本文首发于微信公众号「Python之美」: https://mp.weixin.qq.com/s/B9ZAazfXEtAPtptewhWteQ Asyncio.gather vs asyncio.wAIt
在上篇文章已经看到多次用asyncio.gather了,还有另外一台用法是asyncio.wait,他们都可以让多个协程并发执行。那为啥提供2个方法呢?他们有什么区别,适用场景是如何样的呢?其实我之前也是有点困惑,直到我读了asyncio的源码。我们先看2个协程的例子:
async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')
return 'A'
async def b():
print('Suspending b')
await asyncio.sleep(1)
print('Resuming b')
return 'B'在IPython里面用gather执行一下:
In : return_value_a, return_value_b = await asyncio.gather(a(), b())
Suspending a
Suspending b
Resuming b
Resuming a
In : return_value_a, return_value_b
Out: ('A', 'B')Ok,asyncio.gather方法的名字说明了它的用途,gather的意思是「搜集」,也就是能够收集协程的结果,而且要注意,它会按输入协程的顺序保存的对应协程的执行结果。
接着我们说asyncio.await,先执行一下:
In : done, pending = await asyncio.wait([a(), b()])
Suspending b
Suspending a
Resuming b
Resuming a
In : done
Out:
{<Task finished coro=<a() done, defined at <ipython-input-5-5ee142734d16>:1> result=&#39;A&#39;>,
<Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result=&#39;B&#39;>}
In : pending
Out: set()
In : task = list(done)[0]
In : task
Out: <Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result=&#39;B&#39;>
In : task.result()
Out: &#39;B&#39;asyncio.wait的返回值有2项,第一项表示完成的任务列表(done),第二项表示等待(Future)完成的任务列表(pending),每个任务都是一台Task实例,由于这2个任务都已经完成,所以可以执行task.result()获得协程返回值。
Ok, 说到这里,我总结下它俩的区别的第一层区别:
- asyncio.gather封装的Task全程黑盒,只告诉你协程结果。
- asyncio.wait会返回封装的Task(包含已完成和挂起的任务),如果你关注协程执行结果你需要从对应Task实例里面用result方法自个拿。
为啥说「第一层区别」,asyncio.wait看名字可以理解为「等待」,所以返回值的第二项是pending列表,但是看上面的例子,pending是空集合,那么在什么情况下,pending里面不为空呢?这就是第二层区别:asyncio.wait支持选择返回的时机。
asyncio.wait支持一台接收参数return_when,在默认情况下,asyncio.wait会等待全部任务完成(return_when=&#39;ALL_COMPLETED&#39;),它还支持FIRST_COMPLETED(第一台协程完成就返回)和FIRST_EXCEPTION(出现第一台异常就返回):
In : done, pending = await asyncio.wait([a(), b()], return_when=asyncio.tasks.FIRST_COMPLETED)
Suspending a
Suspending b
Resuming b
In : done
Out: {<Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result=&#39;B&#39;>}
In : pending
Out: {<Task pending coro=<a() running at <ipython-input-5-5ee142734d16>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108065e58>()]>>}看到了吧,这次只有协程b完成了,协程a或是pending状态。
在大部分情况下,用asyncio.gather是足够的,如果你有特殊需求,可以选择asyncio.wait,举2个例子:
- 需要拿到封装好的Task,以便取消或者添加成功回调等
- 业务上需要FIRST_COMPLETED/FIRST_EXCEPTION即返回的
asyncio.create_task vs loop.create_task vs asyncio.ensure_future
创建一台Task一共有3种方法,如这小节的标题。在上篇文章我说过,从Python 3.7开始可以统一的使用更高阶的asyncio.create_task。其实asyncio.create_task就是用的loop.create_task:
def create_task(coro):
loop = events.get_running_loop()
return loop.create_task(coro)loop.create_task接受的参数需要是一台协程,但是asyncio.ensure_future除了接受协程,还可以是Future对象或者awaitable对象:
- 如果参数是协程,其实底层或是用的loop.create_task,返回Task对象
- 如果是Future对象会直接返回
- 如果是一台awaitable对象会await这个对象的__await__方法,再执行一次ensure_future,最后返回Task或者Future
所以就像ensure_future名字说的,确保这个是一台Future对象:Task是Future 子类,前面说过一般情况下开发者不需要自个创建Future
其实前面说的asyncio.wait和asyncio.gather里面都用了asyncio.ensure_future。对于绝大多数场景要并发执行的是协程,所以直接用asyncio.create_task就足够了~
shield
接着说asyncio.shield,用它可以屏蔽取消操作。一直到这里,我们还没有见识过Task的取消。看一台例子:
In : loop = asyncio.get_event_loop()
In : task1 = loop.create_task(a())
In : task2 = loop.create_task(b())
In : task1.cancel()
Out: True
In : await asyncio.gather(task1, task2)
Suspending a
Suspending b
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
cell_name in async-def-wrapper()
CancelledError:在上面的例子中,task1被取消了后再用asyncio.gather收集结果,直接抛CancelledError错误了。这里有个细节,gather支持return_exceptions参数:
In : await asyncio.gather(task1, task2, return_exceptions=True)
Out: [concurrent.futures._base.CancelledError(), &#39;B&#39;]可以看到,task2依然会执行完成,但是task1的返回值是一台CancelledError错误,也就是任务被取消了。如果一台创建后就不希望被任何情况取消,可以使用asyncio.shield保护任务能顺利完成。不过要注意一台陷阱,先看错误的写法:
In : task1 = asyncio.shield(a())
In : task2 = loop.create_task(b())
In : task1.cancel()
Out: True
In : await asyncio.gather(task1, task2, return_exceptions=True)
Suspending a
Suspending b
Resuming b
Out: [concurrent.futures._base.CancelledError(), &#39;B&#39;]可以看到依然是CancelledError错误,且协程a未执行完成,正确的用法是这样的:
In : task1 = asyncio.shield(a())
In : task2 = loop.create_task(b())
In : ts = asyncio.gather(task1, task2, return_exceptions=True)
In : task1.cancel()
Out: True
In : await ts
Suspending a
Suspending b
Resuming a
Resuming b
Out: [concurrent.futures._base.CancelledError(), &#39;B&#39;]可以看到虽然结果是一台CancelledError错误,但是看输出能确认协程实际上是执行了的。所以正确步骤是:
- 先创建 GatheringFuture 对象 ts
- 取消任务
- await ts
asynccontextmanager
如果你了解Python,之前可能听过或者用过contextmanager ,一台上下文管理器。通过一台计时的例子就理解它的作用:
from contextlib import contextmanager
async def a():
await asyncio.sleep(3)
return &#39;A&#39;
async def b():
await asyncio.sleep(1)
return &#39;B&#39;
async def s1():
return await asyncio.gather(a(), b())
@contextmanager
def timed(func):
start = time.perf_counter()
yield asyncio.run(func())
print(f&#39;Cost: {time.perf_counter() - start}&#39;)timed函数用了contextmanager装饰器,把协程的运行结果yield出来,执行结束后还计算了耗时:
In : from contextmanager import *
In : with timed(s1) as rv:
...: print(f&#39;Result: {rv}&#39;)
...:
Result: [&#39;A&#39;, &#39;B&#39;]
Cost: 3.0052654459999992大家先体会一下。在Python 3.7添加了asynccontextmanager,也就是异步版本的contextmanager,适合异步函数的执行,上例可以这么改:
@asynccontextmanager
async def async_timed(func):
start = time.perf_counter()
yield await func()
print(f&#39;Cost: {time.perf_counter() - start}&#39;)
async def main():
async with async_timed(s1) as rv:
print(f&#39;Result: {rv}&#39;)
In : asyncio.run(main())
Result: [&#39;A&#39;, &#39;B&#39;]
Cost: 3.00414147500004async版本的with要用async with,另外要注意yield await func()这句,相当于yield + await func()
PS: contextmanager 和 asynccontextmanager 最好的理解方法是去看源码注释,可以看延伸阅读链接2,另外延伸阅读链接3包含的PR中相关的测试代码部分也能帮助你理解
代码目录
本文代码可以在 mp项目 找到
延伸阅读
- https://github.com/python/cpython/blob/3.7/Lib/asyncio/tasks.py#L574
- https://github.com/python/cpython/blob/3.7/Lib/contextlib.py#L243
- https://github.com/python/cpython/pull/360/
|