| 在之前的深入理解asyncio(二)一文中我认为正确使用asyncio.shield的步骤是: 
 当时举的例子:
 先创建 GatheringFuture 对象 ts
取消任务
awAIt ts
 import asyncio
 
 
 async def a():
 print('Suspending a')
 await asyncio.sleep(2)
 print('Resuming a')
 return 'A'
 
 
 async def b():
 print('Suspending b')
 await asyncio.sleep(1)
 print('Resuming b')
 return 'B'
 
 
 async def c1():
 task1 = asyncio.shield(a())
 task2 = asyncio.create_task(b())
 task1.cancel()
 await asyncio.gather(task1, task2, return_exceptions=True)
 
 
 async def c2():
 task1 = asyncio.shield(b())
 task2 = asyncio.create_task(a())
 task1.cancel()
 await asyncio.gather(task1, task2, return_exceptions=True)
 
 async def c3():
 task1 = asyncio.shield(a())
 task2 = asyncio.create_task(b())
 ts = asyncio.gather(task1, task2, return_exceptions=True)
 task1.cancel()
 await ts按当时我的理解,c1和c2中先cancel再gather的用法是错误的,正确的是c3的写法。
 昨天一位买过我书的读者(网名: 秋月风夏, 感谢反馈)QQ问我为啥不能复现,一番讨论发现我被IPython提供的await欺骗了:
 In [1]: from coro import *
 
 In [2]: await c3()
 Suspending a
 Suspending b
 Resuming b  # 第一次执行await并没有完全执行a的逻辑
 
 In [3]: await c3()
 Suspending a
 Suspending b
 Resuming a
 Resuming b  # 之后执行await才会让a执行完整
 
 In [4]: await c1()
 Suspending a
 Suspending b
 Resuming a
 Resuming b
 
 In [5]: await c1()
 Suspending a
 Suspending b
 Resuming a
 Resuming b可以看到除了第一次,c1也都是正常完成的,不存在我说的「陷阱」。
 研究了半天IPython源码没发现这个问题是指什么造成的,但是我们可以不使用IPython提供的await支持:
 In [6]: asyncio.run(c1())
 Suspending a
 Suspending b
 Resuming b
 
 In [7]: asyncio.run(c1())
 Suspending a
 Suspending b
 Resuming b
 
 In [8]: asyncio.run(c3())
 Suspending a
 Suspending b
 Resuming b
 
 In [9]: asyncio.run(c3())
 Suspending a
 Suspending b
 Resuming b看到了吧?直接用asyncio.run运行这些发现他们都是不对的!!那回过头来,看看c2:
 In [10]: asyncio.run(c2())
 Suspending b
 Suspending a
 Resuming b
 Resuming a
 
 In [11]: asyncio.run(c2())
 Suspending b
 Suspending a
 Resuming b
 Resuming a它是正常的。而c1和c2的区别仅仅是取消协程a或是协程b的问题,那么为啥会造成取消的任务有些执行完整情况不同呢?大家可以先思考一下。
 提示:协程a/b的asyncio.sleep时间不同:a任务执行完整至少要3秒,b任务执行完成至少要1秒。
 所以在没有看源码确认前,我「猜」是asyncio.gather在执行任务时不考虑那些被取消了的任务的感受,现有任务都完成即结束。所以:
 
 所以,是「先cancel再gather」或是「先gather再cancel」没关系,要看取消的任务(们)来不来得及赶上gather最后一班车!
 如果取消任务a,当b任务1秒结束后,a还没完成,任务收集就结束了。所以任务a没有执行完整
