上下文什么意思 ?
参考博客 https://blog.csdn.net/caizir913/article/details/108826764
上下文是针对中断来体现其具体含义的
在内核设计者的眼中,当一个任务在中断时,CPU会去执行中断对应的任务。中断结束后,
再执行之前的task时,原有任务的相关数据(在处理原任务所需要的数据)需要保存下来,
否则无法继续执行原有任务。如果把相关数据记录到一个变量里。那这个变量就可以称为原task的上下文了。
通俗的理解,上下文,也就是执行任务所需要的相关信息。
这个任务可以是一段代码,一个线程,一个进程,一个函数。
当这个“任务”,相关信息需要保存下来,就可以使用Context来记录了。
只要想有个"对象"来保存相关信息,这个"对象"就可以叫上下文了。
协程
单线程下实现并发效果,程序层面遇到io,控制任务的切换
或者简单点理解为,在单线程下,运行一个函数当出现io的时候,
会自动切换去运行函数,就是可以实现在不同函数间的来回切换运行的操作
greenlet实现协程
from greenlet import greenlet
def func1():
print(1) #2 打印
gr2.switch() #3 切换到执行func2函数
print(3) #6 打印
gr2.switch() #7 切换到执行func2函数
def func2():
print(2) #4 打印
gr1.switch() #5 切换到执行func1函数
print(4) #8 打印
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() #1 执行func1函数
"""
1
2
3
4
"""
gevent实现协程
import time
from gevent import monkey;monkey.patch_all()
# 固定编写 用于检测所有的IO操作(猴子补丁)
from gevent import spawn
def func1():
print('func1 running')
time.sleep(3)
print('func1 over')
def func2():
print('func2 running')
time.sleep(5)
print('func2 over')
if __name__ == '__main__':
start_time = time.time()
# func1()
# func2()
s1 = spawn(func1) # 检测代码 一旦有IO自动切换(执行没有io的操作,变向的等待io结束)
s2 = spawn(func2)
s1.join()
s2.join()
print(time.time() - start_time) # 8.01237154006958 协程 5.015487432479858
yield关键字实现协程
def func1():
yield 1
yield from func2() # 切换去运行func2函数
yield 4
def func2():
yield 2
yield 3
f1 = func1()
for i in f1:
print(i)
"""
1
2
3
4
"""
老版asyncio实现协程
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(1) # 遇到io,自动切换到tasks列表中的其他任务,并运行
print(3)
@asyncio.coroutine
def func2():
print(2)
yield from asyncio.sleep(2) # 遇到io,自动切换到tasks列表中的其他任务,并运行
print(4)
tasks = [
asyncio.ensure_future(func1()),asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_complete(asyncio.wait(tasks))
# 注意 用了asyncio模块后,遇到io后,会自动切换,不需要像greenlet那样手动切换了
"""
1
2
3
4
"""
python3.5以后 asyncio实现协程
# 就是把老版的 @asyncio.coroutine装饰器换成了async关键字 以及把yield from换成 await 就行了
# asyncio.ensure_future(func1()) 新版可以替换成asyncio.create_task(func1())
import asyncio
async def func1():
print(1)
await asyncio.sleep(1) # 遇到io,自动切换到tasks列表中的其他任务,并运行
print(3)
async def func2():
print(2)
await asyncio.sleep(2) # 遇到io,自动切换到tasks列表中的其他任务,并运行
print(4)
#----------------------写法1--------------------------------#
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(func1()),asyncio.ensure_future(func2())
]
loop.run_until_complete(asyncio.wait(tasks))
#----------------------------------------------------------#
#----------------------写法2--------------------------------#
async def main():
tasks = [asyncio.ensure_future(func1()),asyncio.ensure_future(func2())]
await asyncio.wait(tasks)
asyncio.run(main())
# 用run方法,不能直接 写完task列表后,直接asyncio.run(asyncio.wait(tasks))
# 要把tasks和asyncio.wait(tasks)代码放到main协程函数里面去
# 为什么要这样做? 因为asyncio.ensure_future(func1()) 是要把协程对象包装然后注册到事件循环的任务列表里的
# 但是在执行asyncio.ensure_future(func1())时,事件循环event_loop还没有创建了!!!
# 所以要先asyncio.run(main()) 把事件循环创出来才行
event loop 也叫做事件循环
事件循环是asyncio的核心,它负责调度和执行任务
事件循环可以理解为一个死循环,检测并执行某些代码
# 伪代码
任务列表 = [任务1,任务2,任务3,任务4,...]
while True:
可执行的任务列表 = [去任务列表中检查所有的任务,将可执行的任务放在该列表里]
已完成的任务列表 = [去任务列表中检查所有的任务,将已完成的任务放在该列表里]
for 就绪任务 in 可执行的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除已完成的任务
如果任务列表中的所有任务都已完成,则终止循环!!!
loop = asyncio.get_event_loop() # 生成事件循环
loop.run_until_complete(任务) # 将任务放到任务列表,等待:事件循环去调度执行该任务到完成
事件循环的任务列表里的最小单位是任务对象,
所以loop.run_until_complete(协程对象) 协程对象还是会被包裹成一个任务的
asyncio参考博客
https://www.cnblogs.com/xyztank/articles/17571804.html
快速上手
async def main1():
pass
# 函数在定义的时候左边有 async def 那么该函数就是协程函数
# 注意 在asyncio中,协程函数加括号不会立即执行,而是生成协程对象
# 如果想要运行协程函数内部的代码,必须要将协程对象交给事件循环来处理
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
res = main()
print(res) # <coroutine object main at 0x000001B6A54BE5C0>
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
loop = asyncio.get_event_loop()
# 获取默认的事件循环对象
loop.run_until_complete(main())
# 将协程对象作为一个任务放到事件循环的任务列表中,运行事件循环,等待直到指定的协程对象执行完毕
# 这行代码自身是一个阻塞代码,直到协程执行完毕,主线程才会继续执行下面的代码!!!
loop.close() # 关闭事件循环
# 需要注意的是,run_until_complete()方法接受一个可等待对象作为参数
# 可以是协程对象、任务对象或者Future对象。将可等待对象作为一个任务,注册到事件循环的任务列表里
# 它会持续运行事件循环,直到可等待对象执行完成。
# 在事件循环中,协程函数会按照调度规则进行执行。
# 当遇到await关键字时,协程对象的函数运行会暂时挂起,并将控制权让给其他协程。
# 当await后面的耗时操作完成后,事件循环会恢复被挂起的协程的执行。
-----------------------------------------------------------------
# 上面的event_loop三行代码可以简化为用run函数来代替
asyncio.run(main())
# 该函数做两件事
# 1 建立起事件循环
# 2 把协程对象包装成一个task,并把该任务,注册到事件循环的任务列表里面!!!
await关键字 重要!!!
# await关键字只能用在 协程函数里面
# 并且await关键字后面只能是可等待对象(协程对象,Future对象,task对象)
# 当运行到协程函数里面的 await asyncio.sleep(3) 时,会将代码执行控制权交还给事件循环
# 事件循环就可以去运行任务列表里,其他可执行的任务了
# 当await后面是协程对象的时候,虽然代码执行控制权,还是交还给事件循环,
# 但是这个时候事件循环不会去执行其他的任务,而是先去运行await后面的协程对象的函数代码!!!
# 同理await后面是task对象的时候,当前代码执行控制权还是先交还给事件循环,
# 然后控制权再交给任务对象去执行对应函数代码
import asyncio
import time
async def say_after(n, what):
await asyncio.sleep(n)
print(what)
async def main():
print(f"start at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
await say_after(3, 'haha')
print(f"finish at {time.strftime('%X')}")
asyncio.run(main())
#------------------------------------
# 结果为
"""
start at 13:21:55
hello
world
haha
finish at 13:22:01
"""
# 可以看到此时,出现io并没有切换,总共串行等了6s
# 也就是说在协程函数中,碰到await,并且await后面还是一个协程对象
# 对于当前的协程函数来说,执行控制权交还给事件循环,事件循环去执行await后面的协程对象的函数
# 运行say_after(1, 'hello')函数体代码,又碰到await asyncio.sleep(2)
# 还是一样执行控制权交还给事件循环,此时事件循环的任务列表里只有一个main()任务
# 想要继续执行main()任务里面的代码,必须要等待 await say_after(1, 'hello') 执行完
# 才能往下执行
# 由于上面的代码里面我们只将,协程对象main()变成一个任务,注册到了事件循环的列表里,
# 所以出现 await say_after(1, 'hello') 控制权给事件循环去执行say_after(1, 'hello')协程对象
# 然后在执行到await asyncio.sleep(1)时,控制权给事件循环,此时事件循环想去执行main()任务里面的代码
# 但是await say_after(1, 'hello') 还没有执行结束,所以只能继续等
# 当await asyncio.sleep(1) 睡完了,执行完print('hello')后,协程函数运行完了
# 这个时候代码控制权才交给main协程函数,继续执行await say_after(2, 'world')
# 然后依次类推
# 总结:
"""
协程函数里面有await关键字的时候,协程函数的执行控制权是会交给事件循环的,这个时候该协程会被挂起
如果await关键字后是另一个协程对象,控制权会给该协程对象去执行它的函数代码
也就是说,协程函数里面有await关键字的时候,
对于当前协程对象来说,只有await后面的可等待对象运行完了
才会继续执行当前协程函数的下一行代码
重要!!!
或者说,协程函数里面有await关键字的时候,对于当前协程对象来说,必须老老实实等,可等待对象运行完了,
才能继续往下走,是没法跳过可等待对象的!!!
"""
怎么实现在协程函数运行时,出现io的时候,也能主动进行切换
第1种方法
# 创建task对象的目的就是,将协程对象变成一个任务,添加到事件循环的任务列表中
import asyncio
import time
async def say_after(n, what):
await asyncio.sleep(n)
print(what)
return what
async def main():
# 注意执行asyncio.create_task(协程对象)该代码的时候
# 会把协程对象包一下变成一个task对象,添加到事件循环的任务列表中
# 此时只是将task对象添加到任务列表中,此时并不会去执行 协程对象的代码的!!!
# 因为当前代码还没有io了,代码控制权还没有交给事件循环了!!!
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'world'))
task3 = asyncio.create_task(say_after(1, 'haha'))
# 到这里时事件循环的任务列表里就有4个任务了
print(f"start at {time.strftime('%X')}")
res1 = await task1
res2 = await task2
res3 = await task3
# print(res1, res2, res3) # None None None
print(f"finish at {time.strftime('%X')}")
asyncio.run(main())
# 结果为
"""
start at 13:14:29
hello
haha
world
finish at 13:14:31
"""
"""
代码执行流程:
asyncio.run(main()) 创建事件循环,将main()协程对象变成一个task,
并注册到任务列表中,并开始执行协程对象的代码
再把3个协程对象变成task,也注册到事件循环的任务列表中
await task1 等待,代码控制权交给事件循环,事件循环去执行task1对应的协程对象的函数
运行到await asyncio.sleep(1)后,代码控制权又交给事件循环,
此时task2与task3是可执行的,
事件循环就会任意运行一个假设是task2对应的协程对象的函数
就运行到了await asyncio.sleep(2),代码控制权又交给事件循环,此时只有task3是可执行的
事件循环就会运行task3对应的协程对象的函数
就运行到了await asyncio.sleep(1)
此时所有任务都在等待了,task1的睡1s最先结束,代码执行权回到task1,所以先打印了hello,任务1结束
此时虽然任务1结束了,但是任务2还没结束,所以main函数卡在await task2
task3的睡1s也结束了,代码执行权回到task3,所以先打印了haha,任务3结束
最后task2的睡2s也结束了,代码执行权回到task2,所以先打印了world,任务2结束
此时由于task2运行结束了,所以main函数继续往下运行await task3,由于task3已经运行完了
所以继续往下运行了,打印结束时间后,main函数运行结束了
事件循环的任务列表里任务没有了,事件循环也停止运行了
上面的代码中的3个等待 await task1 await task2 await task3
实际上写一个执行时间最长的await task2 效果是一样的!!!
----------------
所以最后,总共就睡了2秒钟,已经实现出现io自动切换了!!!
"""
第2种方法 (比第一种方法常用)
await asyncio.wait(task_list,timeout=None)
import asyncio
import time
async def say_after(n, what):
await asyncio.sleep(n)
print(what)
return what
async def main():
task_list = [asyncio.create_task(say_after(1, 'hello'),name='xioahong'),
asyncio.create_task(say_after(2, 'world')),
asyncio.create_task(say_after(1, 'haha')),
]
# 可以给任务取名字,不起会默认分配
# 到这里时事件循环的任务列表里就有4个任务了
print(f"start at {time.strftime('%X')}")
done,pending = await asyncio.wait(task_list,timeout=None)
# 如果不要返回值直接 await asyncio.wait(task_list) 就行了
print(done) # 任务执行完了,会将任务的相关信息,放到done这个集合里面
print(f"finish at {time.strftime('%X')}")
asyncio.run(main())
#------------------------------------------------------#
# 如果不想把task_list写在main函数里面,就要这样写,先不创建任务对象
# 因为一旦create_task创建任务对象,就会同时把协程对象变成任务对象,添加到事件循环的任务列表中
# 但此时事件循环还没创建了!!!
xiechengobj_list = [say_after(1, 'hello'),
say_after(2, 'world'),
say_after(1, 'haha'),
]
asyncio.run(asyncio.wait(xiechengobj_list))
# asyncio.wait() 括号里放任务列表,或放协程对象列表都行,协程对象也会被转化成任务对象
第3种方法 await asyncio.gather(task1, task2, task3)
import asyncio
import time
async def say_after(n, what):
await asyncio.sleep(n)
print(what)
return what
async def main():
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'world'))
task3 = asyncio.create_task(say_after(1, 'haha'))
print(f"start at {time.strftime('%X')}")
# res1 = await task1
# res2 = await task2
# res2 = await task3
ret = await asyncio.gather(task1, task2, task3) # 等于上面3行代码
print(ret)
print(f"finish at {time.strftime('%X')}")
asyncio.run(main())
# 结果
"""
start at 23:34:07
hello
haha
world
['hello', 'world', 'haha']
finish at 23:34:09
"""
# 这样也实现了出现io自动切换了
第4种方法
await asyncio.gather( say_after(1, 'hello'),say_after(2, 'world'))
# 还可以不用create_task方法,直接把协程对象放到gather的括号里面也行
# gather也能把协程对象变成task
import asyncio
import time
async def say_after(n, what):
await asyncio.sleep(n)
print(what)
return what
async def main():
# task1 = asyncio.create_task(say_after(1, 'hello'))
# task2 = asyncio.create_task(say_after(2, 'world'))
# task3 = asyncio.create_task(say_after(1, 'haha'))
print(f"start at {time.strftime('%X')}")
# ret = await asyncio.gather(task1, task2, task3)
ret = await asyncio.gather(
say_after(1, 'hello'),
say_after(2, 'world'),
say_after(1, 'haha'))
# gather(协程对象) 的时候,协程对象会被隐式的变成task
print(ret) # 这些task任务运行的返回值,放到一个列表里,返回给 await等号左边的变量名
print(f"finish at {time.strftime('%X')}")
asyncio.run(main())
# 结果
"""
start at 23:55:48
hello
haha
world
['hello', 'world', 'haha']
finish at 23:55:50
"""
# 归根结底就是要把3个say_after协程函数变成任务对象,放到事件循环的任务列表才行!!!
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
关键字总结
# 1 asyncio.run()
asyncio.run(main()) # 把协程对象包成task注册到event_loop中,并运行该任务
相当于下面这两行代码
loop = asyncio.get_event_loop() # 获取默认的事件循环对象
loop.run_until_complete(main())
#
------------------------------------------------------
# 2 await
await 后面是asyncio.sleep(2) io操作的时候,会将当前代码控制权交给事件循环
await 后面是个协程对象的时候,会将当前代码控制权交给事件循环,并运行该协程函数
await 后面是task时,会将当前代码控制权交给事件循环,并运行该task
await 还可以把右边task运行的返回值,给到等号左边的变量名
------------------------------------------------------
# 3 create_task()
把协程对象变成task,再把task注册到事件循环的任务列表中!!!
------------------------------------------------------
# 4 gather()
括号里面放task,会将这些task注册到事件循环的任务列表中!
括号里面也可以直接放协程对象,也会被转化成task,并注册到事件循环的任务列表中!
代码执行控制权会交给event_loop
然后阻塞等待括号里面所有的task运行结束,
------------------------------------------------------
# 5 run_forever( )
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
是一种使用 asyncio 库 创建无限循环的方式,
这个循环会持续运行,直到显式终止。
通常情况下,如果任务列表中的所有任务都已完成,事件循环就会终止了!!!
它的作用是启动一个事件循环并使其永远不会自动停止,从而允许异步操作持续执行。
通常用于编写长期运行的异步服务或应用程序,例如网络服务器或监控系统,
其中需要不间断地处理异步任务。
------------------------------------------------------
.
.
.
.
.
.
.
.
.
.
基本使用代码
import asyncio
async def func(n):
print('start', n)
# await 后面跟的是可能会发生阻塞的代码
# await关键字必须写在一个async函数里
await asyncio.sleep(1)
print('end')
loop = asyncio.get_event_loop()
loop.run_until_complete(func(1)) # 这行代码整体是一个阻塞的代码
loop.run_until_complete(func(2))
# 此时不会切换,而是先运行完第一个协程函数后,再运行第二个协程函数,不会因为有io而切换
# 因为loop.run_until_complete(func(1))时,事件循环的任务列表里只有一个任务,所以不会切换!!
# loop.run_until_complete(func(1))
# 整体是一个阻塞的代码,直到协程函数执行完毕,主线程才会继续执行下面的代码!!!
# 将协程对象变成一个任务,放到事件循环的任务列表中,运行事件循环,直到指定的协程对象执行完毕
loop.run_until_complete(asyncio.wait([func(1), func(2)]))
# 只有这样才能实现异步了,协程里出现io会切换到另一个协程里
-------------------------------------------------------------
import asyncio
import time
async def func1():
print(1)
await asyncio.sleep(3) # 遇到耗时后会自动切换到其他函数中执行
print(2)
async def func2():
print(3)
await asyncio.sleep(2)
print(4)
async def func3():
print(5)
await asyncio.sleep(2)
print(6)
loop = asyncio.get_event_loop()
task_list = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2()),
asyncio.ensure_future(func3())
]
start = time.time()
loop.run_until_complete(asyncio.wait(task_list))
end = time.time()
print(end - start) # 只会等待3秒
# 结果为
"""
1
3
5
4
6
2
2.9925854206085205
"""
.
.
.
.
.
.
.
.
asyncio的future对象的使用
# future对象就是来配合await关键字的 await的等待,什么时候结束,靠的就是判断对象的值
# 对象的值就是靠的就是Future类里面的set_result方法,给对象赋值的
# 换句话说在协程函数里面 await关键字后面一定要跟可等待对象,就是因为可等待对象的类继承了Future类
# 可等待对象的的函数代码执行完后,会自动给对象执行set_result函数,所以任务对象就有值了
# 所以await判断任务对象有值了,就不再等了,代码就继续往下执行了
# 所以说如果一个io操作的代码不支持协程,又想放在await后面,那就要想办法先继承Future类
# future对象的类Future,是task对象的类Task的父类
# 当执行 await task对象 的时候,内部就是去执行父类Future里面的代码
import asyncio
import time
async def main():
loop = asyncio.get_running_loop() # 获取事件循环
# 创建一个任务(future对象),该future对象没有绑定任何行为,此时任务不知道什么时候结束
fut = loop.create_future()
# 等待任务最终结果,没有结果会一直等待下去,此时程序就会一直阻塞住
await fut
asyncio.run(main())
#-------------------------------------------------------#
import asyncio
async def sat_after(fut):
print(4)
await asyncio.sleep(2)
#4 代码的控制权交给事件循环,但由于事件循环的任务列表中只有一个任务,且也在等待
# 所以在此等待2s后,继续往下执行
print(5)
fut.set_result('hello') #5 给future对象设置值
async def main():
loop = asyncio.get_running_loop() #1 获取事件循环
print(1)
#2 创建一个任务(future对象),该任务什么都不干
fut = loop.create_future()
print(2)
#3 sat_after协程对象注册到到事件循环的任务列表中
#3 loop.create_task(sat_after(fut))整体返回的是协程对象包装成的任务对象
#3 当前代码的控制权交给事件循环,并去执行任务对象对应的函数代码,就去运行sat_after函数了
print(3)
await loop.create_task(sat_after(fut))
print(6)
res = await fut #6 一旦future对象有值了,就相当于future这个任务执行完了,就不再等待了
print(7)
print(res) #7 给future对象设置什么值,这里res就拿到什么值
asyncio.run(main()) # 开始
# await配合futur对象使用,就可阻塞住主线程代码,等待future对象有了结果后,主线程代码才继续往下执行
# await 任务对象 为什么当任务对象的函数代码执行完了,为什么就不再等待了?
# 是因为任务对象的函数代码执行完后,会自动给任务对象执行set_result函数,所以任务对象也有值了
# 所以等待就结束了!!!
# (而且好像设置的值,就是协程函数的返回值,不是太确定!!!)
"""
1
2
3
4
5
6
7
hello
"""
.
.
.
.
concurrent模块的进程池与线程池
"""
因为硬件的发展赶不上软件,有物理极限.
如果我们在编写代码的过程中无限制的创建进程或者线程,可能会导致计算机崩溃!!!
池的作用: 降低程序的执行效率,但是保证了计算机硬件的安全
进程池: 提前创建好固定数量的进程,也就是说同时只有固定数量的进程去运行对应的函数,其他的函数要等待进程去运行
线程池: 提前创建好固定数量的线程,也就是说同时只有固定数量的线程去运行对应的函数,其他的函数要等待进程去运行
"""
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random
from threading import current_thread
# 1.产生含有固定数量线程的线程池
pool = ThreadPoolExecutor(10)
def task(n):
print('task is running')
time.sleep(random.randint(1, 3))
print('task is over', n, current_thread().name)
# 2.将任务提交给线程池即可
for i in range(20):
fut = pool.submit(task, 123)
# 朝线程池提交任务,123是要传给task函数的参数
# 这个ThreadPoolExecutor是一个类,调用submit方法最后也会调threading.Thread类生成线程对象
#-------------------------------------------------#
# 1.产生含有固定数量线程的线程池 与 固定数量进程的进程池
pool = ProcessPoolExecutor(5)
def task1(n):
print('task is running')
return '我是task函数的返回值'
def func(*args, **kwargs):
print('from func')
# 2.将任务提交给线程池即可
for i in range(20):
# print(res.result()) # 不能直接获取
pool.submit(task, 123).add_done_callback(func) # 朝进程池提交20个任务,是异步操作
"""
先朝进程池提交20个任务,但是进程池只有5,
所以一次性只能运行5个子进程每个子进程运行一个任务,
一旦运行的这5个子进程里面有一个子进程运行完了,有返回值了,
立刻自动调用func函数运行,并把返回值传给该func函数
add_done_callback() 就是异步回调机制
"""
.
.
.
.
.
.
concurrent模块的Future类
# 使用线程池或进程池来实现异步操作时,会用到该类
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(2)
print(value)
return value
# 创建线程池,池里面最多5个线程
pool = ThreadPoolExecutor(max_workers=5)
# 或 pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
# 产生一个Future对象,如果从池中拿一个线程,该对象的state就是running,就用该线程去执行func函数并传参
# 如果从池中没拿到线程,该对象的state就是pending就等待,
# 直到从池中拿到线程,该对象的state才会变成running,才会用该线程去执行func函数并传参
print(fut)
# 所以for循环走完,10个Future对象都已经创建了,5个从池中拿到线程了,5个在等
print("-----------------------------")
"""
<Future at 0x25f93187be0 state=running>
<Future at 0x25f9322da00 state=running>
<Future at 0x25f9322dd60 state=running>
<Future at 0x25f93237130 state=running>
<Future at 0x25f932374c0 state=running>
<Future at 0x25f93237850 state=pending>
<Future at 0x25f93237970 state=pending>
<Future at 0x25f93237a90 state=pending>
<Future at 0x25f93237bb0 state=pending>
<Future at 0x25f93237cd0 state=pending>
-----------------------------
4
1
2
3
0
9
7
5
8
6
"""
.
.
.
.
# 在使用协程异步编程的时候,如果某一个第三方模块不支持基于协程的异步的时候
# 那想要实现异步,该第三方模块的代码部分,就只能使用多线程或多进程的时候实现异步了
# 代码实现
import asyncio
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func():
time.sleep(2)
print('hello')
return 'hello'
async def main():
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(None, func) # None代表创建线程池
# run_in_executor函数内部会默认创建线程池,然后产生一个Future对象,这里的Future对象是concurrent模块的
# 将concurrent模块产生的Future对象,再转换成支持异步的asyncio里面future对象
# 这样run_in_executor产生的Future对象,就可以和asyncio模块的Future对象一样,支持await关键字了
res = await fut
print("default thread pool",res)
asyncio.run(main())
#------------------------------------------------------------#
# asyncio模块和一个不支持异步的模块一起使用,都实现异步
import asyncio
import requests
async def fetch(url):
print(f'start 下载 {url}')
loop = asyncio.get_event_loop()
# 通过run_in_executor生成了一个concurrent的Future对象,并包装成asyncio的Future对象
# 然后从线程池中拿一个线程,去执行requests.get方法传url参数
fut = loop.run_in_executor(None, requests.get, url)
res = await fut
# 等待Future对象执行完,也就是requests.get函数执行完
# 执行完的返回值,还会通过set_result设置给Future对象,res 就拿到返回值了
print(f'end 下载完成 {url}')
# 图片保存到本地
file_name = url.rsplit("/")[-1]
with open(file_name,mode='wb') as f:
f.write(res.content)
url_list = [
'https://pic.616pic.com/photoone/00/00/16/618ce649d05877001.jpg',
'https://img.tukuppt.com/photo-big/00/10/60/61963c2ab6c097864.jpg',
'https://pic.3zitie.cn/zhuangshi/2017/09/324/pic/img/0275.jpg',
]
# 生成协程对象列表
tasks = [fetch(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
.
.
.
.
.
.
后续关于 asyncio还有一点知识
异步迭代器
异步上下文管理器
等等
用到到b站上搜相关视频再学吧
.
.
.
.
.
.
.
.
.
.
.
.
asyncio模块 实现协程的底层原理,就利用yield关键字,代码示范
用空再看
# asyncio协程实现的原理,就利用yield关键字,执行生成器,遇到yield,就会从生成器的代码里面返回出去了
# 这就是asyncio模块,实现切换协程的基础原理!!!
# 同时利用两个yield关键字
# 就能把原来time模块里的sleep,写成协程里面的sleep函数了!!!
import time
# 自己写一个sleep函数
def sleep(n):
print('start sleep') # 7
# print(type(time.time())) # 浮点型数字
# print(time.time() + n)
yield time.time() + n # 8 将要睡结束的时间,返回值给了i
print('end sleep')
def func(n):
print(123) # 4
g = sleep(n) # 5 变成一个生成器
# yield from g
# yield from sleep(n) 还可以和上面的代码合并
# await sleep(n) 这也就是await的底层代码实现
# yield from g 就是把g的值,for循环然后再一个个的yield
for i in g: # 6
# print(i) # 只有一个浮点型数字
yield i # 9 最终将值给了ret1
print(456)
start_time = time.time()
n = 1
g1 = func(1) # 1 变成一个生成器1
g2 = func(1.01) # 2 变成一个生成器2
ret1 = next(g1) # 3 运行生成器1
ret2 = next(g2) # 10 运行生成器2 重复上面4-9的代码 就是 11-16
# 上面的代码都执行的很快,没有io操作
time_dic = {ret1: g1, ret2: g2} # 17
while time_dic: # 18
min_time = min(time_dic) # 19 取最近的时间
time.sleep(min_time - time.time())
# 20 这一步玩的骚啊,用当时记录要睡到的时间减现在实际的时间,就是现在实际要睡的时间!!!
# 再一次循环的时候,取第二个最近的时间,我们是在之前第一次循环睡的基础上再睡,不是串行的睡,而是并行的睡!!
try:
next(time_dic[min_time]) # 21 开始运行sleep生成器里yield后面的代码 end sleep
# 由于此时函数里面已经没有yield代码了,所以会报错,所以就会走print(456)
except StopIteration:
pass
del time_dic[min_time]
print(time.time() - start_time)
# 代码执行结果
123
start sleep
123
start sleep # 上面几行代码几乎一起打印的,然后等了约1s
end sleep
456 # 又等了约1s 又执行了下面的代码!!!
end sleep
456
2.0105111598968506
----------------------------------------------------------
# 总结:
多个函数的sleep操作,没有在sleep函数中真睡,只是在sleep函数中记录了,要睡到什么时间结束,
并将该时间返回出去,这样最后将多个函数的sleep要睡到时间,放到一个容器里
通过while循环,每次取出,io最短的那个时间,阻塞对应的时间后,
并用next再切回到对应的生成器里面去执行剩下的代码,
# 整个过程,决定切出来是协程函数做的,是协程函数做的
# 决定再切回到协程函中,是外面的while循环来通过判断来做的
# asyncio模块基本就是这么玩的
# 所以asyncio模块里面的event loop的事件循环 干的事情就是:
循环找到io最短的那个时间,阻塞对应的时间后,再切回到对应的生成器里面去执行剩下的代码
----------------------------------------------------
最后把代码再封装一下,就已经像asyncio模块的雏形了
def sleep(n):pass
def func(n):pass
def run_until_complete(g1, g2):
ret1 = next(g1) # 3 运行生成器1
ret2 = next(g2) # 10 运行生成器2 重复上面4-9的代码 就是 11-16
# 上面的代码都执行的很快,没有io操作
time_dic = {ret1: g1, ret2: g2} # 17
while time_dic: # 18
min_time = min(time_dic) # 19 取最近的时间
time.sleep(min_time - time.time())
# 20 这一步玩的骚啊,用当时记录要睡到的时间减现在实际的时间,就是现在实际要睡的时间!!!
# 再一次循环的时候,取第二个最近的时间,我们是在之前第一次循环睡的基础上再睡,不是串行的睡,而是并行的睡!!
try:
next(time_dic[min_time]) # 21 开始运行从6-9
except StopIteration:
pass
del time_dic[min_time]
start_time = time.time()
n = 1
g1 = func(1) # 1 变成一个生成器1
g2 = func(2) # 2 变成一个生成器2
run_until_complete(g1, g2)
print(time.time() - start_time)
------------------------------------------------------
.
.
.