Python 并发编程之多协程


一、协程简述

​ 协程不是计算机提供,程序员人为创造。

​ 协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。

​ 实现协程的方法有:

  • greenlet、早期模块
  • yield 关键字
  • asyncio 装饰器(py3.4)
  • async、await 关键字(py3.5)【推荐】

1、greenlet 实现

pip install greenlet
from greenlet import greenlet

def func1():
    print(1)
    gr2.switch()
    print(2)
    gr2.switch()
    
def func2():
    print(3)
    gr1.switch()
    print(4)

gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch()

2、yield 关键字实现

def func1():
    yield 1
    yield from func2()
    yield 2

def func2():
    yield 3
    yield 4

f1 = func1()
for item in f1:
    print(item)

3、asyncio 实现

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务
    yield from asyncio.sleep(2)
    print(2)

@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2)
    print(4)

tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2()),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

​ 特别之处:遇到 IO 阻塞就自动切换


4、async&await 关键字实现

import asyncio

async def func1():
    print(1)
    # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务
    # 这里是设置该协程的休眠
    await asyncio.sleep(2)
    print(2)

async def func2():
    print(3)
    await asyncio.sleep(2)
    print(4)

tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2()),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

二、基于协程的异步

1、事件循环

# 伪代码
任务列表 = [任务1, 任务2, 任务3]
while True:
    可执行任务列表,已完成任务列表 = 检查列表中所有任务,将可执行和已完成的任务返回
    for 就绪任务 in 可执行的任务列表:
    	执行已就绪的任务
    for 已完成的任务 in 已完成的任务列表:
    	在任务列表中移除已完成的任务
    如果任务列表中的任务都已完成,则终止循环
import asyncio
# 生成事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(任务)
# 在 py3.7,一行代码即可:
asyncio.run(任务)

2、async 关键字

​ 协程函数:使用 async 关键字定义的函数。

​ 协程对象:执行协程函数得到的对象。

​ 注:执行协程函数会创建协程对象,函数内部代码不会执行。如果想要运行协程函数内部代码,需要将协程对象交给事件循环。

import asyncio

async def func():
    print('执行 async 函数内部代码~')
asyncio.run(func())

3、await 关键字

​ await 后要跟可等待对象。(包括协程对象Future对象Task对象

import asyncio

async def other():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return 'value'

async def func():
    print("执行协程函数内部代码")
    resp = await other()
    print("阻塞结束,结果为:", resp)

asyncio.run(func())

await 就是等待值获取后再往下执行,等待时事件循环切换到其他任务。


4、Task 对象

​ Task 是对协程的包装。在事件循环中添加多个任务,即可并发调度协程。而且任务是可以包含各种状态的,便于对异步操作状态的控制

​ Task 用于并发调度协程,通过 asyncio.create_task(协程对象) 的方式创建 Task 对象,可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task()asyncio.ensure_future() 函数。不建议手动实例化 Task 对象。

​ 注意:asyncio.create_task() 函数在 Python3.7 中被加入。在 Python3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。

​ 关于 Task 对象,更多详见:https://docs.python.org/3.8/library/asyncio-task.html?highlight=task#asyncio.Task

​ 示例:

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "value"

async def main():
    print("main 开始")
    # 可以分别 await 多个任务,但添加多任务时,一般像下面这样写:
    # 同时可以取名方便最后区分
    task_list = [
        asyncio.create_task(func(), name='f1'),
        asyncio.create_task(func(), name='f2')
    ]
    print("main 添加任务结束")
    # 可以设置超时,超时未完成即在 pending 中
    # 从 >py3.8 开始,向该方法传递协程的可迭代对象将导致混淆行为,所以最好像这里一样传递 Task 可迭代对象
    done, pending = await asyncio.wait(task_list, timeout=None)
    for t in done:
        print(t.get_name(), t.result())

asyncio.run(main())

​ 也可以对 Task 对象使用 add_done_callback()添加回调。

​ 对于某些情况,可能会先定义 task_list,此时的写法将会有所不同:(由于上面所述原因,py 3.8 后开始已经不支持这么做,因此下面的代码仅限于 <=py 3.8 使用

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "value"

task_list = [
    func(), func()
]
# 会先启动 event_loop,再添加任务
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)

所以最好创建一个 main() 协程函数,在其内添加 Task,最后让 event_loop 接管 main 协程


5、Future 对象

Future 是一个特殊的低级可等待对象,表示异步操作的最终结果。

​ Task 类继承于 Future 类,同时 Task 对象内部 await 结果的处理基于 Future 对象。

​ 官方文档:


6、asyncio + 同步模块的异步实现

​ 以一个爬虫为例:

import asyncio
import requests

async def download_image(url):
    print('开始下载', url)
    
    lop = asyncio.get_event_loop()
    # 同步模块不支持异步协程,因此使用进程池
    future = loop.run_in_executor(None, requests.get, url)
    resp = await future
    print('下载完成')
    # 之后进行一些处理/保存工作
    
async def main():
    url_list = [
        'https://www.xxx.com',
        'https://www.yyy.com',
        'https://www.zzz.com'
    ]
    tasks = [asyncio.create_task(download_image(url)) for url in url_list]
   	done, pending = await asyncio.wait(tasks)
    
asyncio.run(main())

7、异步迭代器

​ 异步迭代器:实现了 __aiter__()__anext__() 方法的对象。 __anext__() 必须返回一个 awaitable 对象。 async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 stopAsyncrIteration 异常。

​ 异步可迭代对象:可在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个异步迭代器。

import asyncio

class Reader:
    """自定义异步迭代器"""
    def __init__(self):
        self.count = 0
    async def readline(self):
        self.count += 1
        if self.count == 100:
            return None
        return self.count
    def __aiter__(self):
        return self
    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise StopAsyncIteration
        return val

async def func():
	obj = Reader()
	# async 关键字只能在协程函数中使用
	async for item in obj:
		print(item)
asyncio.run(func())

8、异步上下文管理器

​ 此种对象通过 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。

import asyncio

class AsyncDbManager:
    def __init__(self):
        self.conn = conn
    async def do_something(self):
        # 异步操作数据库 
        return 666
    async def __aenter__(self):
        # 异步链接数据库
        self.conn = await asyncio.sleep(1)
        return self
    async def __aexit__(self, exc_type, exc, tb):
        # 异步关闭数据库
        await asyncio.sleep(1)

async def main():
    async with AsyncDbManager() as f:
        result = await f.do_something()
        print(result)
        
asyncio.run(main())

三、uvloop

​ uvloop 可以使 asyncio 更快。事实上,它至少比 nodejs、gevent 和其他 Python 异步框架要快两倍 。基于 uvloop 的 asyncio 的速度几乎接近了 Go 程序的速度。

pip install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 写 asyncio 的代码,与之前一致

# 内部的事件循环会自动变为 uvloop
asyncio.run(...)

Q.E.D.


f"{言语过于单薄,于此引用心声}"