如果取消任务b,协程b要早于协程a的执行时间
 学习不能靠猜,刚才我读了源码和大家分享下gather是如何处理取消了的任务(Task)的。
 我们就拿前面说的c3里面的逻辑来说。
 在asyncio.gather中首先会对所有Task(task1和task2)加一台done_callback,由于task1被取消了,所以task1.done()的结果为True(只要不是pending状态就为done),在asyncio.futures.Future的实现中,task1会立刻回调(详细的可以看延伸阅读链接1):
 static PyObject *
 future_add_done_callback(FutureObj *fut, PyObject *arg, PyObject *ctx)
 {
 ...
 if (fut->fut_state != STATE_PENDING) {
 /* The future is done/cancelled, so schedule the callback
 right away. */
 if (call_soon(fut->fut_loop, arg, (PyObject*) fut, ctx)) {
 return NULL;
 }
 }
 }接着看一下gather里面的回调_done_callback(详细的可以看延伸阅读链接2):
 def _done_callback(fut):
 nonlocal nfinished
 nfinished += 1
 ...
 if nfinished == nfuts:
 results = []
 
 for fut in children:
 if fut.cancelled():
 res = exceptions.CancelledError()
 else:
 res = fut.exception()
 if res is None:
 res = fut.result()
 results.append(res)
 ...每次有任务完成回调,一开始先让nfinished累加1,nfuts是任务总数。如果完成数(nfinished)等于任务总数时就开始对全部任务设置结果,如果任务被取消就会设置为exceptions.CancelledError()(哪怕它正常完成了也不会通过result方法把结果返回出来)。
 前面说到task1在gather一开始就回调了,但是由于nfuts为2,而nfinished为1,所以不符合条件,需要等待task2(也就是未取消的任务完成)才能返回。
 asyncio.shield所谓的保护其实就是让协程a作为一台Inner Future(内部的),再在事件循环上创建一台新的 Outer Future(外部的),所以a的逻辑继续进行(inner),而我们取消的task1只是outer。
 那如何用Shield才正确?
 
 第一种情况:符合短板理论。也就是说「取消的任务耗时小于正常任务耗时」,那么在gather搜集结果时被取消的任务已经完成。可以感受到前面例子中的 c2 是正确的。
 接着说第二种,也就是官网提到的场景。在官方文档中对它的描述非常模糊,就说了一句:
 
  Protect an awaitable object from being cancelled.我的理解就是「保护一台可await对象,防止其被取消」。这里就不得不吐槽官方文档不够明确的描述和例子了。我觉得应该这样说:
 假如有个Task(叫做something)被shield保护,如下:
 outer = shield(something())
 res = await outer如果outer被取消了,不会影响Task本身(something)的执行。
 所以官网里面说了一句:
 
  From the point of view of something(), the cancellation did not happen.也就是「从something() 的角度看来,取消操作并没有发生」。官网没有给出完整的例子,我用2个例子来帮助理解:
 async def stop_after(loop, when):
 await asyncio.sleep(when)
 loop.stop()
 
 
 def c4():
 loop = asyncio.get_event_loop()
 outer = asyncio.shield(a())
 outer.cancel()
 loop.create_task(stop_after(loop, 3))
 
 loop.run_forever()
 print(outer.cancelled())注意c4不是异步函数,在里面使用loop.run_forever让事件循环一直运行下去,里面有3个任务:a()、outer和stop_after(loop, 3),第三个任务会在3秒后把停掉,这个例子可以用来验证上面说的「如果outer被取消了,不会影响Task本身(something)的执行」
 运行一下:
 In [14]: c4()
 Suspending a
 Resuming a
 True可以看到这样做,虽然outer取消了,但是异步函数a的逻辑执行完整了。
 基于这种思路,我突然想到为啥「IPython用await第一次没有执行完整,之后每次都能执行完整」,说结论前先看另外一台例子:
 async def cancel_after(task, when):
 await asyncio.sleep(when)
 task.cancel()
 
 
 async def d(n):
 print(f'Suspending a with {n}')
 await asyncio.sleep(2)
 print(f'Resuming a with {n}')
 return 'A'
 
 
 async def c5():
 loop = asyncio.get_event_loop()
 n = random.randint(1, 100)
 outer = asyncio.shield(d(n))
 loop.create_task(cancel_after(outer, 1))
 try:
 await outer
 except asyncio.CancelledError:
 print('Cancelled!')这个例子中有2个任务,outer和cancel_after(outer, 1),cancel_after会在1秒后把outer取消掉。另外这次用了新的一步函数d,接受一台随机参数(一会就能感受到用意)而且这次要重新进入IPython交互环境(防止之前的测试对其产生影响),而且不再用aysncio.run了:
 In [1]: from coro import *
 
 In [2]: import asyncio
 
 In [3]: import random
 
 In [4]: loop = asyncio.get_event_loop()
 
 In [5]: loop.run_until_complete(c5())
 Suspending a with 72
 Cancelled!  # 注意,这就是第一次,也没有把d运行完整
 
 In [6]: loop.run_until_complete(c5())
 Suspending a with 48  # 启动了下一轮随机数
 Resuming a with 72  # 结束了上一轮随机数
 Cancelled!
 
 In [7]: loop.run_until_complete(c5())
 Suspending a with 26
 Resuming a with 48
 Cancelled!
 
 In [8]: loop.run_until_complete(c5())
 Suspending a with 38
 Resuming a with 26
 Cancelled!每次事件循环会把全部任务轮询一遍,outer.cancel虽然取消了Outer任务,但是Inner任务或是running状态的(有兴趣的同学可以写个例子证实一下),等待下次事件循环( loop.run_until_complete的目标是等待c5执行结束,它并不关注d运行状态)。而之前在IPython里面用await就会造成第一次不完整也解释通了:
 async def c6():
 n = random.randint(1, 100)
 task1 = asyncio.shield(d(n))
 task2 = asyncio.create_task(b())
 task1.cancel()
 await asyncio.gather(task1, task2, return_exceptions=True)
 
 ❯ ipython
 
 In [1]: from coro import *
 
 In [2]: await c6()
 Suspending a with 34
 Suspending b
 Resuming b
 
 In [3]: await c6()
 Suspending a with 41
 Suspending b
 Resuming a with 34
 Resuming b
 
 In [4]: await c6()
 Suspending a with 71
 Suspending b
 Resuming a with 41
 Resuming b看到了吧。那为啥说用asyncio.run不行呢:
 In [5]: asyncio.run(c6())
 Suspending a with 62
 Suspending b
 Resuming b
 
 In [6]: asyncio.run(c6())
 Suspending a with 94
 Suspending b
 Resuming b其实可以猜出来:每次都创建了一台新的loop。当然也得看源码确认一下(详见延伸阅读链接3):
 def run(main, *, debug=False):
 ...
 loop = events.new_event_loop()
 try:
 events.set_event_loop(loop)
 loop.set_debug(debug)
 return loop.run_until_complete(main)
 finally:
 ...里面用events.new_event_loop创建了新的事件循环,所以拿不到之前的事件循环上的待执行任务。
 感觉这类场景这个比较适合Web框架中使用。
 后记
 
 希望这些内容让你对asyncio和asyncio.shield有更深的了解。
 延伸阅读
 
 
 
 https://github.com/python/cpython/blob/b9a0376b0dedf16a2f82fa43d851119d1f7a2707/Modules/_asynciomodule.c#L627-L632
https://github.com/python/cpython/blob/0baa72f4b2e7185298d09cf64c7b591efcd22af0/Lib/asyncio/tasks.py#L690-L724
https://github.com/python/cpython/blob/master/Lib/asyncio/runners.py#L39-L41
 |