首页 > 编程语言 >Python 异步 IO 、协程、asyncio、async/await、aiohttp

Python 异步 IO 、协程、asyncio、async/await、aiohttp

时间:2022-12-20 17:03:36浏览次数:64  
标签:协程 aiohttp Python await print async def asyncio

From :廖雪峰 异步IO :​​https://www.liaoxuefeng.com/wiki/1016959663602400/1017959540289152​

Python Async/Await入门指南 :​​https://zhuanlan.zhihu.com/p/27258289​


协程与任务 官网文档:​https://docs.python.org/zh-cn/3/library/asyncio-task.html


谈谈Python协程技术的演进:https://www.freebuf.com/company-information/153421.html

最后推荐一下《流畅的Python》,这本书中 第16章 协程的部分介绍的非常详细
《流畅的Python》pdf 下载地址

gevent 是 python 的一个并发框架,以微线程 greenlet 为核心,使用了 epoll 事件监听机制以及诸多其他优化而变得高效。

aiohttp 使用代理 ip 访问 https 网站报错的问

Python:使用 Future、asyncio 处理并发

异步  IO

在 IO 编程( 廖雪峰 Python IO 编程 :​​https://www.liaoxuefeng.com/wiki/1016959663602400/1017606916795776​​) 一节中,我们已经知道,CPU的速度远远快于磁盘、网络等IO。在一个线程中,CPU执行代码的速度极快,然而,一旦遇到IO操作,如读写文件、发送网络数据时,就需要等待IO操作完成,才能继续进行下一步操作。这种情况称为同步IO。

在IO操作的过程中,当前线程被挂起,而其他需要CPU执行的代码就无法被当前线程执行了。

因为一个 IO 操作就阻塞了当前线程,导致其他代码无法执行,所以我们必须使用多线程或者多进程来并发执行代码,为多个用户服务。每个用户都会分配一个线程,如果遇到IO导致线程被挂起,其他用户的线程不受影响。

多线程和多进程的模型虽然解决了并发问题,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,所以,一旦线程数量过多,CPU的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。

由于我们要解决的问题是CPU高速执行能力和IO设备的龟速严重不匹配,多线程和多进程只是解决这一问题的一种方法。

另一种解决IO问题的方法是异步IO。当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。

Python 异步 IO 、协程、asyncio、async/await、aiohttp_子程序

消息模型

由于GUI 线程处理键盘、鼠标等消息的速度非常快,所以用户感觉不到延迟。某些时候,GUI线程在一个消息处理的过程中遇到问题导致一次消息处理时间过长,此时,用户会感觉到整个GUI程序停止响应了,敲键盘、点鼠标都没有反应。这种情况说明在消息模型中,处理一个消息必须非常迅速,否则,主线程将无法及时处理消息队列中的其他消息,导致程序看上去停止响应。

消息模型 是 如何解决 同步IO 必须等待IO操作这一问题的呢 ?

在消息处理过程中,当遇到 IO 操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。

在 “发出IO请求” 到收到 “IO完成” 的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。

协程 (Coroutines)

在学习异步IO模型前,我们先来了解协程,协程 又称 微线程,纤程,英文名 Coroutine

子程序( 又叫 函数 ) 协程

  • 子程序 在 所有语言中都是层级调用。比如: A 调用 B,B 在执行过程中又调用了 C,C 执行完毕返回,B 执行完毕返回,最后是 A 执行完毕。所以 子程序 即 函数 的调用是通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。
  • 协程的调用 和 子程序 不同。协程 看上去也是 子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个 子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如:子程序 A 和 B :

def A():
print('1')
print('2')
print('3')


def B():
print('x')
print('y')
print('z')

假设由协程执行,在执行 A 的过程中,可以随时中断,去执行 B,B 也可能在执行过程中中断再去执行 A,结果可能是:

1
2
x
y
3
z

但是在 A 中是没有调用 B 的,所以 协程的调用 比 函数调用 理解起来要难一些。

看起来 A、B 的执行有点像多线程,但 协程 的特点在于是一个线程执行。

协程 和 多线程比,协程有何优势?

  • 1. 最大的优势就是协程极高的执行效率。因为 子程序 切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
  • 2. 第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那怎么利用多核CPU呢?

最简单的方法是 多进程 + 协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

Python 对 协程 的支持 是通过 generator (生成器)实现的

在 generator 中,我们不但可以通过 for 循环来迭代,还可以不断调用 next() 函数获取由 yield 语句返回的下一个值。

但是 Python 的 yield 不但可以返回一个值,它还可以接收调用者发出的参数。

来看例子:

传统的 生产者-消费者 模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。如果改用协程,生产者生产消息后,直接通过 yield 跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author :
# @File : text.py
# @Software : PyCharm
# @description : XXX


def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'


def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()


c = consumer()
produce(c)

执行结果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到 consumer函数 是一个 generator,把一个 consumer 传入 produce 后:

  1. 首先调用 c.send(None) 启动生成器;
  2. 然后,一旦生产了东西,通过 c.send(n) 切换到 consumer 执行;
  3. consumer 通过 yield拿到消息,处理,又通过yield把结果传回;
  4. produce 拿到 consumer 处理的结果,继续生产下一条消息;
  5. produce 决定不生产了,通过 c.close() 关闭 consumer,整个过程结束。

整个流程无锁,由一个线程执行,​​produce ​​​和 ​​consumer ​​协作完成任务,所以称为 “协程”,而非线程的抢占式多任务。

最后套用 Donald Knuth 的一句话总结协程的特点:“子程序就是协程的一种特例。”

参考源码:​​https://github.com/michaelliao/learn-python3/blob/master/samples/async/coroutine.py​

在 Python 中,异步函数  通常 被称作  协程

创建一个协程仅仅只需使用 async 关键字,或者使用 @asyncio.coroutine 装饰器。下面的任一代码,都可以作为协程工作,形式上也是等同的:

import asyncio

# 方式 1
async def ping_server(ip):
pass


# 方式 2
@asyncio.coroutine
def load_file(path):
pass

上面这两个 特殊的函数,在调用时会返回协程对象。熟悉 JavaScript 中 Promise 的同学,可以把这个返回对象当作跟 Promise 差不多。调用他们中的任意一个,实际上并未立即运行,而是返回一个协程对象,然后将其传递到 Eventloop 中,之后再执行。

  • 如何判断一个 函数是不是协程 ?
  • 如何判断一个 函数返回的是不是协程对象 ?

用 asyncio 提供的 @asyncio.coroutine 可以把一个 generator 标记为 coroutine 类型,然后在 coroutine 内部用 yield from 调用另一个 coroutine 实现异步操作。

Python 3.5 开始引入了新的语法 async 和 await

为了简化并更好地标识异步 IO,从 Python 3.5 开始引入了新的语法 async await,可以让 coroutine 的代码更简洁易读。

 async / await 是 python3.5 的新语法,需使用 Python3.5 版本 或 以上才能正确运行。

注意:async 和 await 是针对 coroutine 的新语法,要使用新的语法,只需要做两步简单的替换:

  • 把 @asyncio.coroutine 替换为 async 
  • 把 yield from 替换为 await

 Python 3.5 以前 版本原来老的语法使用 协程

import asyncio


@asyncio.coroutine
def hello():
print("Hello world!")
r = yield from asyncio.sleep(1)
print("Hello again!")

Python 3.5 以后 用新语法重新编写如下:

import asyncio


async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")

在过去几年内,异步编程由于某些好的原因得到了充分的重视。虽然它比线性编程难一点,但是效率相对来说也是更高。

比如,利用 Python 的 异步协程 (async coroutine) ,在提交 HTTP 请求后,就没必要等待请求完成再进一步操作,而是可以一边等着请求完成,一边做着其他工作。这可能在逻辑上需要多些思考来保证程序正确运行,但是好处是可以利用更少的资源做更多的事。

即便逻辑上需要多些思考,但实际上在 Python 语言中,异步编程的语法和执行并不难。跟 Javascript 不一样,现在 Python 的异步协程已经执行得相当好了。

对于服务端编程,异步性似乎是 Node.js 流行的一大原因。我们写的很多代码,特别是那些诸如网站之类的高 I/O 应用,都依赖于外部资源。这可以是任何资源,包括从远程数据库调用到 POST 一个 REST 请求。一旦你请求这些资源的任一一个,你的代码在等待资源响应时便无事可做 (译者注:如果没有异步编程的话)。

有了异步编程,在等待这些资源响应的过程中,你的代码便可以去处理其他的任务。

Python async / await 手册

Python 部落:Python async/await 手册:​https://python.freelycode.com/contribution/detail/57

知乎:从 0 到 1,Python 异步编程的演进之路( 通过爬虫演示进化之路 ):​https://zhuanlan.zhihu.com/p/25228075

async / await 的使用

async 用来声明一个函数是协程然后使用 await 调用这个协程, await 必须在函数内部,这个函数通常也被声明为另一个协程await 的目的是等待协程控制流的返回yield 的目的 是 暂停并挂起函数的操作。

正常的函数在执行时是不会中断的,所以你要写一个能够中断的函数,就需要添加 async 关键。

  • async 用来声明一个函数为异步函数异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是sleep(5))消失后,也就是5秒到了再回来执行。
  • await 可以将耗时等待的操作挂起,让出控制权( await 语法来挂起自身的协程比如:异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。await 后面只能跟 异步程序 或 有 __await__ 属性 的 对象因为异步程序与一般程序不同

假设有两个异步函数 async a,async b,a 中的某一步有 await,当程序碰到关键字 await b() 后,异步程序挂起后去执行另一个异步b程序,就是从函数内部跳出去执行其他函数,当挂起条件消失后,不管b是否执行完,要马上从b程序中跳出来,回到原程序执行原来的操作。如果 await 后面跟的 b 函数不是异步函数,那么操作就只能等 b 执行完再返回,无法在 b 执行的过程中返回。如果要在 b 执行完才返回,也就不需要用 await 关键字了,直接调用 b 函数就行。所以这就需要 await 后面跟的是 异步函数了。在一个异步函数中,可以不止一次挂起,也就是可以用多个 await 。

看下 Python 中常见的几种函数形式:

# 1. 普通函数
def function():
return 1

# 2. 生成器函数
def generator():
yield 1

# 在3.5过后,我们可以使用async修饰将普通函数和生成器函数包装成异步函数和异步生成器。

# 3. 异步函数(协程)
async def async_function():
return 1

# 4. 异步生成器
async def async_generator():
yield 1

通过类型判断可以验证函数的类型

import types


# 1. 普通函数
def function():
return 1

# 2. 生成器函数
def generator():
yield 1

# 在3.5过后,我们可以使用async修饰将普通函数和生成器函数包装成异步函数和异步生成器。

# 3. 异步函数(协程)
async def async_function():
return 1

# 4. 异步生成器
async def async_generator():
yield 1


print(type(function) is types.FunctionType)
print(type(generator()) is types.GeneratorType)
print(type(async_function()) is types.CoroutineType)
print(type(async_generator()) is types.AsyncGeneratorType)

直接调用异步函数不会返回结果,而是返回一个coroutine对象:

print(async_function())
# <coroutine object async_function at 0x102ff67d8>

协程 需要通过其他方式来驱动,因此可以使用这个协程对象的 send 方法给协程发送一个值:

print(async_function().send(None))

不幸的是,如果通过上面的调用会抛出一个异常:StopIteration: 1

因为 生成器 / 协程 在正常返回退出时会抛出一个 StopIteration 异常,而原来的返回值会存放在 StopIteration 对象的 value 属性中,通过以下捕获可以获取协程真正的返回值: 

try:
async_function().send(None)
except StopIteration as e:
print(e.value)
# 1

通过上面的方式来新建一个 run 函数来驱动协程函数,在协程函数中,可以通过 await 语法来挂起自身的协程,并等待另一个 协程 完成直到返回结果:

def run(coroutine):
try:
coroutine.send(None)
except StopIteration as e:
return 'run() : return {0}'.format(e.value)

async def async_function():
return 1


async def await_coroutine():
result = await async_function()
print('await_coroutine() : print {0} '.format(result))

ret_val = run(await_coroutine())
print(ret_val)

要注意的是,await 语法只能出现在通过 async 修饰的函数中,否则会报 SyntaxError 错误。

而且 await 后面的对象需要是一个 Awaitable,或者实现了相关的协议。

查看 Awaitable 抽象类的代码,表明了只要一个类实现了__await__方法,那么通过它构造出来的实例就是一个 Awaitable:

class Awaitable(metaclass=ABCMeta):
__slots__ = ()

@abstractmethod
def __await__(self):
yield

@classmethod
def __subclasshook__(cls, C):
if cls is Awaitable:
return _check_methods(C, "__await__")
return NotImplemented

而且可以看到,Coroutine类 也继承了 Awaitable,而且实现了 send,throw 和 close 方法。所以 await 一个调用异步函数返回的协程对象是合法的。

class Coroutine(Awaitable):
__slots__ = ()

@abstractmethod
def send(self, value):
...

@abstractmethod
def throw(self, typ, val=None, tb=None):
...

def close(self):
...

@classmethod
def __subclasshook__(cls, C):
if cls is Coroutine:
return _check_methods(C, '__await__', 'send', 'throw', 'close')
return NotImplemented

接下来是异步生成器,来看一个例子:

假如我要到一家超市去购买土豆,而超市货架上的土豆数量是有限的:

class Potato:
@classmethod
def make(cls, num, *args, **kws):
potatos = []
for i in range(num):
potatos.append(cls.__new__(cls, *args, **kws))
return potatos

all_potatos = Potato.make(5)

现在我想要买50个土豆,每次从货架上拿走一个土豆放到篮子:

def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
sleep(.1)
else:
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break

def buy_potatos():
bucket = []
for p in take_potatos(50):
bucket.append(p)

对应到代码中,就是迭代一个生成器的模型,显然,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的),也许可以用多进程和多线程解决,而在现实生活中,更应该像是这样的:

