首页 > 编程语言 >python-asyncio

python-asyncio

时间:2023-05-20 21:00:50浏览次数:53  
标签:task run python def print loop asyncio

python -asyncio

目录

协程是在用户态实现的上下文切换技术

相比线程切换,协程上下文切换代价更小。
协程是单线程的,不存在多线程互斥访问共享变量,不用锁、信号量等机制
协程非常灵活,当出现I/O阻塞时,就去切换任务,I/O完成再唤醒,这就是所谓的 异步I/O,

实现化协程对象的方法有:

  • yield关键字
  • yield from关键字
  • asyncawait 关键字

yield

def func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4


f1 = func1()
for item in f1:
    print(item)

asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。
此处使用的是Python 3.5之后出现的async/await来实现协程

通过 async/await 语法来声明协程是编写 asyncio 应用的推荐方式。

asyncio中几个重要概念

1.事件循环

管理所有的事件,在整个程序运行过程中不断循环执行并追踪事件发生的顺序将它们放在队列中,空闲时调用相应的事件处理者来处理这些事件。

2.Future

Future对象表示尚未完成的计算,还未完成的结果

3.Task

是Future的子类,作用是在运行某个任务的同时可以并发的运行多个任务。

asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数创建:

asyncio 完整流程

1、定义/创建协程对象
2、将协程转为task任务
3、定义事件循环对象容器
4、将task任务扔进事件循环对象中触发
asyncio.async()
loop.create_task() 或 asyncio.ensure_future()

异步代码

import time
import asyncio

# 定义异步函数
async def hello():
    await asyncio.sleep(1)
    print(f'Hello World: {time.time()}')

def addtack(loop):
    for i in range(5):
        loop.create_task(hello())

if __name__ == '__main__':
    t1 = time.time()
    loop = asyncio.get_event_loop()      # 初始化时间
    addtack(loop)                        # 创建任务
    loop.run_until_complete(hello())     # 等待任务执行结束
    t2 = time.time()
    print(f"all time {t2 - t1}")

# -------------------------------------------------------
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
Hello World: 1684489947.7962332
all time 1.0133047103881836

定义和运行

使用async def语句定义一个协程函数,但这个函数不可直接运行

# 普通函数
def function():
    return 1
   
# 异步函数
async def asynchronous():
    return 1
# 异步函数不同于普通函数不可能被直接调用
async def aaa():
    print('hello')

print(aaa())

# 输出----------------------------------
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

如何运行一个协程呢,有三种方式:

asyncio.run()

1、使用函数,可直接运行

import asyncio

async def aaa():
    print('hello')

asyncio.run(aaa())
# 输出-------------------------
hello

await()

2、使用await()进行异步等待

  • await语法等待另一个协程,这将挂起当前协程,直到另一个协程返回结果。

asyncio.sleep 也是一个协程,所以 await asyncio.sleep(x) 就是等待另一个协程

import asyncio
async def aaa():
    print('hello')

async def main():
    await aaa()
    
asyncio.run(main())

asyncio.create_task()

使用asyncio.create_task() 函数来创建一个任务,放入事件循环中

import asyncio

async def aaa():
    print('hello')

async def main():
    asyncio.create_task(aaa())
asyncio.run(main())
import asyncio

async def asfunc():
    await asyncio.sleep(1)
    print('hello')

aaa=asfunc()
loop = asyncio.get_event_loop()
task = loop.create_task(aaa)
loop.run_until_complete(task)

创建Task

loop.create_task()

import asyncio

async def helperfunc():
    await asyncio.sleep(1)
    print('beike tian')

coro = helperfunc()

loop = asyncio.get_event_loop()
print("loop:",loop)
task = loop.create_task(coro)
print('task:', task)

loop.run_until_complete(task)
print('task:', task)
loop.close()

# 输出 ----------------------------------------
loop: <ProactorEventLoop running=False closed=False debug=False>
task: <Task pending name='Task-1' coro=<helperfunc() running at E:\TzxNote\Note\lcodeNoteCards\edge_det.py:3>>
beike tian
task: <Task finished name='Task-1' coro=<helperfunc() done, defined at E:\TzxNote\Note\lcodeNoteCards\edge_det.py:3> result=None>

run_until_complete 是一个阻塞(blocking)调用,直到协程运行结束,它才返回

import asyncio

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))

获取协程返回

有2种方案可以获取返回值。

方法一:通过task.result()

