首页 > 其他分享 >Asyncio 协程异步笔记

Asyncio 协程异步笔记

时间:2023-09-19 17:44:55浏览次数:28  
标签:异步 协程 await print async Asyncio def asyncio

协程 & asyncio & 异步

1. 协程 (coroutine)

协程不是计算机提供,而是程序员人为创造。

协程(coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块互相切换运行。例如:

def func1():
    print(1)
    ...
    print(2)
    
def func2():
    print(3)
    ...
    print(4)
    
func1()
func2()


实现协程有这么几种方法:

  • greenlet,早期模块。
  • yield 关键字。
  • asyncio 装饰器(python 3.4)
  • asyncawait 关键字(python 3.5)


1.1 greenlet 实现协程

pip3 install greenlet
from greenlet import greenlet

def func1():
    print(1)  # 第 1 步:输出 1
    gr2.switch()  # 第 3 步:切换到 func2 函数
    print(2)  # 第 6 步:输出 2
    gr2.switch()  # 第 7 步 切换到 func2 函数,从上一次执行的位置继续向后执行
    
def func2():
    print(3)  # 第 4 步:输出 3
    gr1.switch()  # 第 5 步:切换到 func1 函数,从上一次执行的位置继续向后执行
    print(4)  # 第 8 步:输出 4
    
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch()  # 第 1 步:去执行 func1 函数


1.2 yield 关键字

def func1():
    yield 1
    yield from func2()
    yield 2
    
def func2():
    yield 3
    yield 4
    
f1 = func1()
for item in f1:
    print(item)

伪实现,仅能实现协程的功能。



1.3 asyncio

在 python 3.4 及之后的版本。

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中其它任务
    print(2)
    
@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中其它任务
    print(4)
    
tasks = [
    asyncio.ensure_future(func1())
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

注意:遇到 IO 阻塞自动切换。



1.4 aynsc & await 关键字

在 python 3.5 及之后的版本。

import asyncio

async def func1():
    print(1)
    # 网络 IO 请求:下载一张图片
    await asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中的其它任务。
    print(2)
    
async def func2():
    print(3)
        # 网络 IO 请求:下载一张图片
    await asyncio.sleep(2)  # 遇到 IO 耗时操作,自动化切换到 tasks 中的其它任务。
    print(4)
    
tasks = [
    asyncio.ensure_future(func1())
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))    


2. 协程的意义

在一个线程中如果遇到 IO 等待时间,线程不会傻等,而是利用空闲时间再去干点其它事情。



案例:下载三张图片(网络 IO):

  • 普通方式(同步)

    pip3 install requests
    
    import requests
    
    def download_image(url):
        print("开始下载:", url)
        response = requests.get(url)
        print("下载完成")
    
        file_name = url.rsplit("_")[-1]
        with open(file_name, mode="wb") as file_object:
            file_object.write(response.content)
    
    url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
                "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
                "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", 
    ]
    
    for item in url_list:
        download_image(item)
    


  • 协程方式(异步)

    pip3 install aiohttp
    
    import aiohttp
    import asyncio
    
    async def fetch(session, url):
        print("发送请求:", url)
        async with session.get(url, verify_ssl=False) as response:
            content = await response.content.read()
            file_name = url.rsplit("_")[-1]
            with open(file_name, mode="wb") as file_object:
                file_object.write(content)
                
    async def main():
        async with aiohttp.ClientSession() as session:
            url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
                "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
                "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", 
    ]
            tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
            await asyncio.wait(tasks)
            
            
    if __name__ == "__main__":
        aynscio.run(main())
    


3. 异步编程

3.1 事件循环(Event Loop)

理解成一个死循环,去检测并执行某些代码。

task_list = [task1, task2, task3, ...]

while True:
	executables, completes = [...], [...]  # 在 task_list 中检查所有任务,将可执行和已完成返回
	
	for executable in executables:
		execute executable
		
	for complete in completes:
		remove complete from task_list
		
	if task_list == []:  # 如果 task_list 中的任务都已完成,则终止循环
		break


import asyncio

# 去生成或获取一个事件循环
loop = asyncio.get_event_loop()

# 将任务放到任务列表
loop.run_until_complete(asyncio.wait(tasks))


3.2 快速上手

协程函数(coroutine function):定义函数时 async def (加上 async 关键字)。

协程对象(coroutine object):执行协程函数得到的协程对象。

async def func():
    pass

result = func()

注意到 result = func() 中 call 了 func(),但并不会执行 func() 内部代码,只是得到了 func() 的协程对象。



若要执行协程函数内部代码,需要事件循环去处理协程函数得到的协程对象。

async def func():
    print("come here.")
    
result = func()