import asyncio
import random


class Potato:
@classmethod
def make(cls, num, *args, **kws):
potatos = []
for i in range(num):
potatos.append(cls.__new__(cls, *args, **kws))
return potatos


all_potatos = Potato.make(5)


async def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
await ask_for_potato()
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break


async def ask_for_potato():
await asyncio.sleep(random.random())
all_potatos.extend(Potato.make(random.randint(1, 10)))


async def buy_potatos():
bucket = []
async for p in take_potatos(50):
bucket.append(p)
print(f'Got potato {id(p)}...')


def main():
loop = asyncio.get_event_loop()
res = loop.run_until_complete(buy_potatos())
loop.close()


if __name__ == '__main__':
main()

当货架上的土豆没有了之后,可以询问超市请求需要更多的土豆,这时候需要等待一段时间直到生产者完成生产的过程。

当生产者完成和返回之后,这是便能从 await 挂起的地方继续往下跑,完成消费的过程。而这整一个过程,就是一个异步生成器迭代的流程。

用 asyncio 运行这段代码,结果是这样的:

Got potato 4338641384...
Got potato 4338641160...
Got potato 4338614736...
Got potato 4338614680...
Got potato 4338614568...
Got potato 4344861864...
Got potato 4344843456...
Got potato 4344843400...
Got potato 4338641384...
Got potato 4338641160...
...

既然是异步的,在请求之后不一定要死等,而是可以做其他事情。比如除了土豆,我还想买番茄,这时只需要在事件循环中再添加一个过程:

def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
loop.close()

再来运行这段代码:

Got potato 4423119312...
Got tomato 4423119368...
Got potato 4429291024...
Got potato 4421640768...
Got tomato 4429331704...
Got tomato 4429331760...
Got tomato 4423119368...
Got potato 4429331760...
Got potato 4429331704...
Got potato 4429346688...
Got potato 4429346072...
Got tomato 4429347360...
...

看下 AsyncGenerator 的定义,它需要实现 __aiter__ 和 __anext__ 两个核心方法,以及 asend,athrow,aclose 方法。

class AsyncGenerator(AsyncIterator):
__slots__ = ()

async def __anext__(self):
...

@abstractmethod
async def asend(self, value):
...

@abstractmethod
async def athrow(self, typ, val=None, tb=None):
...

async def aclose(self):
...

@classmethod
def __subclasshook__(cls, C):
if cls is AsyncGenerator:
return _check_methods(C, '__aiter__', '__anext__',
'asend', 'athrow', 'aclose')
return NotImplemented

异步生成器是在 3.6 之后才有的特性,同样的还有异步推导表达式,因此在上面的例子中,也可以写成这样:

bucket = [p async for p in take_potatos(50)]

类似的,还有 await 表达式:

result = [await fun() for fun in funcs if await condition()]

除了函数之外,类实例的普通方法也能用 async 语法修饰:

class ThreeTwoOne:
async def begin(self):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return

async def game():
t = ThreeTwoOne()
await t.begin()
print('start')

实例方法的调用同样是返回一个 coroutine:

function = ThreeTwoOne.begin
method = function.__get__(ThreeTwoOne, ThreeTwoOne())
import inspect
assert inspect.isfunction(function)
assert inspect.ismethod(method)
assert inspect.iscoroutine(method())

同理 还有类方法:

class ThreeTwoOne:
@classmethod
async def begin(cls):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return

async def game():
await ThreeTwoOne.begin()
print('start')

根据PEP 492中,async 也可以应用到 上下文管理器中,__aenter__ 和 __aexit__ 需要返回一个 Awaitable:

class GameContext:
async def __aenter__(self):
print('game loading...')
await asyncio.sleep(1)

async def __aexit__(self, exc_type, exc, tb):
print('game exit...')
await asyncio.sleep(1)

async def game():
async with GameContext():
print('game start...')
await asyncio.sleep(2)

在3.7版本,contextlib 中会新增一个 asynccontextmanager 装饰器来包装一个实现异步协议的上下文管理器:

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_connection():
conn = await acquire_db_connection()
try:
yield
finally:
await release_db_connection(conn)

async 修饰符也能用在 __call__ 方法上:

class GameContext:
async def __aenter__(self):
self._started = time()
print('game loading...')
await asyncio.sleep(1)
return self

async def __aexit__(self, exc_type, exc, tb):
print('game exit...')
await asyncio.sleep(1)

async def __call__(self, *args, **kws):
if args[0] == 'time':
return time() - self._started

async def game():
async with GameContext() as ctx:
print('game start...')
await asyncio.sleep(2)
print('game time: ', await ctx('time'))

asyncio

​asyncio ​​是 Python 3.4 版本引入的标准库,直接内置了对 异步 IO 的支持。

asyncio 官方只实现了比较底层的协议,比如TCP,UDP。所以诸如 HTTP 协议之类都需要借助第三方库,比如 aiohttp 。

虽然异步编程的生态不够同步编程的生态那么强大,但是如果有高并发的需求不妨试试,下面说一下比较成熟的异步库

aiohttp:异步 http client/server框架。github地址: ​​https://github.com/aio-libs/aiohttp​​​ sanic:速度更快的类 flask web框架。github地址:​​https://github.com/channelcat/sanic​​​ uvloop 快速,内嵌于 asyncio 事件循环的库,使用 cython 基于 libuv 实现。github地址: ​​https://github.com/MagicStack/uvloop​

asyncio 的编程模型就是一个 消息循环我们从 asyncio 模块中直接获取一个 EventLoop 的引用,然后把需要执行的协程扔到 EventLoop 中执行,就实现了 异步IO

​python 用asyncio​​ 模块实现异步编程,该模块最大特点就是,只存在一个线程

由于只有一个线程,就不可能多个任务同时运行。asyncio 是 "多任务合作" 模式(cooperative multitasking),允许异步任务交出执行权给其他任务,等到其他任务完成,再收回执行权继续往下执行

asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。

Python 异步 IO 、协程、asyncio、async/await、aiohttp_子程序_02

什么是事件循环?

单线程就意味着所有的任务需要在单线程上排队执行,也就是前一个任务没有执行完成,后一个任务就没有办法执行。在CPU密集型的任务之中,这样其实还行,但是如果我们的任务都是IO密集型的呢?也就是我们大部分的任务都是在等待网络的数据返回,等待磁盘文件的数据,这就会造成CPU一直在等待这些任务的完成再去执行下一个任务。

有没有什么办法能够让单线程的任务执行不这么笨呢?其实我们可以将这些需要等待IO设备的任务挂在一边嘛!这时候,如果我们的任务都是需要等待的任务,那么单线程在执行时遇到一个就把它挂起来,这里可以通过一个数据结构(例如队列)将这些处于执行等待状态的任务放进去,为什么是执行等待状态呢?因为它们正在执行但是又不得不等待例如网络数据的返回等等。直到将所有的任务都放进去之后,单线程就可以开始它的接连不断的表演了:有没有任务完成的小伙伴呀!快来我这里执行!

