首页 > 其他分享 >协程相关及asyncio使用

协程相关及asyncio使用

时间:2024-11-06 17:58:32浏览次数:1  
标签:task 协程 await time print 相关 asyncio

上下文什么意思 ?

参考博客 https://blog.csdn.net/caizir913/article/details/108826764
上下文是针对中断来体现其具体含义的

在内核设计者的眼中,当一个任务在中断时,CPU会去执行中断对应的任务。中断结束后,
再执行之前的task时,原有任务的相关数据(在处理原任务所需要的数据)需要保存下来,
否则无法继续执行原有任务。如果把相关数据记录到一个变量里。那这个变量就可以称为原task的上下文了。

通俗的理解,上下文,也就是执行任务所需要的相关信息。
这个任务可以是一段代码,一个线程,一个进程,一个函数。
当这个“任务”,相关信息需要保存下来,就可以使用Context来记录了。

只要想有个"对象"来保存相关信息,这个"对象"就可以叫上下文了。

协程

单线程下实现并发效果,程序层面遇到io,控制任务的切换

或者简单点理解为,在单线程下,运行一个函数当出现io的时候,
会自动切换去运行函数,就是可以实现在不同函数间的来回切换运行的操作

greenlet实现协程

from greenlet import greenlet

def func1():
    print(1)  #2  打印
    gr2.switch()  #3  切换到执行func2函数
    print(3)  #6  打印   
    gr2.switch()  #7  切换到执行func2函数
    
    
def func2():
    print(2)  #4  打印
    gr1.switch()  #5  切换到执行func1函数
    print(4)  #8  打印   

gr1 = greenlet(func1)
gr2 = greenlet(func2)


gr1.switch()  #1  执行func1函数

"""
1
2
3
4
"""

gevent实现协程


import time
from gevent import monkey;monkey.patch_all()
# 固定编写 用于检测所有的IO操作(猴子补丁)

from gevent import spawn


def func1():
    print('func1 running')
    time.sleep(3)
    print('func1 over')


def func2():
    print('func2 running')
    time.sleep(5)
    print('func2 over')


if __name__ == '__main__':
    start_time = time.time()
    # func1()
    # func2()
    s1 = spawn(func1)  # 检测代码 一旦有IO自动切换(执行没有io的操作,变向的等待io结束)
    s2 = spawn(func2)
    s1.join()
    s2.join()
    print(time.time() - start_time)  # 8.01237154006958   协程 5.015487432479858

yield关键字实现协程

def func1():
    yield 1  
    yield from func2()  # 切换去运行func2函数
    yield 4
    
    
def func2():
    yield 2
    yield 3
       
        
f1 = func1()
for i in f1:
    print(i)


"""
1
2
3
4
"""

老版asyncio实现协程

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(1)  # 遇到io,自动切换到tasks列表中的其他任务,并运行
    print(3)

@asyncio.coroutine
def func2():
    print(2)
    yield from asyncio.sleep(2)  # 遇到io,自动切换到tasks列表中的其他任务,并运行
    print(4)

