前言

为了以后写代码运行效率更高,遂补坑。

协程

协程不是计算机提供,程序员人为创造。

协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术,简而言之,其实就是通过一个线程实现代码块相互切换执行。例如:

1
2
3
4
5
6
7
8
9
10
11
def func1():
print(1)
...
print2()

def func2():
print(3)
...
print(4)
func1()
func2()

实现协程有这么几种方法:

  • greenbelt,早期模块
  • yield关键字
  • asyncio装饰器(python3.4)
  • async,await关键字(python3.5)【推荐】

greenlet

1
pip3 install greenlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from greenlet import greenlet

def func1():
print(1) # 第2步:输出1
gr2.switch() # 第3步,切换到func2函数
print(2) # 第6步,输出2
gr2.switch() # 第7步,切换到func2函数

def func2():
print(3) # 第4步,输出3
gr1.switch() # 第5步,切换到func1函数
print(4) # 第8步,输出4

gr1 = greenlet(func1)
gr2 = greenlet(func2)

gr1.switch() # 第1步,去执行func1函数

yield关键字

1
2
3
4
5
6
7
8
9
10
11
12
def func1():
yield 1
yield from func2()
yield 2

def func2():
yield 3
yield 4

f1 = func1()
for item in f1:
print(item)

asyncio

在python3.4及之后的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

@asyncio.coroutines
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(2)

@asyncio.coroutines
def func2():
print(3)
yield from asyncio.sleep(2)# 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(4)

tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

注意:遇到IO阻塞自动切换,网络请求也是IO

async & await关键字

在python3.5及之后的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def func1():
print(1)
await asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(2)

async def func2():
print(3)
await asyncio.sleep(2)# 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(4)

tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

协程意义

在一个线程中如果遇到IO等待时间,线程不会傻傻等,利用空闲的时间再去完成一点其它的任务

案例:爬取三个网站(网络IO)

  • 普通方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import requests
    def get_page(url):
    response = requests.get(url)
    print(len(response.text))

    if __name__ == "__main__":
    urls = [
    "https://www.ghtwf01.cn/index.php/archives/350/",
    "https://www.ghtwf01.cn/index.php/archives/341/",
    "https://www.ghtwf01.cn/index.php/archives/295/"
    ]
    for url in urls:
    get_page(url)
  • 协程方式(异步)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import aiohttp
import asyncio

async def get_page(session, url):
async with session.get(url) as response:
html = await response.text()
print(len(html))

async def main():
urls = [
"https://www.ghtwf01.cn/index.php/archives/350/",
"https://www.ghtwf01.cn/index.php/archives/341/",
"https://www.ghtwf01.cn/index.php/archives/295/"
]
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(get_page(session, url)) for url in urls]
await asyncio.wait(tasks)

if __name__ == "__main__":
asyncio.run(main())

异步编程

事件循环

理解成一个死循环,去检查并执行某些代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 伪代码

任务列表 = [任务1, 任务2, 任务3]

while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将"可执行"和"已完成"的任务返回

for 就绪任务 in 可执行的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除已完成的任务

如果任务列表中的任务都已完成,则终止循环
1
2
3
4
5
6
import asyncio

# 去生成获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(任务)

如何执行协程函数

协程函数:定义函数时async def 函数名

协程对象:执行协程函数()得到的协程对象

1
2
3
4
async def func():
print("1")

result = func()

注意:执行协程函数创建协程对象,函数内部代码不会执行。

如果想要运行协程函数内部代码,必须将协程对象交给事件循环来处理

1
2
3
4
5
6
7
8
9
10
import asyncio

async def func():
print("1")

result = func()

# loop = asymcio.get_event_loop()
# loop.run_until_complete( result )
asyncio.run( result ) #python3.7

await

await + 可等待的对象(协程对象、Future、 Task对象——>暂且通俗理解为IO等待)

示例1:

1
2
3
4
5
6
7
8
import asyncio

async def func():
print(1)
response = await asyncio.sleep(2)
print("结束", response)

asyncio.run( func() )

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio


async def others():
print("start")
await asyncio.sleep(2)
print("end")
return '返回值'


async def func():
print("执行协程函数内部代码")

# 遇到IO操作挂起当前协程(任务),等IO操作完成之后再往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)
response = await others()


print("IO请求结束,结果为:", response)

asyncio.run(func())

示例3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time

async def others():
print("start")
await asyncio.sleep(2)
print("end")
return '返回值'