此时如果有某个任务完成了,它会得到结果,于是发出一个信号:我完成了。那边还在循环追问的单线程终于得到了答复,就会去看看这个任务有没有绑定什么回调函数呀?如果绑定了回调函数就进去把回调函数给执行了,如果没有,就将它所在的任务恢复执行,并将结果返回。

asyncio 就是一个 协程库

  • (1)事件循环 (event loop)。事件循环需要实现两个功能,一是顺序执行协程代码;二是完成协程的调度,即一个协程“暂停”时,决定接下来执行哪个协程。
  • (2)协程上下文的切换。基本上Python 生成器的 yeild 已经能完成切换,Python3中还有特定语法支持协程切换。

Python 的异步IO:API

官方文档:​​https://docs.python.org/zh-cn/3/library/asyncio.html​

Python 的 asyncio 是使用 async/await 语法编写并发代码的标准库。Python3.7 这个版本,asyncio又做了比较大的调整,把这个库的 API 分为了 高层级API低层级API,并引入asyncio.run() 这样的高级方法,让编写异步程序更加简洁。

这里先从全局认识 Python 这个异步IO库。

asyncio 的 高层级 API 主要提高如下几个方面:

  • 并发地运行Python协程并完全控制其执行过程;
  • 执行网络IO和IPC;
  • 控制子进程;
  • 通过队列实现分布式任务;
  • 同步并发代码。

asyncio 的 低层级API 用以支持开发异步库和框架:

  • 创建和管理事件循环(event loop),提供异步的API用于网络,运行子进程,处理操作系统信号等;
  • 通过 transports 实现高效率协议;
  • 通过 async/await  语法桥架基于回调的库和代码。

asyncio 高级 API

高层级API让我们更方便的编写基于asyncio的应用程序。这些API包括:

(1)协程和任务

协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。历史的 ​​@asyncio.coroutine​​​ 和 ​​yield from​​​ 已经被弃用,并计划在Python 3.10中移除。协程可以通过 ​​asyncio.run(coro, *, debug=False)​​ 函数运行,该函数负责管理事件循环并完结异步生成器。它应该被用作asyncio程序的主入口点,相当于main函数,应该只被调用一次。

任务被用于并发调度协程,可用于网络爬虫的并发。使用 ​​asyncio.create_task()​​ 就可以把一个协程打包为一个任务,该协程会自动安排为很快运行。

协程,任务和Future都是可等待对象。其中,Future是低层级的可等待对象,表示一个异步操作的最终结果。

(2)流

流是用于网络连接的高层级的使用 async/await的原语。流允许在不使用回调或低层级协议和传输的情况下发送和接收数据。异步读写TCP有客户端函数 ​​asyncio.open_connection()​​​ 和 服务端函数 ​​asyncio.start_server()​​​ 。它还支持 Unix Sockets: ​​asyncio.open_unix_connection()​​​ 和 ​​asyncio.start_unix_server()​​。

(3)同步原语

asyncio同步原语的设计类似于threading模块的原语,有两个重要的注意事项:
asyncio原语不是线程安全的,因此它们不应该用于OS线程同步(而是用threading)
这些同步原语的方法不接受超时参数; 使用​​​asyncio.wait_for()​​​函数执行超时操作。
asyncio具有以下基本同步原语:

  • Lock
  • Event
  • Condition
  • Semaphore
  • BoundedSemaphore

(4)子进程

asyncio提供了通过 async/await 创建和管理子进程的API。不同于Python标准库的subprocess,asyncio的子进程函数都是异步的,并且提供了多种工具来处理这些函数,这就很容易并行执行和监视多个子进程。创建子进程的方法主要有两个:

​coroutine asyncio.create_subprocess_exec()​​​​coroutine asyncio.create_subprocess_shell()​

(5)队列

asyncio 队列的设计类似于标准模块queue的类。虽然asyncio队列不是线程安全的,但它们被设计为专门用于 async/await 代码。需要注意的是,asyncio队列的方法没有超时参数,使用 ​​asyncio.wait_for()​​​函数进行超时的队列操作。
因为和标注模块queue的类设计相似,使用起来跟queue无太多差异,只需要在对应的函数前面加 await 即可。asyncio 队列提供了三种不同的队列:

  • class asyncio.Queue 先进先出队列
  • class asyncio.PriorityQueue 优先队列
  • class asyncio.LifoQueue 后进先出队列

(6)异常

asyncio提供了几种异常,它们是:

  • TimeoutError,
  • CancelledError,
  • InvalidStateError,
  • SendfileNotAvailableError
  • IncompleteReadError
  • LimitOverrunError

asyncio低级API

低层级API为编写基于asyncio的库和框架提供支持,有意编写异步库和框架的大牛们需要熟悉这些低层级API。主要包括:

(1)事件循环

事件循环是每个asyncio应用程序的核心。 事件循环运行异步任务和回调,执行网络IO操作以及运行子进程。

应用程序开发人员通常应该使用高级asyncio函数,例如​​asyncio.run()​​,并且很少需要引用循环对象或调用其方法。

Python 3.7 新增了 ​​asyncio.get_running_loop()​​函数。

(2)Futures

Future对象用于将基于低层级回调的代码与高层级的 async/await 代码进行桥接。
Future表示异步操作的最终结果。 不是线程安全的。
Future是一个可等待对象。 协程可以等待Future对象,直到它们有结果或异常集,或者直到它们被取消。
通常,Futures用于启用基于低层级回调的代码(例如,在使用asyncio传输实现的协议中)以与高层级 async/await 代码进行互操作。

(3)传输和协议(Transports和Protocols)

Transport 和 Protocol由低层级事件循环使用,比如函数​​loop.create_connection()​​。它们使用基于回调的编程风格,并支持网络或IPC协议(如HTTP)的高性能实现。

在最高级别,传输涉及字节的传输方式,而协议确定要传输哪些字节(在某种程度上何时传输)。

换种方式说就是:传输是套接字(或类似的I/O端点)的抽象,而协议是从传输的角度来看的应用程序的抽象。

另一种观点是传输和协议接口共同定义了一个使用网络I/O和进程间I/O的抽象接口。

传输和协议对象之间始终存在1:1的关系:协议调用传输方法来发送数据,而传输调用协议方法来传递已接收的数据。

大多数面向连接的事件循环方法(例如​​loop.create_connection()​​)通常接受protocol_factory参数,该参数用于为接受的连接创建Protocol对象,由Transport对象表示。 这些方法通常返回(传输,协议)元组。

(4)策略(Policy)

事件循环策略是一个全局的按进程划分的对象,用于控制事件循环的管理。 每个事件循环都有一个默认策略,可以使用策略API对其进行更改和自定义。

策略定义了上下文的概念,并根据上下文管理单独的事件循环。 默认策略将上下文定义为当前线程。

通过使用自定义事件循环策略,可以自定义​​get_event_loop()​​​,​​set_event_loop()​​​和​​new_event_loop()​​函数的行为。

(5)平台支持

asyncio模块设计为可移植的,但由于平台的底层架构和功能,某些平台存在细微的差异和限制。在Windows平台,有些是不支持的,比如 ​​loop.create_unix_connection()​​​ and ​​loop.create_unix_server()​​。而Linux和比较新的macOS全部支持。

总结

Python 3.7 通过对 asyncio 分组使得它的架构更加清晰,普通写异步IO的应用程序只需熟悉高层级API,需要写异步IO的库和框架时才需要理解低层级的API。

生产者 --- 消费者

Python 分布与并行 asyncio实现生产者消费者模

示例 1:

# coding=utf-8

import asyncio


async def consumer(n, q):
print('consumer {}: starting'.format(n))
while True:
print('consumer {}: waiting for item'.format(n))
item = await q.get()
print('consumer {}: has item {}'.format(n, item))
if item is None:
# None is the signal to stop.
q.task_done()
break
else:
await asyncio.sleep(0.01 * item)
q.task_done()
print('consumer {}: ending'.format(n))


async def producer(q, num_workers):
print('producer: starting')
# Add some numbers to the queue to simulate jobs
for i in range(num_workers * 3):
await q.put(i)
print('producer: added task {} to the queue'.format(i))
# Add None entries in the queue
# to signal the consumers to exit
print('producer: adding stop signals to the queue')
for i in range(num_workers):
await q.put(None)
print('producer: waiting for queue to empty')
await q.join()
print('producer: ending')


async def main(num_cnotallow=1):
q = asyncio.Queue(maxsize=num_consumers)

consumer_list = [
# asyncio.create_task(consumer(i, q)) for i in range(num_consumers)
asyncio.ensure_future(consumer(i, q)) for i in range(num_consumers)
]

# produce_list = [asyncio.create_task(producer(q, num_consumers))]
produce_list = [asyncio.ensure_future(producer(q, num_consumers))]

task_list = consumer_list + produce_list
for item in task_list:
await item


if __name__ == '__main__':
asyncio.run(main(num_cnotallow=3))
pass

示例 2:

Python 的异步IO编程例子

以 Python 3.7 上的 asyncio 为例讲解如何使用 Python 的异步 IO。

创建第一个协程

Python 3.7 推荐使用 async/await 语法来声明协程,来编写异步应用程序。我们来创建第一个协程函数:首先打印一行“你好”,等待1秒钟后再打印 "大家同好"。

import asyncio


async def say_hi():
print('你好')
await asyncio.sleep(1)
print('大家同好')

asyncio.run(say_hi())

"""
你好
大家同好
"""

say_hi() 函数通过 async 声明为协程函数,较之前的修饰器声明更简洁明了。

在实践过程中,什么功能的函数要用 async 声明为协程函数呢?就是那些能发挥异步IO性能的函数,比如读写文件、读写网络、读写数据库,这些都是浪费时间的IO操作,把它们协程化、异步化从而提高程序的整体效率(速度)。

say_hi() 函数是通过 ​​asyncio.run()​​来运行的,而不是直接调用这个函数(协程)。因为,直接调用并不会把它加入调度日程,而只是简单的返回一个协程对象:

print(say_hi())  # <coroutine object say_hi at 0x000001264DB3FCC0>