tasks = [
    asyncio.ensure_future(func1()),asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()  # 获取事件循环
loop.run_until_complete(asyncio.wait(tasks))

# 注意 用了asyncio模块后,遇到io后,会自动切换,不需要像greenlet那样手动切换了

"""
1
2
3
4
"""

python3.5以后 asyncio实现协程

# 就是把老版的 @asyncio.coroutine装饰器换成了async关键字   以及把yield from换成 await 就行了
# asyncio.ensure_future(func1())  新版可以替换成asyncio.create_task(func1())

import asyncio

async def func1():
    print(1)
    await asyncio.sleep(1)  # 遇到io,自动切换到tasks列表中的其他任务,并运行
    print(3)
    
async def func2():
    print(2)
    await asyncio.sleep(2)  # 遇到io,自动切换到tasks列表中的其他任务,并运行
    print(4)

    
#----------------------写法1--------------------------------#
loop = asyncio.get_event_loop()

tasks = [
     asyncio.ensure_future(func1()),asyncio.ensure_future(func2())
]

loop.run_until_complete(asyncio.wait(tasks))
#----------------------------------------------------------#




#----------------------写法2--------------------------------#
async def main():
    tasks = [asyncio.ensure_future(func1()),asyncio.ensure_future(func2())]
    await asyncio.wait(tasks)

asyncio.run(main())

# 用run方法,不能直接 写完task列表后,直接asyncio.run(asyncio.wait(tasks))
# 要把tasks和asyncio.wait(tasks)代码放到main协程函数里面去

# 为什么要这样做?   因为asyncio.ensure_future(func1())  是要把协程对象包装然后注册到事件循环的任务列表里的

# 但是在执行asyncio.ensure_future(func1())时,事件循环event_loop还没有创建了!!!
# 所以要先asyncio.run(main()) 把事件循环创出来才行


event loop 也叫做事件循环

事件循环是asyncio的核心,它负责调度和执行任务

事件循环可以理解为一个死循环,检测并执行某些代码

# 伪代码

任务列表 = [任务1,任务2,任务3,任务4,...]

while True:
	可执行的任务列表 = [去任务列表中检查所有的任务,将可执行的任务放在该列表里]
	已完成的任务列表 = [去任务列表中检查所有的任务,将已完成的任务放在该列表里]
	
	for 就绪任务 in 可执行的任务列表:
		执行已就绪的任务
		
	for 已完成的任务 in 已完成的任务列表:
		在任务列表中移除已完成的任务
	
	如果任务列表中的所有任务都已完成,则终止循环!!!


loop = asyncio.get_event_loop()  # 生成事件循环
loop.run_until_complete(任务)  # 将任务放到任务列表,等待:事件循环去调度执行该任务到完成

事件循环的任务列表里的最小单位是任务对象,
所以loop.run_until_complete(协程对象) 协程对象还是会被包裹成一个任务的

asyncio参考博客

https://www.cnblogs.com/xyztank/articles/17571804.html

快速上手

async def main1():
    pass
# 函数在定义的时候左边有 async def 那么该函数就是协程函数


# 注意 在asyncio中,协程函数加括号不会立即执行,而是生成协程对象
# 如果想要运行协程函数内部的代码,必须要将协程对象交给事件循环来处理

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

res = main()
print(res)  # <coroutine object main at 0x000001B6A54BE5C0>

import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')


loop = asyncio.get_event_loop()
# 获取默认的事件循环对象

loop.run_until_complete(main())
# 将协程对象作为一个任务放到事件循环的任务列表中,运行事件循环,等待直到指定的协程对象执行完毕
# 这行代码自身是一个阻塞代码,直到协程执行完毕,主线程才会继续执行下面的代码!!!


loop.close()  # 关闭事件循环


# 需要注意的是,run_until_complete()方法接受一个可等待对象作为参数
# 可以是协程对象、任务对象或者Future对象。将可等待对象作为一个任务,注册到事件循环的任务列表里
# 它会持续运行事件循环,直到可等待对象执行完成。


# 在事件循环中,协程函数会按照调度规则进行执行。
# 当遇到await关键字时,协程对象的函数运行会暂时挂起,并将控制权让给其他协程。
# 当await后面的耗时操作完成后,事件循环会恢复被挂起的协程的执行。

-----------------------------------------------------------------

# 上面的event_loop三行代码可以简化为用run函数来代替
asyncio.run(main())

# 该函数做两件事
# 1 建立起事件循环
# 2 把协程对象包装成一个task,并把该任务,注册到事件循环的任务列表里面!!!

await关键字 重要!!!

# await关键字只能用在 协程函数里面
# 并且await关键字后面只能是可等待对象(协程对象,Future对象,task对象)

# 当运行到协程函数里面的 await asyncio.sleep(3) 时,会将代码执行控制权交还给事件循环
# 事件循环就可以去运行任务列表里,其他可执行的任务了

# 当await后面是协程对象的时候,虽然代码执行控制权,还是交还给事件循环,
# 但是这个时候事件循环不会去执行其他的任务,而是先去运行await后面的协程对象的函数代码!!!

# 同理await后面是task对象的时候,当前代码执行控制权还是先交还给事件循环,
# 然后控制权再交给任务对象去执行对应函数代码

import asyncio
import time


async def say_after(n, what):
    await asyncio.sleep(n)
    print(what)


async def main():
    print(f"start at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')
    await say_after(3, 'haha')
    
    print(f"finish at {time.strftime('%X')}")

asyncio.run(main())


#------------------------------------

# 结果为
"""
start at 13:21:55
hello
world
haha
finish at 13:22:01
"""

# 可以看到此时,出现io并没有切换,总共串行等了6s


# 也就是说在协程函数中,碰到await,并且await后面还是一个协程对象
# 对于当前的协程函数来说,执行控制权交还给事件循环,事件循环去执行await后面的协程对象的函数
# 运行say_after(1, 'hello')函数体代码,又碰到await asyncio.sleep(2)
# 还是一样执行控制权交还给事件循环,此时事件循环的任务列表里只有一个main()任务
# 想要继续执行main()任务里面的代码,必须要等待 await say_after(1, 'hello') 执行完
# 才能往下执行


# 由于上面的代码里面我们只将,协程对象main()变成一个任务,注册到了事件循环的列表里,
# 所以出现 await say_after(1, 'hello') 控制权给事件循环去执行say_after(1, 'hello')协程对象
# 然后在执行到await asyncio.sleep(1)时,控制权给事件循环,此时事件循环想去执行main()任务里面的代码
# 但是await say_after(1, 'hello') 还没有执行结束,所以只能继续等
# 当await asyncio.sleep(1) 睡完了,执行完print('hello')后,协程函数运行完了
# 这个时候代码控制权才交给main协程函数,继续执行await say_after(2, 'world')
# 然后依次类推


# 总结:
"""

协程函数里面有await关键字的时候,协程函数的执行控制权是会交给事件循环的,这个时候该协程会被挂起
如果await关键字后是另一个协程对象,控制权会给该协程对象去执行它的函数代码

也就是说,协程函数里面有await关键字的时候,
对于当前协程对象来说,只有await后面的可等待对象运行完了
才会继续执行当前协程函数的下一行代码


重要!!!
或者说,协程函数里面有await关键字的时候,对于当前协程对象来说,必须老老实实等,可等待对象运行完了,
才能继续往下走,是没法跳过可等待对象的!!!


"""

怎么实现在协程函数运行时,出现io的时候,也能主动进行切换

第1种方法

# 创建task对象的目的就是,将协程对象变成一个任务,添加到事件循环的任务列表中

import asyncio
import time

async def say_after(n, what):
    await asyncio.sleep(n)
    print(what)
	return what

async def main():
    # 注意执行asyncio.create_task(协程对象)该代码的时候
    # 会把协程对象包一下变成一个task对象,添加到事件循环的任务列表中
    # 此时只是将task对象添加到任务列表中,此时并不会去执行 协程对象的代码的!!!
    # 因为当前代码还没有io了,代码控制权还没有交给事件循环了!!!
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    task3 = asyncio.create_task(say_after(1, 'haha'))
	# 到这里时事件循环的任务列表里就有4个任务了
    
    print(f"start at {time.strftime('%X')}")
    res1 = await task1
    res2 = await task2
    res3 = await task3
    # print(res1, res2, res3)  # None None None
    print(f"finish at {time.strftime('%X')}")

asyncio.run(main())  


# 结果为
"""
start at 13:14:29
hello
haha
world
finish at 13:14:31
"""


"""
代码执行流程:
asyncio.run(main())  创建事件循环,将main()协程对象变成一个task,
并注册到任务列表中,并开始执行协程对象的代码
再把3个协程对象变成task,也注册到事件循环的任务列表中

await task1  等待,代码控制权交给事件循环,事件循环去执行task1对应的协程对象的函数
运行到await asyncio.sleep(1)后,代码控制权又交给事件循环,
此时task2与task3是可执行的,
事件循环就会任意运行一个假设是task2对应的协程对象的函数
就运行到了await asyncio.sleep(2),代码控制权又交给事件循环,此时只有task3是可执行的
事件循环就会运行task3对应的协程对象的函数
就运行到了await asyncio.sleep(1)

此时所有任务都在等待了,task1的睡1s最先结束,代码执行权回到task1,所以先打印了hello,任务1结束
此时虽然任务1结束了,但是任务2还没结束,所以main函数卡在await task2
task3的睡1s也结束了,代码执行权回到task3,所以先打印了haha,任务3结束
最后task2的睡2s也结束了,代码执行权回到task2,所以先打印了world,任务2结束
此时由于task2运行结束了,所以main函数继续往下运行await task3,由于task3已经运行完了
所以继续往下运行了,打印结束时间后,main函数运行结束了
事件循环的任务列表里任务没有了,事件循环也停止运行了
上面的代码中的3个等待   await task1    await task2     await task3
实际上写一个执行时间最长的await task2  效果是一样的!!!

----------------

所以最后,总共就睡了2秒钟,已经实现出现io自动切换了!!!

"""

第2种方法 (比第一种方法常用)

await asyncio.wait(task_list,timeout=None)

import asyncio
import time

async def say_after(n, what):
    await asyncio.sleep(n)
    print(what)
    return what


async def main():
    task_list = [asyncio.create_task(say_after(1, 'hello'),name='xioahong'),
                 asyncio.create_task(say_after(2, 'world')),
                 asyncio.create_task(say_after(1, 'haha')), 
                 ]
	# 可以给任务取名字,不起会默认分配
    # 到这里时事件循环的任务列表里就有4个任务了
    
    print(f"start at {time.strftime('%X')}")
    
    done,pending = await asyncio.wait(task_list,timeout=None)
    # 如果不要返回值直接 await asyncio.wait(task_list) 就行了
    
    print(done)  # 任务执行完了,会将任务的相关信息,放到done这个集合里面
    
    print(f"finish at {time.strftime('%X')}")

asyncio.run(main())


#------------------------------------------------------#

# 如果不想把task_list写在main函数里面,就要这样写,先不创建任务对象
# 因为一旦create_task创建任务对象,就会同时把协程对象变成任务对象,添加到事件循环的任务列表中
# 但此时事件循环还没创建了!!!

xiechengobj_list = [say_after(1, 'hello'),
             say_after(2, 'world'),
             say_after(1, 'haha'), 
             ]
asyncio.run(asyncio.wait(xiechengobj_list))

# asyncio.wait() 括号里放任务列表,或放协程对象列表都行,协程对象也会被转化成任务对象

第3种方法 await asyncio.gather(task1, task2, task3)

import asyncio
import time


async def say_after(n, what):
    await asyncio.sleep(n)
    print(what)
    return what


async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    task3 = asyncio.create_task(say_after(1, 'haha'))

    print(f"start at {time.strftime('%X')}")
    # res1 = await task1
    # res2 = await task2
    # res2 = await task3
    ret = await asyncio.gather(task1, task2, task3)  # 等于上面3行代码
    print(ret)
    print(f"finish at {time.strftime('%X')}")

asyncio.run(main())

# 结果
"""
start at 23:34:07
hello
haha
world
['hello', 'world', 'haha']
finish at 23:34:09
"""

# 这样也实现了出现io自动切换了


第4种方法

await asyncio.gather( say_after(1, 'hello'),say_after(2, 'world'))

# 还可以不用create_task方法,直接把协程对象放到gather的括号里面也行
# gather也能把协程对象变成task

import asyncio
import time

async def say_after(n, what):
    await asyncio.sleep(n)
    print(what)
    return what

async def main():
    # task1 = asyncio.create_task(say_after(1, 'hello'))
    # task2 = asyncio.create_task(say_after(2, 'world'))
    # task3 = asyncio.create_task(say_after(1, 'haha'))
	
    print(f"start at {time.strftime('%X')}")
    # ret = await asyncio.gather(task1, task2, task3)
    
    
    ret = await asyncio.gather(
        say_after(1, 'hello'),
        say_after(2, 'world'),
        say_after(1, 'haha'))
    # gather(协程对象) 的时候,协程对象会被隐式的变成task
    
    print(ret)  # 这些task任务运行的返回值,放到一个列表里,返回给 await等号左边的变量名
    
    print(f"finish at {time.strftime('%X')}")

asyncio.run(main())

# 结果
"""
start at 23:55:48
hello
haha
world
['hello', 'world', 'haha']
finish at 23:55:50
"""

# 归根结底就是要把3个say_after协程函数变成任务对象,放到事件循环的任务列表才行!!!

.
image
.
image
.
.

.

.

.

.

.

.

.

.

.

.

.

关键字总结

# 1 asyncio.run()
asyncio.run(main())    # 把协程对象包成task注册到event_loop中,并运行该任务


相当于下面这两行代码
loop = asyncio.get_event_loop()    # 获取默认的事件循环对象
loop.run_until_complete(main())
# 

------------------------------------------------------


# 2 await

await 后面是asyncio.sleep(2)  io操作的时候,会将当前代码控制权交给事件循环

await 后面是个协程对象的时候,会将当前代码控制权交给事件循环,并运行该协程函数

await 后面是task时,会将当前代码控制权交给事件循环,并运行该task

await 还可以把右边task运行的返回值,给到等号左边的变量名

------------------------------------------------------

# 3 create_task()
把协程对象变成task,再把task注册到事件循环的任务列表中!!!

------------------------------------------------------

# 4 gather()

括号里面放task,会将这些task注册到事件循环的任务列表中!
括号里面也可以直接放协程对象,也会被转化成task,并注册到事件循环的任务列表中!

代码执行控制权会交给event_loop
然后阻塞等待括号里面所有的task运行结束,

------------------------------------------------------

# 5 run_forever(  )

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
是一种使用 asyncio 库 创建无限循环的方式,
这个循环会持续运行,直到显式终止。

通常情况下,如果任务列表中的所有任务都已完成,事件循环就会终止了!!!

它的作用是启动一个事件循环并使其永远不会自动停止,从而允许异步操作持续执行。
通常用于编写长期运行的异步服务或应用程序,例如网络服务器或监控系统,
其中需要不间断地处理异步任务。

------------------------------------------------------

.
.
.
.
.
.
.
.
.
.

基本使用代码

import asyncio

async def func(n):
    print('start', n)
    # await 后面跟的是可能会发生阻塞的代码
    # await关键字必须写在一个async函数里
    await asyncio.sleep(1)
    print('end')


loop = asyncio.get_event_loop()

loop.run_until_complete(func(1))  # 这行代码整体是一个阻塞的代码

loop.run_until_complete(func(2))
# 此时不会切换,而是先运行完第一个协程函数后,再运行第二个协程函数,不会因为有io而切换
# 因为loop.run_until_complete(func(1))时,事件循环的任务列表里只有一个任务,所以不会切换!!

# loop.run_until_complete(func(1)) 
# 整体是一个阻塞的代码,直到协程函数执行完毕,主线程才会继续执行下面的代码!!!
# 将协程对象变成一个任务,放到事件循环的任务列表中,运行事件循环,直到指定的协程对象执行完毕


loop.run_until_complete(asyncio.wait([func(1), func(2)]))
# 只有这样才能实现异步了,协程里出现io会切换到另一个协程里

-------------------------------------------------------------

import asyncio
import time

async def func1():
    print(1)
    await asyncio.sleep(3)  # 遇到耗时后会自动切换到其他函数中执行
    print(2)

async def func2():
    print(3)
    await asyncio.sleep(2)
    print(4)

async def func3():
    print(5)
    await asyncio.sleep(2)
    print(6)

    
loop = asyncio.get_event_loop()

task_list = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2()),
    asyncio.ensure_future(func3())
]