async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等IO操作完成之后再往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)
response1 = await others()
print("IO请求结束,结果为:", response1)

response2 = await others()
print("IO请求结束,结果为:", response2)

start_time = time.time()
asyncio.run(func())
end_time = time.time()
print("耗时:",end_time-start_time)

看上去像是串行,不过据说因为有await还是会切换到其它任务执行,应该是这个案例没有太多其他任务吧,我切换成串行没发现有时间优化,暂且记住可以优化,可添加到自己的程序里。

Task对象

白话:在事件循环中添加多个任务的。

Tasks用于并发调度协程,通过asyncio.create_task(协程对象)方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task()函数以外,还可以用低层级的loop.create_task()ensure_future()函数。不建议手动实例化Task对象。

注意:asyncio.create_task()函数在Python3.7中被加入。在Python3.7之前

可以改用低层级的asyncio.ensure_future()函数。

示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio

async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'

async def main():
print("main开始")
# 创建Task对象,将当前执行func函数任务添加到事件循环
task1 = asyncio.create_task( func() )
task2 = asyncio.create_task( func() )

# 当前执行某协程遇到IO操作时,会自动化切换执行其他任务
ret1 = await task1
ret2 = await task2
print(ret1, ret2)

asyncio.run(main())

优化后示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"

async def main():
print("main开始")
task_list = [
asyncio.create_task( func() ),
asyncio.create_task( func() )
]
print("main结束")
done,pending = await asyncio.wait(task_list)
print(done)

asyncio.run( main() )

如果要将tasklist写在创建事件循环前,示例3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"

task_list = [
func(),
func(),
]
done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)

实战案例

异步redis

在使用python代码操作redis时,链接/操作/断开都是网络IO

1
pip3 install aioredis

示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import aioredis

async def execute(address, password):
print("开始执行", address)
# 网络IO操作:创建redis连接
redis = await aioredis.create_redis(address, password=password)

# 网络IO操作:在redis中设置哈希值car,内部再设三个键值对
await redis.hmset_dict('car', key1=1, key2=2, key3=3)

# 网络IO操作:去redis中获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()

# 网络IO操作:关闭redis连接
await redis.wait_closed()
print("结束", address)

asyncio.run( execute('redis://x.x.x.x','root') )

就是每一步网络IO操作就加上await,但因为没其它操作,所以速度优化感觉不明显,所以有示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import aioredis

async def execute(address, password):
print("开始执行", address)
# 网络IO操作:创建redis连接
redis = await aioredis.create_redis(address, password=password)

# 网络IO操作:在redis中设置哈希值car,内部再设三个键值对
await redis.hmset_dict('car', key1=1, key2=2, key3=3)

# 网络IO操作:去redis中获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()

# 网络IO操作:关闭redis连接
await redis.wait_closed()
print("结束", address)

task_list = [
execute('redis://x.x.x.x','root'),
execute('redis://x.x.x.x','root')
]
asyncio.run( asyncio.wait(task_list) )

异步mysql

1
pip3 install aiomysql

示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import aiomysql

async def execute():
# 网络IO操作:连接MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3303, user='root', password='123', db='mysql', )
# 网络IO操作:创建CURSOR
cur = await conn.cursor()
# 网络IO操作:执行SQL
result = await cur.fetchall()
# 网络IO操作:关闭连接
await cur.close()
conn.close()

asyncio.run(execute())

一样示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import aiomysql

async def execute():
# 网络IO操作:连接MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3303, user='root', password='123', db='mysql', )
# 网络IO操作:创建CURSOR
cur = await conn.cursor()
# 网络IO操作:执行SQL
result = await cur.fetchall()
# 网络IO操作:关闭连接
await cur.close()
conn.close()

task_list = [
execute(),
execute(),
]
asyncio.run(asyncio.wait(task_list))

异步爬虫

1
pip3 install aiohttp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import aiohttp
import asyncio

async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
text = await response.text()
print("得到结果:", url, len(text))

async def main():
async with aiohttp.ClientSession() as session:
url_list = [
"https://www.ghtwf01.cn/index.php/archives/350/",
"https://www.ghtwf01.cn/index.php/archives/341/",
"https://www.ghtwf01.cn/index.php/archives/295/"
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
done, pedding = await asyncio.wait(tasks)

if __name__ == "__main__":
asyncio.run( main() )

总结

最大的意义:通过一个线程利用其IO等待时间去做一些其它的事情。