loop = async.get_event_loop()
loop.run_until_complete(result)


到了 python 3.7 之后,还有更简便的写法:

async def func():
    print("come here.")
    
result = func()

# loop = async.get_event_loop()
# loop.run_until_complete(result)
async.run(result)  # python 3.7


3.3 await 关键字

await 一般要加上 可等待的对象(协程对象、Future 对象、Task 对象),可以简单理解为 IO 等待(但实际上并没有这么简单)。



示例 1:

import asyncio

async def func():
    print("come here.")
    response = await asyncio.sleep(2)  # 没有什么意义,假设这是一个 IO 等待(例如网络请求)
    print("terminate", response)
    
asyncio.run(func())

在事件循环内部,执行协程对象 func() 时会先执行 print("come here."),接下来会进入 IO 等待,此时事件循环会跳出 func() 函数去执行其它任务,一旦 response 得到返回值(即结束 IO 等待),事件循环会在下一次循环中检测到 IO 等待已经结束,此刻才会继续执行 func() 后面的代码(即 print("terminate", response))。



示例 2(协程对象之间可以嵌套):

async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return "返回值"

async def func():
    print("执行协程函数内部代码")
    
    # 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其它协程(任务)。
    response = await others()
    print("IO 请求结束,结果为:", response)
    

asyncio.run(func())


示例 3:

async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return "返回值"

async def func():
    print("执行协程函数内部代码")
    
    # 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其它协程(任务)。
    response_1 = await others()
    print("IO 请求结束,结果为:", response_1)
    
    response_2 = await others()
    print("IO 请求结束,结果为:", response_2)
    

asyncio.run(func())

await 关键字的含义就是,等待对象的值得到返回结果之后再继续向下运行。



3.4 Task 对象

Tasks are used to schedule coroutines concurrently.

When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon.

简单来说,它可以在事件循环中添加多个任务。

Tasks 用于并发调度协程,通过 asyncio.create_task(协程对象) 的方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外的,还可以用低层级的 loop.create_task()ensure_future() 函数。不建议手动实例化 Task 对象。



示例 1(这种代码写得比较少):

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main 开始")
    
    # 创建协程,将协程封装到一个 Task 对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(fucn())
    
    # 当执行某协程遇到 IO 操作时,会自动化切换执行其它任务。
    # 此处的 await 时等待相对应的协程全都执行完毕并获取结果。
    result_1 = await task1
    result_2 = await task2
    print(result_1, result_2)
    

asyncio.run(main())


示例 2(这种代码应用得比较多):

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main 开始")
    
    # 创建协程任务列表
	task_list = [
        asyncio.create_task(func(), name="n1"),  # 给 task 命名,会在返回集中显示
        asyncio.create_task(func(), name="n2")
    ]
    
	# 不能直接把 task_list 以列表的形式加在 await 之后
    # 注意 await 关键字只接受 coroutine object, task object, future object
    # 此处 done 是一个集合,为 task_list 的返回值
    # pending 在 timeout 不为 None 时有意义,timeout 规定了最长等待时间,
    # 如果超过 timeout,那么还未完成的任务将添加到 pending 中。
	done, _ = await asyncio.wait(task_list, timeout=1)
    print(done)
    

asyncio.run(main())


示例 3:

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

# 创建协程任务列表
task_list = [
    asyncio.create_task(func(), name="n1"),  # 给 task 命名,会在返回集中显示
    asyncio.create_task(func(), name="n2")
]

done, pending = asyncio.run(asyncio.wait(task_list))
print(done)


注意到以上代码会导致程序报错。原因是:asyncio.create_task() 会将协程对象立即添加到事件循环中,但是,事件循环是在 asyncio.run() 中被创造,因此此时并不存在事件循环。应该如此修改:

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

# 创建协程对象列表
task_list = [
    func(), 
    func()
]

# 此时 asyncio 会在创建事件循环之后,在内部将 task_list 中的协程对象添加到事件循环中
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)


3.5 Future 对象

Future 类是 Task 类的父类,即 Task 类继承自 Future 类,Task 对象内部 await 结果的处理基于 Future 对象而来。

A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation.



示例 1:

import asyncio

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    
    # 创建一个任务(Future 对象),这个任务什么都不干。
    future = loop.create_future()
    
    # 等待任务最终结果(Future 对象),没有结果则会一直等下去。
    await future
    
asyncio.run(main())

在上述代码中,由于创建的 Future 对象什么也不干,因此 await future 将一直卡住,无法获得返回结果,所以上述代码是没有实际意义的。但注意,如果某一个时刻突然给 future 赋值,那么 future 立刻可以获得返回结果,并且跳出 await



