首页 > 编程语言 >python异步编程

python异步编程

时间:2024-03-22 17:31:56浏览次数:37  
标签:异步 await python 编程 func print async def asyncio

1、协程

1.1 协程是什么

协程不是计算机提供,程序员人为创造。协程(Coroutine)也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。线程是通过时间片抢占来执行程序的,相比与线程的抢占式调度协程de 协作式调度是程序自身控制的切换的,协程的核心思想式主动让出和恢复。协程对比于线程减少了资源消耗。

1.2 协程的发展

python中的tornado、fastapi、django3都支持协程的并发

fastapi->uvicorn->awsgi->uvloop

django3->awsgi

1.2.1 greenlet

greenlet是一个python早期使用协程的第三方的模块

安装

pip install greenlet

from greenlet import greenlet


def func1():
    print(1)        # 第1步:输出 1
    gr2.switch()    # 第3步:切换到 func2 函数
    print(2)        # 第6步:输出 2
    gr2.switch()    # 第7步:切换到 func2 函数,从上一次执行的位置继续向后执行


def func2():
    print(3)        # 第4步:输出 3
    gr1.switch()    # 第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
    print(4)        # 第8步:输出 4


gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第1步:去执行 func1 函数
1.2.2 yield

基于python生成器yield和yield from 关键词也能实现协程代码

ef func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4


f1 = func1()
for item in f1:
    print(item)
1.2.3 asyncio

在python3.4 出现了aysncio之后,python官方正式支持协程,ayncio比greenlet更加强大,遇到IO操作会自动切换。

import asyncio


@asyncio.coroutine
def func1():
    print("func1开始.....")
    yield from asyncio.sleep(2)
    print("func1完成")


@asyncio.coroutine
def func2():
    print("func2开始.....")
    yield from asyncio.sleep(2)
    print("func2完成")


# 为协程对象创建task
tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]
# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
1.2.4 async & await

为了让asyncio的操作更加简洁,python3.5引入了async和await关键词,

其中async修饰的函数就是一个协程的对象,而await将会等待协程对象\task对象\future对象的执行

import asyncio


async def func1():
    print("func1开始.....")
    await asyncio.sleep(2)
    print("func1完成")


async def func2():
    print("func2开始.....")
    await asyncio.sleep(2)
    print("func2完成")


# 为协程对象创建task
tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]
# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

2、协程中的定义

2.1 事件循环

事件循环可以把它当成一个while,这个循环一直在检查任务的状态,将未执行完成的任务执行,剔除已经执行,遇到IO任务进行任务切换。



任务列表 = [ 任务1, 任务2, 任务3,... ]

while True:
    可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
    
    for 就绪任务 in 已准备就绪的任务列表:
        执行已就绪的任务
        
    for 已完成的任务 in 已完成的任务列表:
        在任务列表中移除 已完成的任务

	如果 任务列表 中的任务都已完成,则终止循环

2.2 协程对象

使用async定义的是一个协程函数,加()是并不会执行而是生成一个协程对象,并且放入到事件循环中才能执行。

放入到事件循环中有两种方式

直接加入事件循环中

import asyncio
​
async def func():
    print("正在执行....")
 
# loop = asyncio.get_event_loop()
# loop.run_until_complete( func() )
asyncio.run(func()) 

构造成一个task放入

import asyncio
​
​
async def func():
    print("正在执行....1")
​
tasks = [asyncio.ensure_future(func())]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

2.3 await

await + 可等待的对象(协程对象, Future, Task)

协程对象

# 协程对象1
import asyncio
​
async def func():
    print("开始执行func1")
    await asyncio.sleep(2)
    print("结束执行func1")
​
asyncio.run(func())

# 协程对象2
import asyncio


async def func1():
    print("开始func1")
    await asyncio.sleep(2)
    print('结束func1')
    return 'ok'


async def func2():
    print("开始func2")
    response = await func1()
    print(f"结束func2:{response}")


asyncio.run(func2())
asyncio.run( func() )

Futrue对象

# Future对象
import asyncio
​
​
async def execute_fut(fut):
    print("开始执行fut")
    await asyncio.sleep(2)
    fut.set_result(222)
    print("结束执行fut")
​
​
async def main():
    print("开始执行main")
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
​
    # 创建一个任务Future对象
    fut = loop.create_future()
    await execute_fut(fut)
    # Future没有结果则会一直等下去。
    response = await fut
    print(response)
    print("结束执行main")
​
asyncio.run(main())

Task对象

import asyncio
​
​
async def func():
    print("开始执行func")
    await asyncio.sleep(2)
    print("结束执行func")
    return "ok"
​
​
async def main():
    print("开始执行main")
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(func())
    response1 = await task1
    response2 = await task2
    print(response1, response2)
    print("结束执行main")
​
asyncio.run(main())

