前言 为了以后写代码运行效率更高,遂补坑。
协程 协程不是计算机提供,程序员人为创造。
协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术,简而言之,其实就是通过一个线程实现代码块相互切换执行。例如:
1 2 3 4 5 6 7 8 9 10 11 def func1 (): print (1 ) ... print2() def func2 (): print (3 ) ... print (4 ) func1() func2()
实现协程有这么几种方法:
greenbelt,早期模块
yield关键字
asyncio装饰器(python3.4)
async,await关键字(python3.5)【推荐】
greenlet
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from greenlet import greenletdef func1 (): print (1 ) gr2.switch() print (2 ) gr2.switch() def func2 (): print (3 ) gr1.switch() print (4 ) gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch()
yield关键字 1 2 3 4 5 6 7 8 9 10 11 12 def func1 (): yield 1 yield from func2() yield 2 def func2 (): yield 3 yield 4 f1 = func1() for item in f1: print (item)
asyncio 在python3.4及之后的版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncio@asyncio.coroutines def func1 (): print (1 ) yield from asyncio.sleep(2 ) print (2 ) @asyncio.coroutines def func2 (): print (3 ) yield from asyncio.sleep(2 ) print (4 ) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
注意:遇到IO阻塞自动切换,网络请求也是IO
async & await关键字 在python3.5及之后的版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def func1 (): print (1 ) await asyncio.sleep(2 ) print (2 ) async def func2 (): print (3 ) await asyncio.sleep(2 ) print (4 ) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
协程意义 在一个线程中如果遇到IO等待时间,线程不会傻傻等,利用空闲的时间再去完成一点其它的任务
案例:爬取三个网站(网络IO)
普通方式
1 2 3 4 5 6 7 8 9 10 11 12 13 import requestsdef get_page (url ): response = requests.get(url) print (len (response.text)) if __name__ == "__main__" : urls = [ "https://www.ghtwf01.cn/index.php/archives/350/" , "https://www.ghtwf01.cn/index.php/archives/341/" , "https://www.ghtwf01.cn/index.php/archives/295/" ] for url in urls: get_page(url)
协程方式(异步)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import aiohttpimport asyncioasync def get_page (session, url ): async with session.get(url) as response: html = await response.text() print (len (html)) async def main (): urls = [ "https://www.ghtwf01.cn/index.php/archives/350/" , "https://www.ghtwf01.cn/index.php/archives/341/" , "https://www.ghtwf01.cn/index.php/archives/295/" ] async with aiohttp.ClientSession() as session: tasks = [asyncio.create_task(get_page(session, url)) for url in urls] await asyncio.wait(tasks) if __name__ == "__main__" : asyncio.run(main())
异步编程 事件循环 理解成一个死循环,去检查并执行某些代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 # 伪代码 任务列表 = [任务1, 任务2, 任务3] while True: 可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将"可执行"和"已完成"的任务返回 for 就绪任务 in 可执行的任务列表: 执行已就绪的任务 for 已完成的任务 in 已完成的任务列表: 在任务列表中移除已完成的任务 如果任务列表中的任务都已完成,则终止循环
1 2 3 4 5 6 import asyncioloop = asyncio.get_event_loop() loop.run_until_complete(任务)
如何执行协程函数 协程函数:定义函数时async def 函数名
协程对象:执行协程函数()
得到的协程对象
1 2 3 4 async def func (): print ("1" ) result = func()
注意:执行协程函数创建协程对象,函数内部代码不会执行。
如果想要运行协程函数内部代码,必须将协程对象交给事件循环来处理
1 2 3 4 5 6 7 8 9 10 import asyncioasync def func (): print ("1" ) result = func() asyncio.run( result )
await await + 可等待的对象(协程对象、Future、 Task对象——>暂且通俗理解为IO等待)
示例1:
1 2 3 4 5 6 7 8 import asyncioasync def func (): print (1 ) response = await asyncio.sleep(2 ) print ("结束" , response) asyncio.run( func() )
示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import asyncioasync def others (): print ("start" ) await asyncio.sleep(2 ) print ("end" ) return '返回值' async def func (): print ("执行协程函数内部代码" ) response = await others() print ("IO请求结束,结果为:" , response) asyncio.run(func())
示例3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioimport timeasync def others (): print ("start" ) await asyncio.sleep(2 ) print ("end" ) return '返回值' async def func (): print ("执行协程函数内部代码" ) response1 = await others() print ("IO请求结束,结果为:" , response1) response2 = await others() print ("IO请求结束,结果为:" , response2) start_time = time.time() asyncio.run(func()) end_time = time.time() print ("耗时:" ,end_time-start_time)
看上去像是串行,不过据说因为有await还是会切换到其它任务执行,应该是这个案例没有太多其他任务吧,我切换成串行没发现有时间优化,暂且记住可以优化,可添加到自己的程序里。
Task对象 白话:在事件循环中添加多个任务的。
Tasks用于并发调度协程,通过asyncio.create_task(协程对象)
方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task()
函数以外,还可以用低层级的loop.create_task()
或ensure_future()
函数。不建议手动实例化Task对象。
注意:asyncio.create_task()
函数在Python3.7中被加入。在Python3.7之前
可以改用低层级的asyncio.ensure_future()
函数。
示例1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import asyncioasync def func (): print (1 ) await asyncio.sleep(2 ) print (2 ) return '返回值' async def main (): print ("main开始" ) task1 = asyncio.create_task( func() ) task2 = asyncio.create_task( func() ) ret1 = await task1 ret2 = await task2 print (ret1, ret2) asyncio.run(main())
优化后示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def func (): print (1 ) await asyncio.sleep(2 ) print (2 ) return "返回值" async def main (): print ("main开始" ) task_list = [ asyncio.create_task( func() ), asyncio.create_task( func() ) ] print ("main结束" ) done,pending = await asyncio.wait(task_list) print (done) asyncio.run( main() )
如果要将tasklist写在创建事件循环前,示例3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioasync def func (): print (1 ) await asyncio.sleep(2 ) print (2 ) return "返回值" task_list = [ func(), func(), ] done,pending = asyncio.run( asyncio.wait(task_list) ) print (done)
实战案例 异步redis 在使用python代码操作redis时,链接/操作/断开都是网络IO
示例1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioimport aioredisasync def execute (address, password ): print ("开始执行" , address) redis = await aioredis.create_redis(address, password=password) await redis.hmset_dict('car' , key1=1 , key2=2 , key3=3 ) result = await redis.hgetall('car' , encoding='utf-8' ) print (result) redis.close() await redis.wait_closed() print ("结束" , address) asyncio.run( execute('redis://x.x.x.x' ,'root' ) )
就是每一步网络IO操作就加上await,但因为没其它操作,所以速度优化感觉不明显,所以有示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioimport aioredisasync def execute (address, password ): print ("开始执行" , address) redis = await aioredis.create_redis(address, password=password) await redis.hmset_dict('car' , key1=1 , key2=2 , key3=3 ) result = await redis.hgetall('car' , encoding='utf-8' ) print (result) redis.close() await redis.wait_closed() print ("结束" , address) task_list = [ execute('redis://x.x.x.x' ,'root' ), execute('redis://x.x.x.x' ,'root' ) ] asyncio.run( asyncio.wait(task_list) )
异步mysql
示例1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioimport aiomysqlasync def execute (): conn = await aiomysql.connect(host='127.0.0.1' , port=3303 , user='root' , password='123' , db='mysql' , ) cur = await conn.cursor() result = await cur.fetchall() await cur.close() conn.close() asyncio.run(execute())
一样示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioimport aiomysqlasync def execute (): conn = await aiomysql.connect(host='127.0.0.1' , port=3303 , user='root' , password='123' , db='mysql' , ) cur = await conn.cursor() result = await cur.fetchall() await cur.close() conn.close() task_list = [ execute(), execute(), ] asyncio.run(asyncio.wait(task_list))
异步爬虫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import aiohttpimport asyncioasync def fetch (session, url ): print ("发送请求:" , url) async with session.get(url, verify_ssl=False ) as response: text = await response.text() print ("得到结果:" , url, len (text)) async def main (): async with aiohttp.ClientSession() as session: url_list = [ "https://www.ghtwf01.cn/index.php/archives/350/" , "https://www.ghtwf01.cn/index.php/archives/341/" , "https://www.ghtwf01.cn/index.php/archives/295/" ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] done, pedding = await asyncio.wait(tasks) if __name__ == "__main__" : asyncio.run( main() )
总结 最大的意义:通过一个线程利用其IO等待时间去做一些其它的事情。