可通过调用 task.result() 方法来获取协程的返回值,但是只有运行完毕后才能获取,若没有运行完毕,result()方法不会阻塞去等待结果,而是抛出 asyncio.InvalidStateError 错误

import asyncio

async def helperfunc():
    await asyncio.sleep(1)
    print('beike tian')
    return  "小贝壳"

coro = helperfunc()

loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print('task:', task)
try:
    print('task.result:', task.result())
except asyncio.InvalidStateError:
    print('task状态未完成,捕获了 InvalidStateError 异常')

loop.run_until_complete(task)
print('task:', task)
print('task.result:', task.result())
loop.close()

# ----------------------------
task: <Task pending name='Task-1' coro=<helperfunc() running at E:\TzxNote\Note\lcodeNoteCards\edge_det.py:3>>
task状态未完成,捕获了 InvalidStateError 异常
beike tian
task: <Task finished name='Task-1' coro=<helperfunc() done, defined at E:\TzxNote\Note\lcodeNoteCards\edge_det.py:3> result='小贝壳'>
task.result: 小贝壳

方法二、asyncio.gather

import asyncio

async def func1(i):
    print(f"协程函数{i}马上开始执行。")
    await asyncio.sleep(2)
    return i

async def main():
    tasks = []
    for i in range(1, 5):
        tasks.append(func1(i))

    results = await asyncio.gather(*tasks)
    for result in results:
        print(f"执行结果: {result}")

if __name__ == '__main__':
    asyncio.run(main())

取消任务

import asyncio
import time

async def helperfunc(nums):
    await asyncio.sleep(1)
    print(f'beike tian {nums}')
    return  "小贝壳"


if __name__ == "__main__":
    task1 = helperfunc(2)
    task2 = helperfunc(3)
    task3 = helperfunc(3)

    tasks = [task1, task2, task3]
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            print("cancel task")
            print(task.cancel())
        loop.stop()
        # stop 调用之后,需要调用 run_forever,不然会报错
        loop.run_forever()
    finally:
        loop.close()

方法三:通过add_done_callback()回调

通过 Future 的 add_done_callback() 方法来添加回调函数,当任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。

import asyncio

async def helperfunc():
    await asyncio.sleep(1)
    print('beike tian')
    return  "小贝壳"

def my_callback(future):
    print('返回值:', future.result())

coro = helperfunc()

loop = asyncio.get_event_loop()
task = loop.create_task(coro)
task.add_done_callback(my_callback)

loop.run_until_complete(task)
loop.close()

# -----------------------------------------
beike tian
返回值: 小贝壳

多任务控制

通过asyncio.wait()可以控制多任务

asyncio.wait()是一个协程,不会阻塞,立即返回,返回的是协程对象。传入的参数是future或协程构成的可迭代对象。最后将返回值传给run_until_complete()加入事件循环

asyncio.gatherasyncio.wait 功能相似。

方法1、asyncio.wait

import asyncio

async def coroutine_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(1)
    print('执行完毕name:', name)

loop = asyncio.get_event_loop()

tasks = [coroutine_example('Zarten_' + str(i)) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)
loop.close()

方法2、asyncio.gather

import asyncio
async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)
    
loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

# 或者借助列表
coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))

# -------------------------------------
Waiting 3
Waiting 1
<等待三秒钟>
Done

多任务返回

方法1、需要通过loop.create_task()创建task对象

import asyncio