start = time.time()
loop.run_until_complete(asyncio.wait(task_list))

end = time.time()
print(end - start)  # 只会等待3秒

# 结果为
"""
1
3
5
4
6
2
2.9925854206085205
"""

.
.
.

.

.

.

.

.

asyncio的future对象的使用

# future对象就是来配合await关键字的 await的等待,什么时候结束,靠的就是判断对象的值
# 对象的值就是靠的就是Future类里面的set_result方法,给对象赋值的

# 换句话说在协程函数里面 await关键字后面一定要跟可等待对象,就是因为可等待对象的类继承了Future类
# 可等待对象的的函数代码执行完后,会自动给对象执行set_result函数,所以任务对象就有值了
# 所以await判断任务对象有值了,就不再等了,代码就继续往下执行了
# 所以说如果一个io操作的代码不支持协程,又想放在await后面,那就要想办法先继承Future类


# future对象的类Future,是task对象的类Task的父类
# 当执行 await task对象  的时候,内部就是去执行父类Future里面的代码

import asyncio
import time

async def main():
    
    loop = asyncio.get_running_loop()  # 获取事件循环

    # 创建一个任务(future对象),该future对象没有绑定任何行为,此时任务不知道什么时候结束
    fut = loop.create_future()
	
    # 等待任务最终结果,没有结果会一直等待下去,此时程序就会一直阻塞住
    await fut