真正运行一个协程

那么,如何真正运行一个协程呢?

asyncio 提供了三种机制:

  • (1)asyncio.run() 函数,这是异步程序的主入口,相当于C语言中的main函数。
  • (2)用 await 等待协程,比如上例中的 ​​await asyncio.sleep(1)​​ 。

再看下面的例子,我们定义了协程 ​​say_delay()​​ ,在 main() 协程中调用两次,第一次延迟1秒后打印“你好”,第二次延迟2秒后打印 "大家同好"。这样我们通过 await 运行了两个协程。

import asyncio
import datetime


async def say_delay(msg=None, delay=None):
await asyncio.sleep(delay)
print(msg)


async def main():
print(f'begin at {datetime.datetime.now().replace(microsecnotallow=0)}')
await say_delay('你好', 2)
await say_delay('大家同好', 1)
print(f'end at {datetime.datetime.now().replace(microsecnotallow=0)}')

asyncio.run(main())

'''
begin at 2020-12-19 00:55:01
你好
大家同好
end at 2020-12-19 00:55:04
'''

从起止时间可以看出,两个协程是顺序执行的,总共耗时1+2=3秒。

  • (3)通过 asyncio.create_task()
import asyncio
import datetime


async def say_delay(msg=None, delay=None):
await asyncio.sleep(delay)
print(msg)


async def main():
task_1 = asyncio.create_task(say_delay('你好', 2))
task_2 = asyncio.create_task(say_delay('大家同好', 1))
print(f'begin at {datetime.datetime.now().replace(microsecnotallow=0)}')
await task_1
await task_2
print(f'end at {datetime.datetime.now().replace(microsecnotallow=0)}')

asyncio.run(main())

'''
begin at 2020-12-19 00:58:20
大家同好
你好
end at 2020-12-19 00:58:22
'''

从运行结果的起止时间可以看出,两个协程是并发执行的了,总耗时等于最大耗时2秒。

​asyncio.create_task()​​​ 是一个很有用的函数,在爬虫中它可以帮助我们实现大量并发去下载网页。在 Python 3.6中与它对应的是 ​ensure_future()​。

生产者 --- 消费者

示例 代码:

# coding=utf-8

import asyncio


async def consumer(n, q):
print('consumer {}: starting'.format(n))
while True:
print('consumer {}: waiting for item'.format(n))
item = await q.get()
print('consumer {}: has item {}'.format(n, item))
if item is None:
# None is the signal to stop.
q.task_done()
break
else:
await asyncio.sleep(0.01 * item)
q.task_done()
print('consumer {}: ending'.format(n))


async def producer(q, num_workers):
print('producer: starting')
# Add some numbers to the queue to simulate jobs
for i in range(num_workers * 3):
await q.put(i)
print('producer: added task {} to the queue'.format(i))
# Add None entries in the queue
# to signal the consumers to exit
print('producer: adding stop signals to the queue')
for i in range(num_workers):
await q.put(None)
print('producer: waiting for queue to empty')
await q.join()
print('producer: ending')


async def main(num_cnotallow=1):
q = asyncio.Queue(maxsize=num_consumers)

consumer_list = [
asyncio.create_task(consumer(i, q)) for i in range(num_consumers)
]

produce_list = [asyncio.create_task(producer(q, num_consumers))]

task_list = consumer_list + produce_list
for item in task_list:
await item


if __name__ == '__main__':
asyncio.run(main(num_cnotallow=3))
pass

可等待对象(awaitables)

可等待对象,就是可以在 await 表达式中使用的对象,前面我们已经接触了两种可等待对象的类型:协程任务,还有一个是低层级的 Future

asyncio 模块的许多 API 都需要传入可等待对象,比如 run(), create_task() 等等。

(1)协程

协程是可等待对象,可以在其它协程中被等待。协程两个紧密相关的概念是:

  • 协程函数:通过 async def 定义的函数;
  • 协程对象:调用协程函数返回的对象。

Python 异步 IO 、协程、asyncio、async/await、aiohttp_Python_03

运行上面这段程序,结果为:

co is 
now is 1548512708.2026224
now is 1548512708.202648

可以看到,直接运行协程函数 whattime()得到的co是一个协程对象,因为协程对象是可等待的,所以通过 await 得到真正的当前时间。now2是直接await 协程函数,也得到了当前时间的返回值。

(2)任务

前面我们讲到,任务是用来调度协程的,以便并发执行协程。当一个协程通过 ​asyncio.create_task() 被打包为一个 任务,该协程将自动加入调度队列中,但是还未执行

create_task() 的基本使用前面例子已经讲过。它返回的 task 通过 await 来等待其运行完。如果,我们不等待,会发生什么?“准备立即运行”又该如何理解呢?先看看下面这个例子:

Python 异步 IO 、协程、asyncio、async/await、aiohttp_Python_04

运行这段代码的情况是这样的:首先,1秒钟后打印一行,这是第13,14行代码运行的结果:

calling:0, now is 09:15:15

接着,停顿1秒后,连续打印4行:

calling:1, now is 09:15:16
calling:2, now is 09:15:16
calling:3, now is 09:15:16
calling:4, now is 09:15:16

从这个结果看,​​asyncio.create_task()​​​产生的4个任务,我们并没有 ​​await​​​,它们也执行了。关键在于第18行的 ​​await​​,如果把这一行去掉或是 sleep 的时间小于1秒(比whattime()里面的sleep时间少即可),就会只看到第一行的输出结果而看不到后面四行的输出。这是因为,main() 不 sleep 或 sleep 少于1秒钟,main() 就在 whattime() 还未来得及打印结果(因为,它要sleep 1秒)就退出了,从而整个程序也退出了,就没有 whattime() 的输出结果。

再来理解一下 “准备立即执行” 这个说法。它的意思就是,create_task() 只是打包了协程并加入调度队列还未执行,并准备立即执行,什么时候执行呢?在 “主协程”(调用create_task()的协程)挂起的时候,这里的“挂起”有两个方式:

  • 一是,通过 await task
  • 另一个是,主协程通过 await sleep

我们知道,asyncio 是通过事件循环实现异步的。在主协程 main()里面,没有遇到 await 时,事件就是执行 main() 函数,遇到 await 时,事件循环就去执行别的协程,即 create_task() 生成的 whattime()的4个任务,这些任务一开始就是 await sleep 1秒。这时候,主协程和4个任务协程都挂起了,CPU空闲,事件循环等待协程的消息。

如果 main() 协程只 sleep了 0.1秒,它就先醒了,给事件循环发消息,事件循环就来继续执行 main() 协程,而 main() 后面已经没有代码,就退出该协程,退出它也就意味着整个程序退出,4个任务就没机会打印结果;

如果 main()协程sleep时间多余1秒,那么4个任务先唤醒,就会得到全部的打印结果;

如果main()的18行sleep等于1秒时,和4个任务的sleep时间相同,也会得到全部打印结果。这是为什么呢?

我猜想是这样的:4个任务生成在前,第18行的sleep在后,事件循环的消息响应可能有个先进先出的顺序。后面深入asyncio的代码专门研究一下这个猜想正确与否。

示例:

# -*- coding: utf-8 -*-

"""
@File : aio_test.py
@Author : XXX
@Time : 2020/12/25 23:54
"""

import asyncio
import datetime


async def hi(msg=None, sec=None):
print(f'enter hi(), {msg} @{datetime.datetime.now().replace(microsecnotallow=0)}')
await asyncio.sleep(sec)
print(f'leave hi(), {msg} @{datetime.datetime.now().replace(microsecnotallow=0)}')
return sec


async def main_1():
print(f'main() begin at {datetime.datetime.now().replace(microsecnotallow=0)}')
tasks = []
for i in range(1, 5):
tsk = asyncio.create_task(hi(i, i))
tasks.append(tsk)
for tsk in tasks:
ret_val = await tsk
print(f'ret_val:{ret_val}')
print(f'main() end at {datetime.datetime.now().replace(microsecnotallow=0)}')


async def main_2():
# ***** 注意:main_2 中睡眠了2秒,导致睡眠时间大于2秒的协程没有执行完成 *****
print(f'main() begin at {datetime.datetime.now().replace(microsecnotallow=0)}')
tasks = []
for i in range(1, 5):
tsk = asyncio.create_task(hi(i, i))
tasks.append(tsk)
await asyncio.sleep(2)
print(f'main() end at {datetime.datetime.now().replace(microsecnotallow=0)}')


async def main_3():
# ***** 注意:main_3方法并没有实现并发执行,只是顺序执行 *****
print(f'main() begin at {datetime.datetime.now().replace(microsecnotallow=0)}')
tasks = []
for i in range(1, 5):
tsk = asyncio.create_task(hi(i, i))
await tsk
print(f'main() end at {datetime.datetime.now().replace(microsecnotallow=0)}')


print('*' * 50)
asyncio.run(main_1())
print('*' * 50)
asyncio.run(main_2())
print('*' * 50)
asyncio.run(main_3())
print('*' * 50)

(3)Future

它是一个低层级的可等待对象,表示一个异步操作的最终结果。目前,我们写应用程序还用不到它,暂不学习。

asyncio异步IO协程总结

协程就是我们异步操作的片段。通常,写程序都会把全部功能分成很多不同功能的函数,目的是为了结构清晰;进一步,把那些涉及耗费时间的IO操作(读写文件、数据库、网络)的函数通过 async def 异步化,就是异步编程。

那些异步函数(协程函数)都是通过消息机制被事件循环管理调度着,整个程序的执行是单线程的,但是某个协程A进行IO时,事件循环就去执行其它协程非IO的代码。当事件循环收到协程A结束IO的消息时,就又回来执行协程A,这样事件循环不断在协程之间转换,充分利用了IO的闲置时间,从而并发的进行多个IO操作,这就是异步IO。

写异步IO程序时记住一个准则:需要IO的地方异步。其它地方即使用了协程函数也是没用的。

不 使用 asyncio 的 消息循环 让协程运行

先看下 不使用  ​asyncio 的​消息循环

async def func_1():
print("func_1 start")
print("func_1 end")


async def func_2():
print("func_2 start")
print("func_2 a")
print("func_2 b")
print("func_2 c")
print("func_2 end")


f_1 = func_1()
print(f_1)

f_2 = func_2()
print(f_2)


