在使用 Python 的 asyncio 库实现异步编程的过程中,协程与事件循环这两个概念可以说有着千丝万缕的联系,常常是形影不离的出现,如胶似漆般的存在,asyncio 库到底是如何调度协程的? 下面以 Python 3.8 中的 asyncio.sleep
定时器为例研究一手 asyncio 的源码实现。
几个主要的概念
首先需要对 asyncio 中的几个主要函数和模块做一个初步认识:
asyncio.run
是启动事件循环的入口,接收一个协程作为参数。asyncio.BaseEventLoop
就是事件循环基类了,子类常用的是_UnixSelectorEventLoop
,但核心调度逻辑都在基类中,其中最主要的是run_forever
函数用来启动事件循环;另一个主要的函数是create_task
,用来创建一个Task
对象并放到事件循环中,准备在下一次循环时执行。asyncio.events.Handle
和asyncio.events.TimerHandle
是放到loop
中的处理对象,其中_callback
属性保存的是一个回调函数,处理对象执行时调用的就是这个函数,回调函数参数放在_args
属性中。asyncio.futures.Future
作为一个事件在未来完成的占位符,当事件完成后可通过Future.set_result
方法将事件的结果设置进去。asyncio.tasks.Task
是Future
类的子类,可以理解为是对协程的包装,在Future
基础上增加了启动协程和恢复协程的能力,主要逻辑在Task.__step
函数中。
从简单例子开始
先从最简单的一段代码开始
import asyncio
async def main():
print("main start")
print("main end")
asyncio.run(main())
这段代码启动一个 main
协程,协程输出两行内容后完成结束,这里先不加入任何 await
异步操作,主要看一下事件循环是怎样初始化和启动的,只保留了关键代码。
loop 的初始化
首先看 asyncio.run
函数,内容比较简单,初始化一个事件循环 loop
,然后调用 loop.run_until_complete(main)
启动并传入 main
协程。
# asyncio.runners.run
def run(main, *, debug=False):
# 初始化一个事件循环 loop
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
loop.set_debug(debug)
# 启动事件循环
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
Task 的初始化
接着来到 asyncio.base_events.BaseEventLoop.run_until_complete
,首先调用了 asyncio.tasks.ensure_future
函数,目的是将传入的 main
协程转换成一个 Task
对象,在创建 Task
的过程中会将 Task
对象加入到 loop
的队列中,之后调用 self.run_forever
启动事件循环。
确切的说应该是将 Task.__step
函数包装到 Handle
对象中,之后加入到 loop
的队列中,稍后会看到这个细节。
# asyncio.base_events.BaseEventLoop
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
new_task = not futures.isfuture(future)
# 将 future 包装成 Task 对象
future = tasks.ensure_future(future, loop=self)
if new_task:
future.add_done_callback(_run_until_complete_cb)
try:
# 启动loop
self.run_forever()
except:
pass
return future.result()
def create_task(self, coro, *, name=None):
if self._task_factory is None:
# 初始化 Task 对象
task = tasks.Task(coro, loop=self, name=name)
if task._source_traceback:
del task._source_traceback[-1]
else:
pass
return task
再看一下 Task.__init__
,其中 _coro
保存了传入的协程 coro
对象,实际上可以将 Task
视为一个协程的包装,在初始化的后面调用了 loop.call_soon(self.__step, context=self._context)
函数,将 Task
对象自己的 __step
函数加入到 loop
队列,当 loop
启动后便会执行这个函数。
# asyncio.tasks.Task
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None, name=None):
# 保存了协程函数
self._coro = coro
self._context = contextvars.copy_context()
# 调用 loop call_soon 将 self.__step 加入到 loop 的队列中
self._loop.call_soon(self.__step, context=self._context)
再看一下 loop.call_soon
做了什么,接受一个 callback
参数,在这里就代表 Task.__step
,接着会调用 _call_soon
函数,在 _call_soon
函数中初始化了 events.Handle
对象,然后将 handle
对象加入到 loop._ready
队列中。
# asyncio.base_events.BaseEventLoop
class BaseEventLoop(events.AbstractEventLoop):
def call_soon(self, callback, *args, context=None):
handle = self._call_soon(callback, args, context)
return handle
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
self._ready.append(handle)
return handle
在看一眼 Handle
的初始化,主要就是将 callback
保存下来,并且用 args
表示 callback
的参数。
Handle 的一个主要的函数是 _run
,当 loop
启动后会从 loop._ready
队列中取出 Handle
执行,执行的就是 _run
函数,_run
函数中 self._context.run(self._callback, *self._args)
其实就是在原有 context
环境下执行回调函数并传入 args
参数。
class Handle:
def __init__(self, callback, args, loop, context=None):
if context is None:
context = contextvars.copy_context()
self._context = context
self._loop = loop
# 持有 callback 回调函数
self._callback = callback
# callback 的参数
self._args = args
self._cancelled = False
self._repr = None
def _run(self):
try:
# 执行 callback 回调函数
self._context.run(self._callback, *self._args)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
pass
self = None
到这里先总结一下,通过 asyncio.run(main())
添加了一个协程,然后将协程 main
包装成 Task
,并将 Task.__step
包装成 Handle
放到 loop._ready
队列中,接下来就是真正启动 loop
了。
loop 的启动
asyncio.base_events.BaseEventLoop.run_until_complete
,在封装完 main
协程后会先添加一个回调函数 _run_until_complete_cb
,这个回调函数会在 main
协程执行完后执行,作用就是将 loop
设置成关闭。
接着的 run_forever
函数就是启动 loop
了。
# asyncio.base_events.BaseEventLoop.run_until_complete
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
# 启动 loop
self.run_forever()
def run_forever(self):
while True:
self._run_once()
if self._stopping:
break
run_forever
中做了一些初始检查和设置,然后进入 while
循环并在循环中调用 _run_once
,_run_once
就是一次事件循环的核心调度逻辑了。
loop 调度的核心逻辑
核心调度逻辑在 _run_once
中。loop
主要有两个队列存放协程任务对应的 Handle
,一个是 _scheduled
用来存放定时类协程,它是一个最小堆实现的优先队列,例如使用 asyncio.sleep
就会存进去一个 TimerHandle
对象;另一个是 _ready
用来存放准备好执行的协程,而 _scheduled
中有准备好的协程会取出来放入 _ready
中,loop
最终执行 Handle
都是从 _ready
中取出的。
_run_once
中做的事情分四个部分,第一部分是清理 _scheduled
;第二部分是调用多路复用 IO 并处理就绪的描述符;第三部分是将到期的 _scheduled
转移到 _ready
;第四部分遍历 _ready
并逐一启动处理函数 handle._run
;
# asyncio.base_events.BaseEventLoop
class BaseEventLoop(events.AbstractEventLoop):
def _run_once(self):
# 第一部分 清理 _scheduled
# 队列中已经取消的 handle 数量超过阈值
# 则直接重建最小堆
# 而下面 else 的逻辑则是一个个的移除和取消
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
# 第二部分 利用多路复用获取就绪的描述符
# 首先计算一个timeout 用来决定 select 应该阻塞多久,传入<=0则不会阻塞
# _process_events 函数就是典型的根据描述符和事件类型做相应处理,如可读可写
# 将可读可写的函数放到 _ready 队列中在下一次循环时触发
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
event_list = self._selector.select(timeout)
self._process_events(event_list)
# 第三部分 _scheduled 中到期的 handle 转移到 _ready
# _scheduled 是按照 handle._when 到期时间排列的最小堆
# 每次检查堆顶 Handle 看是否到期
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# 第四部分 启动处理函数 handle._run
# 从 _ready 队列中取出 handle
# 调用 handle._run
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None
Handle._run
没啥说的,直接调用 Handle._callback
,并且将 Handle._args
作为参数传进去。
# asyncio.events.Handle._run
class Handle:
def _run(self):
self._context.run(self._callback, *self._args)
还记得 loop
是怎么启动的吗?将 main
协程包装成 Task
,在创建 Task
时将 Task.__step
作为 callback
生成了一个 Handle
并放到了 loop._ready
中,所以这里 Handle._run
其实执行的就是根据 main
协程生成出来的 Task.__step
。Task.__step
是协程启动和协程暂停恢复的关键
协程的启动
Task._coro
属性保存了协程,通过 result = coro.send(None)
启动协程,由此进入到 main
协程中,打印出 main start
和 main end
。
之后 main
协程结束,抛出 StopIteration
异常,调用 super().set_result(exc.value)
给 Task._result
设置结果并将 _state
标记成 _FINISHED
,之后调用 __schedule_callbacks
触发 Task
上注册的回调函数,在这里 mian
协程注册的就是 _run_until_complete_cb
用来结束 loop
的,将回调函数放在传给 loop.call_soon
等待下一轮事件循环来触发。
# asyncio.tasks.Task
class Task(futures._PyFuture):
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
super().set_result(exc.value)
except exceptions.CancelledError:
super().cancel()
except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
raise
except BaseException as exc:
super().set_exception(exc)
else:
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
if futures._get_loop(result) is not self._loop:
new_exc = RuntimeError(
f'Task {self!r} got Future ' f'{result!r} attached to a different loop')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
elif blocking:
if result is self:
new_exc = RuntimeError(
f'Task cannot await on itself: {self!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
else:
result._asyncio_future_blocking = False
result.add_done_callback(
self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel():
self._must_cancel = False
else:
new_exc = RuntimeError(
f'yield was used instead of yield from '
f'in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
elif result is None:
self._loop.call_soon(self.__step, context=self._context)
elif inspect.isgenerator(result):
new_exc = RuntimeError(
f'yield was used instead of yield from for '
f'generator in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
else:
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
_leave_task(self._loop, self)
self = None
def set_result(self, result):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
def __schedule_callbacks(self):
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
到这里就可能看到一个协程是如何传给 loop
并启动的了,也知道了 loop
的大概流程。下面在 main
中加入 asyncio.sleep
看看定时器是如何调度的。
asyncio.sleep 如何定时
main
中加入一个 asyncio.sleep
看看定时是如何实现的
async def main():
print("main start")
await asyncio.sleep(3)
print("main end")
loop
的初始化和启动还是一样的,直接看看 Task.__step
是如何调度的,其中调用 result = coro.send(None)
会启动协程,首先输出 main start
,然后调用 asyncio.sleep(3)
# asyncio.tasks.sleep
async def sleep(delay, result=None, *, loop=None):
if delay <= 0:
await __sleep0()
return result
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()
# futures._set_result_unless_cancelled
def _set_result_unless_cancelled(fut, result):
"""Helper setting the result only if the future was not cancelled."""
if fut.cancelled():
return
fut.set_result(result)
# asyncio.base_events.BaseEventLoop
class BaseEventLoop(events.AbstractEventLoop):
def call_later(self, delay, callback, *args, context=None):
timer = self.call_at(self.time() + delay, callback, *args,
context=context)
if timer._source_traceback:
del timer._source_traceback[-1]
return timer
def call_at(self, when, callback, *args, context=None):
timer = events.TimerHandle(when, callback, args, self, context)
if timer._source_traceback:
del timer._source_traceback[-1]
heapq.heappush(self._scheduled, timer)
timer._scheduled = True
return timer
协程的挂起
首先常见一个空的 Future
对象 future
,然后调用的 loop.call_later(delay, futures._set_result_unless_cancelled, future, result)
,然后一路向下调用 loop.call_at
,最后生成了一个 TimerHandle
对象 push 进 loop._scheduled
堆中。
TimerHandle
其实就比 Handle
多了个 _when
属性表示何时可以恢复运行,当时间到了会调用 TimerHandle._run
执行 TimerHandle
的 callback
,也就是 _set_result_unless_cancelled(future, result)
用来给 future
设置结果。
asyncio.sleep
的函数签名是 asyncio.sleep(delay, result=None)
,一般不传第二个参数所以结果是 None
,如果传的话之后会将结果设置到 future
对象里面。
asyncio.sleep
函数的最后将 future
返回并挂起自己,控制权又交还给 Task.__step
中 result = coro.send(None)
的位置,result
接到的就是 future
对象。
result 接到 future 后向下执行到 result.add_done_callback(self.__wakeup, context=self._context)
给 future
设置一个回调函数 Task.__wakeup
,到这里本轮循环就结束了。
# asyncio.tasks.Task
class Task(futures._PyFuture):
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
try:
if exc is None:
# 接到 future
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
super().set_result(exc.value)
except exceptions.CancelledError:
super().cancel()
except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
raise
except BaseException as exc:
super().set_exception(exc)
else:
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
if futures._get_loop(result) is not self._loop:
new_exc = RuntimeError(
f'Task {self!r} got Future ' f'{result!r} attached to a different loop')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
elif blocking:
if result is self:
new_exc = RuntimeError(
f'Task cannot await on itself: {self!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
else:
result._asyncio_future_blocking = False
# 接到 future 之后执行到这里
result.add_done_callback(
self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel():
self._must_cancel = False
else:
new_exc = RuntimeError(
f'yield was used instead of yield from '
f'in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
elif result is None:
self._loop.call_soon(self.__step, context=self._context)
elif inspect.isgenerator(result):
new_exc = RuntimeError(
f'yield was used instead of yield from for '
f'generator in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
else:
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
_leave_task(self._loop, self)
self = None
def set_result(self, result):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
def __schedule_callbacks(self):
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
到目前为止 loop
的状态是 _scheduled
堆中有一个 TimerHandle
对象,对象的 _when
表示剩余启动的秒数,对象的 _callback
指向的是 futures._set_result_unless_cancelled
参数是一个 future
,这个 future
的 callbacks
回调列表中有一个 main
协程生成的 Task.__wakeup
。
协程的恢复
本轮循环结束,下一轮循环时会检查 loop._scheduled
发现 TimerHandle
已经到期,将其放到 loop._ready
队列中,紧接着就取出执行 TimerHandle._run
,也就是执行 futures._set_result_unless_cancelled(future, None)
,其实就是给 future
设置结果、标记完成、执行 future
的回调函数。
# asyncio.futures._set_result_unless_cancelled
def _set_result_unless_cancelled(fut, result):
if fut.cancelled():
return
fut.set_result(result)
# asyncio.futures.Future
class Future:
def set_result(self, result):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
def __schedule_callbacks(self):
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
还记得 future
是怎么来的以及 future
里面是啥吗?future
是在 asyncio.sleep
时生成并通过 await
返回的,返回给 Task.__step
后通过 add_done_callback(self.__wakeup)
为其添加了一个回调函数。
所以到此为止干的事儿就是遍历 future
的 callbacks
逐一通过 loop.call_soon()
添加到 loop
中,等待下一轮事件循环执行,这里添加的就是 main
Task
的 __wakeup
函数。
进入下一轮循环,loop._ready
中有一个 Handle
,其内部的 _coro
代表的是 main
Task
的 __wakeup
,取出来执行 Handle._run
实际上就是执行 main
Task.__wakeup
。
__wakeup
也很简单就是确认 future
是已完成状态并调用 __step
,控制权有交给了之前挂起的 main
Task
。
class Task(futures._PyFuture):
def __wakeup(self, future):
try:
future.result()
except BaseException as exc:
# This may also be a cancellation.
self.__step(exc)
else:
self.__step()
self = None # Needed to break cycles when an exception occurs.
当 Task.__step
再一次执行到 result = coro.send(None)
时,便会恢复之前的 sleep
协程接着执行 return
,回到了 main
函数中,继续执行并输出 main end
最后完成,抛出 StopIteration
异常,被 Task.__step
捕获,整个协程结束,之后事件循环做收尾工作也关闭,事件循环也关闭,到这里整个程序就结束了。
总结
asyncio 中的定时通过 asyncio.sleep
实现,原理是在事件循环中维护一个最小堆实现的优先队列 _scheduled
,其中保存的都是定时任务处理对象 Handle
,越早到期 Handle
就会越早被取出来并加入到 loop._ready
队列,在下一轮循环时取出并从挂起的位置恢复执行。
由于协程代码在执行时会切换控制权导致代码逻辑跳来跳去,有时会被绕晕,借助定时器的调度可以让整个事件循环的逻辑更加清晰。