asyncio.run(main())

#-------------------------------------------------------#

import asyncio

async def sat_after(fut):
    print(4)
    await asyncio.sleep(2)
    #4 代码的控制权交给事件循环,但由于事件循环的任务列表中只有一个任务,且也在等待
    
    # 所以在此等待2s后,继续往下执行
    print(5)
    fut.set_result('hello')  #5 给future对象设置值


async def main():
    loop = asyncio.get_running_loop()   #1 获取事件循环
    print(1)

    #2 创建一个任务(future对象),该任务什么都不干
    fut = loop.create_future()
    print(2)

    #3 sat_after协程对象注册到到事件循环的任务列表中
    #3 loop.create_task(sat_after(fut))整体返回的是协程对象包装成的任务对象
    #3 当前代码的控制权交给事件循环,并去执行任务对象对应的函数代码,就去运行sat_after函数了
    print(3)
    await loop.create_task(sat_after(fut))

    print(6)
    res = await fut  #6 一旦future对象有值了,就相当于future这个任务执行完了,就不再等待了

    print(7)
    print(res)  #7 给future对象设置什么值,这里res就拿到什么值

    

asyncio.run(main())  # 开始

# await配合futur对象使用,就可阻塞住主线程代码,等待future对象有了结果后,主线程代码才继续往下执行