示例 2(没什么意义,用于理解 Future 对象的作用,即帮助我们等待结果):

async def set_after(future):
    await asyncio.sleep(2)
    future.set_result("666")
    
async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    
    # 创建一个任务(Future 对象),没有绑定任何行为,则这个任务永远不知道什么时候结束。
    future = loop.create_future()
    
    # 创建一个任务(Task 对象),绑定了 set_after 函数,函数内部在 2s 之后会给 future 赋值。
    # 即手动设置 future 任务的最终结果,那么 future 就可以结束了。
    await loop.create_task(set_after(future))
    
    # 等待 Future 对象获取最终结果,否则一直等待下去。
    data = await future
    print(data)
    
asyncio.run(main())


3.6 concurrent 中的 Future 对象

首先注意到,concurrent 中的 Future 对象(concurrent.futures.Future)和 asyncio 中的 Future 对象没有关系。concurrent 中的 Future 对象是当使用线程池、进程池实现异步操作时使用到的对象。

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func(value):
    time.sleep(1)
   	return value
    
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)

# 或创建进程池
# pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):
    # 让 pool 拿出一个线程去执行 func 函数
    future = pool.submit(func, i)
    print(future)

实际中可能会存在两种 Future 对象交叉使用。例如:crm 项目中 80% 都基于协程异步编程 + MySQL,但 MySQL 不支持异步,因此在 MySQL 中使用进程池、线程池做异步编程。



示例 1:

import time
import asyncio
import concurrent.futures

def func1():
    # 某个耗时操作
    time.sleep(2)
    return "complete"

async def main():
    loop = asyncio.get_running_loop()
    
    # 1. Run in the default loop's executor (default to ThreadPoolExecutor)
    # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去
    # 执行 func1 函数,并返回一个 concurrent.futures.Future 对象
    # 第二步:调用 asyncio.wrap_future 将 concurrent.future.Future 对象
    # 包装为 asyncio.Future 对象。
    # 因为 concurrent.futures.Future 对象不支持 await 语法,所以需要包装为 
    # asyncio.Future 对象才能使用。
   	future = loop.run_in_executor(None, func1)  # 返回一个 Future
    # 上面这一步内部会调用 asyncio.wrap_future 将返回的 concurrent.futures.Future 
    # 对象转换为 asyncio.Future 对象
    # 默认 None 意味着创建线程池,若想使用进程池请参考以下注释代码
    result = await future
    print("default thread pool", result)
    
    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor() as pool:
        # result = await loop.run_in_executor(pool, func1)
        # print("custom thread pool", result)
    
    # 3. Run in a custom process pool:
    # with concurrent.futures.ProcessPoolExecutor() as pool:
        # result = await loop.run_in_executor(pool, func1)
        # print("custom process pool", result)

asyncio.run(main())


案例:asyncio + 不支持异步的模块(爬虫)

import asyncio
import requests

async def download_image(url):
    # 发送网络请求,下载图片(遇到网络下载图片的 IO 请求,自动化切换到其它任务)
    print("开始下载:", url)
    
    loop = asyncio.get_event_loop()
    
    # requests 模块默认不支持异步操作,所以就用线程池配合实现了
    future = loop.run_in_executor(None, requests.get, url)
    
    response = await future
    print("下载完成")
    
    # 图片保存到本地文件
    file_name = url.rsplit("_")[-1]
    with open(file_name, mode="wb") as file_object:
        file_object.write(response.content)
        
if __name__ == "__main__":
    url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
            "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
            "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg", 
]
    tasks = [download_image(url) for url in url_list]
   	loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

耗费资源更大,不得已而为之。



3.7 异步迭代器

  • 什么是异步迭代器?

    实现了 __aiter__()__anext__() 方法的对象。__anext__() 必须返回一个 awaitable 对象。async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。由 PEP 492 引入。

  • 什么是异步可迭代对象?

    可在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator。由 PEP 492 引入。



示例:

import asyncio

class Reader(object):
    """
    自定义异步迭代器(同时也是异步可迭代对象)
    """
    
    def __init__(self):
        self.count = 0
    
    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        val = await self.readline()
        if val is None:
            raise StopAsyncIteration
        return val
    
# 以下代码会报错,因为 async for 必须写在协程函数内。
# obj = Reader()
# async for item in obj:
    # print(item)
    
async def func():
    obj = Reader()
    async for item in obj:
        print(item)
        
asyncio.run(func())


3.8 异步上下文管理器

  • 什么是异步上下文管理器?

    此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。



示例:

import asyncio

class AsyncContextManager(object):
    def __init__(self):
        self.conn = conn
    
    async def do_something(self):
        # 异步操作数据库
        return 666
    
    async def __aenter__(self):
        # 异步连接数据库
        self.conn = await asyncio.sleep(1)  # 可以换成连接数据库代码
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        # 异步关闭数据库链接
        await asyncio.sleep(1)
        
        
