1、协程
1.1 协程是什么
协程不是计算机提供,程序员人为创造。协程(Coroutine)也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。线程是通过时间片抢占来执行程序的,相比与线程的抢占式调度协程de 协作式调度是程序自身控制的切换的,协程的核心思想式主动让出和恢复。协程对比于线程减少了资源消耗。
1.2 协程的发展
python中的tornado、fastapi、django3都支持协程的并发
fastapi->uvicorn->awsgi->uvloop
django3->awsgi
1.2.1 greenlet
greenlet是一个python早期使用协程的第三方的模块
安装
pip install greenlet
from greenlet import greenlet
def func1():
print(1) # 第1步:输出 1
gr2.switch() # 第3步:切换到 func2 函数
print(2) # 第6步:输出 2
gr2.switch() # 第7步:切换到 func2 函数,从上一次执行的位置继续向后执行
def func2():
print(3) # 第4步:输出 3
gr1.switch() # 第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
print(4) # 第8步:输出 4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第1步:去执行 func1 函数
1.2.2 yield
基于python生成器yield和yield from 关键词也能实现协程代码
ef func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
1.2.3 asyncio
在python3.4 出现了aysncio之后,python官方正式支持协程,ayncio比greenlet更加强大,遇到IO操作会自动切换。
import asyncio
@asyncio.coroutine
def func1():
print("func1开始.....")
yield from asyncio.sleep(2)
print("func1完成")
@asyncio.coroutine
def func2():
print("func2开始.....")
yield from asyncio.sleep(2)
print("func2完成")
# 为协程对象创建task
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
1.2.4 async & await
为了让asyncio的操作更加简洁,python3.5引入了async和await关键词,
其中async修饰的函数就是一个协程的对象,而await将会等待协程对象\task对象\future对象的执行
import asyncio
async def func1():
print("func1开始.....")
await asyncio.sleep(2)
print("func1完成")
async def func2():
print("func2开始.....")
await asyncio.sleep(2)
print("func2完成")
# 为协程对象创建task
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
2、协程中的定义
2.1 事件循环
事件循环可以把它当成一个while,这个循环一直在检查任务的状态,将未执行完成的任务执行,剔除已经执行,遇到IO任务进行任务切换。
任务列表 = [ 任务1, 任务2, 任务3,... ]
while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
for 就绪任务 in 已准备就绪的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除 已完成的任务
如果 任务列表 中的任务都已完成,则终止循环
2.2 协程对象
使用async定义的是一个协程函数,加()是并不会执行而是生成一个协程对象,并且放入到事件循环中才能执行。
放入到事件循环中有两种方式
直接加入事件循环中
import asyncio
async def func():
print("正在执行....")
# loop = asyncio.get_event_loop()
# loop.run_until_complete( func() )
asyncio.run(func())
构造成一个task放入
import asyncio
async def func():
print("正在执行....1")
tasks = [asyncio.ensure_future(func())]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
2.3 await
await + 可等待的对象(协程对象, Future, Task)
协程对象
# 协程对象1
import asyncio
async def func():
print("开始执行func1")
await asyncio.sleep(2)
print("结束执行func1")
asyncio.run(func())
# 协程对象2
import asyncio
async def func1():
print("开始func1")
await asyncio.sleep(2)
print('结束func1')
return 'ok'
async def func2():
print("开始func2")
response = await func1()
print(f"结束func2:{response}")
asyncio.run(func2())
asyncio.run( func() )
Futrue对象
# Future对象
import asyncio
async def execute_fut(fut):
print("开始执行fut")
await asyncio.sleep(2)
fut.set_result(222)
print("结束执行fut")
async def main():
print("开始执行main")
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务Future对象
fut = loop.create_future()
await execute_fut(fut)
# Future没有结果则会一直等下去。
response = await fut
print(response)
print("结束执行main")
asyncio.run(main())
Task对象
import asyncio
async def func():
print("开始执行func")
await asyncio.sleep(2)
print("结束执行func")
return "ok"
async def main():
print("开始执行main")
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
response1 = await task1
response2 = await task2
print(response1, response2)
print("结束执行main")
asyncio.run(main())
等同于
import asyncio
async def func():
print("开始执行func")
await asyncio.sleep(2)
print("结束执行func")
return "ok"
tasks = [
asyncio.ensure_future(func()),
asyncio.ensure_future(func())
]
print("开始执行main")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print("结束执行main")
2.4 Task对象
在协程中可以将多个协程对象封装成一个Task去执行,Task中的协程对象会并发执行,可以通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task()
函数以外,还可以用低层级的 loop.create_task()
或 ensure_future()
函数。
# 协程对象串行执行
import asyncio
async def func():
print("开始执行func")
await asyncio.sleep(2)
print("结束执行func")
return "ok"
async def main():
print("开始执行main")
response1 = await func()
response2 = await func()
print(response1, response2)
print("结束执行main")
asyncio.run(main())
# task中的协程会并发执行
import asyncio
async def func():
print("开始执行func")
await asyncio.sleep(2)
print("结束执行func")
return "ok"
async def main():
print("开始执行main")
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
response1 = await task1
response2 = await task2
print(response1, response2)
print("结束执行main")
asyncio.run(main())
对比上面两个例子,1中会串行打印开始执行func1
,而2中会同时打印开始执行func
1`
import asyncio
async def func():
print("开始执行func")
await asyncio.sleep(2)
print("结束执行func")
return "ok"
async def main():
print("开始执行main")
task1 = asyncio.create_task(func(), name="func1")
task2 = asyncio.create_task(func(), name="func2")
done, pending = await asyncio.wait([task1, task2], timeout=3)
print(done)
print(pending)
print("结束执行main")
asyncio.run(main())
上面的代码中可以加多个Task加入到一个task_list中,并使用asyncio.wait等待结果,Task对象在超时时间未完成将出现在pending中,已经完成的将出现在done中。注意:asyncio.wait传入的是一个列表,列表中元素的Task对象。
import asyncio
async def func():
print("开始执行func")
await asyncio.sleep(2)
print("结束执行func")
return "ok"
print("开始执行main")
asyncio.run(asyncio.wait([func(), func()]))
print("结束执行main")
代码可以简写成上面的形式直接在run中执行协程对象的列表
2.5 asyncio.Future对象
Task继承Future,Task对象内部await结果的处理基于Future对象来的。
实例1:
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),这个任务什么都不干。
fut = loop.create_future()
# 等待任务最终结果(Future对象),没有结果则会一直等下去。
await fut
asyncio.run(main())
实例2:
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result("ok")
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
fut = loop.create_future()
# 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
# 即手动设置future任务的最终结果,那么fut就可以结束了。
await loop.create_task( set_after(fut) )
# 等待 Future对象获取, 等到结果返回
data = await fut
print(data)
asyncio.run( main() )
2.6 concurrent 中的Future对象
此Future是线程池或进程池中创建的Future对象,跟协程的Future无关联
import time
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
def func(value):
print(f"开始执行func:{value}")
time.sleep(3)
print(f"执行完成func:{value}")
return 123
print("开始执行main")
for i in range(10):
print(f"创建Future:{i + 1}")
fut = pool.submit(func, i+1)
time.sleep(10)
print("执行完成main")
注意: 线程池的Future可以转换成协程的Future
import asyncio
import requests
async def download_image(url):
# 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
print("开始下载:", url)
loop = asyncio.get_event_loop()
# requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
future = loop.run_in_executor(None, requests.get, url)
response = await future
print('下载完成')
# 图片保存到本地文件
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
如果遇到不支持的协程的模块的时候,可以使用线程或进程池的Future转化协程的Future,底层实际上还是用线程或进程实现的并发操作
2.7 异步上下文
实现了__enter__()和__exit__()的类为上下文管理器,而实现了__aenter__()和__aexit__()为异步上下文管理器,可以使用async with 进行上下文管理
import asyncio
class AsyncContextManager:
def __init__(self):
self.conn = conn
async def do_something(self):
# 异步操作数据库
return 666
async def __aenter__(self):
# 异步链接数据库
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
# 异步关闭数据库链接
await asyncio.sleep(1)
async def func():
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
asyncio.run( func() )
2.8 uvloop
uvloop的协程性能强于asyncio的,可以用uvloop替代asyncio的事件循环
安装: pip install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写asyncio的代码,与之前写的代码一致。
# 内部的事件循环自动化会变为uvloop
asyncio.run(...)
3 案例
3.1 异步redis
安装: pip install aioredis
import asyncio
import aioredis
import time
class RedisArray(object):
def __init__(self, url="redis://192.168.1.200", kname="my_list"):
"""
:param url: redis url
:param kname: list name
"""
self.url = url
self.kname = kname
async def create_pool(self):
"""
创建连接池
"""
self.aioClient = await aioredis.from_url(self.url)
async def lpush(self, kvalue):
"""
往列表左边, 也就是头部插入元素
:param kvalue: element value
"""
try:
# 插入加入延时3秒
await asyncio.sleep(3)
return await self.aioClient.lpush(self.kname, kvalue)
except Exception as e:
print(f"lpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
return None
async def rpush(self, kvalue):
"""
往列表右边, 也就是尾部插入元素
:param kvalue: element value
"""
try:
# 插入加入延时3秒
await asyncio.sleep(3)
return await self.aioClient.rpush(self.kname, kvalue)
except Exception as e:
print(f"rpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
return None
async def get(self, start=0, end=-1):
"""
获取元素
:param start: element start index
:param end: element end index
"""
try:
# 获取加入延时3秒
await asyncio.sleep(3)
result = await self.aioClient.lrange(self.kname, 0, -1)
result = [str(item, 'utf-8') for item in result]
print(result)
return result
except Exception as e:
print(f"读取: {self.kname}/{start}/{end}错误: {str(e)}")
return []
async def close(self):
"""
关闭redis连接
"""
await self.aioClient.close()
async def main():
start_time = time.time()
# 使用异步事件循环执行插入操作
RA = RedisArray()
await RA.create_pool()
await RA.lpush("test22")
await RA.rpush("test23")
task1 = asyncio.ensure_future(RA.get())
task2 = asyncio.ensure_future(RA.get())
await task1
await task2
await RA.close()
print(f"操作耗时:{time.time()-start_time}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# 总耗时9秒
示例中lpush,rpush,get函数中分别都加了3秒的延迟,但是总的耗时只有9秒,说明get数据是异步的。
3.2 异步mysql
import time
import aiomysql
import asyncio
async def select():
conn = await aiomysql.connect(host='192.168.1.200', port=3306, user='root', password='123456', db='test')
sql = "select * from users"
cursor = await conn.cursor()
await asyncio.sleep(3)
await cursor.execute(sql)
result = await cursor.fetchall()
await cursor.close()
conn.close()
return result
async def main():
start_time = time.time()
task1 = asyncio.ensure_future(select())
task2 = asyncio.ensure_future(select())
result1 = await task1
result2 = await task2
print(result1)
print(result2)
print(f"执行耗时:{time.time() - start_time}")
asyncio.run(main())
# 总耗时3秒
在select函数中增加耗时3秒。main函数中两次查询MySQL的数据总共耗时3秒, 说明两次查询时异步的。
3.3 Fastapi
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
# 创建一个redis连接池
redis = aioredis.from_url('redis://192.168.1.200:6379')
@app.get("/")
def index():
""" 普通操作接口 """
return {"message": "Hello World"}
@app.get("/red")
async def red():
""" 异步操作接口 """
print("请求来了")
await asyncio.sleep(3)
# 连接池获取一个连接
async with redis.client() as conn:
await conn.set("my-key", "value")
result = await conn.get("my-key")
print(result)
return result
if __name__ == '__main__':
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")
3.4 异步爬虫
import aiohttp
import asyncio
async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
text = await response.text()
print("得到结果:", url, len(text))
return text
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com'
]
tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list]
done,pending = await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run( main() )
3.5 异步服务和socket对比
# socket 服务端
import socket
s = socket.socket()
port = 2324
s.bind(("127.0.0.1", port))
s.listen(5)
while True:
c, addr = s.accept()
print('连接地址:', addr)
c.send("欢迎进入!".encode(encoding="gbk"))
while True:
# 当多个客户端来连接的时候将阻塞在这里
data = c.recv(1024)
print("收到:", data.decode("gbk"))
c.send(data)
c.close()
# socket 客户福
import socket
s = socket.socket()
port = 2324
s.connect(("127.0.0.1", port))
while True:
msg = s.recv(1024)
print(msg.decode(encoding="gbk"))
data = input()
s.send(data.encode(encoding="gbk"))
这个代码只适合一个客户端和一个服务端的通信,因为当服务端接收到客服端的请求执行accept到下一个while的循环里面时会一直阻塞状态,无法接收其他的客户端的请求,想要实现多个客户端通信,并且在服务端对每一个连接开启线程。而使用aysncio则无需如此
import asyncio
async def script_handle(reader, writer):
while True:
data = await asyncio.wait_for(reader.readline(), None)
if not data:
print('client disconnected')
writer.close() # 关闭套接字
await writer.wait_closed() # 等待套接字完全关闭
return
print("收到:", data.decode())
writer.write(b'>'+data)
await writer.drain()
async def main():
server = await asyncio.start_server(script_handle, host='', port=2324)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())
在cmd窗口中输入telnent 127.0.0.1 2324
如图所示,当多个客户端去连接的时候,服务端会使用协程异步接收请求、处理消息
标签:异步,await,python,编程,func,print,async,def,asyncio From: https://blog.csdn.net/weixin_43413871/article/details/136821416