# await 任务对象    为什么当任务对象的函数代码执行完了,为什么就不再等待了?
# 是因为任务对象的函数代码执行完后,会自动给任务对象执行set_result函数,所以任务对象也有值了
# 所以等待就结束了!!!
# (而且好像设置的值,就是协程函数的返回值,不是太确定!!!)

"""
1
2
3
4
5
6
7
hello
"""

.

.

.

.

concurrent模块的进程池与线程池

"""

因为硬件的发展赶不上软件,有物理极限.
如果我们在编写代码的过程中无限制的创建进程或者线程,可能会导致计算机崩溃!!!

池的作用: 降低程序的执行效率,但是保证了计算机硬件的安全

进程池: 提前创建好固定数量的进程,也就是说同时只有固定数量的进程去运行对应的函数,其他的函数要等待进程去运行

线程池: 提前创建好固定数量的线程,也就是说同时只有固定数量的线程去运行对应的函数,其他的函数要等待进程去运行

"""

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random
from threading import current_thread

# 1.产生含有固定数量线程的线程池
pool = ThreadPoolExecutor(10)

def task(n):
    print('task is running')
    time.sleep(random.randint(1, 3))
    print('task is over', n, current_thread().name)

# 2.将任务提交给线程池即可
for i in range(20):
    fut = pool.submit(task, 123)
    # 朝线程池提交任务,123是要传给task函数的参数