# 以下代码会报错,因为 async with 必须写在协程函数内。
# obj = AsyncContextManager()
# async with obj:
    # result = await obj.do_something()
    # print(result)

# 或者
# async with AsyncContextManager() as f:
    # result = await f.do_something()
    # print(result)
    
async def func():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)
        
asyncio.run(func())


4. uvloop

uvloopasyncio 事件循环的替代方案,可以提高事件循环效率,性能接近于 go 语言。

pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# Your asyncio code here.

# 内部的事件循环自动会变为 uvloop
asyncio.run()

注意:asgi 是支持异步的 wsgi 网关接口(e.g. uvicorn,内部使用的就是 uvloop)。

标签:异步,协程,await,print,async,Asyncio,def,asyncio
From: https://www.cnblogs.com/chetianjian/p/17715303.html

相关文章

  • KingbaseES V8R3集群运维案例之---流复制异步同步及全同步模式配置
    案例说明:通过案例描述KingbaseESV8R3集群异步、同步及全同步强一致性配置,本案例为一主二备的架构。适用版本:KingbaseESV8R3集群架构:集群复制配置参数说明:1)sync_flag[kingbase@node101bin]$cat../etc/HAmodule.conf|grep-isync_#1->synchronouscluster,0->async......
  • SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据
    一、背景:    利用ThreadPoolTaskExecutor多线程异步批量插入,提高百万级数据插入效率。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。ThreadPoolTaskExecutor是ThreadPoolExecutor的封装,所以,性能更加优秀,推荐ThreadPoolTaskExecutor。二、具体细节:2.1、配置app......
  • c# 异步 与 Link
    异步操作:"异步"指的是代码执行不按照顺序进行,而是通过使用回调函数、Promise、async/await等机制来实现非阻塞式的执行。在异步执行的情况下,代码不会等待前一段代码执行完成,而是继续执行后续的代码。当异步操作完成后,系统会通知代码进行相应的处理。 采用async/await实现......
  • 不知道如何入门Kotlin?《Android版kotlin协程入门进阶实战》带你从入门,带你飞
    作为一名Android开发者,掌握Kotlin语言对于职业发展具有重要意义。随着Google正式将Kotlin确立为Android开发的官方编程语言,Kotlin的地位在Android开发领域迅速攀升。因此,仅仅依靠Java语言进行开发已经不能满足当前市场需求。作为一名Android开发者,学习和掌握Kotl......
  • CompletableFuture 异步多线程D优雅!
    一个示例回顾Future一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度。JDK5新增了Future接口,用于描述一个异步计算的结果。虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,我们必须使用Future.get()的方式阻塞调用线程,或者使用轮......
  • Python并发编程——IO模型、阻塞IO、非阻塞IO、多路复用、异步IO、IO模型比较、select
    文章目录每日测验一IO模型介绍二阻塞IO(blockingIO)三非阻塞IO(non-blockingIO)四多路复用IO(IOmultiplexing)五异步IO(AsynchronousI/O)六IO模型比较分析七selectors模块网络并发知识点梳理网络并发知识点梳理每日测验简述死锁现象你用过哪些队列阐述进......
  • 解决SpringBoot Async异步方法获取不到Security Context
     SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);这样设置的话很不安全,不废话,直接上代码,改造一下AsyncConfig就可以了,线程也安全/***@description:线程池的配置*/@ConfigurationpublicclassAsyncConfig{privates......
  • 记录--如何解决异步请求中的返回值问题
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助在Web开发中,异步请求是一个常见的操作。然而,在异步请求中正确地获取返回值却可能会变得棘手。本文将介绍如何解决异步请求中的返回值问题,并提供一种解决方案。一、问题描述在某个Web应用程序中,用户遇到了无......
  • 基于Spring事务的可靠异步调用实践
    SpringTxAsync组件是仓储平台组(WMS6)自主研发的一个专门用于解决可靠异步调用问题的组件。通过使用SpringTxAsync组件,我们成功地解决了在仓储平台(WMS6)中的异步调用需求。经过近二年多的实践并经历了两次618活动以及两次双11活动,该组件已经在我们的所有应用中稳定运行并成功应用于......
  • 同步异步 阻塞非阻塞
    同步异步描述的事任务的提交方式 描述的事一段代码或者函数同步:任务提交后,原地等待任务的返回结果,等待的过程中不做任何事(干等)程序层面上表现出来的感觉就是卡住了例子:importtime deffunc():time.sleep(3)print('helloworld')  if__name++=='__main__......