等同于

import asyncio
​
​
async def func():
    print("开始执行func")
    await asyncio.sleep(2)
    print("结束执行func")
    return "ok"
​
​
tasks = [
    asyncio.ensure_future(func()),
    asyncio.ensure_future(func())
]
​
print("开始执行main")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print("结束执行main")

2.4 Task对象

在协程中可以将多个协程对象封装成一个Task去执行,Task中的协程对象会并发执行,可以通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task()ensure_future() 函数。

# 协程对象串行执行
import asyncio
​
​
async def func():
    print("开始执行func")
    await asyncio.sleep(2)
    print("结束执行func")
    return "ok"
​
​
async def main():
    print("开始执行main")
    response1 = await func()
    response2 = await func()
    print(response1, response2)
    print("结束执行main")
​
asyncio.run(main())
​
# task中的协程会并发执行
import asyncio
​
​
async def func():
    print("开始执行func")
    await asyncio.sleep(2)
    print("结束执行func")
    return "ok"
​
​
async def main():
    print("开始执行main")
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(func())
    response1 = await task1
    response2 = await task2
    print(response1, response2)
    print("结束执行main")
​
asyncio.run(main())

对比上面两个例子,1中会串行打印开始执行func1,而2中会同时打印开始执行func1`

import asyncio
​
​
async def func():
    print("开始执行func")
    await asyncio.sleep(2)
    print("结束执行func")
    return "ok"
​
​
async def main():
    print("开始执行main")
    task1 = asyncio.create_task(func(), name="func1")
    task2 = asyncio.create_task(func(), name="func2")
    done, pending = await asyncio.wait([task1, task2], timeout=3)
    print(done)
    print(pending)
    print("结束执行main")
​
asyncio.run(main())

上面的代码中可以加多个Task加入到一个task_list中,并使用asyncio.wait等待结果,Task对象在超时时间未完成将出现在pending中,已经完成的将出现在done中。注意:asyncio.wait传入的是一个列表,列表中元素的Task对象。

import asyncio
​
​
async def func():
    print("开始执行func")
    await asyncio.sleep(2)
    print("结束执行func")
    return "ok"
​
​
print("开始执行main")
asyncio.run(asyncio.wait([func(), func()]))
print("结束执行main")
代码可以简写成上面的形式直接在run中执行协程对象的列表

2.5 asyncio.Future对象

Task继承Future,Task对象内部await结果的处理基于Future对象来的。

实例1:

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
​
    # 创建一个任务(Future对象),这个任务什么都不干。
    fut = loop.create_future()
​
    # 等待任务最终结果(Future对象),没有结果则会一直等下去。
    await fut
​
asyncio.run(main())

实例2:

import asyncio
​
​
async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result("ok")
​
​
async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
​
    # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
    fut = loop.create_future()
​
    # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
    # 即手动设置future任务的最终结果,那么fut就可以结束了。
    await loop.create_task(  set_after(fut) )
​
    # 等待 Future对象获取, 等到结果返回
    data = await fut
    print(data)
​
asyncio.run( main() )

2.6 concurrent 中的Future对象

此Future是线程池或进程池中创建的Future对象,跟协程的Future无关联

import time
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
​
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
​
​
def func(value):
    print(f"开始执行func:{value}")
    time.sleep(3)
    print(f"执行完成func:{value}")
    return 123
​
​
print("开始执行main")
for i in range(10):
    print(f"创建Future:{i + 1}")
    fut = pool.submit(func, i+1)
​
​
time.sleep(10)
print("执行完成main")

注意: 线程池的Future可以转换成协程的Future

import asyncio
import requests
​
​
async def download_image(url):
    # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
    print("开始下载:", url)
​
    loop = asyncio.get_event_loop()
    # requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
    future = loop.run_in_executor(None, requests.get, url)
    response = await future
    print('下载完成')
    # 图片保存到本地文件
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)
​
​
if __name__ == '__main__':
    url_list = [
        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]
​
    tasks = [download_image(url) for url in url_list]
​
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

​如果遇到不支持的协程的模块的时候,可以使用线程或进程池的Future转化协程的Future,底层实际上还是用线程或进程实现的并发操作

2.7 异步上下文

实现了__enter__()和__exit__()的类为上下文管理器,而实现了__aenter__()和__aexit__()为异步上下文管理器,可以使用async with 进行上下文管理

import asyncio
​
​
class AsyncContextManager:
    def __init__(self):
        self.conn = conn
        
    async def do_something(self):
        # 异步操作数据库
        return 666
​
    async def __aenter__(self):
        # 异步链接数据库
        self.conn = await asyncio.sleep(1)
        return self
​
    async def __aexit__(self, exc_type, exc, tb):
        # 异步关闭数据库链接
        await asyncio.sleep(1)
​
async def func():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)
​
asyncio.run( func() )

2.8 uvloop

uvloop的协程性能强于asyncio的,可以用uvloop替代asyncio的事件循环

安装: pip install uvloop

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
​
# 编写asyncio的代码,与之前写的代码一致。
​
# 内部的事件循环自动化会变为uvloop
asyncio.run(...)

3 案例

3.1 异步redis

安装: pip install aioredis

import asyncio
import aioredis
import time
​
​
class RedisArray(object):
    def __init__(self, url="redis://192.168.1.200", kname="my_list"):
        """
        :param url: redis url
        :param kname: list name
        """
        self.url = url
        self.kname = kname
​
    async def create_pool(self):
        """
        创建连接池
        """
        self.aioClient = await aioredis.from_url(self.url)
​
    async def lpush(self, kvalue):
        """
        往列表左边, 也就是头部插入元素
        :param kvalue: element value
        """
        try:
            # 插入加入延时3秒
            await asyncio.sleep(3)
            return await self.aioClient.lpush(self.kname, kvalue)
        except Exception as e:
            print(f"lpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
            return None
​
    async def rpush(self, kvalue):
        """
        往列表右边, 也就是尾部插入元素
        :param kvalue: element value
        """
        try:
            # 插入加入延时3秒
            await asyncio.sleep(3)
            return await self.aioClient.rpush(self.kname, kvalue)
        except Exception as e:
            print(f"rpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
            return None
​
    async def get(self, start=0, end=-1):
        """
        获取元素
        :param start: element start index
        :param end: element end index
        """
        try:
            # 获取加入延时3秒
            await asyncio.sleep(3)
            result = await self.aioClient.lrange(self.kname, 0, -1)
            result = [str(item, 'utf-8') for item in result]
​
            print(result)
            return result
        except Exception as e:
            print(f"读取: {self.kname}/{start}/{end}错误: {str(e)}")
            return []
​
    async def close(self):
        """
        关闭redis连接
        """
        await self.aioClient.close()
​
​
async def main():
    start_time = time.time()
    # 使用异步事件循环执行插入操作
    RA = RedisArray()
    await RA.create_pool()
    await RA.lpush("test22")
    await RA.rpush("test23")
    task1 = asyncio.ensure_future(RA.get())
    task2 = asyncio.ensure_future(RA.get())
    await task1
    await task2
    await RA.close()
    print(f"操作耗时:{time.time()-start_time}")
​
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# 总耗时9秒

示例中lpush,rpush,get函数中分别都加了3秒的延迟,但是总的耗时只有9秒,说明get数据是异步的。

3.2 异步mysql

import time
import aiomysql
import asyncio
​
​
async def select():
    conn = await aiomysql.connect(host='192.168.1.200', port=3306, user='root', password='123456', db='test')
    sql = "select * from users"
    cursor = await conn.cursor()
    await asyncio.sleep(3)
    await cursor.execute(sql)
    result = await cursor.fetchall()
    await cursor.close()
    conn.close()
    return result
​
​
async def main():
    start_time = time.time()
    task1 = asyncio.ensure_future(select())
    task2 = asyncio.ensure_future(select())
    result1 = await task1
    result2 = await task2
    print(result1)
    print(result2)
    print(f"执行耗时:{time.time() - start_time}")
​
asyncio.run(main())
# 总耗时3秒

在select函数中增加耗时3秒。main函数中两次查询MySQL的数据总共耗时3秒, 说明两次查询时异步的。

3.3 Fastapi

import asyncio
​
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
​
app = FastAPI()
​
# 创建一个redis连接池
redis = aioredis.from_url('redis://192.168.1.200:6379')
​
​
@app.get("/")
def index():
    """ 普通操作接口 """
    return {"message": "Hello World"}
​
​
@app.get("/red")
async def red():
    """ 异步操作接口 """
​
    print("请求来了")
​
    await asyncio.sleep(3)
    # 连接池获取一个连接
​
    async with redis.client() as conn:
        await conn.set("my-key", "value")
        result = await conn.get("my-key")
    print(result)
    return result
​
​
if __name__ == '__main__':
    uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")

3.4 异步爬虫

import aiohttp
import asyncio
​
​
async def fetch(session, url):
    print("发送请求:", url)
    async with session.get(url, verify_ssl=False) as response:
        text = await response.text()
        print("得到结果:", url, len(text))
        return text
​
​
async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com'
        ]
        tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list]
​
        done,pending = await asyncio.wait(tasks)
​
​
if __name__ == '__main__':
    asyncio.run( main() )

3.5 异步服务和socket对比

# socket 服务端

import socket

s = socket.socket()
port = 2324
s.bind(("127.0.0.1", port))

s.listen(5)
while True:
    c, addr = s.accept()
    print('连接地址:', addr)
    c.send("欢迎进入!".encode(encoding="gbk"))
    while True:
        # 当多个客户端来连接的时候将阻塞在这里
        data = c.recv(1024)
        print("收到:", data.decode("gbk"))
        c.send(data)
    c.close()

# socket 客户福

import socket

s = socket.socket()
port = 2324

s.connect(("127.0.0.1", port))
while True:
    msg = s.recv(1024)
    print(msg.decode(encoding="gbk"))
    data = input()
    s.send(data.encode(encoding="gbk"))

这个代码只适合一个客户端和一个服务端的通信,因为当服务端接收到客服端的请求执行accept到下一个while的循环里面时会一直阻塞状态,无法接收其他的客户端的请求,想要实现多个客户端通信,并且在服务端对每一个连接开启线程。而使用aysncio则无需如此

import asyncio


async def script_handle(reader, writer):
    while True:
        data = await asyncio.wait_for(reader.readline(), None)
        if not data:
            print('client disconnected')
            writer.close()  # 关闭套接字
            await writer.wait_closed()  # 等待套接字完全关闭
            return
        print("收到:", data.decode())
        writer.write(b'>'+data)
        await writer.drain()


async def main():
    server = await asyncio.start_server(script_handle, host='', port=2324)
    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')
    async with server:
        await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(main())

在cmd窗口中输入telnent 127.0.0.1 2324

如图所示,当多个客户端去连接的时候,服务端会使用协程异步接收请求、处理消息

标签:异步,await,python,编程,func,print,async,def,asyncio
From: https://blog.csdn.net/weixin_43413871/article/details/136821416

相关文章

  • 异步消息队列Celery
    1.什么是Celery4.4.0|Celery中文手册(celerycn.io)1.1介绍Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。Celery的架构由三部分组成,消息中间件(messagebroker),任务执行单元(worker)和任务执行结果存储(tas......
  • 为什么说金融/财务人需要学Python??
    Python是财务人和金融人的多功能工具箱!它有非常多的细分应用,接下来我将给你详细介绍为什么财务、金融方向需要学Python以及有哪些方向的细分应用!一、为什么金融/财务人需要Python?想在行业中崭露头角?那你可不能忽视Python这个神器!首先,它是个数据处理高手,无论是股票......
  • 2020-7-28-并发编程
    概述、生产者消费者模型、锁对象、集合的线程安全问题、Callable的使用、计数器、队列、线程池、ForkJoin、异步回调、单例模式、CAS、锁概述1多线程下变量访问存在问题变量访问不可见性2JMM特点所有共享变量存于主内存中每个线程有自己的工作内存线程对变量的操作都必须在......
  • Python中常用模块有哪些?
    1.importosos模块提供很多方法用来处理文件和目录 2.importsyssys模块提供了一系列运行环境的变量和函数,例如argv变量,argv变量是一个包含命令行参数的列表 3.fromminioimportMiniominio是一个对象存储服务,可以用来存储大量的数据,比如图片,视频,文档等 4.frommysq......
  • python运算符
    运算符:算数运算符:赋值运算符:比较运算符: 逻辑运算符:  ......
  • python 使用 ffmpeg合成音视频
    moviepy太慢了,ffmpeg似乎快一点1.从github下载安装https://github.com//BtbN/FFmpeg-Builds/releases  下载了ffmpeg-master-latest-win64-gpl-shared.zip 直接解压到某个目录中,如:D:\ffmpeg  ,并添加环境变量,将  D:\ffmpeg 添加到path变量中(win10)在命令行运行 ffm......
  • 肖sir__python的安装2.1
     一、Python安装 python现在主要就是python2和python3,目前python3最新是3.8,考虑稳定性我们用3.6,3.7等第一步:下载Python安装包python现在主要就是python2和python3,目前python3最新是3.8,考虑稳定性我们用3.6,3.7等 1.python的官方网站下载python的安装包 地址:https://ww......
  • 【Python脚本随手笔记】 ---基于鸿蒙系统LiteOS实现差分编译脚本(下篇)
    ......
  • python中的多继承理解
    在python的多继承中,父类的初始化顺序遵循所谓方法解析顺序(MethodResolutionOrder,MRO)的机制。python使用C3线性化算法来确定多继承类的MRO:1.目标:创建一个一致的线性继承顺序,同时保持父类的相对顺序和子类优先原则。2.子类优先:子类总是在其父类之前出现。从而子类......
  • Python函数每日一讲12 - len()
    引言在Python编程中,经常会遇到需要获取对象的长度或者元素个数的情况。而len()函数就是用来返回对象的长度或者元素个数的。通过本文的介绍,你将学习到len()函数的基本用法以及在实际应用中的一些技巧,帮助你更好地利用这一函数解决问题。语句概览len()函数用于返回对象的长度或......