# 这个ThreadPoolExecutor是一个类,调用submit方法最后也会调threading.Thread类生成线程对象

#-------------------------------------------------#

# 1.产生含有固定数量线程的线程池 与 固定数量进程的进程池
pool = ProcessPoolExecutor(5)

def task1(n):
    print('task is running')
    return '我是task函数的返回值'

def func(*args, **kwargs):
    print('from func')

# 2.将任务提交给线程池即可
for i in range(20):
    # print(res.result())  # 不能直接获取
    pool.submit(task, 123).add_done_callback(func)  # 朝进程池提交20个任务,是异步操作

    
"""

先朝进程池提交20个任务,但是进程池只有5,
所以一次性只能运行5个子进程每个子进程运行一个任务,
一旦运行的这5个子进程里面有一个子进程运行完了,有返回值了,
立刻自动调用func函数运行,并把返回值传给该func函数
add_done_callback() 就是异步回调机制

"""

.

.

.

.

.

.

concurrent模块的Future类

# 使用线程池或进程池来实现异步操作时,会用到该类

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func(value):
    time.sleep(2)
    print(value)
    return value


# 创建线程池,池里面最多5个线程
pool = ThreadPoolExecutor(max_workers=5)
# 或 pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):
    fut = pool.submit(func, i)
    # 产生一个Future对象,如果从池中拿一个线程,该对象的state就是running,就用该线程去执行func函数并传参
    # 如果从池中没拿到线程,该对象的state就是pending就等待,
    # 直到从池中拿到线程,该对象的state才会变成running,才会用该线程去执行func函数并传参
    print(fut)
# 所以for循环走完,10个Future对象都已经创建了,5个从池中拿到线程了,5个在等
print("-----------------------------")


"""
<Future at 0x25f93187be0 state=running>
<Future at 0x25f9322da00 state=running>
<Future at 0x25f9322dd60 state=running>
<Future at 0x25f93237130 state=running>
<Future at 0x25f932374c0 state=running>
<Future at 0x25f93237850 state=pending>
<Future at 0x25f93237970 state=pending>
<Future at 0x25f93237a90 state=pending>
<Future at 0x25f93237bb0 state=pending>
<Future at 0x25f93237cd0 state=pending>
-----------------------------
4
1
2
3
0
9
7
5
8
6
"""

.

.

.

.

# 在使用协程异步编程的时候,如果某一个第三方模块不支持基于协程的异步的时候

# 那想要实现异步,该第三方模块的代码部分,就只能使用多线程或多进程的时候实现异步了


# 代码实现
import asyncio
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func():
    time.sleep(2)
    print('hello')
    return 'hello'


async def main():

    loop = asyncio.get_event_loop()

    fut = loop.run_in_executor(None, func)  # None代表创建线程池
    # run_in_executor函数内部会默认创建线程池,然后产生一个Future对象,这里的Future对象是concurrent模块的
    # 将concurrent模块产生的Future对象,再转换成支持异步的asyncio里面future对象
    # 这样run_in_executor产生的Future对象,就可以和asyncio模块的Future对象一样,支持await关键字了

    res = await fut
    print("default thread pool",res)

asyncio.run(main())


#------------------------------------------------------------#

# asyncio模块和一个不支持异步的模块一起使用,都实现异步
import asyncio
import requests


async def fetch(url):
    print(f'start 下载 {url}')
    loop = asyncio.get_event_loop()
    # 通过run_in_executor生成了一个concurrent的Future对象,并包装成asyncio的Future对象
    # 然后从线程池中拿一个线程,去执行requests.get方法传url参数
    
    fut = loop.run_in_executor(None, requests.get, url)
    res = await fut
    # 等待Future对象执行完,也就是requests.get函数执行完
    # 执行完的返回值,还会通过set_result设置给Future对象,res 就拿到返回值了
    
    print(f'end 下载完成 {url}')
    # 图片保存到本地
    file_name = url.rsplit("/")[-1]
    with open(file_name,mode='wb') as f:
        f.write(res.content)


