目录
并发编程补充
一、asyncio 模块
asyncio
是Python 3.4版本引入的标准库,用于实现异步编程。它基于事件循环(Event Loop)模型,通过协程(Coroutines)来实现任务的切换和调度(也叫协程模块)。在asyncio
中,我们可以使用async
关键字定义协程函数,并使用await
关键字挂起协程的执行- 异步编程的核心思想是避免阻塞,即当一个任务在等待某个操作完成时,可以让出CPU执行权,让其他任务继续执行。这样可以充分利用CPU的时间片,提高程序的整体效率
- 一旦决定使用异步,则系统每一层都必须是异步,“开弓没有回头箭”
1. asyncio
中的几个重要概念
-
事件循环
- 事件循环是异步编程的核心机制,它负责协调和调度协程的执行,以及处理IO操作和定时器等事件。它会循环监听事件的发生,并根据事件的类型选择适当的协程进行调度
- 在
asyncio
库中,可以通过asyncio.get_event_loop()
方法获取默认的事件循环对象,也可以使用asyncio.new_event_loop()
方法创建新的事件循环对象
-
Future
- future表示还没有完成的工作结果。事件循环可以通过监视一个future对象的状态来指示它已经完成。future对象有几个状态:Pending、Running、Done、Cancelled
- 创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消,状态为cancel
-
Task
-
task是Future的一个子类,它知道如何包装和管理一个协程的执行。任务所需的资源
可用时,事件循环会调度任务,并生成一个结果,从而可以由其他协程消费
-
asyncio.Task
用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数创建:asyncio.async() 、loop.create_task() 或 asyncio.ensure_future()
-
-
coroutine
- 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用
2. asyncio
模块的常用方法
(1)run_until_complete()
- 阻塞调用,直到协程运行结束才返回。参数是future、协程对象,传入协程对象时内部会自动变为future
- 在最早的Python 3.4中,协程函数是通过
@asyncio.coroutine 和 yeild from
实现的
import asyncio
@asyncio.coroutine
def func1(i):
print("协程函数{}马上开始执行。".format(i))
yield from asyncio.sleep(2)
print("协程函数{}执行完毕!".format(i))
if __name__ == '__main__':
# 获取事件循环
loop = asyncio.get_event_loop()
# 执行协程任务
loop.run_until_complete(func1(1))
# 关闭事件循环
loop.close()
(2)asyncio.run()
- 这是异步程序的主入口,相当于C语言中的main函数。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次
- Python 3.7之前执行协程任务都是分三步进行的,代码有点冗余。Python 3.7提供了一个更简便的
asyncio.run
方法,下面代码为asyncio.run
出现前后的对比:
# asyncio.run出现之前
import asyncio
# 这是一个协程函数
async def func1(i):
print("协程函数{}马上开始执行。".format(i))
await asyncio.sleep(2)
print("协程函数{}执行完毕!".format(i))
if __name__ == '__main__':
# 获取事件循环
loop = asyncio.get_event_loop()
# 执行协程任务
loop.run_until_complete(func1(1))
# 关闭事件循环
# asyncio.run方法出现之后
import asyncio
async def func1(i):
print(f"协程函数{i}马上开始执行。")
await asyncio.sleep(2)
print(f"协程函数{i}执行完毕!")
if __name__ == '__main__':
(3)asyncio.sleep()
- 模拟IO操作,这样的休眠不会阻塞事件循环,前面加上await后会把控制权交给主事件循环,在休眠(IO操作)结束后恢复这个协程
- 注意:若在协程中需要有延时操作,应该使用
await asyncio.sleep()
,而不是使用time.sleep()
,因为使用time.sleep()
后会释放GIL,阻塞整个主线程,从而阻塞整个事件循环
(4)async 和 await
- python在3.5以后引入
async
和await
,代替了asyncio.coroutine
装饰器,基于他编写的协程代码其实就是上一示例的加强版,让代码可以更加简便可读 async
用在定义函数之前,声明当前定义的函数时一个异步函数- 异步函数不能直接调用,直接调用异步函数不会返回结果,而是返回一个coroutine对象。需要放在异步框架中调用
await
的作用是挂起一个函数,在挂起时,该挂起函数仍在执行,而当前线程会去执行其他异步函数,而不是阻塞住等其全部执行完;当挂起函数执行完成后,线程会从其它异步函数返回继续执行挂起函数之后的内容await
只可对异步函数使用(本质是实现了 __await__ 方法的类
),且只能在一个异步函数中使用
(5)asyncio.create_task()
-
asyncio.create_task()
其是对loop.create_task()
的封装,相当于创建并行的任务 -
接收一个协程,返回一个
asyncio.Task
的实例,也是asyncio.Future
的实例,毕竟Task是Future的子类。返回值可直接传入run_until_complete()
-
返回的Task对象可以看到协程的运行情况
-
我们只执行了单个协程任务(函数)。实际应用中,我们先由协程函数创建协程任务,然后把它们加入协程任务列表,最后一起交由事件循环执行。
根据协程函数创建协程任务有多种方法,其中最新的是Python 3.7版本提供的
asyncio.create_task
方法,如下所示:
# 方法1:使用ensure_future方法。future代表一个对象,未执行的任务。
task1 = asyncio.ensure_future(func1(1))
task2 = asyncio.ensure_future(func1(2))
# 方法2:使用loop.create_task方法
task1 = loop.create_task(func1(1))
task2 = loop.create_task(func1(2))
# 方法3:使用Python 3.7提供的asyncio.create_task方法
task1 = asyncio.create_task(func1(1))
task2 = asyncio.create_task(func1(2))
- 简单实例
import asyncio
async def coroutine_example():
await asyncio.sleep(1)
print('zhihu ID: Zarten')
coro = coroutine_example()
loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print('运行情况:', task)
loop.run_until_complete(task)
print('再看下运行情况:', task)
loop.close()
# 输出结果:
'''
运行情况:<Tosk pending coro=<coroutine_example() running at E:/Profile/pyproject/test.py>>
zhihu ID:Zarten
再看下运行情况:<Task finished coro=<coroutine_example() done,definde at E:/Profile/pyproject/test.py>result=None>
从输出结果可看到,当task为finished状态时,有个result()的方法,我们可以通过这个方法来获取协程的返回值
'''
(6)asyncio.wait() 和 asyncio.gather()
-
wait()
方法将task任务列表加入到当前的事件循环中,等待多个异步任务完成等;(注意:必须先创建事件循环,后加入任务列表,否则会报错) -
asyncio.wait()
是一个协程,不会阻塞,立即返回,返回的是协程对象。传入的参数是future或协程构成的可迭代对象。最后将返回值传给run_until_complete()加入事件循环 -
相同
- 从功能上看,
asyncio.wait
和asyncio.gather
实现的效果是相同的,都是把所有 Task 任务结果收集起来
- 从功能上看,
-
不同:
-
asyncio.wait
使用一个set保存它创建的Task实例,因为set是无序的所以这也就是我们的任务不是顺序执行的原因。会返回两个值:done 和 pending,done 为已完成的协程 Task,pending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result;而
asyncio.gather
返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果,如果列表中传入的不是create_task方法创建的协程任务,它会自动将函数封装成协程任务
-
i. asyncio.wait实例
- 最常见的写法是:
await asyncio.wait(task_list)
import asyncio
import arrow
def current_time():
'''
获取当前时间
:return:
'''
cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
return cur_time
async def func(sleep_time):
func_name_suffix = sleep_time # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
await asyncio.sleep(sleep_time)
print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"
async def run():
task_list = []
for i in range(5):
task = asyncio.create_task(func(i))
task_list.append(task)
#会返回两个值:done 和 pending,done 为已完成的协程 Task,pending 为超时未完成的协程 Task
done, pending = await asyncio.wait(task_list, timeout=None)
for done_task in done:
print((f"[{current_time()}] 得到执行结果 {done_task.result()}"))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
if __name__ == '__main__':
main()
ii. asyncio.gather 实例
- *注意
await asyncio.gather(*task_list)
,注意这里 task_list 前面有一个 (把list解构)
import asyncio
import arrow
def current_time():
'''
获取当前时间
:return:
'''
cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
return cur_time
async def func(sleep_time):
func_name_suffix = sleep_time # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
await asyncio.sleep(sleep_time)
print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"
async def run():
task_list = []
for i in range(5):
task = asyncio.create_task(func(i))
task_list.append(task)
results = await asyncio.gather(*task_list)
for result in results:
print((f"[{current_time()}] 得到执行结果 {result}"))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
if __name__ == '__main__':
main()
(7)add_done_callback
回调
- 我们还可以给每个协程任务通过add_done_callback的方法给单个协程任务添加回调函数
#-*- coding:utf-8 -*-
import asyncio
async def func1(i):
print(f"协程函数{i}马上开始执行。")
await asyncio.sleep(2)
return i
# 回调函数
def callback(future):
print(f"执行结果:{future.result()}")
async def main():
tasks = []
for i in range(1, 5):
task = asyncio.create_task(func1(i))
# 注意这里,增加回调函数
task.add_done_callback(callback)
tasks.append(task)
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
(8)其他
很多协程任务都是很耗时的,当你使用wait方法收集协程任务时,可通过timeout选项设置任务切换前单个任务最大等待时间长度,如下所示:
# 获取任务执行结果,如下所示: done,pending = await asyncio.wait(tasks, timeout=10)
asyncio.current_task: 返回当前运行的Task实例,如果没有正在运行的任务则返回 None。如果 loop 为 None 则会使用 get_running_loop()获取当前事件循环。
asyncio.all_tasks: 返回事件循环所运行的未完成的Task对象的集合
3. 简单的协程实例
- 基于python3.5及之后版本
1.
import asyncio
async def hsw_test():
print('---你好')
await asyncio.sleep(2)
print('---我的朋友')
asyncio.run(hsw_test())
2. 同步运行多个协程
import asyncio,time
async def hsw_test(msg,delay):
await asyncio.sleep(delay)
print(msg)
async def main():
print(f"start {time.strftime('%H:%M:%S')}")
await hsw_test('你好',2)
await hsw_test('我的朋友', 4)
print(f"end {time.strftime('%H:%M:%S')}")
asyncio.run(main())
'''
start 15:13:09
你好
我的朋友
end 15:13:15
从起止时间可以看出,两个协程是 顺序 执行的,总共耗时2+4=6秒
'''
3. 并发运行多个协程
import asyncio,time
async def hsw_test(msg,delay):
await asyncio.sleep(delay)
print(msg)
async def main():
task1 = asyncio.create_task(hsw_test('你好',2))
task2 = asyncio.create_task(hsw_test('我的朋友', 4))
print(f"start {time.strftime('%H:%M:%S')}")
await task1
await task2
print(f"end {time.strftime('%H:%M:%S')}")
asyncio.run(main())
"""
start 15:17:23
你好
我的朋友
end 15:17:27
从起止时间可以看出,两个协程是 并发 执行的,总共耗时等于携程中的最大耗时4秒
asyncio.create_task() 在后面的异步编程中非常常用,相当于创建了多条生产线,让协程在不同生产线之间切换执行
"""
4. asyncio.create_task()并发运行多个协程
import asyncio
# 定义异步函数
async def foo2():
print('----start foo')
await asyncio.sleep(6)
print('foo foo')
await asyncio.sleep(3)
print('----end foo')
# 定义异步函数
async def bar2():
print('----start bar')
await asyncio.sleep(3)
print('bar bar')
await asyncio.sleep(6)
print('----end bar')
async def sdf():
tasks = []
tasks.append(asyncio.create_task(foo2())) # 创建两个异步任务
tasks.append(asyncio.create_task(bar2()))
await asyncio.wait(tasks) # asyncio.wait()可以控制多任务,asyncio.wait()是一个协程,不会阻塞,立即返回,返回的是协程对象。传入的参数是future或协程构成的可迭代对象。最后将返回值传给run_until_complete()加入事件循环;run_until_complete()方法作用是阻塞调用,直到协程运行结束才返回。参数是future,传入协程对象时内部会自动变为future。Future对象表示尚未完成的计算,还未完成的结果
if __name__ == '__main__':
asyncio.run(sdf())
# 打印结果
'''
----start foo
----start bar
bar bar
foo foo
----end foo
----end bar
'''
标签:__,task,协程,补充,编程,并发,print,await,asyncio
From: https://www.cnblogs.com/Mcoming/p/17674668.html