目录
迭代器
可迭代对象 & 迭代器
现在我们回忆一下什么是可迭代对象?
iterables=[
"123",
[1,2,3],
(1,2,3),
{1,2,3}, # 集合
{1:'a',2:'b',3:'c'}
]
for iterable in iterables:
print(type(iterable))
for x in iterable:
print(x,end=', ')
print(' ')
def common_attrs(*objs):
"""计算对象之间的共同属性"""
assert len(objs) > 0
# 第一个元素的全局对象列表:__dir__()
# dir(objs[0])比objs[0].__dir__()多了__contains__,其它一样
attrs = set(dir(objs[0]))
for obj in objs[1:]:
attrs &= set(dir(obj)) # 取所有对象的共同属性,交集
attrs -= set(dir(object)) # 剔除基础对象object的属性
return attrs
# 计算可迭代对象的共同属性
iterable_common_attrs = common_attrs(*iterables)
print(iterable_common_attrs)
# 可迭代对象是所属类中实现了'__iter__()'的对象
# 之所以出现'__contains__', '__len__'是由于以上对象都是容器类型对象,容器类型对象有求容量及包含操作
# 文件对象也是可迭代对象
f = open('ex1.py', 'r')
# 加入到可迭代对象中
iterables.append(f)
iterable_common_attrs &= set(dir(f))
print(iterable_common_attrs)#这时只有__iter__
由此,我们得到两个结论:
- 1.凡是提供
__iter__()
实现的类创建的实例都是可迭代对象 - 2.常见的可迭代对象有:字符串、列表、元组、字典、集合、文件对象
同时注意到__iter__
方法对应的调用方法就是内置函数iter()
第二个问题:什么是迭代器?它同可迭代对象有什么关系?
# 由可迭代对象列表得到相应的迭代器列表
iterators = [iter(iterable) for iterable in iterables]
# 计算迭代器共同属性
iterators_common_attrs = common_attrs(*iterators)
print(iterators_common_attrs)
# 迭代器共同属性:'__next__', '__iter__'
由此我们得到一个结论:迭代器有两个接口__iter__
和__next__
,换言之,要想造一个迭代器我们需要提供这两个接口的实现。
自定义迭代器
1.设计迭代器
迭代器基本功能:
- 初始化时要传入可迭代对象,才能知道去哪里取数据
- 初始化迭代进度
- 每次迭代时,即每次调用
__next__
方法时:- 如果仍有元素可迭代,则返回本轮迭代的元素,同时更新当前迭代进度
- 如果已无元素可返回,则迭代结束,抛出
StopIteration
异常
迭代的三个关键步骤:
- 调用iter()构建迭代器
- 调用next()获取下一个值
- 捕获StopIteration异常,结束迭代
class Parachute:
def __init__(self, actions):
self.actions = actions
self.index = 0 # 初始化索引下标
def __next__(self): # 取下一个值
while self.index < len(self.actions):
action = self.actions[self.index]
self.index += 1 # 更新索引下标
return action
raise StopIteration
def __iter__(self): # 构建迭代器
return self # 迭代器自己实现__iter__方法,返回自己(99%都是写这一句)
actions = ['登机','穿降落伞','跳伞']
for x in Parachute(actions):
print(x)
2.迭代器协议
迭代器必须同时实现__next__
和__iter__
方法,称之为迭代器协议
迭代器一定是可迭代对象,可迭代对象不一定是迭代器,是不是有点绕了?
3.迭代器的意义
-
统一通过next()获取数据,屏蔽底层不同数据读取方式
-
容器类数据结构只关心数据静态存储,每次迭代需要额外的迭代器对象专门负责记录迭代过程中的状态信息
虽然__iter__
方法写法几乎是固定的,但是它才是点睛之笔!
4.存在两种可迭代对象
-
容器类型
- list,tuple,dict,set
- 只有
__iter__
接口 - 静态数据
- 需要额外迭代器支持
- 支持多次迭代
-
迭代器类型
- 文件、stringIO(这玩意本质是把字符串当成文件来用)等
- 同时实现
__iter__
和__next__
接口 - 动态的
- 只能迭代一次
5.迭代器的应用场景
from random import random
class Random:
def __iter__(self):
return self
def __next__(self):
return random()
这个迭代器不需要存储数据,甚至连StopIteration都不管
每次数据都是实时产生,不占用内存!
虽然迭代器是名副其实的数据生成器,但生成器在Python中特指包含yield的函数对象
早期Python有些可迭代对象没有定义__iter__
方法,如果一个对象没有__iter__
方法,但是定义了__getitem__
方法,同样是可以迭代的。
所以我们不能通过检查__iter__
方法是否存在来判断一个对象是否可迭代,应该直接使用iter()
,如果不可迭代,则会抛出TypeError异常
生成器
生成器就是迭代器
关键字yield的特别之处在于:
- 只能在函数内使用
- 在函数内任何地方出现了yield关键字,哪怕无法执行到,函数都被看作一个生成器函数
def gen():
print('ok')
if False:
yield
g=gen()
import inspect
# 是函数也是生成器函数
print(inspect.isfunction(gen))
print(inspect.isgeneratorfunction(gen))
# 生成器函数不是生成器
print(inspect.isgenerator(gen))
print(inspect.isgenerator(g))
含有yield的函数就是一个生成器函数,调用生成器函数返回的结果为生成器
yield是语句or表达式?
- yield语句:python2.2时代
- yield表达式:python2.5时代
yield对函数做了啥?
改变函数性质
1.调用生成器函数不是直接执行其中代码,而是返回一个对象
2.生成器函数内代码,要通过生成器对象来执行
从这一点来说,生成器函数作用和类差不多
生成器就是迭代器,运行方式和迭代器一致
- 通过next()调用
- 每次next()在遇到yield后返回结果(作为next()返回值)
- 如果函数执行结束(遇到return)则抛出StopIteration异常
示例
def gen_666(meet_yield):
print('hi')
if meet_yield:
print('yield')
yield 66
print('back')
print('bye')
return 'result'
# g1 = gen_666(False) # 相当于实例化了类,但并未调用类的方法
# x1 = next(g1)
# return数据被异常带出来
# print(x1) # 结果被StopIteration带出来,结果在简单生成器里是没用的,在升级到复杂协程后这个结果就有大用
g1 = gen_666(True)
x1 = next(g1)
# return数据被yield带出来
print(x1) # yield的数据返回
x1=next(g1)
print(x1)
在循环中使用yield,要想迭代多次,可以在函数内多次使用yield语句
def count(start=0,step=1):
n=start
while True:
yield n # 走到yield时暂停向下执行,将数据出栈,第二次next()时往下执行
n+=step
生成器的4个状态
- 当调用生成器函数得到生成器对象时
- 此时的生成器对象可理解为处于【初始】状态
- 通过next()调用生成器对象,对应的生成器函数代码开始运行
- 此时生成器对象处于【运行中】状态
- 如果遇到yield语句,next()返回时
- yield语句右边对象作为next()返回值
- 生成器在yield语句所在位置【暂停】,当再次使用next()时继续从该为止继续运行
- 如果执行到函数结束,则抛出StopIteration异常
- 不管使用了return语句显示返回值或者默认返回None值,返回值只能作为异常值一并抛出
- 此时的生成器对象处于【结束】状态
- 对于已经结束的生成器对象,再次调用next()将抛出StopIteration异常,这时不含返回值,即返回值只能抛出一次
用yield重构迭代器
动作 | class实现迭代器 | yield生成迭代器 |
---|---|---|
定义 | class 迭代器 | def 迭代器() |
next() | def next(self): return v | yield v |
StopIteration | raise StopIteration | return |
iter() | def iter(self): return self | 隐含 |
生成器三种应用场景
- 定义一个容器类的可迭代对象,为该对象实现
__iter__
接口 - 定义一个处理其他可迭代对象的迭代器
- 定义一个不依赖数据存储的数据生成器
1.定义一个容器类的可迭代对象
class MyData:
# 其余代码省略
@property
def size(self):
return self.size
def get_value(self,index):
return index
def __iter__(self):
index=-1
while index<2:
index+=1
yield self.get_value(index)
mydata=MyData()
2.实现有处理数据能力的迭代器
# 我们来重写之前的Parachute,可见实现的功能一样但是代码大大简化了!
def Parachute(actions):
for action in actions:
yield action
actions = ['登机','穿降落伞','跳伞']
for x in Parachute(actions):
print(x)
3.数据生成器
def countdown(start):
while start>0:
start-=1
yield start
for e in countdown(5):
print(e)
def countup(start):
while start<10:
start+=1
yield start
for e in countup(5):
print(e)
经过一番回忆之后,我们尝试解答下面的问题:
- 生成器函数和普通函数区别
- 生成器函数和生成器对象关系
- 生成器函数可以暂停执行的秘密
生成器进阶
函数运行机制
函数对象和代码对象
这两个对象保存的是函数的静态信息
def func():
pass
func # 函数对象
func.__code__ # 代码对象
代码对象随函数对象一起创建,是函数对象重要属性
代码对象重要属性以co_开头
func_code=func.__code__
for attr in dir(func_code):
if attr.startswith('co_'):
print(f'{attr}\t: {getattr(func_code,attr)}')
函数运行帧
函数对象和代码对象保存了函数基本信息,当函数运行时,需要对象来保存运行时状态
这个对象就是帧对象(frame object)
每次调用函数,都会自动创建一个帧对象,记录当次运行的状态
获取函数运行帧信息我们要借助于inspect模块的currentframe()这个API。
import inspect
def foo():
# 获取到函数的运行帧并返回
return inspect.currentframe()
f1 = foo() # 由于被变量引用,所以帧不会被垃圾回收
print(f1)
以下让我们以可视化方式来看函数对象、代码对象和帧对象间关系
先安装graphviz
# ubuntu
sudo apt install graphviz
# centos
sudo yum install graphviz
再安装驱动
pip install graphviz xdot==1.2 # xdot1.3安装报错,提示需要更新c++库文件,这时可下载合适的wlh或直接降低版本安装
pip install objgraph
运行以下代码
import inspect
import subprocess
from objgraph import show_backrefs
def foo():
# 获取到函数的运行帧并返回
return inspect.currentframe()
f1 = foo() # 由于被变量引用,所以帧不会被垃圾回收
print(f1)
f2 = foo() # 再调用一次得到另一个帧
# 函数对象、代码对象和帧对象间关系
filename = 'out.dot'
show_backrefs(foo.__code__, filename=filename)
subprocess.run(f'dot -T jpg ./{filename} -o {filename.split(".")[0]}.jpg', shell=True)
帧对象重要属性以f_开头
- f_code:执行代码对象
- f_back:指向上一帧,也就是调用者的帧
- f_locals:局部变量
- f_globals:全局变量
- f_lineno:当前对应行号
函数运行栈
当一个函数中调用了另一个函数,此时前一个函数还没有结束,所以这两个函数的帧对象时同时存在的
比如,程序一般始于main函数,然后又调用其他函数,因此一个程序运行期间同时存在多个帧对象。函数间调用关系是先执行的后退出,所以帧对象间的关系也是先入后出,正好以栈的形式保存。
因此,函数的运行帧又称为栈帧。(函数运行过程中入栈出栈,函数帧对象又放在栈上,这个帧对象又称为栈帧)
注意:一个线程只有一个函数运行栈
import subprocess
import inspect
from objgraph import show_refs
# 展示函数调用时的栈
def foo():
return inspect.currentframe()
def bar():
return foo()#返回foo函数运行时的帧对象
f1=bar()
filename = 'foo.dot'
show_refs(f1, filename=filename)
subprocess.run(f'dot -T jpg ./{filename} -o {filename.split(".")[0]}.jpg', shell=True)
生成器函数有何不同?
生成器函数仍然时函数对象,当然包括代码对象
import inspect
def gen_foo():
for _ in range(10):
yield inspect.currentframe()#每次迭代都返回当前帧
import subprocess
from objgraph import show_refs
filename='gen_foo.dot'
show_refs(gen_foo, filename=filename)
subprocess.run(f'dot -T jpg ./{filename} -o {filename.split(".")[0]}.jpg', shell=True)
调用生成器函数不会直接运行(不像普通函数那样创建帧对象并压入函数栈),而是得到一个生成器对象,那么秘密自然藏在生成器对象里!
生成器对象有个特殊属性:gi_frame,保留了帧对象的属性,保留对代码对象的引用
生成器的特殊之处在于它自带一个帧,而且当每次使用next()对生成器进行迭代时,都用这个帧对象(gi_frame)来保存状态
gf=gen_foo()
filename='gf.dot'
show_refs(gf, filename=filename)
subprocess.run(f'dot -T jpg ./{filename} -o {filename.split(".")[0]}.jpg', shell=True)
验证
#展示生成器迭代的过程中都是同一个帧对象
gf=gen_foo()
#存为变量,不然迭代结束后改属性会清空
gi_frame=gf.gi_frame
#保存所有迭代的结果
frames=list(gf)
print(gf.gi_frame)#None
for f in frames:
print(f is gi_frame)#True
结论:生成器迭代时使用同一个gi_frame帧来保存状态
生成器的frame对象在暂停状态下看不到调用的关系图
总结:
- 生成器函数不直接运行,而是借助于生成器对象来间接运行
- 创建生成器对象同时创建帧对象,并且由生成器对象保持引用
- 每次使用next()调用生成器时,就是将生成器引用的帧对象入栈
- 当next()返回时,也就是代码遇到yield暂停时,就是将帧出栈
- 直到迭代结束,帧最后一次出栈,并销毁
简言之:
1.创建生成器对象时同时创建帧对象gi_frame,每次调用生成器时拿着同一个帧对象入栈遇到yield时帧对象出栈,帧对象最后一次出栈后自动销毁。
2.调用next()是帧对象入栈,yield使帧对象出栈
def gen_frame_graph(filename):
for _ in range(3):
#运行时生成图形
graph=show_refs(inspect.currentframe(),filename=filename)
yield graph
#定义两个普通函数便于观察栈的变化
def func1(g):
return next(g)
def func2(g):
return next(g)
filename='gi_frame1.dot'
gfg=gen_frame_graph(filename)
hh=func1(gfg)
subprocess.run(f'dot -T jpg ./{filename} -o {filename.split(".")[0]}.jpg', shell=True)
filename='gi_frame2.dot'
gfg=gen_frame_graph(filename)
hh=func2(gfg)
subprocess.run(f'dot -T jpg ./{filename} -o {filename.split(".")[0]}.jpg', shell=True)
同步和异步
普通函数:
- 调用函数:构建帧对象并入栈
- 函数执行结束:帧对象出栈并销毁
def sync_task_runner():
task_a()
task_b() # 只有tsk_a执行完毕才开始执行
生成器函数:
- 创建生成器:构建帧对象
- 多次通过next触发执行帧入栈
- 多次遇到yield帧出栈(保留)
- 迭代结束:帧出栈并销毁
从生成器到协程
生成器对象是一个用来迭代执行生成器函数的迭代器
让一个函数可以迭代运行其中代码才是生成器对象最根本作用,而不仅是字面意义上的生成数据的东西
迭代产出数据只是迭代代码的自然结果
当用生成器来实现迭代器时,关注的重点是yield value返回出来的数据
如果把焦点集中到被迭代执行的代码上,就能对生成器有全新的视角——协程
协程
对比generator和coroutine
def generator_func():
yield
gen=generator_func()
print(gen)
print(sorted(set(dir(gen))-set(dir(object))))
async def coroutine_func():
await coroutine_func()
coro=coroutine_func()
print(coro)
print(sorted(set(dir(coro))-set(dir(object))))
"""
继承:
['__del__', '__name__', '__qualname__', 'close','send', 'throw']
改名:
'gi_code' > cr_code
'gi_frame'>cr_frame
'gi_running'>cr_running
基于生成器的协程取消:
'gi_yieldfrom' > 改成coro的cr_await
__iter__ > 改成coro的__await__
__next__ coro不需要了
"""
yield表达式
表达式可被解析为一个值
x=yield
y=yield+1
print((yield))
yield表达式如何获取值
def show_yield_val():
x=yield
print(f'x is {x}')
g=show_yield_val()
next(g)#None
next(g)#Stop...
为生成器增加一个send(),该方法可以接受一个入参
send方法将参数送给生成器,使生成器恢复运行同时,将该入参作为yield表达式的值
def show_yield_val():
print('开始')
x=yield
print(f'x is {x}')
g=show_yield_val()
# g.send('hi')#第一次只能是None
g.send(None)#第一次只能是None
# TypeError: can't send non-None value to a just-started generator
g.send('hi')
对于刚创建的生成器,总是要在第一次send(None),使其运行到yield地方暂停,这个步骤术语为prime
可以称为激活=prime
yield表达式的优先级
def add_yield_val():
#x=yield+1#bug!,相当于下面句子
#x=(yield 1)
x=(yield)+1
print(f'x is {x}')
g=add_yield_val()
g.send(None)#prime
g.send(1)
send()用法
- send是生成器对象的方法
- 对于生成器对象g,next(g)等价于g.send(None)
- 只有当生成器处在暂停状态时,才能传入非None的值 解释了为什么有激活步骤
- send方法是为了协程而增加的API,所以
- 如果将生成器视作协程,就应该只用send方法
- 如果视作迭代器,就仍用next
所以,后面统一使用g.send(None)方式,不再使用next(g)方式
yield表达式作为函数入参
def gen_echo():
while True:
print((yield))
echo=gen_echo()
echo.send(None) # 激活
for action in 'hello':
echo.send(action) # 相当于next(echo)
echo.send(StopIteration('')) # 传进去什么就会打印什么
echo.close()
使用close()结束生成器
当生成器作为迭代器来用时,它的生命周期取决于有多少元素可迭代
当作协程来用时,通常可视作是在执行一个任务,希望任务的终止能够变得可控
新增的close方法就是用来结束一个协程
如果协程代码比较复杂,可能需要在结束时做一些善后处理,比如释放资源等
类似于StopIteration的实现机制,结束协程也靠异常来实现
def gen_echo_v2():
while True:
try:
x = yield # g.send(1)将1入栈,遇到yield1就出栈,用x来接这个结果
except GeneratorExit: # 协程里用的也是这个异常!栈空了就走这里销毁栈帧
print('exit')
# 不结束导致报错:RuntimeError: generator ignored GeneratorExit
return # 必须要结束,或者用break,必须结束不能回到yield,不结束又去取数对已销毁的栈帧取数引发错误
else:
print(x)
g=gen_echo_v2()
g.send(None)
# g.send(1)
# g.close()
# 以下两种都表示垃圾回收
# del g # 显式删除
g=11 # 重新赋值也会触发垃圾回收,表示上面的生成器对象不要了,这时它的栈帧肯定是要销毁的,因此先走一遍出栈销毁的过程
使用throw()将异常抛给yield
def gen_echo_v3():
while True:
try:
x=yield # 出数据=yield
except GeneratorExit:
print('exit')
break
except KeyboardInterrupt:
print('按下了Ctrl-C')
else:
print(x)
g=gen_echo_v3()
g.send(None)
# g.throw(KeyboardInterrupt)
# g.send(2)#生成器还是可用的
g.throw(RuntimeError)#没办法处理的异常
g.send(2)#进入Stop...
# yield也可以接收异常
协程的几个功能点
def coro_average():
"""计算移动平均值"""
count=0
total=0
avg=None
while True:
try:
val=yield avg # 出数据=yield 收数据
except GeneratorExit:
return total,count,avg
else:
total+=val
count+=1
avg=total/count
g=coro_average()
g.send(None)
arr=[g.send(i) for i in range(10)]
print(arr)
# 0 1 2 3 4 5
# 0 .5 1 1.5 2 2.5
1.在yield的位置产出数据
2.在yield的位置暂停
3.在yield的位置恢复,并接受新的参数
4.在yield的位置传入结束信号
5.在yield的位置传入其他异常
基于生成器的协程
基于生成器的协程指的是基于yield from创建的生成器,并且还要搭配asyncio.coroutine
装饰器来使用
生成器的3种模式
- pull:特点在于能不断产出数据,也就是迭代器
- push:特点在于能不断向生成器发送数据,比如计算移动平均的例子,是早期的协程
- task:任务式(asyncio的协程)
data vs event
- pull/push都是受数据驱动
- task是受事件驱动
def pull_style():
while still_have_data:
yield data
def push_style():
while still_have_data:
input_data=yield output_data
def task_style():
??=yield ??
yield是函数出栈再到入栈的过程
socket变得可读就是一个事件
那么event是如何运作的?
事件通常都是通过回调函数来处理的
import selectors
call_later(3, func)
register(sock, selectors.EVENT_READ,read)#EVENT_READ表示ev可读
灵魂发问
- 为啥要让出yield执行权(也就是出栈)?
- 遇到什么样的事件要yield?
- 在出栈前如何设置事件(回调)?
- 凭啥恢复执行(也就是入栈)?
- 是谁促成了事件的发生?
- 是谁(感知到了事件的发生)让出栈的协程再次入栈(也就是说,谁来调用send)?
大家都说有了协程不用写回调函数,又是怎么回事?
yield from语法
yield from入门例子
yield from语法具体实现
result=yield from expr
,expr是可迭代对象,看作是生成器即可
# yield from实现:
_i=iter(expr)#__iter__
while True:#不遇到StopIteration不算完
try:
_y=_i.send(None)#总是None,无所谓prime
except StopIteration as _e:
_r=_e.value
break
else:
yield _y#照原样yield出去,不再接受send传入的值,因为总是None
result=_r#StopIteration带出来的值就是结果
当然以上是简化版,完整版需要考虑异常,不过我们可以改写成可以运行的版本:
def yield_from(expr):
_i = iter(expr) # __iter__
while True: # 不遇到StopIteration不算完
try:
_y = next(_i)
except StopIteration as _e:
_r = _e.value
break
else:
yield _y # 照原样yield出去,不再接受send传入的值,因为总是None
result = _r
return result
# ex=[1,2,3,4]
ex = '1234'
ret = yield_from(ex)
for i in ret:
print(i)
定义一个任务
一个同步模式的简单任务
def one_task():
print(f'begin task')
...#其它步骤
print(f' begin big_step')
big_result = big_step()
print(f' end big_step with {big_result}')
...
print(f'end task')
def big_step():
...#其它小步骤
print(f' begin small_step')
small_result = small_step()
print(f' end samall_step with {small_result}')
...
return small_result * 1000
def small_step():
print(f' 努力工作中')
return 123
one_task()
改进
def one_task():
print(f'begin task')
... # 其它步骤
print(f' begin big_step')
big_coro = big_step()
while True:
try:
x = big_coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
big_result = e.value # StopIteration的value就是返回值
break
else:
# 阻塞过程
func,arg=x
func(arg)
print(f' end big_step with {big_result}')
...
print(f'end task')
def big_step():
... # 其它小步骤
print(f' begin small_step')
small_coro = small_step() # 改为协程
while True:
try:
x = small_coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
small_result = e.value # StopIteration的value就是返回值
break
else:
yield x # 当前解决不了问题,再向上抛
print(f' end samall_step with {small_result}')
...
return small_result * 1000
import time
from time import sleep
def small_step():
print(' 休息一下马上回来')
t1 = time.time()
yield sleep, 2 # 模拟阻塞,一加yield调用它的也要改变
assert time.time() - t1 > 2, '睡眠时间不足'
print(f' 努力工作中')
return 123
one_task()
阶段性总结
- 协程自己不能消除阻塞,只会将阻塞发生地方向上移动
- 最终阻塞仍然要被解决
改用yield from的协程化改造!
def one_task():
print(f'begin task')
... # 其它步骤
print(f' begin big_step')
big_result = yield from big_step()
print(f' end big_step with {big_result}')
...
print(f'end task')
def big_step():
... # 其它小步骤
print(f' begin small_step')
small_result = yield from small_step()
print(f' end samall_step with {small_result}')
...
return small_result * 1000
import time
from time import sleep
def small_step():
print(' 休息一下马上回来')
t1 = time.time()
# 两次yield
yield from YieldFromable((sleep, 2)) # 模拟阻塞,yiedl有传染性,一加yield调用它的也要改变
assert time.time() - t1 > 2, '睡眠时间不足'
print(f' 努力工作中')
return 123
class YieldFromable: # 包裹可迭代对象
def __init__(self, value):
self.value = value
def __iter__(self): # 传递出去
yield self
class Task: # 通用任务驱动器
def __init__(self, coro):
self.coro = coro
def run(self):
while True:
try:
x = self.coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
result = e.value # StopIteration的value就是返回值
break
else:
assert isinstance(x, YieldFromable)
# 阻塞发生在这里!
func, arg = x.value
func(arg)
return result
t = Task(one_task())
t.run()
做了两点改进:
- 用yield from语法改写
- 增加了可迭代对象的包裹类和通用任务驱动器
阶段总结
- 主动协程是最先出栈的位置
- 被动协程可能有很多层
- yield from大大简化被动协程的编码
asyncio
对上面最后一个版本做以下替换后,仍然可正常运行
1.将yield from替换为await (只是更换关键字罢了)
2.__iter__
替换为__await__
(只是更换关键字罢了)
3.包含await的函数替换为async def
可不用加,但是新语法要求必须加,好处是可读性更好
4.将YieldFromable改名为Awaitable (不是必要的)
两种awaitable
- await coro
- 实现了
__await__
方法的类
asyncio有两个Awaitable:Future和Task,Task继承自Future
Future核心代码
class Future:
def __await__(self):
if not self.done():
yield self
return self.result()
__iter__ = __await__ # 这里看出__await__就是__iter__
注意:yield具有主动出栈的能力
改写后的代码
async def one_task():
print(f'begin task')
... # 其它步骤
print(f' begin big_step')
big_result = await big_step()
print(f' end big_step with {big_result}') # 顶层的值通过异常StopIteration传递出来
...
print(f'end task')
async def big_step():
... # 其它小步骤
print(f' begin small_step')
small_result = await small_step()
print(f' end samall_step with {small_result}')
...
return small_result * 1000
import time
from time import sleep
async def small_step():
print(' 休息一下马上回来')
t1 = time.time()
# 两次yield
await Awaitable((sleep, 2)) # 模拟阻塞,yiedl有传染性,一加yield调用它的也要改变
assert time.time() - t1 > 2, '睡眠时间不足'
print(f' 努力工作中')
return 123
class Awaitable: # 包裹可迭代对象
def __init__(self, value):
self.value = value
def __await__(self): # 传递出去
yield self
class Task: # 通用任务驱动器
def __init__(self, coro):
self.coro = coro
self._done = False
self._result = None # 存放结果
def run(self):
print('------0-------')
if not self._done: # 任务没结束才可进来
try:
x = self.coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
self._result = e.value # StopIteration的value就是返回值
self._done = True
else:
assert isinstance(x, Awaitable)
# 阻塞发生在这里! 把执行权让出来
# func, arg = x.value
# func(arg)
else: # 其它情况下进来应该报错
print('task is done.')
print('-------1------')
if __name__ == '__main__':
t = Task(one_task())
t.run()
for _ in range(10):
print('doing something...')
sleep(.2)
t.run()
自制事件循环
第一个版本:
import collections
import heapq
import time
from time import sleep
import random
class EventLoop: # 任务调度器
def __init__(self):
self._ready = collections.deque()
self._scheduled = [] # 定时任务,要保证最先开始的总在最前面,中间出栈入栈可能顺序会乱掉,每次排序会影响性能,考虑用堆!
self._stopping = False
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args)) # 先开始的定时任务先进去,取堆顶
def stop(self):
self._stopping = True
def run_forever(self):
while True:
self.run_once() # 保证执行一轮
if self._stopping:
break
def run_once(self):
now = time.time()
if self._scheduled: # 看下有没有定时任务
if self._scheduled[0][0] < now: # 这里为了简便,没有考虑多个定时任务同时满足条件的情况,只取第一个定时任务执行
_, cb, args = heapq.heappop(self._scheduled)
self._ready.append((cb, args))
num = len(self._ready)
for i in range(num):
cb, args = self._ready.popleft()
cb(*args) # 执行准备好的任务
async def one_task():
print(f'begin task')
... # 其它步骤
print(f' begin big_step')
big_result = await big_step()
print(f' end big_step with {big_result}') # 顶层的值通过异常StopIteration传递出来
...
print(f'end task')
async def big_step():
... # 其它小步骤
print(f' begin small_step')
small_result = await small_step()
print(f' end samall_step with {small_result}')
...
return small_result * 1000
async def small_step():
print(' 休息一下马上回来')
t1 = time.time()
# 两次yield Awaitable(sleep, 2)
await Awaitable((sleep, 2)) # 模拟阻塞,yiedl有传染性,一加yield调用它的也要改变
assert time.time() - t1 > 2, '睡眠时间不足'
print(f' 努力工作中')
return 123
class Awaitable: # 包裹可迭代对象
def __init__(self, value):
self.value = value
def __await__(self): # 传递出去
yield self
# return self.value
class Task: # 通用任务驱动器
def __init__(self, coro):
self.coro = coro
self._done = False
self._result = None # 存放结果
def run(self):
print('------0-------')
if not self._done: # 任务没结束才可进来
try:
x = self.coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
self._result = e.value # StopIteration的value就是返回值
self._done = True
else:
assert isinstance(x, Awaitable)
# 阻塞发生在这里! 把执行权让出来
# func, arg = x.value
# func(arg)
else: # 其它情况下进来应该报错
print('task is done.')
print('-------1------')
if __name__ == '__main__':
loop = EventLoop()
t = Task(one_task())
loop.call_soon(t.run) # 任务塞进去
loop.call_later(2, t.run)
loop.call_later(2.1,loop.stop)
loop.run_forever()
做了这几件事情:
- 自制事件循环
- 即时任务用队列管理
- 定时任务用堆管理
- 时间到了将定时任务放到即时任务队列中去
- 任务驱动器里把阻塞操作交给事件循环调度
第二个版本:
import collections
import heapq
import itertools
import time
from time import sleep
import random
task_id_counter = itertools.count(1) # 自增数据生成器
class EventLoop: # 任务调度器
def __init__(self):
self._ready = collections.deque()
self._scheduled = [] # 定时任务,要保证最先开始的总在最前面,中间出栈入栈可能顺序会乱掉,每次排序会影响性能,考虑用堆!
self._stopping = False
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args)) # 先开始的定时任务先进去,取堆顶
def stop(self):
self._stopping = True
def run_forever(self):
while True:
self.run_once() # 保证执行一轮
if self._stopping:
break
def run_once(self):
now = time.time()
if self._scheduled: # 看下有没有定时任务
if self._scheduled[0][0] < now: # 这里为了简便,没有考虑多个定时任务同时满足条件的情况,只取第一个定时任务执行
_, cb, args = heapq.heappop(self._scheduled)
self._ready.append((cb, args))
num = len(self._ready)
for i in range(num):
cb, args = self._ready.popleft()
cb(*args) # 执行准备好的任务
class Awaitable: # 包裹可迭代对象
def __init__(self, value):
self.value = value
def __await__(self): # 传递出去
yield self
# return self.value
class Task: # 通用任务驱动器
def __init__(self, coro):
self.coro = coro
self._done = False
self._result = None # 存放结果
self._id = f'Task-{next(task_id_counter)}'
def run(self):
print(f'-----{self._id}--------')
if not self._done: # 任务没结束才可进来
try:
x = self.coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
self._result = e.value # StopIteration的value就是返回值
self._done = True
else:
assert isinstance(x, Awaitable)
loop.call_later(x.value, self.run) # 让事件循环帮我们调度耗时任务
else: # 其它情况下进来应该报错
print('task is done.')
print(f'-------{self._id}------')
async def one_task():
print(f'begin task')
... # 其它步骤
print(f' begin big_step')
big_result = await big_step()
print(f' end big_step with {big_result}') # 顶层的值通过异常StopIteration传递出来
...
print(f'end task')
async def big_step():
... # 其它小步骤
print(f' begin small_step')
small_result = await small_step()
print(f' end samall_step with {small_result}')
...
return small_result * 1000
async def small_step():
print(' 休息一下马上回来')
t1 = time.time()
sleep_time = random.random()
await Awaitable(sleep_time) # 模拟阻塞,yiedl有传染性,一加yield调用它的也要改变
assert time.time() - t1 > sleep_time, '睡眠时间不足'
print(f' 努力工作中')
return sleep_time
if __name__ == '__main__':
loop = EventLoop()
for i in range(10):
t = Task(one_task())
loop.call_soon(t.run) # 任务塞进去
loop.call_later(1, loop.stop)
loop.run_forever()
Future
以上是主动阻塞,但是正常情况都是被动阻塞的通常都希望拿到一个值,要解决
import collections
import heapq
import itertools
import time
from time import sleep
import random
task_id_counter = itertools.count(1) # 自增数据生成器
class EventLoop: # 任务调度器
def __init__(self):
self._ready = collections.deque()
self._scheduled = [] # 定时任务,要保证最先开始的总在最前面,中间出栈入栈可能顺序会乱掉,每次排序会影响性能,考虑用堆!
self._stopping = False
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args)) # 先开始的定时任务先进去,取堆顶
def stop(self):
self._stopping = True
def run_forever(self):
while True:
self.run_once() # 保证执行一轮
if self._stopping:
break
def run_once(self):
now = time.time()
if self._scheduled: # 看下有没有定时任务
if self._scheduled[0][0] < now: # 这里为了简便,没有考虑多个定时任务同时满足条件的情况,只取第一个定时任务执行
_, cb, args = heapq.heappop(self._scheduled)
self._ready.append((cb, args))
num = len(self._ready)
for i in range(num):
cb, args = self._ready.popleft()
cb(*args) # 执行准备好的任务
class Future: # 未来才会发生的结果
def __init__(self):
self._result = None # 创建时是不知道结果的,结果可能就是None
self._done = False # 标志位,表示还没拿到result
self._callbacks = []
def set_result(self, result):
if self._done:
raise RuntimeError('future already done') # 值只能设置一次
self._result = result
self._done = True
for cb in self._callbacks:
cb() # 通常要把自己传进去: cb(self)
def result(self):
if self._done:
return self._result
else:
raise RuntimeError('future is not done')
def add_done_callback(self, callback):
self._callbacks.append(callback)
def __await__(self): # 传递出去
yield self
return self.result() # await的结果通过return捕获的
class Task: # 通用任务驱动器
def __init__(self, coro):
self.coro = coro
self._done = False
self._result = None # 存放结果
self._id = f'Task-{next(task_id_counter)}'
def run(self):
print(f'-----{self._id}--------')
if not self._done: # 任务没结束才可进来
try:
x = self.coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
self._result = e.value # StopIteration的value就是返回值
self._done = True
else:
assert isinstance(x, Future)
# task的任务是我何时能恢复执行?
x.add_done_callback(self.run) # task在拿到值之后才去恢复的
else: # 其它情况下进来应该报错
print('task is done.')
print(f'-------{self._id}------')
async def one_task():
print(f'begin task')
... # 其它步骤
print(f' begin big_step')
big_result = await big_step()
print(f' end big_step with {big_result}') # 顶层的值通过异常StopIteration传递出来
...
print(f'end task')
async def big_step():
... # 其它小步骤
print(f' begin small_step')
small_result = await small_step()
print(f' end samall_step with {small_result}')
...
return small_result * 1000
async def small_step():
print(' 休息一下马上回来')
global loop # 这里这么做是为了简化处理
fut = Future()
# 指派一个目标来执行set_result
loop.call_later(random.random(), fut.set_result, 123) # 这里值是确定,这是后面要改进之处!
ret = await fut # 模拟阻塞,yiedl有传染性,一加yield调用它的也要改变
print(f' 努力工作中')
return ret
if __name__ == '__main__':
loop = EventLoop()
for i in range(5):
t = Task(one_task())
loop.call_soon(t.run) # 任务塞进去
loop.call_later(1, loop.stop)
loop.run_forever()
存在问题:值是已知和确定的,希望更贴合实际
import collections
import heapq
import itertools
import threading
import time
from time import sleep
import random
task_id_counter = itertools.count(1) # 自增数据生成器
class EventLoop: # 任务调度器
def __init__(self):
self._ready = collections.deque()
self._scheduled = [] # 定时任务,要保证最先开始的总在最前面,中间出栈入栈可能顺序会乱掉,每次排序会影响性能,考虑用堆!
self._stopping = False
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args)) # 先开始的定时任务先进去,取堆顶
def stop(self):
self._stopping = True
def run_forever(self):
while True:
self.run_once() # 保证执行一轮
if self._stopping:
break
def run_once(self):
now = time.time()
if self._scheduled: # 看下有没有定时任务
if self._scheduled[0][0] < now: # 这里为了简便,没有考虑多个定时任务同时满足条件的情况,只取第一个定时任务执行
_, cb, args = heapq.heappop(self._scheduled)
self._ready.append((cb, args))
num = len(self._ready)
for i in range(num):
cb, args = self._ready.popleft()
cb(*args) # 执行准备好的任务
class Future: # 未来才会发生的结果
def __init__(self):
self._result = None # 创建时是不知道结果的,结果可能就是None
self._done = False # 标志位,表示还没拿到result
self._callbacks = []
def set_result(self, result):
if self._done:
raise RuntimeError('future already done') # 值只能设置一次
self._result = result
self._done = True
for cb in self._callbacks:
cb() # 通常要把自己传进去: cb(self)
def result(self):
if self._done:
return self._result
else:
raise RuntimeError('future is not done')
def add_done_callback(self, callback):
self._callbacks.append(callback)
def __await__(self): # 传递出去
yield self
return self.result() # await的结果通过return捕获的
class Task: # 通用任务驱动器
def __init__(self, coro):
self.coro = coro
self._done = False
self._result = None # 存放结果
self._id = f'Task-{next(task_id_counter)}'
def run(self):
print(f'-----{self._id}--------')
if not self._done: # 任务没结束才可进来
try:
x = self.coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
self._result = e.value # StopIteration的value就是返回值
self._done = True
else:
assert isinstance(x, Future)
# task的任务是我何时能恢复执行?
x.add_done_callback(self.run) # task在拿到值之后才去恢复的
else: # 其它情况下进来应该报错
print('task is done.')
print(f'-------{self._id}------')
async def one_task():
print(f'begin task')
... # 其它步骤
print(f' begin big_step')
big_result = await big_step()
print(f' end big_step with {big_result}') # 顶层的值通过异常StopIteration传递出来
...
print(f'end task')
async def big_step():
... # 其它小步骤
print(f' begin small_step')
small_result = await small_step()
print(f' end samall_step with {small_result}')
...
return small_result * 1000
async def small_step():
print(' 休息一下马上回来')
global loop
fut = Future()
# 指派一个目标来执行set_result
fake_io_read(fut)
ret = await fut # 模拟阻塞,yiedl有传染性,一加yield调用它的也要改变
print(f' 努力工作中')
return ret
def fake_io_read(future):
def read():
sleep(random.random()) # IO阻塞,sleep不能发生在当前线程,否则仍然会阻塞,启一个线程!
future.set_result(random.randint(1, 100))
threading.Thread(target=read).start()
if __name__ == '__main__':
loop = EventLoop()
for i in range(5):
t = Task(one_task())
loop.call_soon(t.run) # 任务塞进去
loop.call_later(1, loop.stop)
loop.run_forever()
整理代码
import collections
import heapq
import itertools
import threading
import time
from time import sleep
import random
task_id_counter = itertools.count(1) # 自增数据生成器
class EventLoop: # 任务调度器
def __init__(self):
self._ready = collections.deque()
self._scheduled = [] # 定时任务,要保证最先开始的总在最前面,中间出栈入栈可能顺序会乱掉,每次排序会影响性能,考虑用堆!
self._stopping = False
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args)) # 先开始的定时任务先进去,取堆顶
def stop(self):
self._stopping = True
def run_forever(self):
while True:
self.run_once() # 保证执行一轮
if self._stopping:
break
def run_once(self):
now = time.time()
if self._scheduled: # 看下有没有定时任务
if self._scheduled[0][0] < now: # 这里为了简便,没有考虑多个定时任务同时满足条件的情况,只取第一个定时任务执行
_, cb, args = heapq.heappop(self._scheduled)
self._ready.append((cb, args))
num = len(self._ready)
for i in range(num):
cb, args = self._ready.popleft()
cb(*args) # 执行准备好的任务
class Future: # 未来才会发生的结果
def __init__(self):
# 改进点:
# 1.asyncio里Future和eventloop要挂钩的
# 挂钩的原因:cb不是自己执行而是要交给loop执行
global loop # 这里不引入变量了
self._loop = loop
self._result = None # 创建时是不知道结果的,结果可能就是None
self._done = False # 标志位,表示还没拿到result
self._callbacks = []
def set_result(self, result):
if self._done:
raise RuntimeError('future already done') # 值只能设置一次
self._result = result
self._done = True
for cb in self._callbacks:
# cb() # 通常要把自己传进去: cb(self)
# 2.asyncio里cb交给事件循环来执行
# 所有东西都是交给eventloop来执行的
self._loop.call_soon(cb)
def result(self):
if self._done:
return self._result
else:
raise RuntimeError('future is not done')
def add_done_callback(self, callback):
self._callbacks.append(callback)
def __await__(self): # 传递出去
yield self
return self.result() # await的结果通过return捕获的
class Task(Future): # 通用任务驱动器
def __init__(self, coro):
super().__init__()
self.coro = coro
self._id = f'Task-{next(task_id_counter)}'
# 3.交给eventloop让它跑起来
self._loop.call_soon(self.run)
def run(self):
print(f'-----{self._id}--------')
if not self._done: # 任务没结束才可进来
try:
x = self.coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
self.set_result(e.value)
else:
assert isinstance(x, Future)
# task的任务是我何时能恢复执行?
x.add_done_callback(self.run) # task在拿到值之后才去恢复的
else: # 其它情况下进来应该报错
print('task is done.')
print(f'-------{self._id}------')
async def one_task():
print(f'begin task')
... # 其它步骤
print(f' begin big_step')
big_result = await big_step()
print(f' end big_step with {big_result}') # 顶层的值通过异常StopIteration传递出来
...
print(f'end task')
async def big_step():
... # 其它小步骤
print(f' begin small_step')
small_result = await small_step()
print(f' end samall_step with {small_result}')
...
return small_result * 1000
async def small_step():
print(' 休息一下马上回来')
global loop
fut = Future()
# 指派一个目标来执行set_result
fake_io_read(fut)
ret = await fut # 模拟阻塞,yiedl有传染性,一加yield调用它的也要改变
print(f' 努力工作中')
return ret
def fake_io_read(future):
def read():
sleep(random.random()) # IO阻塞,sleep不能发生在当前线程,否则仍然会阻塞,启一个线程!
future.set_result(random.randint(1, 100))
threading.Thread(target=read).start()
if __name__ == '__main__':
loop = EventLoop()
for i in range(5):
t = Task(one_task())
loop.call_later(1, loop.stop)
loop.run_forever()
最后一个版本:
import collections
import heapq
import itertools
import threading
import time
from time import sleep
import random
task_id_counter = itertools.count(1) # 自增数据生成器
class EventLoop: # 任务调度器
def __init__(self):
self._ready = collections.deque()
self._scheduled = [] # 定时任务,要保证最先开始的总在最前面,中间出栈入栈可能顺序会乱掉,每次排序会影响性能,考虑用堆!
self._stopping = False
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args)) # 先开始的定时任务先进去,取堆顶
def stop(self):
self._stopping = True
def run_forever(self):
while True:
self.run_once() # 保证执行一轮
if self._stopping:
break
def run_once(self):
now = time.time()
if self._scheduled: # 看下有没有定时任务
if self._scheduled[0][0] < now: # 这里为了简便,没有考虑多个定时任务同时满足条件的情况,只取第一个定时任务执行
_, cb, args = heapq.heappop(self._scheduled)
self._ready.append((cb, args))
num = len(self._ready)
for i in range(num):
cb, args = self._ready.popleft()
cb(*args) # 执行准备好的任务
class Future: # 未来才会发生的结果
def __init__(self):
# 改进点:
# 1.asyncio里Future和eventloop要挂钩的
# 挂钩的原因:cb不是自己执行而是要交给loop执行
global loop # 这里不引入变量了
self._loop = loop
self._result = None # 创建时是不知道结果的,结果可能就是None
self._done = False # 标志位,表示还没拿到result
self._callbacks = []
def set_result(self, result):
if self._done:
raise RuntimeError('future already done') # 值只能设置一次
self._result = result
self._done = True
for cb in self._callbacks:
# cb() # 通常要把自己传进去: cb(self)
# 2.asyncio里cb交给事件循环来执行
# 所有东西都是交给eventloop来执行的
self._loop.call_soon(cb)
def result(self):
if self._done:
return self._result
else:
raise RuntimeError('future is not done')
def add_done_callback(self, callback):
self._callbacks.append(callback)
def __await__(self): # 传递出去
yield self
return self.result() # await的结果通过return捕获的
class Task(Future): # 通用任务驱动器
def __init__(self, coro):
super().__init__()
self.coro = coro
self._id = f'Task-{next(task_id_counter)}'
# 3.交给eventloop让它跑起来
self._loop.call_soon(self.run)
self._start_time = time.time()
def run(self):
print(f'-----{self._id}--------')
if not self._done: # 任务没结束才可进来
try:
x = self.coro.send(None) # 跑起来,send目的是要拿到值
except StopIteration as e: # 没数据了
self.set_result(e.value)
global total_block_time
total_block_time += time.time() - self._start_time # 看下阻塞了多久
else:
assert isinstance(x, Future)
# task的任务是我何时能恢复执行?
x.add_done_callback(self.run) # task在拿到值之后才去恢复的
else: # 其它情况下进来应该报错
print('task is done.')
print(f'-------{self._id}------')
async def one_task():
print(f'begin task')
... # 其它步骤
print(f' begin big_step')
big_result = await big_step()
print(f' end big_step with {big_result}') # 顶层的值通过异常StopIteration传递出来
...
print(f'end task')
async def big_step():
... # 其它小步骤
print(f' begin small_step')
small_result = await small_step()
print(f' end samall_step with {small_result}')
...
return small_result * 1000
async def small_step():
print(' 休息一下马上回来')
global loop
fut = Future()
# 指派一个目标来执行set_result
fake_io_read(fut)
ret = await fut # 模拟阻塞,yiedl有传染性,一加yield调用它的也要改变
print(f' 努力工作中')
return ret
def fake_io_read(future):
def read():
sleep(random.random()) # IO阻塞,sleep不能发生在当前线程,否则仍然会阻塞,启一个线程!
future.set_result(random.randint(1, 100))
threading.Thread(target=read).start()
def until_all_done(tasks):
tasks = [t for t in tasks if not t._done] # 所有任务都完毕了才结束
if tasks:
loop.call_soon(until_all_done, tasks)
else:
loop.stop()
if __name__ == '__main__':
loop = EventLoop()
total_block_time = 0
start_time = time.time()
all_tasks = [Task(one_task()) for i in range(1000)]
loop.call_later(.9, until_all_done, all_tasks)
loop.run_forever()
print(total_block_time, time.time() - start_time) # 总共阻塞时间之和,耗时
实际测试结果表明:开1w协程去产生随机数据这件事在我们自己写的asyncio实现上耗时约3.5s,还有巨大的改进空间。不过我们当初定下的手写asyncio库的目的差不多已经实现了,先这样吧!
标签:__,协程库,self,yield,剖析,._,print,def,asyncio From: https://www.cnblogs.com/aloha-72pierre/p/17873554.html