try:
print('f_1.send')
f_1.send(None)
except StopIteration as e:
# 这里也是需要去捕获StopIteration方法
pass

try:
print('f_2.send')
f_2.send(None)
except StopIteration as e:
pass

运行结果:

<coroutine object func_1 at 0x0000020121A07C40>
<coroutine object func_2 at 0x0000020121B703C0>
f_1.send
func_1 start
func_1 end
f_2.send
func_2 start
func_2 a
func_2 b
func_2 c
func_2 end

示例代码2:

async def test(x):
return x * 2

print(test(100))

try:
# 既然是协程,我们像之前yield协程那样
test(100).send(None)
except BaseException as e:
print(type(e))
ret_val = e.value
print(ret_val)

示例代码3:

def simple_coroutine():
print('-> start')
x = yield
print('-> recived', x)


sc = simple_coroutine()

next(sc)

try:
sc.send('zhexiao')
except BaseException as e:
print(e)

对上述例子的分析:yield 的右边没有表达式,所以这里默认产出的值是None。刚开始先调用了next(...)是因为这个时候生成器还没有启动,没有停在yield那里,这个时候也是无法通过send发送数据。所以当我们通过 next(...)激活协程后 ,程序就会运行到x = yield,这里有个问题我们需要注意, x = yield这个表达式的计算过程是先计算等号右边的内容,然后在进行赋值,所以当激活生成器后,程序会停在yield这里,但并没有给x赋值。当我们调用 send 方法后 yield 会收到这个值并赋值给 x,而当程序运行到协程定义体的末尾时和用生成器的时候一样会抛出StopIteration异常

如果协程没有通过 next(...) 激活(同样我们可以通过send(None)的方式激活),但是我们直接send,会提示如下错误:

Python 异步 IO 、协程、asyncio、async/await、aiohttp_Python_05

最先调用 next(sc) 函数这一步通常称为“​​预激​​”(prime)协程 (即,让协程执行到第一个 yield 表达式,准备好作为活跃的协程使用)。

协程在运行过程中有四个状态:

  1. GEN_CREATE: 等待开始执行
  2. GEN_RUNNING: 解释器正在执行,这个状态一般看不到
  3. GEN_SUSPENDED: 在yield表达式处暂停
  4. GEN_CLOSED: 执行结束

通过下面例子来查看协程的状态:

Python 异步 IO 、协程、asyncio、async/await、aiohttp_子程序_06

示例代码4:(使用协程计算移动平均值)

def averager():
total = 0.0
count = 0
avg = None

while True:
num = yield avg
total += num
count += 1
avg = total / count


# run
ag = averager()
# 预激协程
print(next(ag)) # None

print(ag.send(10)) # 10
print(ag.send(20)) # 15

这里是一个死循环,只要不停 send 值 给 协程,可以一直计算下去。

解释:

  • 1. 调用 next(ag) 函数后,协程会向前执行到 yield 表达式,产出 average 变量的初始值 None。
  • 2. 此时,协程在 yield 表达式处暂停。
  • 3. 使用 send() 激活协程,把发送的值赋给 num,并计算出 avg 的值。
  • 4. 使用 print 打印出 yield 返回的数据。

单步 调试 上面程序。

使用 asyncio 的 消息循环 让协程运行

使用 asyncio 异步 IO 调用 协程

示例代码 1:

import asyncio


async def func_1():
print("func_1 start")
print("func_1 end")
# await asyncio.sleep(1)


async def func_2():
print("func_2 start")
print("func_2 a")
print("func_2 b")
print("func_2 c")
print("func_2 end")
# await asyncio.sleep(1)


f_1 = func_1()
print(f_1)

f_2 = func_2()
print(f_2)


# 获取 EventLoop:
loop = asyncio.get_event_loop()
tasks = [func_1(), func_2()]

# 执行 coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

示例代码 2:

import asyncio
import time

start = time.time()


def tic():
return 'at %1.1f seconds' % (time.time() - start)


async def gr1():
# Busy waits for a second, but we don't want to stick around...
print('gr1 started work: {}'.format(tic()))
# 暂停两秒,但不阻塞时间循环,下同
await asyncio.sleep(2)
print('gr1 ended work: {}'.format(tic()))


async def gr2():
# Busy waits for a second, but we don't want to stick around...
print('gr2 started work: {}'.format(tic()))
await asyncio.sleep(2)
print('gr2 Ended work: {}'.format(tic()))


async def gr3():
print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
await asyncio.sleep(1)
print("Done!")

# 事件循环
ioloop = asyncio.get_event_loop()

# tasks中也可以使用 asyncio.ensure_future(gr1())..
tasks = [
ioloop.create_task(gr1()),
ioloop.create_task(gr2()),
ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()


"""
结果:
gr1 started work: at 0.0 seconds
gr2 started work: at 0.0 seconds
Let's do some stuff while the coroutines are blocked, at 0.0 seconds
Done!
gr2 Ended work: at 2.0 seconds
gr1 ended work: at 2.0 seconds
"""

多个 coroutine 可以封装成一组 Task 然后并发执行。

  • ​asyncio.wait(...)​​​ 协程的参数是一个由 future 或 协程 构成的可迭代对象;wait 会分别
    把各个协程包装进一个 Task 对象。最终的结果是,wait 处理的所有对象都通过某种方式变成 Future 类的实例。wait 是协程函数,因此返回的是一个协程或生成器对象。
  • ​ioloop.run_until_complete​​​ 方法的参数是一个 future 或 协程。如果是协程,​​run_until_complete​​方法与 wait 函数一样,把协程包装进一个 Task 对象中。
  • 在 asyncio 包中,future和协程关系紧密,因为可以使用 yield from 从 asyncio.Future 对象中产出结果。这意味着,如果 foo 是协程函数(调用后返回协程对象),抑或是返回Future 或 Task 实例的普通函数,那么可以这样写:res = yield from foo()。这是 asyncio 包的 API 中很多地方可以互换协程与期物的原因之一。 例如上面的例子中 tasks 可以改写成协程列表:​​tasks = [gr1(), gr(2), gr(3)]​

详细的各个类说明,类方法,传参,以及方法返回的是什么类型都可以在​​官方文档​​上仔细研读,多读几遍,方有体会。

示例代码 3:

import asyncio
import time
import aiohttp
import async_timeout

msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}

urls = [msg.format(i) for i in range(5400, 5500)]


async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url) as response:
return response.status


async def main(url):
async with aiohttp.ClientSession() as session:
status = await fetch(session, url)
return status


if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop()
tasks = [main(url) for url in urls]
# 返回一个列表,内容为各个tasks的返回值
status_list = loop.run_until_complete(asyncio.gather(*tasks))
print(len([status for status in status_list if status == 200]))
end = time.time()
print("cost time:", end - start)

示例代码 4:

用 ​​asyncio ​​​实现  ​​Hello world ​​代码如下:

import asyncio


@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用 asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")


# 获取 EventLoop:
loop = asyncio.get_event_loop()

# 执行 coroutine
loop.run_until_complete(hello())
loop.close()

或者直接使用新语法 asyncawait

import asyncio


async def hello():
print("Hello world!")
# 异步调用 asyncio.sleep(1):
r = await asyncio.sleep(1)
print("Hello again!")


# 获取 EventLoop:
loop = asyncio.get_event_loop()

# 执行 coroutine
loop.run_until_complete(hello())
loop.close()

​@asyncio.coroutine ​​​把一个 generator 标记为 coroutine类型,然后,我们就把这个 ​​coroutine ​​​扔到 ​​EventLoop ​​中执行。

​hello() ​​​会首先打印出 ​​Hello world!​​​,然后,​​yield from ​​​语法可以让我们方便地调用另一个 ​​generator​​​。由于​​asyncio.sleep() ​​​也是一个 ​​coroutine​​​,所以线程不会等待 ​​asyncio.sleep()​​​,而是直接中断并执行下一个消息循环。当​​asyncio.sleep() ​​​返回时,线程就可以从 ​​yield from ​​​拿到返回值(此处是​​None​​),然后接着执行下一行语句。

把 ​​asyncio.sleep(1)​​​看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行 ​​EventLoop ​​​中其他可以执行的​​coroutine​​了,因此可以实现并发执行。

我们用 Task 封装两个 ​​coroutine ​​试试:

import threading
import asyncio


async def hello():
print('1 : Hello world! (%s)' % threading.currentThread())
await asyncio.sleep(5)
print('2 : Hello again! (%s)' % threading.currentThread())


loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

观察执行过程:

1 : Hello world! (<_MainThread(MainThread, started 12200)>)
1 : Hello world! (<_MainThread(MainThread, started 12200)>)
( 暂停约 5 秒 )
2 : Hello again! (<_MainThread(MainThread, started 12200)>)
2 : Hello again! (<_MainThread(MainThread, started 12200)>)

由打印的当前线程名称可以看出,两个 ​​coroutine ​​是由同一个线程并发执行的。

如果把 ​​asyncio.sleep() ​​​换成真正的IO操作,则多个 ​​coroutine ​​就可以由一个线程并发执行。

我们用 ​​asyncio ​​的异步网络连接来获取 sina、sohu 和 163 的网站首页:

import asyncio


async def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = await connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
await writer.drain()
while True:
line = await reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()


loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

执行结果如下:

wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段时间)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0

可见 3 个连接 由一个线程通过 ​​coroutine ​​并发完成。

参考源码:

async_hello.py:​​https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_hello.py​​​ async_wget.py:​​https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_wget.py​

