前言

为了以后写代码运行效率更高,遂补坑。

协程

协程不是计算机提供,程序员人为创造。

协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术,简而言之,其实就是通过一个线程实现代码块相互切换执行。例如:

def func1():
  print(1)
  ...
  print2()
 
def func2():
  print(3)
  ...
  print(4)
func1()
func2()

实现协程有这么几种方法:

  • greenbelt,早期模块
  • yield关键字
  • asyncio装饰器(python3.4)
  • async,await关键字(python3.5)【推荐】

greenlet

pip3 install greenlet
from greenlet import greenlet

def func1():
  print(1) # 第2步:输出1
  gr2.switch() # 第3步,切换到func2函数
  print(2) # 第6步,输出2
  gr2.switch() # 第7步,切换到func2函数

def func2():
  print(3) # 第4步,输出3
  gr1.switch() # 第5步,切换到func1函数
  print(4) # 第8步,输出4

gr1 = greenlet(func1)
gr2 = greenlet(func2)

gr1.switch() # 第1步,去执行func1函数

yield关键字

def func1():
  yield 1
  yield from func2()
  yield 2

def func2():
  yield 3
  yield 4
  
f1 = func1()
for item in f1:
  print(item)

asyncio

在python3.4及之后的版本

import asyncio

@asyncio.coroutines
def func1():
  print(1)
  yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
  print(2)

@asyncio.coroutines
def func2():
  print(3)
  yield from asyncio.sleep(2)# 遇到IO耗时操作,自动化切换到tasks中的其他任务
  print(4)

tasks = [
  asyncio.ensure_future(func1()),
  asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

注意:遇到IO阻塞自动切换,网络请求也是IO

async & await关键字

在python3.5及之后的版本

import asyncio

async def func1():
  print(1)
  await asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
  print(2)

async def func2():
  print(3)
  await asyncio.sleep(2)# 遇到IO耗时操作,自动化切换到tasks中的其他任务
  print(4)

tasks = [
  asyncio.ensure_future(func1()),
  asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

协程意义

在一个线程中如果遇到IO等待时间,线程不会傻傻等,利用空闲的时间再去完成一点其它的任务

案例:爬取三个网站(网络IO)

  • 普通方式

    import requests
    def get_page(url):
        response = requests.get(url)
        print(len(response.text))
    
    if __name__ == "__main__":
        urls = [
            "https://www.ghtwf01.cn/index.php/archives/350/",
            "https://www.ghtwf01.cn/index.php/archives/341/",
            "https://www.ghtwf01.cn/index.php/archives/295/"
        ]
        for url in urls:
            get_page(url)
  • 协程方式(异步)
import aiohttp
import asyncio

async def get_page(session, url):
    async with session.get(url) as response:
        html = await response.text()
        print(len(html))

async def main():
    urls = [
        "https://www.ghtwf01.cn/index.php/archives/350/",
        "https://www.ghtwf01.cn/index.php/archives/341/",
        "https://www.ghtwf01.cn/index.php/archives/295/"
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(get_page(session, url)) for url in urls]
        await asyncio.wait(tasks)

if __name__ == "__main__":
    asyncio.run(main())

异步编程

事件循环

理解成一个死循环,去检查并执行某些代码。

# 伪代码

任务列表 = [任务1, 任务2, 任务3]

while True:
    可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将"可执行"和"已完成"的任务返回
    
    for 就绪任务 in 可执行的任务列表:
        执行已就绪的任务
    for 已完成的任务 in 已完成的任务列表:
        在任务列表中移除已完成的任务
    
    如果任务列表中的任务都已完成,则终止循环
import asyncio

# 去生成获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(任务)

如何执行协程函数

协程函数:定义函数时async def 函数名

协程对象:执行协程函数()得到的协程对象

async def func():
  print("1")

result = func()

注意:执行协程函数创建协程对象,函数内部代码不会执行。

如果想要运行协程函数内部代码,必须将协程对象交给事件循环来处理

import asyncio

async def func():
  print("1")

result = func()

# loop = asymcio.get_event_loop()
# loop.run_until_complete( result )
asyncio.run( result ) #python3.7

await

await + 可等待的对象(协程对象、Future、 Task对象——>暂且通俗理解为IO等待)

示例1:

import asyncio

async def func():
    print(1)
    response = await asyncio.sleep(2)
    print("结束", response)

asyncio.run( func() )

示例2:

import asyncio


async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return '返回值'


async def func():
    print("执行协程函数内部代码")

    # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)
    response = await others()


    print("IO请求结束,结果为:", response)

asyncio.run(func())

示例3:

import asyncio
import time

async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return '返回值'

async def func():
    print("执行协程函数内部代码")
    # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)
    response1 = await others()
    print("IO请求结束,结果为:", response1)

    response2 = await others()
    print("IO请求结束,结果为:", response2)

start_time = time.time()
asyncio.run(func())
end_time = time.time()
print("耗时:",end_time-start_time)

看上去像是串行,不过据说因为有await还是会切换到其它任务执行,应该是这个案例没有太多其他任务吧,我切换成串行没发现有时间优化,暂且记住可以优化,可添加到自己的程序里。

Task对象

白话:在事件循环中添加多个任务的。

Tasks用于并发调度协程,通过asyncio.create_task(协程对象)方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task()函数以外,还可以用低层级的loop.create_task()ensure_future()函数。不建议手动实例化Task对象。

注意:asyncio.create_task()函数在Python3.7中被加入。在Python3.7之前

可以改用低层级的asyncio.ensure_future()函数。

示例1:

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return '返回值'

async def main():
    print("main开始")
    # 创建Task对象,将当前执行func函数任务添加到事件循环
    task1 = asyncio.create_task( func() )
    task2 = asyncio.create_task( func() )
        
    # 当前执行某协程遇到IO操作时,会自动化切换执行其他任务
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)

asyncio.run(main())

优化后示例2:

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main开始")
    task_list = [
        asyncio.create_task( func() ),
        asyncio.create_task( func() )
    ]
    print("main结束")
    done,pending = await asyncio.wait(task_list)
    print(done)

asyncio.run( main() )

如果要将tasklist写在创建事件循环前,示例3:

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

task_list = [
    func(),
    func(),
]
done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)