url_list = [
    'https://pic.616pic.com/photoone/00/00/16/618ce649d05877001.jpg',
    'https://img.tukuppt.com/photo-big/00/10/60/61963c2ab6c097864.jpg',
    'https://pic.3zitie.cn/zhuangshi/2017/09/324/pic/img/0275.jpg',
]

# 生成协程对象列表
tasks = [fetch(url) for url in url_list]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

.

.

.

.

.

.

后续关于 asyncio还有一点知识
异步迭代器
异步上下文管理器
等等
用到到b站上搜相关视频再学吧

.

.

.

.

.

.

.

.

.

.

.

.

asyncio模块 实现协程的底层原理,就利用yield关键字,代码示范

用空再看

# asyncio协程实现的原理,就利用yield关键字,执行生成器,遇到yield,就会从生成器的代码里面返回出去了
# 这就是asyncio模块,实现切换协程的基础原理!!!
# 同时利用两个yield关键字
# 就能把原来time模块里的sleep,写成协程里面的sleep函数了!!!


import time


# 自己写一个sleep函数
def sleep(n):
    print('start sleep')  # 7
    # print(type(time.time()))  # 浮点型数字
    # print(time.time() + n)
    yield time.time() + n  # 8 将要睡结束的时间,返回值给了i
    print('end sleep')


def func(n):
    print(123)  # 4
    g = sleep(n)  # 5 变成一个生成器
    # yield from g
    # yield from sleep(n)  还可以和上面的代码合并
    # await sleep(n)  这也就是await的底层代码实现
    # yield from g 就是把g的值,for循环然后再一个个的yield
    for i in g:  # 6
        # print(i)  # 只有一个浮点型数字
        yield i  # 9 最终将值给了ret1

    print(456)


start_time = time.time()
n = 1
g1 = func(1)  # 1 变成一个生成器1
g2 = func(1.01)  # 2 变成一个生成器2

ret1 = next(g1)  # 3 运行生成器1
ret2 = next(g2)  # 10 运行生成器2  重复上面4-9的代码 就是 11-16
# 上面的代码都执行的很快,没有io操作

time_dic = {ret1: g1, ret2: g2}  # 17

while time_dic:  # 18
    min_time = min(time_dic)  # 19 取最近的时间
    time.sleep(min_time - time.time())
    # 20 这一步玩的骚啊,用当时记录要睡到的时间减现在实际的时间,就是现在实际要睡的时间!!!
    # 再一次循环的时候,取第二个最近的时间,我们是在之前第一次循环睡的基础上再睡,不是串行的睡,而是并行的睡!!

    try:
        next(time_dic[min_time])  # 21  开始运行sleep生成器里yield后面的代码 end sleep
        # 由于此时函数里面已经没有yield代码了,所以会报错,所以就会走print(456)
    except StopIteration:
        pass
    del time_dic[min_time]
print(time.time() - start_time)


# 代码执行结果
123
start sleep
123
start sleep   # 上面几行代码几乎一起打印的,然后等了约1s
end sleep
456           # 又等了约1s 又执行了下面的代码!!!
end sleep
456
2.0105111598968506

----------------------------------------------------------

# 总结:
多个函数的sleep操作,没有在sleep函数中真睡,只是在sleep函数中记录了,要睡到什么时间结束,
并将该时间返回出去,这样最后将多个函数的sleep要睡到时间,放到一个容器里
通过while循环,每次取出,io最短的那个时间,阻塞对应的时间后,
并用next再切回到对应的生成器里面去执行剩下的代码,

# 整个过程,决定切出来是协程函数做的,是协程函数做的
# 决定再切回到协程函中,是外面的while循环来通过判断来做的

# asyncio模块基本就是这么玩的

# 所以asyncio模块里面的event loop的事件循环 干的事情就是:
循环找到io最短的那个时间,阻塞对应的时间后,再切回到对应的生成器里面去执行剩下的代码

----------------------------------------------------

最后把代码再封装一下,就已经像asyncio模块的雏形了

def sleep(n):pass
def func(n):pass

def run_until_complete(g1, g2):
    ret1 = next(g1)  # 3 运行生成器1
    ret2 = next(g2)  # 10 运行生成器2  重复上面4-9的代码 就是 11-16
    # 上面的代码都执行的很快,没有io操作

    time_dic = {ret1: g1, ret2: g2}  # 17
    
    while time_dic:  # 18
        min_time = min(time_dic)  # 19 取最近的时间
        time.sleep(min_time - time.time())
        # 20 这一步玩的骚啊,用当时记录要睡到的时间减现在实际的时间,就是现在实际要睡的时间!!!
        # 再一次循环的时候,取第二个最近的时间,我们是在之前第一次循环睡的基础上再睡,不是串行的睡,而是并行的睡!!
    
        try:
            next(time_dic[min_time])  # 21  开始运行从6-9
        except StopIteration:
            pass
        del time_dic[min_time]