示例代码 5: ( 协程 的 返回值

一个协程里可以启动另外一个协程,并等待它完成返回结果,采用 await 关键字

import asyncio


async def outer():
print('in outer')
print('waiting for result1')
result1 = await phase1()
print('waiting for result2')
result2 = await phase2(result1)
return (result1, result2)


async def phase1():
print('in phase1')
return 'result1'


async def phase2(arg):
print('in phase2')
return 'result2 derived from {}'.format(arg)


event_loop = asyncio.get_event_loop()
try:
return_value = event_loop.run_until_complete(outer())
print('return value: {!r}'.format(return_value))
finally:
event_loop.close()

运行结果:

in outer
waiting for result1
in phase1
waiting for result2
in phase2
return value: ('result1', 'result2 derived from result1')

前面都是关于 asyncio 的例子,那么除了asyncio,就没有其他协程库了吗?asyncio 作为 python 的标准库,自然受到很多青睐,但它有时候还是显得太重量了,尤其是提供了许多复杂的轮子和协议,不便于使用。

你可以理解为,asyncio 是使用 async/await 语法开发的 协程库,而不是有 asyncio 才能用 async/await,
除了 asyncio 之外,curio 和 trio 是更加轻量级的替代物,而且也更容易使用。

curio 的作者是 David Beazley,下面是使用 curio 创建 tcp server 的例子,据说这是 dabeaz 理想中的一个异步服务器的样子:

from curio import run, spawn
from curio.socket import *

async def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
print('Server listening at', address)
async with sock:
while True:
client, addr = await sock.accept()
await spawn(echo_client, client, addr)

async def echo_client(client, addr):
print('Connection from', addr)
async with client:
while True:
data = await client.recv(100000)
if not data:
break
await client.sendall(data)
print('Connection closed')

if __name__ == '__main__':
run(echo_server, ('',25000))

无论是 asyncio 还是 curio,或者是其他异步协程库,在背后往往都会借助于 IO的事件循环来实现异步,下面用几十行代码来展示一个简陋的基于事件驱动的echo服务器:

from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from selectors import DefaultSelector, EVENT_READ

selector = DefaultSelector()
pool = {}

def request(client_socket, addr):
client_socket, addr = client_socket, addr
def handle_request(key, mask):
data = client_socket.recv(100000)
if not data:
client_socket.close()
selector.unregister(client_socket)
del pool[addr]
else:
client_socket.sendall(data)
return handle_request

def recv_client(key, mask):
sock = key.fileobj
client_socket, addr = sock.accept()
req = request(client_socket, addr)
pool[addr] = req
selector.register(client_socket, EVENT_READ, req)

def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
selector.register(sock, EVENT_READ, recv_client)
try:
while True:
events = selector.select()
for key, mask in events:
callback = key.data
callback(key, mask)
except KeyboardInterrupt:
sock.close()

if __name__ == '__main__':
echo_server(('',25000))

验证一下:

# terminal 1
$ nc localhost 25000
hello world
hello world

# terminal 2
$ nc localhost 25000
hello world
hello world

现在知道:

  • 完成 异步的代码 不一定要用 async/await ,使用了 async/await 的代码也不一定能做到异步,
  • async/await 是协程的语法糖,使协程之间的调用变得更加清晰,
  • 使用 async 修饰的函数调用时会返回一个协程对象,
  • await 只能放在 async 修饰的函数里面使用,await 后面必须要跟着一个 协程对象Awaitable
  • await 的目的是等待协程控制流的返回而实现暂停并挂起函数的操作是yield。

async/await 以及 协程 是Python未来实现异步编程的趋势,我们将会在更多的地方看到他们的身影,例如协程库的 curio 和 trio,web 框架的 sanic,数据库驱动的 asyncpg 等等。在Python 3主导的今天,作为开发者,应该及时拥抱和适应新的变化,而基于async/await的协程凭借良好的可读性和易用性日渐登上舞台,看到这里,你还不赶紧上车?

Python 模块 asyncio – 协程之间的同步

Python 模块 asyncio – 协程之间的同步:​​https://www.quxihuan.com/posts/python-module-asyncio-synchronization/​

await 和 yield from

Python3.3 的 yield from 语法可以把生成器的操作委托给另一个生成器,生成器的调用方可以直接与子生成器进行通信:

def sub_gen():
yield 1
yield 2
yield 3

def gen():
return (yield from sub_gen())

def main():
for val in gen():
print(val)
# 1
# 2
# 3

利用这一特性,使用 yield from 能够编写出类似协程效果的函数调用,在3.5之前,asyncio 正是使用@asyncio.coroutine 和 yield from 语法来创建协程:​​https://docs.python.org/3.4/library/asyncio-task.html​

@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y

@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

然而,用 yield from 容易在表示协程和生成器中混淆,没有良好的语义性,所以在 Python 3.5 推出了更新的 async/await 表达式来作为协程的语法。

因此类似以下的调用是等价的:

async with lock:
...

with (yield from lock):
...
######################
def main():
return (yield from coro())

def main():
return (await coro())

那么,怎么把生成器包装为一个协程对象呢?这时候可以用到 types 包中的 coroutine 装饰器(如果使用asyncio做驱动的话,那么也可以使用 asyncio 的 coroutine 装饰器),@types.coroutine装饰器会将一个生成器函数包装为协程对象:

import asyncio
import types

@types.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y

async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

尽管两个函数分别使用了新旧语法,但他们都是协程对象,也分别称作 native coroutine 以及 generator-based coroutine,因此不用担心语法问题。

下面观察一个 asyncio 中 Future 的例子:

import asyncio

future = asyncio.Future()


async def test_1():
await asyncio.sleep(1)
future.set_result('data')


async def test_2():
print(await future)


loop = asyncio.get_event_loop()
tasks_list = [test_1(), test_2()]
loop.run_until_complete(asyncio.wait(tasks_list))
loop.close()

两个协程在事件循环中,协程 test_1 在执行第一句后挂起自身切到 asyncio.sleep,而协程 test_2 一直等待 future 的结果,让出事件循环,计时器结束后 test_1 执行第二句并设置了 future 的值,被挂起的 test_2 恢复执行,打印出 future 的结果 'data' 。

future 可以被 await 证明了 future 对象是一个 Awaitable,进入 Future 类的源码可以看到有一段代码显示了 future 实现了__await__ 协议:

class Future:
...
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.

if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression

当执行 await future 这行代码时,future中的这段代码就会被执行,首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的 Task(任务)等待 future 完成。

当 future 执行 set_result 方法时,会触发以下的代码,设置结果,标记 future 已经完成:

def set_result(self, result):
...
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self._schedule_callbacks()

最后 future 会调度自身的回调函数,触发 Task._step() 告知 Task 驱动 future 从之前挂起的点恢复执行,不难看出,future 会执行下面的代码:

class Future:
...
def __iter__(self):
...
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.

最终返回结果给调用方。

Yield from

调用协程 的方式有有很多,yield from 就是其中的一种。这种方式在 Python3.3 中被引入,在 Python3.5 中以 async/await 的形式进行了优化。yield from 表达式的使用方式如下:

import asyncio

@asyncio.coroutine
def get_json(client, url):
file_content = yield from load_file('/Users/scott/data.txt')

正如所看到的,yield from 被使用在用 @asyncio.coroutine 装饰的函数内,如果想把 yield from 在这个函数外使用,将会抛出如下语法错误:

File "main.py", line 1
file_content = yield from load_file('/Users/scott/data.txt')
^
SyntaxError: 'yield' outside function

为了避免语法错误,yield from 必须在调用函数的内部使用(这个调用函数通常被装饰为协程)。

Async / await

较新的语法是使用 async/await 关键字。 async 从 Python3.5 开始被引进,跟 @asyncio.coroutine 装饰器一样,用来声明一个函数是一个协程。只要把它放在函数定义之前,就可以应用到函数上,使用方式如下:

async def ping_server(ip):
# ping code here...

实际调用这个函数时,使用 await 而不用 yield from ,当然,使用方式依然差不多:

async def ping_local(ip):
return await ping_server('192.168.1.1')

再强调一遍,跟 yield from 一样,不能在函数外部使用 await ,否则会抛出语法错误。 (译者注: async 用来声明一个函数是协程,然后使用 await调用这个协程, await 必须在函数内部,这个函数通常也被声明为另一个协程)

Python3.5 对这两种调用协程的方法都提供了支持,但是推荐 async/await 作为首选。

Event Loop

如果你还不知道如何开始和操作一个 Eventloop ,那么上面有关协程所说的都起不了多大作用。 Eventloop 在执行异步函数时非常重要,重要到只要执行协程,基本上就得利用 Eventloop 。

Eventloop 提供了相当多的功能:

  • 注册,执行 和 取消 延迟调用(异步函数)
  • 创建 客户端 与 服务端 传输用于通信
  • 创建 子程序 和 通道 跟 其他的程序 进行通信
  • 指定 函数 的 调用 到 线程池

Eventloop 有相当多的配置和类型可供使用,但大部分程序只需要如下方式预定函数即可:

import asyncio

async def speak_async():
print('OMG asynchronicity!')

loop = asyncio.get_event_loop()
loop.run_until_complete(speak_async())
loop.close()

有意思的是代码中的最后三行,首先获取默认的 Eventloop ( asyncio.get_event_loop() ),然后预定和运行异步任务,并在完成后结束循环。

loop.run_until_complete() 函数实际上是阻塞性的,也就是在所有异步方法完成之前,它是不会返回的。但因为我们只在一个线程中运行这段代码,它没法再进一步扩展,即使循环仍在运行。

可能你现在还没觉得这有多大的用处,因为我们通过调用其他 IO 来结束 Eventloop 中的阻塞(译者注:也就是在阻塞时进行其他 IO ),但是想象一下,如果在网页服务器上,把整个程序都封装在异步函数内,到时就可以同时运行多个异步请求了。

也可以将 Eventloop 的线程中断,利用它去处理所有耗时较长的 IO 请求,而主线程处理程序逻辑或者用户界面。

一个案例

让我们实际操作一个稍大的案例。下面这段代码就是一个非常漂亮的异步程序,它先从 Reddit 抓取 JSON 数据,解析它,然后打印出当天来自 /r/python,/r/programming 和 /r/compsci 的置顶帖。

所示的第一个方法 get_json() ,由 get_reddit_top() 调用,然后只创建一个 GET 请求到适当的网址。当这个方法和 await 一起调用后, Eventloop 便能够继续为其他的协程服务,同时等待 HTTP 响应达到。一旦响应完成, JSON 数据就返回到 get_reddit_top() ,得到解析并打印出来。

import signal
import sys
import asyncio
import aiohttp
import json

loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=loop)


async def get_json(client, url):
async with client.get(url) as response:
assert response.status == 200
return await response.read()


async def get_reddit_top(subreddit, client):
data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=5')

j = json.loads(data1.decode('utf-8'))
for i in j['data']['children']:
score = i['data']['score']
title = i['data']['title']
link = i['data']['url']
print(str(score) + ': ' + title + ' (' + link + ')')

print('DONE:', subreddit + '\n')


def signal_handler(signal, frame):
loop.stop()
client.close()
sys.exit(0)