实战案例

异步redis

在使用python代码操作redis时,链接/操作/断开都是网络IO

pip3 install aioredis

示例1:

import asyncio
import aioredis

async def execute(address, password):
    print("开始执行", address)
    # 网络IO操作:创建redis连接
    redis = await aioredis.create_redis(address, password=password)
    
    # 网络IO操作:在redis中设置哈希值car,内部再设三个键值对
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    
    # 网络IO操作:去redis中获取值
    result = await  redis.hgetall('car', encoding='utf-8')
    print(result)
    redis.close()
    
    # 网络IO操作:关闭redis连接
    await  redis.wait_closed()
    print("结束", address)

asyncio.run( execute('redis://x.x.x.x','root') )

就是每一步网络IO操作就加上await,但因为没其它操作,所以速度优化感觉不明显,所以有示例2:

import asyncio
import aioredis

async def execute(address, password):
    print("开始执行", address)
    # 网络IO操作:创建redis连接
    redis = await aioredis.create_redis(address, password=password)

    # 网络IO操作:在redis中设置哈希值car,内部再设三个键值对
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)

    # 网络IO操作:去redis中获取值
    result = await  redis.hgetall('car', encoding='utf-8')
    print(result)
    redis.close()

    # 网络IO操作:关闭redis连接
    await  redis.wait_closed()
    print("结束", address)

task_list = [
    execute('redis://x.x.x.x','root'),
    execute('redis://x.x.x.x','root')
]
asyncio.run( asyncio.wait(task_list) )

异步mysql

pip3 install aiomysql

示例1:

import asyncio
import aiomysql

async def execute():
    # 网络IO操作:连接MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3303, user='root', password='123', db='mysql', )
    # 网络IO操作:创建CURSOR
    cur = await conn.cursor()
    # 网络IO操作:执行SQL
    result = await cur.fetchall()
    # 网络IO操作:关闭连接
    await cur.close()
    conn.close()

asyncio.run(execute())

一样示例2:

import asyncio
import aiomysql

async def execute():
    # 网络IO操作:连接MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3303, user='root', password='123', db='mysql', )
    # 网络IO操作:创建CURSOR
    cur = await conn.cursor()
    # 网络IO操作:执行SQL
    result = await cur.fetchall()
    # 网络IO操作:关闭连接
    await cur.close()
    conn.close()

task_list = [
    execute(),
    execute(),
]
asyncio.run(asyncio.wait(task_list))

异步爬虫

pip3 install aiohttp
import aiohttp
import asyncio

async def fetch(session, url):
    print("发送请求:", url)
    async with session.get(url, verify_ssl=False) as response:
        text = await response.text()
        print("得到结果:", url, len(text))

async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            "https://www.ghtwf01.cn/index.php/archives/350/",
            "https://www.ghtwf01.cn/index.php/archives/341/",
            "https://www.ghtwf01.cn/index.php/archives/295/"
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        done, pedding = await asyncio.wait(tasks)

if __name__ == "__main__":
    asyncio.run( main() )

总结

最大的意义:通过一个线程利用其IO等待时间去做一些其它的事情。