start_time = time.time()
n = 1
g1 = func(1)  # 1 变成一个生成器1
g2 = func(2)  # 2 变成一个生成器2
run_until_complete(g1, g2)

print(time.time() - start_time)

------------------------------------------------------

.
.
.

标签:task,协程,await,time,print,相关,asyncio
From: https://www.cnblogs.com/tengyifan888/p/18530709

相关文章

  • ePWM相关记录
    此处记录TMS320F28xePWM模块相关理解。此处先介绍几个名词概念TBCTR(时基计数器):时基计数器保存当前的计数值,用于生成PWM信号周期。TBPRD(时基周期寄存器):这个寄存器存储PWM信号的周期值,计数器从0开始计数,直到TBPRD的值。TBPHS(时基相位寄存器):这个寄存器控制PWM信号的相位偏移,主......
  • 在 Windows 11 中,如果在 WSL2 中使用了 mirrored 或 virtioproxy 模式,而子系统的 IP
    在Windows11中,如果在WSL2中使用了mirrored或virtioproxy模式,而子系统的IP地址与主机地址相同,通常这与WSL2的网络配置和虚拟化模式相关。1. 理解 mirrored 和 virtioproxy 模式mirrored模式:通常在虚拟化环境中,mirrored网络模式意味着虚拟机(或者在此情况下......
  • GPT-Academic 其它插件相关使用教程
    一.虚空终端“GPT插件虚空终端”似乎是一个结合了GPT技术和某种终端(可能是命令行或控制台)的插件或工具。以下是一些可能的解释和应用:GPT插件:这可能是一个利用GPT(生成式预训练变换器)技术的插件,用于提供自然语言处理功能,比如文本生成、对话系统等。虚空终端:这个名称可能指的......
  • 二叉树的一些相关操作
    前言    链式结构二叉树的访问基本使用递归来进行访问,同时也可以体现出递归的暴力美学。因为有涉及到二叉树的相关操作,因此先要手动构造一个二叉树,在对其进行一些操作。        本篇一切对二叉树的操作都基于我创建的二叉树        这个是我构造的......
  • 内存函数的相关知识点
    1strerrorchar*strerror(interrnum)从语言的库函数在运行的时候,如果发生错误,就会将错误码放在一个变量中,这个变量是errnor.//strerror(errno)//fopen//FILE*fopen(constchar*filename,constchar*mode);//如果打开文件成功,就返回一个有效的指针,如......
  • 简单介绍机器学习与深度学习以及相关算法
    机器学习算法线性回归解释:线性回归是一种简单的预测算法,它通过寻找输入变量和输出变量之间的线性关系来进行预测。例子:假设你想预测一个房子的价格,可以根据房子的面积(输入)和价格(输出)画一条直线,线性回归就是找到这条最合适的直线。逻辑回归解释:尽管名字中有“回归”,逻......
  • Kafka笔记系列-概念相关
    消息队列的主要功能连接服务、消息路由、消息传递、数据持久化、日志记录消息队列基本分类1、点对点生产者发送消息到队列中,消费者从队列中取出并消费。消息在消费以后,队列中不再有存储,队列可以有多个消费者,但是一个消息只能被一个消费者消费2、发布订阅模式生产者发布消息......
  • docker相关知识
    docker-compose自定义执行文件:docker-compose-fdocker-compose.rel.yamlup-d docker前期准备://给某个ubuntu添加docker权限,否则每次都需要去root权限下执行docker-compose命令sudogroupadddockersudogpasswd-aubuntudockersudosystemctlrestartdo......
  • apisix~相关组件的图形化说明
    参考:https://docs.api7.ai/apisix/key-concepts/pluginsApacheAPISIX是Apache软件基金会下的顶级项目,由API7.ai开发并捐赠。它是一个高性能的云原生API网关,具有动态、实时等特点。APISIX网关可作为所有业务的流量入口,为用户提供了丰富的功能,包括动态路由、动态上游、动态证书、......
  • Pandas相关性分析
    1.相关性分析定义  在Pandas中,数据相关性分析是通过计算不同变量之间的相关系数来了解它们之间的关系。在Pandas中,数据相关性是一项重要的分析任务,它帮助我们理解数据中各个变量之间的关系。2.使用corr()方法计算数据集中每列之间的关系df.corr(method='pearson',......