async def coroutine_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(1)
    print('执行完毕name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = [loop.create_task(coroutine_example('Zarten_' + str(i))) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

for task in tasks:
    print(task.result())

loop.close()

方法2、回调add_done_callback()

import asyncio

def my_callback(future):
    print('返回值:', future.result())

async def coroutine_example(name):
    print('正在执行name:', name)
    await asyncio.sleep(1)
    print('执行完毕name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = []
for i in range(3):
    task = loop.create_task(coroutine_example('Zarten_' + str(i)))
    task.add_done_callback(my_callback)
    tasks.append(task)

wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

loop.close()

其他知识点

run_until_complete 实现的原理

run_forever 会一直运行,直到 stop 被调用

loop.run_forever() 会让线程一直运行。loop.run_until_complete() 借助了 run_forever 方法。
在 run_until_complete 的实现中,调用了 future.add_done_callback(_run_until_complete_cb)。
async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')
    loop.stop()

wait 与 gather 中的区别

gather 比 wait 更加高层。gather 可以将任务分组,一般优先使用 gather。在某些定制化任务需求的时候,会使用 wait。

from functools import partial
import asyncio
import time


async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [do_some_work(i) for i in range(10)]

    group1 = [do_some_work(i)  for i in range(2)]
    group2 = [do_some_work(i)  for i in range(2)]

    loop.run_until_complete(asyncio.gather(*group1, *group2))
    print(time.time() - start_time)

参考文献

https://docs.python.org/zh-cn/3/library/asyncio-task.html#running-an-asyncio-program

https://www.cnblogs.com/Red-Sun/p/16934843.html

https://zhuanlan.zhihu.com/p/137698989

https://www.cnblogs.com/MrReboot/p/16413332.html

https://zhuanlan.zhihu.com/p/59621713 动态添加协程

https://zhuanlan.zhihu.com/p/137698989 gather 和 wait 区别

标签:task,run,python,def,print,loop,asyncio
From: https://www.cnblogs.com/tian777/p/17417777.html

相关文章

  • python yield yield from
    pythonyield1:可迭代、迭代器、生成器2:如何运行/激活生成器3:生成器的执行状态4:从生成器过渡到协程:yield可迭代、迭代器、生成器fromcollections.abcimportIterable,Iterator,Generatorisinstance(obj,Iterable)#可迭代对象isinstance(obj,Iterator)......
  • Python潮流周刊#2:Rust 让 Python 再次伟大
    这里记录每周值得分享的Python及通用技术内容,部分为英文,已在小标题注明。(本期标题取自其中一则分享,不代表全部内容都是该主题,特此声明。)文章&教程1、Python修饰器的函数式编程介绍了装饰器的实现原理、带参装饰器、多装饰器、类装饰器和几个典型的示例。文章发布于2014年,代......
  • python中的装饰器原理和作用
    装饰器的作用就是用一个新函数封装旧函数(是旧函数代码不变的情况下增加功能)然后会返回一个新函数,新函数就叫做装饰器,一般为了简化装饰器会用语法糖@新函数来简化例子:这是一段代码,但功能太少,要对这个进行增强,但又不能改变代码。defhello():return"helloworld!"现在我......
  • Python数据可视化小结
    1.引言原始形式的数据对大多数人来说可能都是枯燥乏味的,但是如果掌握正确的可视化工具,给人的印象就会变得引人入胜。本文通过实际例子,让我们利用数据可视化工具来探索不一样的数据体验。闲话少说,我们直接开始吧!2.举个栗子让我们从创建一个数据集开始,假设以下数据集包含2010-2020......
  • Python自动化运维
    2-27在命令行窗口中启动的Python解释器中实现在Python自带的IDLE中实现print("Helloworld")编码规范每个import语句只导入一个模块,尽量避免一次导入多个模块不要在行尾添加分号“:”,也不要用分号将两条命令放在同一行建议每行不超过80个字符使用必要的空行可以增加代码的可读性运......
  • Python自动化
    3-20数据类型转换数据类型转换:   1.int(x):x代指对象,返回值是一个整数类型,对象->整数       x为字符串:字符串应全为整数       x为小数:只保留整数部分       x不能是负数       x为布尔类型:True=1False=0   2.float(x):x......
  • Python学习
    3-13字符串类型字符串类型:str   1.定义格式:       变量='内容'           打印一行       变量="内容"           打印一行       变量='''内容'''或者三引号           可以通过回车的方式换行,且打印出......
  • Python学习
    3-13字符串类型字符串类型:str   1.定义格式:       变量='内容'           打印一行       变量="内容"           打印一行       变量='''内容'''或者三引号           可以通过回车的方式换行,......
  • python内存管理机制
    1:引用计数机制实现垃圾回收对象引用一次,引用计数就增加1,销毁计数就减少1,当引用计数为零时,从内存中删除对象。还有一种情况,ab两对象互相引用时,del语句可以减少引用计数,但不会归零。会导致内存泄漏,解释器会定期执行一个循环检测,搜索不可访问对象的循环,并删除他们2:内存池机制为了......
  • Python编写输出斐波那契数列的前n项
    以下是一个使用Python编写的程序代码,可以计算并输出斐波那契数列的前n项(n由用户输入):n=int(input("请输入斐波那契数列的项数:"))a,b=0,1foriinrange(n):print(b,end="")a,b=b,a+b代码解释:用户输入斐波那契数列的项数n,并使用int()函数将输入的字符串......