signal.signal(signal.SIGINT, signal_handler)

asyncio.ensure_future(get_reddit_top('python', client))
asyncio.ensure_future(get_reddit_top('programming', client))
asyncio.ensure_future(get_reddit_top('compsci', client))
loop.run_forever()

注意,如果多次运行这段代码,打印出来的 subreddit 数据在顺序上会有些许变化。这是因为每当我们调用一次代码都会释放对线程的控制,容许线程去处理另一个 HTTP 调用。这将导致谁先获得响应,谁就先打印出来。

结论

即使 Python 内置的异步操作没有 Javascript 那么顺畅,但这并不意味着就不能用它来把应用变得更有趣、更有效率。只要花半个小时的时间去了解它的来龙去脉,你就会感觉把异步操作应用到你的程序中将会是多美好的一件事。

aiohttp

​asyncio ​​​可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把​​asyncio​​用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用 单线程 + coroutine ​​实现多用户的高并发支持。

asyncio 实现了TCP、UDP、SSL等协议,​aiohttp 则是基于 asyncio 实现的 HTTP 框架。

我们先安装 ​​aiohttp​​:pip install aiohttp

然后编写一个HTTP服务器,分别处理以下URL:

  • ​/​​​ - 首页返回​​b'​

Index

  • ​'​​;
  • ​/hello/{name}​​​ - 根据URL参数返回文本​​hello, %s!​​。

代码如下:

import asyncio

from aiohttp import web


async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')


async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))


async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv


loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

注意 ​​aiohttp​​​的初始化函数​​init()​​​也是一个​​coroutine​​​,​​loop.create_server()​​​则利用​​asyncio​​创建TCP服务。

参考源码:aio_web.py :​​https://github.com/michaelliao/learn-python3/blob/master/samples/async/aio_web.py​

 一切从爬虫开始

【续篇】Python 协程之从放弃到死亡再到重生:​​https://www.secpulse.com/archives/64912.html​

从一个简单的爬虫开始,这个爬虫很简单,访问指定的URL,并且获取内容并计算长度,这里我们给定5个URL。第一版的代码十分简单,顺序获取每个URL的内容,当第一个请求完成、计算完长度后,再开始第二个请求。

spider_normal.py

# filename: spider_normal.py
import time
import requests

targets = [
"https://lightless.me/archives/python-coroutine-from-start-to-boom.html",
"https://github.com/aio-libs",
"https://www.python.org/dev/peps/pep-0380/",
"https://www.baidu.com/",
"https://www.zhihu.com/",
]


def spider():
results = {}
for url in targets:
r = requests.get(url)
length = len(r.content)
results[url] = length
return results


def show_results(results):
for url, length in results.items():
print("Length: {:^7d} URL: {}".format(length, url))


def main():
start_time = time.time()
results = spider()
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(results)


if __name__ == '__main__':
main()

我们多运行几次看看结果。

Python 异步 IO 、协程、asyncio、async/await、aiohttp_生成器_07

大约需要花费14-16秒不等,这段代码并没有什么好看的,我们把关注点放在后面的代码上。现在我们使用多线程来改写这段代码。

# filename: spider_thread.py
import time
import threading
import requests

final_results = {}

targets = [
"https://lightless.me/archives/python-coroutine-from-start-to-boom.html",
"https://github.com/aio-libs",
"https://www.python.org/dev/peps/pep-0380/",
"https://www.baidu.com/",
"https://www.zhihu.com/",
]


def show_results(results):
for url, length in results.items():
print("Length: {:^7d} URL: {}".format(length, url))


def spider(url):
r = requests.get(url)
length = len(r.content)
final_results[url] = length


def main():
ts = []
start_time = time.time()
for url in targets:
t = threading.Thread(target=spider, args=(url,))
ts.append(t)
t.start()
for t in ts:
t.join()
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)


if __name__ == '__main__':
main()

我们多运行几次看看结果。

Python 异步 IO 、协程、asyncio、async/await、aiohttp_Python_08

从这两段代码中,已经可以看出并发对于处理任务的好处了,但是使用原生的​​threading​​​模块还是略显麻烦,Python已经给我们内置了一个处理并发任务的库​​concurrent​​​,我们借用这个库修改一下我们的代码,之所以修改成这个库的原因还有一个,那就是引出我们后面会谈到的​​Future​​。

# filename: spider_thread.py
import time
from concurrent import futures
import requests

final_results = {}

targets = [
"https://lightless.me/archives/python-coroutine-from-start-to-boom.html",
"https://github.com/aio-libs",
"https://www.python.org/dev/peps/pep-0380/",
"https://www.baidu.com/",
"https://www.zhihu.com/",
]


def show_results(results):
for url, length in results.items():
print("Length: {:^7d} URL: {}".format(length, url))


def spider(url):
r = requests.get(url)
length = len(r.content)
final_results[url] = length
return True


def main():
start_time = time.time()
with futures.ThreadPoolExecutor(10) as executor:
res = executor.map(spider, targets)
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)


if __name__ == '__main__':
main()

执行一下,会发现耗时与上一个版本一样,稳定在10s左右。

Python 异步 IO 、协程、asyncio、async/await、aiohttp_子程序_09

可以看到我们调用了​​concurrent​​​库中的​​futures​​​,那么到底什么是​​futures​​​?简单的讲,这个对象代表一种异步的操作,可以表示为一个需要延时进行的操作,当然这个操作的状态可能已经完成,也有可能尚未完成,如果你写JS的话,可以理解为是类似​​Promise​​​的对象。在Python中,标准库中其实有两个​​Future​​​类,一个是​​concurrent.futures.Future​​​,另外一个是​​asyncio.Future​​​,这两个类很类似,不完全相同,这些实现差异以及API的差异我们先按下暂且不谈,有兴趣的同学可以参考下相关的文档。​​Future​​​是我们后面讨论的​​asyncio​​异步编程的基础,因此这里多说两句。

​Future​​​代表的是一个未来的某一个时刻一定会执行的操作(可能已经执行完成了,但是无论如何他一定有一个确切的运行时间),一般情况下用户无需手动从零开始创建一个Future,而是应当借助框架中的API生成。比如调用​​concurrent.futures.Executor.submit()​​​时,框架会为"异步操作"进行一个排期,来决定何时运行这个操作,这时候就会生成一个​​Future​​对象。

现在,我们来看看如何使用​​asyncio​​​进行异步编程,与多线程编程不同的是,多个协程总是运行在同一个线程中的,一旦其中的一个协程发生阻塞行为,那么整个线程都被阻塞,进而所有的协程都无法继续运行。​​asyncio.Future​​​和​​asyncio.Task​​​都可以看做是一个异步操作,后者是前者的子类,​​BaseEventLoop.create_task()​​​会接收一个协程作为参数,并且对这个任务的运行时间进行排期,返回一个​​asyncio.Task​​​类的实例,这个对象也是对于协程的一层包装。如果想获取​​asyncio.Future​​​的执行结果,应当使用​​yield from​​​来获取,这样控制权会被自动交还给EventLoop,我们无需处理"等待​​Future​​​或​​Task​​​运行完成"这个操作。于是就有了一个很愉悦的编程方式,如果一个函数A是协程、或返回​​Task​​​或​​Future​​​的实例的函数,就可以通过​​result = yield from A()​​​来获取返回值。下面我们就使用​​asyncio​​​和​​aiohttp​​来改写我们的爬虫。

import asyncio
import time

import aiohttp

final_results = {}

targets = [
"https://lightless.me/archives/python-coroutine-from-start-to-boom.html",
"https://github.com/aio-libs",
"https://www.python.org/dev/peps/pep-0380/",
"https://www.baidu.com/",
"https://www.zhihu.com/",
]


def show_results(results):
for url, length in results.items():
print("Length: {:^7d} URL: {}".format(length, url))


async def get_content(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.read()
return len(content)


async def spider(url):
length = await get_content(url)
final_results[url] = length
return True


def main():
loop = asyncio.get_event_loop()
cor = [spider(url) for url in targets]
start_time = time.time()
result = loop.run_until_complete(asyncio.gather(*cor))
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)
print("loop result: ", result)


if __name__ == '__main__':
main()

结果非常惊人

Python 异步 IO 、协程、asyncio、async/await、aiohttp_子程序_10

这里可能有同学会问为什么没看到​​yield from​​​以及​​@asyncio.coroutine​​​,那是因为在Python3.5以后,增加了​​async def​​​和​​awiat​​​语法,等效于​​@asyncio.coroutine​​​和​​yield from​​​,详情可以参考上一篇文章。在​​main()​​​函数中,我们先获取一个可用的事件循环,紧接着将生成好的协程任务添加到这个循环中,并且等待执行完成。在每个​​spider()​​​中,执行到​​await​​​的时候,会交出控制权(如果不明白请向前看一下委托生成器的部分),并且切到其他的协程继续运行,等到​​get_content()​​​执行完成返回后,那么会恢复​​spider()​​​协程的执行。​​get_content()​​​函数中只是通过​​async with​​​调用​​aiohttp​​库的最基本方法获取页面内容,并且返回了长度,仅此而已。

在修改为协程版本后,爬虫性能有了巨大的提升,从最初了15s,到10s,再到现在的2s左右,简直是质的飞跃。这只是一个简单的爬虫程序,相比多线程,性能提高了近5倍,如果是其他更加复杂的大型程序,也许性能提升会更多。​​asyncio​​​这套异步编程框架,通过简单的事件循环以及协程机制,在需要等待的情况下主动交出控制权,切换到其他协程进行运行。到这里就会有人问,为什么要将​​requests​​​替换为​​aiohttp​​​,能不能用​​requests​​​?答案是不能,还是我们前面提到过的,在协程中,一切操作都要避免阻塞,禁止所有的阻塞型调用,因为所有的协程都是运行在同一个线程中的!​​requests​​​库是阻塞型的调用,当在等待I/O时,并不能将控制权转交给其他协程,甚至还会将当前线程阻塞,其他的协程也无法运行。如果你在异步编程的时候需要用到一些其他的异步组件,可以到​​https://github.com/aio-libs/​​这里找找,也许就有你需要的异步库。

