asyncio
模块里给了用户三个类型,Future
、Task
以及 Handle
。Python 语言本身提供了 Coroutine
类型。
要理解 asyncio
这个模块,就要从这四个类型的关系入手。
Coroutine
这是一个老生常谈的类型。它就是对 Generator
类型的一种封装,在较早版本的 Python 里你可以通过一个装饰器将生成器函数转换为协程函数。
通过 .send()
函数可以驱动一个 Coroutine
执行到下一个切换点,这个切换点是层层 await
语句最终等待的 Awaitable
对象 __await__
方法的 yield
点。
用一段代码来展示如何不使用 loop
去执行并获取一个 Coroutine
对象的返回值。可以尝试修改 while True
下的 c.send
发送的变量、yield
和 return
值并观察这段代码的输出变化。
class Awaitable:
def __await__(self):
x = yield "yield"
print(f"{x=}")
return "return"
async def f():
print("f")
return await Awaitable()
c = f()
count = 0
try:
r = c.send(None)
print(f"{count}. Send {r=}")
count += 1
while True:
r = c.send(None)
print(f"{count}. Send {r=}")
count += 1
except StopIteration as exc:
print("Return", exc.value)
finally:
c.close()
既然 Coroutine
只是一个特殊的生成器对象,那通过 asyncio.new_event_loop
创建的 loop
是如何知道有哪些协程对象需要被调用 .send()
的?于是在这里我们就需要 Task
来上场了。
Task
要将一个 Coroutine
对象加入 loop
,就需要使用 loop.create_task
方法,它会通过一个 Coroutine
对象创建一个 Task
对象。
在 Task
对象的 __step_run_and_handle_result
方法里可以看到这几行代码,让人熟悉的 .send
和 exc.value
。
coro = self._coro
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
self._must_cancel = False
super().cancel(msg=self._cancel_message)
else:
super().set_result(exc.value)
......
拥有了 Coroutine
和 Task
两个对象还不够,因为程序终究是要与网卡、磁盘或者其他设备进行交互的。在过去,往往都是采用回调函数的方式来编程,这种方式会导致代码的执行顺序不断的被各种回调函数打断,非常不利于阅读。这也是 asyncio
被创造的原因。于是就需要 Future
对象来构建 Coroutine
和现实世界的桥梁。
Future
Future
实现了 __await__
方法让它可以被 Coroutine
使用 await
进行等待;又实现了 set_result
/set_exception
方法用于回调设置返回结果。
def __await__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
if not self.done():
raise RuntimeError("await wasn't used with future")
return self.result() # May raise too.
注意到这里有这样一行代码 self._asyncio_future_blocking = True
,从名字不难看出它的特殊意义。回到 Task.__step_run_and_handle_result
方法里可以看到对包含 _asyncio_future_blocking
属性的对象进行了特殊处理:如果这个属性为真,也就意味着 Future
没有被设置值,_asyncio_future_blocking
被设置为了 False
,并且调用了 add_done_callback
追加回调函数,让这个 Future
在完成后调用 Task.__wakeup
来设置 Task
的结果。
def __step_run_and_handle_result(self, exc):
......
result._asyncio_future_blocking = False
result.add_done_callback(
self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel(
msg=self._cancel_message):
self._must_cancel = False
......
而 Future.add_done_task
里是这样处理回调函数的:如果 Future
已经被设置了结果,就调用 loop.call_soon
,如果没有设置结果,就加入 _callbacks
列表里。而在 Future
的 set_result
和 set_exception
方法里都调用了 self.__schedule_callbacks()
,__schedule_callbacks
只做了一件事——让之前设置的回调函数都仅被 loop.call_soon
调用一次。
def add_done_callback(self, fn, *, context=None):
"""Add a callback to be run when the future becomes done.
The callback is called with a single argument - the future object. If
the future is already done when this is called, the callback is
scheduled with call_soon.
"""
if self._state != _PENDING:
self._loop.call_soon(fn, self, context=context)
else:
if context is None:
context = contextvars.copy_context()
self._callbacks.append((fn, context))
def __schedule_callbacks(self):
"""Internal: Ask the event loop to call all callbacks.
The callbacks are scheduled to be called as soon as possible. Also
clears the callback list.
"""
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
loop.call_soon
的功能很简单:把函数对象和参数加入 loop
的待执行队列里,并返回一个 Handle
对象。
Handle
Handle
对象是一个非常简单的对象,它类似于 functools.partial
,是对函数与参数的绑定。并且提供了 cancel
函数用于指示 loop
不要执行这个 Handle
。
是的,到这里终于提到了 loop
执行某段代码。Handle
对象才是 loop
唯一实际接触并执行的对象。
在 Task
初始化函数里能看到这样一行代码,这就是 Task
被创建后立刻在 loop
里执行的原因。
def __init__(self, coro, *, loop=None, name=None, context=None,
eager_start=False):
......
self._loop.call_soon(self.__step, context=self._context)
......
Loop
在 loop.run_forever()
里有一个 while True
不断的执行 loop._run_once()
。
def run_forever(self):
"""Run until stop() is called."""
try:
self._run_forever_setup()
while True:
self._run_once()
if self._stopping:
break
finally:
self._run_forever_cleanup()
通过 _run_once
的函数注释不难看出这个函数的工作就是调用各种被安排的 Handle
对象。在其中它通过 self._process_events(self._selector.select(timeout))
来检查可以进行读写的描述符安排对应的 Handle
到可执行的队列中。
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
......
loop.run_until_complete
和 asyncio.run
是对 loop.run_forever
的封装,通过把 loop.stop()
添加到回调函数里来达到执行完协程后跳出 run_forever
死循环的目的。
综述
Future
是对回调的封装,让回调的结果可以被 Coroutine
使用 await
等待。Task
是对 Coroutine
的封装,让一个 Coroutine
的执行过程可以被拆解为一个个 Handle
对象加入 loop
中执行。
在代码实现上 Task
继承了 Future
,这对理解整个 asyncio
的结构造成了一些误会(在几年前我刚学习 asyncio 时看过很多文章吐槽 Task
继承 Future
)。但其实这一继承只是为了让 Task
复用 Future
的 await
方法,并不是说明 Future
和 Task
这两个概念本身是继承关系。
同步阻塞
很多新手在使用 asyncio 时会有一种误区,认为把原来的函数改成 async def
就获得了神奇的性能提升。但看完本文之后你应该知道,并不会。
asyncio
所有的代码依旧在同一个线程里执行,如果你的 async def
函数里没有 await
,整个函数甚至会一次性全部执行完。而在你的 async def
里只要有任何一点阻塞的部分,整个 loop
就会在这儿停顿。不仅不会提升性能,反而是一个负优化。
如果你真的需要使用 asyncio
而又难以改造代码里的阻塞函数时,可以使用 asyncio.to_thread
把阻塞的同步函数丢进线程池中执行。