关于​​asyncio​​​的异步编程资料目前来说还不算很多,​​官方文档​​​应该算是相当不错的参考文献了,其中非常推荐的两部分是:​​Develop with asyncio​​​和​​Tasks and coroutines​​​,各位同学有兴趣的话可以自行阅读。​​asyncio​​​这个异步框架中包含了非常多的内容,甚至还有​​TCP Server/Client​​​的相关内容,如果想要掌握​​asyncio​​​这个异步编程框架,还需要多加练习。顺带一提,​​asyncio​​​非常容易与其他的框架整合,例如​​tornado​​​已经有实现了​​asyncio.AbstractEventLoop​​​的接口的类​​AsyncIOMainLoop​​​,还有人将​​asyncio​​集成到QT的事件循环中了,可以说是非常的灵活了。

Python 协程总结

Python 之所以能够处理网络 IO 高并发,是因为借助了高效的IO模型,能够最大限度的调度IO,然后事件循环使用协程处理IO,协程遇到IO操作就将控制权抛出,那么在IO准备好之前的这段事件,事件循环就可以使用其他的协程处理其他事情,然后协程在用户空间,并且是单线程的,所以不会像多线程,多进程那样频繁的上下文切换,因而能够节省大量的不必要性能损失。

注: 不要再协程里面使用time.sleep之类的同步操作,因为协程再单线程里面,所以会使得整个线程停下来等待,也就没有协程的优势了

理解


协程,又称为微线程,看上去像是子程序,但是它和子程序又不太一样,它在执行的过程中,可以在中断当前的子程序后去执行别的子程序,再返回来执行之前的子程序,但是它的相关信息还是之前的。

优点:

  1. 极高的执行效率,因为子程序切换而不是线程切换,没有了线程切换的开销;
  2. 不需要多线程的锁机制,因为只有一个线程在执行;

如果要充分利用CPU多核,可以通过使用多进程+协程的方式

使用


打开 asyncio 的源代码,可以发现asyncio中的需要用到的文件如下:


Python 异步 IO 、协程、asyncio、async/await、aiohttp_子程序_11

下面的则是接下来要总结的文件

文件

解释

base_events

基础的事件,提供了BaseEventLoop事件

coroutines

提供了封装成协程的类

events

提供了事件的抽象类,比如BaseEventLoop继承了AbstractEventLoop

futures

提供了Future类

tasks

提供了Task类和相关的方法

coroutines


函数

解释

coroutine(func)

为函数加上装饰器

iscoroutinefunction(func)

判断函数是否使用了装饰器

iscoroutine(obj)

判断该对象是否是装饰器

如果在函数使用了​​coroutine​​​装饰器,就可以通过​​yield from​​​去调用​​async def​​​声明的函数,如果已经使用​​async def​​声明,就没有必要再使用装饰器了,这两个功能是一样的。

import asyncio


@asyncio.coroutine
def hello_world():
print("Hello World!")


async def hello_world2():
print("Hello World2!")


print('------hello_world------')
print(asyncio.iscoroutinefunction(hello_world))

print('------hello_world2------')
print(asyncio.iscoroutinefunction(hello_world2))

print('------event loop------')
loop = asyncio.get_event_loop()

# 一直阻塞该函数调用到函数返回
loop.run_until_complete(hello_world())
loop.run_until_complete(hello_world2())
loop.close()

上面的代码分别使用到了​​coroutine​​​装饰器和​​async def​​,其运行结果如下:

------hello_world------
True
------hello_world2------
True
------event loop------
Hello World!
Hello World2!

注意:不可以直接调用协程,需要一个​​event loop​​去调用。

如果想要在一个函数中去得到另外一个函数的结果,可以使用​​yield from​​​或者​​await​​,例子如下:

import asyncio


async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y


async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))


loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

函数 ​​print_sum​​​ 会一直等到函数 ​​compute​​ 返回结果,执行过程如下:

base_events


这个文件里面漏出来的只有​​BaseEventLoop​​一个类,它的相关方法如下:

函数

解释

create_future()

创建一个future对象并且绑定到事件上

create_task()

创建一个任务

run_forever()

除非调用stop,否则事件会一直运行下去

run_until_complete(future)

直到 future 对象执行完毕,事件才停止

stop()

停止事件

close()

关闭事件

is_closed()

判断事件是否关闭

time()

返回事件运行时的时间

call_later(delay, callback, *args)

设置一个回调函数,并且可以设置延迟的时间

call_at(when, callback, *args)

同上,但是设置的是绝对时间

call_soon(callback, *args)

马上调用

events


函数

解释

get_event_loop()

返回一个异步的事件

...

...

返回的就是BaseEventLoop的对象。


future


Future类的相关方法如下:

方法

解释

cancel()

取消掉future对象

cancelled()

返回是否已经取消掉

done()

如果future已经完成则返回true

result()

返回future执行的结果

exception()

返回在future中设置了的exception

add_done_callback(fn)

当future执行时执行回调函数

remove_done_callback(fn)

删除future的所有回调函数

set_result(result)

设置future的结果

set_exception(exception)

设置future的异常

设置 future 的例子如下:

import asyncio


async def slow_operation(future):
await asyncio.sleep(1) # 睡眠
future.set_result('Future is done!') # future设置结果


loop = asyncio.get_event_loop()
future = asyncio.Future() # 创建future对象
asyncio.ensure_future(slow_operation(future)) # 创建任务
loop.run_until_complete(future) # 阻塞直到future执行完才停止事件
print(future.result())
loop.close()

​run_until_complete​​​方法在内部通过调用了future的​​add_done_callback​​,当执行future完毕的时候,就会通知事件。

下面这个例子则是通过使用future的​​add_done_callback​​方法实现和上面例子一样的效果:

import asyncio


async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')


def got_result(future):
print(future.result())
loop.stop() # 关闭事件


loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result) # future执行完毕就执行该回调
try:
loop.run_forever()
finally:
loop.close()

一旦​​slow_operation​​​函数执行完毕的时候,就会去执行​​got_result​​函数,里面则调用了关闭事件,所以不用担心事件会一直执行。


task


Task类是Future的一个子类,也就是Future中的方法,task都可以使用,类方法如下:

方法

解释

current_task(loop=None)

返回指定事件中的任务,如果没有指定,则默认当前事件

all_tasks(loop=None)

返回指定事件中的所有任务

cancel()

取消任务

并行执行三个任务的例子:

import asyncio


async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))
loop.close()

执行结果为

Task A: Compute factorial(2)...Task B: Compute factorial(2)...Task C: Compute factorial(2)...Task A: factorial(2) = 2Task B: Compute factorial(3)...Task C: Compute factorial(3)...Task B: factorial(3) = 6Task C: Compute factorial(4)...Task C: factorial(4) = 24

可以发现,ABC同时执行,直到future执行完毕才退出。

下面一些方法是和task相关的方法

方法

解释

as_completed(fs, *, loop=None, timeout=None)

返回是协程的迭代器

ensure_future(coro_or_future, *, loop=None)

调度执行一个 coroutine object:并且它封装成future。返回任务对象

async(coro_or_future, *, loop=None)

丢弃的方法,推荐使用ensure_future

wrap_future(future, *, loop=None)

Wrap a concurrent.futures.Future object in a Future object.

gather(*coros_or_futures, loop=None, return_exceptinotallow=False)

从给定的协程或者future对象数组中返回future汇总的结果

sleep(delay, result=None, *, loop=None)

创建一个在给定时间(以秒为单位)后完成的协程

shield(arg, *, loop=None)

等待future,屏蔽future被取消

wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

等待由序列futures给出的Futures和协程对象完成。协程将被包裹在任务中。返回含两个集合的Future:(done,pending)

wait_for(fut, timeout, *, loop=None)

等待单个Future或coroutine object完成超时。如果超时为None,则阻止直到future完成

标签:协程,aiohttp,Python,await,print,async,def,asyncio
From: https://blog.51cto.com/csnd/5956126

相关文章

  • Python 大规模异步新闻爬虫、google、百度、有道、百度指数
    参考:​​https://www.yuanrenxue.com/crawler/news-crawler-urlpool.html​​url_pool.py#-*-coding:utf-8-*-#@Author:佛祖保佑,永无bug#@Date:#@File......
  • python 之将xmind转为excel用例文件
    1.xmind文件模板如下所示(最后一个子级为预置条件)2.excel用例模板3.获取xmind文件数据并转成字典形式fromxmindparserimportxmind_to_dict#xmind_to_dict可读取......
  • python+excel=openpyxl(一)
     原计划写一个openpyxl的操作文档,普及下python如何来操作excel,结果人家官方的文档已经写的非常完美了,就临时改主意把人家的文档翻译了一遍。可以阅读英文文档的同学,建议......
  • Python学习笔记--元组+字符串
    元组元组一旦定义完成,就不能再被修改同样,元组也可以进行嵌套操作当然,若是在元组里面嵌套一个list,那么list里面的元素是可以进行修改的!案例:实现:字符串查找索......
  • python-FunctionType动态创建函数,并改变函数名称
    方法一但无法编写foo函数体里面内容importtypesdeffoo(x,y):#print(x,y)return1x=1y=2f=types.FunctionType(foo.__code__,{},name='te......
  • 解决python无法导入自定义类的问题
    问题:在自定义了类之后,想在另外一个文件导入自定义类,无法导入目录:在class_test.py中自定了类在test.py中导入类A,出现问题解决方法:test.py:importsysimportos......
  • Python3 Robot Framework CustomLibrary 封装系统关键字(使用自定义函数)
    1.创建一个python文件,确认能够执行,放入任意一个目录(如C:\CustomLibrary\helloworld.py)。#-*-coding:utf-8-*-defhi(name):u'''接收一个名字,并问候.例如|......
  • 你可能不知道的 Python 技巧
    英文|​​PythonTipsandTrick,YouHaven'tAlreadySeen​​原作|MartinHeinz(​​https://martinheinz.dev​​)译者|豌豆花下猫声明:本文获得原作者授权翻译,......
  • Python 任务自动化工具 tox 教程
    在我刚翻译完的Python打包​​系列文章​​中,作者提到了一个神奇的测试工具tox,而且他本人就是tox的维护者之一。趁着话题的相关性,本文将对它做简单的介绍,说不定大家在......
  • 2019 年 stackoverflow 网站最受欢迎的 20 个 Python 问题
    在最新一期的“Python开发者周刊”(Pycoder'sweekly)里,我看到一则有意思的分享,故转出来分享给大家。该分享来自是一份”pythonweeklyreports“,统计了2019年里stackoverf......