协程
1.异步非阻塞,asyncio
2.异步框架: 提升性能
tomado fastapi django3.x asgi aiohttp
协程是什么
协程是不是计算机提供出来的,程序员自己创建的。
协程(coroutine) 被称为微线程,是一种用户动态上下文切换的技术,简而言之使用一个线程在代码中进行切换的过程
1.采用原来的同步指向(代码从上到下执行) 一共使用了3秒时间
def func1():
print(1)
time.sleep(1)
def func2():
print(2)
time.sleep(2)
print('开始秒数' + time.strftime('%S'))
func1()
func2()
print('开始秒数' + time.strftime('%S'))
2.实现线程的方式
1.greenlet 早期模块
2.yield 关键字控制
3.asyncio装饰器 py3.4
4.async await 关键字 py3.5 [推荐]
greenlet实现协程方式
1.pip install greenlet
2.案例
from greenlet import greenlet
def func1():
print('func1-第1次打印')
gr2.switch() # 2.切换到func2 进行执行[会进行记录当前这个函数执行的位置,如果切换回来就从当前开始执行]
print('func1-第2次打印')
gr2.switch() # 4.切换到func2 进行执行
def func2():
print('func2-第1次打印')
gr1.switch() # 3.切换到func1函数执行
print('func2-第2次打印')
# 注册到greenlet 对象
gr1 = greenlet(func1)
gr2 = greenlet(func2)
# print(gr2) # greenlet.greenlet object
gr1.switch() # 1.启动gr1对象的进行执行
yield关键字实现
def func1():
yield 1
yield from func2() # 跳到 执行func2生成器
yield 2
def func2():
yield 3
yield 4
f = func1()
for i in f: # 循环执行生成器
print(i)
asynico实现
实际执行时间2秒
import asyncio
import time
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(1) # 模拟io请求
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2)
print(4)
print('开始秒数' + time.strftime('%S'))
tasks = [asyncio.ensure_future(func1()), asyncio.ensure_future(func2())]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('结束秒数' + time.strftime('%S'))
async与await关键字实现
实际执行时间2秒
@asyncio.coroutine 替换为 async def
yield from 替换为 await
import asyncio
import time
# 通过关键字将普通函数包装为协程函数(coroutine func)
async def func1():
print(1)
# await:1.将asyncio.sleep(1)协程对象 包装为task任务 告知event loop 2.将控制权限还给event loop
await asyncio.sleep(1) # 模拟i/o操作
# 协程函数
async def func2():
print(2)
await asyncio.sleep(2)
async def main():
print('开始秒数' + time.strftime('%S'))
coroutine_list = [func1(), func2()] # coroutine func object list
await asyncio.gather(*coroutine_list)
print('结束秒数' + time.strftime('%S'))
if __name__ == '__main__':
asyncio.run(main()) # main 获取event loop权限执行mian task任务执行 (协程函数对象)
当前的优势:
遇到io操作,就会将线程进行切换执行其他的task进行执行,大大的节省了时间的损耗
协程的意义
在一个线程中如果遇到io等待时间,线程不会傻傻的等待,利用空闲的时间赶其他的事情,大大的提高了效率
1.实例代码:同步方式[排队执行] 使用3秒下载完成
import requests
def download_img(url,img_name):
res = requests.get(url)
with open(f'{img_name}.png', mode='wb', ) as file_obj:
file_obj.write(res.content)
print('本地下载完成' + f'{img_name}.png')
url_list = [
'url2',
'url1',
'url3'
]
print('开始下载时间' + time.strftime('%X'))
for index,url in enumerate(url_list):
print('url:' + url)
download_img(url,index)
print('结束下载' + time.strftime('%X'))
# 如果图片下载时间为1分钟,需要花费3分钟[第一张下载完毕后,才会执行下一次下载]
2.实例代码:协程方式[不等带结果执行下一个] 使用1秒将3张图片下载完成
pip install aiohttp # 需要使用这个模块
import time
import asyncio
import aiohttp
async def download_img(session, url, img_name):
async with session.get(url, verify_ssl=False) as response: # 发送io请求
content = await response.content.read()
with open(f'{img_name}.png', mode='wb', ) as file_obj:
file_obj.write(content)
print('本地下载完成' + f'{img_name}.png')
async def main():
print('开始秒数' + time.strftime('%S'))
async with aiohttp.ClientSession() as session:
url_list = [
'url2',
'url1',
'url3'
]
# 协程对象列表
task_list = [download_img(session, url, index) for index, url in enumerate(url_list)]
await asyncio.gather(*task_list) # 将协程对象列表批量添加到event loop 中作为task任务
print('结束秒数' + time.strftime('%S'))
if __name__ == '__main__':
# asyncio.run(main()) # 使用这种方式会出现RuntimeError: Event loop is closed 异常
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# 使用协程asyncio模块需要配合着能使用协程方式的模块进行操作,不然不会有所作用
异步编程
事件循环'event loop'
事件循环: 称为'event loop'
作用: 可以理解为死循环,去检测并执行某写代码[检测任务列表中得任务]
理解代码:
任务列表 = [任务1,任务2,任务3]
while True:
可执行任务列表,已完成任务的任务列表 = 去任务列表中检测全部任务,将可执行和可完成的任务返回('相当于当前的任务列表中得任务存在多个状态,将每个状态进行分类进行循环,直到任务列表中任务完全没有为止')
for 就绪任务 in 已经准备就绪的任务列表
执行就绪的任务
for 已完成的任务 in 已完成的任务列表
从'任务列表'中进行移除已完成的任务
如果任务列表中得任务全部完成,那么这个死循环就会终止
'''
简单的理解事件循环的操作(大致的流程):
任务列表 = ['正在io请求的任务','已经完成的任务','可以执行的任务']
那么事件循环 event loop 就会先将 '正在io请求的任务无视'
将已完成的任务从任务列表中剔除
执行可以执行的任务
如果可以执行的任务出现【io请求的操作】,那么就会【放任这个任务进行io操作】,并【标记】当前任务时【请求io的任务】,执行下一个任务。
如果【当前任务】已经完成,那么就会添加到【已完成任务列表】中,进行循环从【任务列表】中进行剔除
'''
代码:
import asyncio
# 当前 生成或者获取一个事件循环
loop = asyncio.get_enent_loop()
# 将任务放到任务列表中,让可以理解为让当前的任务可以被事件循环监听
loop.run_until_complete('存放任务')
协程函数与协程对象
1.协程函数 coroutine func
async def 函数名: # async关键字为前缀的函数被称为'协程函数'
pass
2.协程对象 coroutine object
协程函数() # 协程函数+()得到的就是协程对象,当前函数并不会执行
3.协程函数与普通定义函数区别
最大的区别就是,普通函数()当前函数就会进行执行,协程函数()只能获取协程对象[内部代码不会执行]
协程函数的使用(async关键字)
async def func()
print('我是协程函数')
asy = func() # 不会执行
print(asy) # 获取当前协程函数的对象[协程对象]
怎么才能让协程函数执行?
需要借助 asyncio 模块中得事件循环 event loop
import asyncio
async def func():
print('123456')
# 执行方式
1.python3.7之前的执行操作
# 事件循环
loop = asyncio.get_event_loop()
# 使用事件循环执行协程函数
loop.run_until_complete(func())
2.python3.7后执行的方式
asyncio.run(func())
# 关于asyncio.run
内部帮助我们创建获取事件循环,并让事件循环执行传入的函数,当前run()作为一个异步编程的入口进行操作
run 的内部还是完成了原来版本的操作 执行创建或者获取event loop 将任务执行
# 注意:
异步函数一定要不事件循环进行一起使用,没有事件循环异步函数是没办法执行的
协程函数的使用(await关键字)
# await 等待对象[需要等待对象执行完毕后才会执行下面的代码]
# await + 代表的可等待的对象(协程对象,future,task对象[这三个对象可以理解为io等待对象])
await最重要的最用: 会将协程对象包装为一个task任务,告知event loop。
import asyncio
案例1:
async def func():
print('123456') 1.先执行当前打印
# 模拟io等待
res = await asyncio.sleep(1) 2.await会将当前协程对象对象包装为task任务,添加到event loop中,并告知当前task任务需要等待(会记录当前执行的位置),将权限还给event loop,去执行其他不需要等待(等待完成的任务)的task任务
print(res) 3.获取io等待返回值,当前等待结束变为可执行任务(已经有了执行的结果),event loop在下次循环中 会将权限给func()函数,从当前记录位置开始执行(并接受返回值)
asyncio.run(func())
案例2:
async def show():
print('正在下载中...') 4.打印
await asyncio.sleep(2) 5.等待asyncio.sleep(2)协程方法
print('下载完成...') 5. 打印
return '文件.txt' 6. 返回值
async def func():
print('我要开始下载了') 2.打印
# 执行的协程函数的执行结果的返回值
res = await show() 3.包装携程对象 并进去show()进行执行 7.接受返回值
print(res) 8.打印返回值
asyncio.run(func()) # 1.执行func() 协程对象
案例3
import asyncio
async def show():
print('正在下载中...')
await asyncio.sleep(2)
print('下载完成...')
return '文件.txt'
async def func():
print('我要开始下载了')
res1 = await show()
print(res1)
res2 = await show() # 等待第一个await等待对象的值得到结果后才会执行
print(res2)
asyncio.run(func())
只有第一个await协程对象(将这对象包装为一个event loop中得一个任务)执行完毕后,并且获取执行结果后,才会执行下一个协程函数
原因: 因为第二个协程对象并未被await包装为任务,告知event loop。代码是从上而下的执行,并没有执行到第二个await
Task对象
task作用:
1.帮助我们在事件循环(event loop)中添加多个任务
例如:
任务列表 = [task1,task2,task3] # 添加多个任务到event loop 中
当某个任务出现io操作就会就会进行任务切换
2.tasks 用于并发的调度协程,通过asyncio.create_task(协程对象)[3.7上使用]的方式创建task对象,这样可以将协程对象添加到event loop(事件循环)。其他的方式(更低级) loop.create_task() 或者asyncio.evsur_future()[3.7下使用]不建议手动实例化task对象
# 重点:
import asyncio
import time
1.实例1通过create_task将协程对象包装为task任务[2个任务执行2秒]
# 使用的较少create_task创建任务 需要大量的await进行通知event loop调度
async def show(num):
print('show函数正在执行...%s'%num)
await asyncio.sleep(2) # 模拟io阻塞
print('show函数执行完毕...%s'%num)
return 'ok'
async def func():
print('执行func函数')
# 创建task对象,将当前执行show函数添加到事件循环中(event loop)
task1 = asyncio.create_task(show(1))
task2 = asyncio.create_task(show(2))
# 当执行到某个遇到io操作会自动化切换执行其他任务
# 此处的awati是等待响应的协程全都执行完毕后获取的结果
task1_return = await task1
task2_return = await task2
print(task1_return,task2_return)
print(time.strftime('%S'))
asyncio.run(func())
print(time.strftime('%S'))
执行过程说明:
1.现在event loop中存在3个任务: [func(),task1,task2]
2.执行过程模拟(未必详细,大概过程)
2.1 执行脚本时,先执行asyncio.run()执行func()对象,并将任务添加到event loop中
2.2 执行到asyncio.create_task() 添加任务到event loop中[将task1-task2添加],任务列表中已经存在了3个任务
2.3 await task1 就会执行task1任务内部代码,遇到 asyncio.sleep(2) task1任务阻塞,'开始任务切换'
2.4 切换到task2任务(为什么不会切换到func任务中,因为当前的任务是在func任务内,task1在阻塞,那func也在阻塞),遇到 asyncio.sleep(2) task2任务阻塞,'开始任务切换'
2.5.切换到task1任务执行完成 获取返回值
2.6.切换到task2任务执行完成 获取返回值
2.7.func()任务执行完成,整段程序执行完毕
2.实例代码2
# 使用的较多,节省await编写调度
asyncio.gather(*任务列表)批量调度 # 需要通过*解构列表
asyncio.wait(任务列表)批量调度 # 直接添加列表
async def show(num):
print('show函数正在执行...%s'%num)
await asyncio.sleep(2)
print('show函数执行完毕...%s'%num)
return 'ok'
async def func():
print('执行func函数')
# 执行的协程函数的执行结果的返回值
task_list = []
# 循环将任务呢添加到任务列表中task_list
for i in range(5):
task = asyncio.create_task(show(i))
task_list.append(task)
# 方式1:通过gather方法批量通知event loop进行调度 asyncio.gather
res = await asyncio.gather(*task_list)
print(res) # 接受任务执行返回的结果 [task1,task2] 存放结果为添加任务的顺序
# 方式2:直接将列表添加 asyncio.wait
done,pending = await asyncio.wait(task_list,timeout=1) # timeout=None
print(done) # 返回的结果的集合
print(pending) # 返回timeout超时未完成任务集合 与参数timeout(设置后,timeout=2,任务最大执行时间为2秒,超出存入当前集合中)
print(time.strftime('%S'))
asyncio.run(func())
print(time.strftime('%S'))
注意:
# 不是用函数调用方式添加任务需要一下写法
async def show(num):
print('show函数正在执行...%s'%num)
await asyncio.sleep(2)
print('show函数执行完毕...%s'%num)
return 'ok'
task_list = [show(1),show(2)]
'''
不能
task_list = [
asyncio.create_task(show(1)),
asyncio.create_task(show(2))
]
因为当前操作不直接将协程对象添加到事件循环中 event loop 但是当前还没有创建事件循环就会报错
async def func():
pass
asyncio.run(func()) 会先创建事件循环将func()对象包装为任务进行添加并且执行
'''
asyncio.run(asyncio.wait(task_list)) # 当前内部代码如果发现并事件循环就会进行创建
asyncio.Future对象
future是底层对象,是task类的基类
Task继承Future,task对象内部await结果的处理基于future对象的
import asyncio
实例1:
# 在这种情况下创建future对象毫无意义因为没有用返回结果就处于等待
async def func():
# asyncio.get_event_loop() 获取当前执行 run 创建的事件循环对象
loop = asyncio.get_event_loop()
print(loop)
# 创建一个任务(future对象),这个任务什么都不干
fut = loop.create_future()
# 当前就会在等待任务(future对象) 等待结果出现,如果没有就一直等待
await fut
asyncio.run(func())
案例2
# 如果有返回值那么future对象就会执行完毕,如果没有就会阻塞等待返回值
async def func1(fut):
# 4.执行任务
await asyncio.sleep(2)
# 5.给future对象进行设置值
fut.set_result('666')
async def func2():
# 1.获取当前事件循环
loop = asyncio.get_event_loop()
# 2.创建一个future对象
fut = loop.create_future()
# 3.添加一个任务到携程对象中同时将future对象传入做参数
await loop.create_task(func1(fut))
# 6.等待fut对象如果向下执行,没有值阻塞等待值
data = await fut
print(data)
asyncio.run(func2())
####
await asyncio.create_task(函数对象) 创建任务,任务完成后自动执行set_result进行设置返回值 等待的状态不会等待
coucurrent.futures.future对象
使用线程池与进程池实现异步函数时使用的对象与asynio没有关系
# concurrent 项目中得第三方模块不支持异步,只支持线程异步或者进程异步使用当前模块
import time
import asyncio
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor # 线程池
from concurrent.futures.process import ProcessPoolExecutor # 进程池
1.基本使用了解
def func(value):
time.sleep(1)
print(value)
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
# 返回值会赋值给fut
fut = pool.submit(func,i)
print(fut)
2. 使用场景: # 重点
项目中进行异步时80%为异步编程,但是20%使用项目使用的模块不支持异步编程可以使用当前方式
def func1():
print('我是一个普通函数')
return '666'
async def main():
# 获取事件循环
loop = asyncio.get_event_loop()
# 创建一个Future对象并且将函数包装为可以被async执行的协程对象
# 默认使用的是线程模式 run_in_executor(线程池对象或者进程池对象[如果参数为none那么默认使用线程池],普通函数(第三方模块不支持异步方式))
# 1.内部调用ThreadPoolExecutor的 submit 方法申请一个线程 去执行func1函数并返回一个concurrent.futures.Future对象
# 2. asyncio.wrap_future 将 concurrent.futures.Future对象包装为 asyncio.Future的对象
# 因为concurrent.futures.Future对象不支持 await方法所以需要包装
fut = loop.run_in_executor(None, func1)
res = await fut
print(res)
# 使用线程池: 可以传入max_workers 固定线程数量
with ThreadPoolExecutor() as pool:
res = await loop.run_in_executor(pool, func1)
print(res)
# 使用进程池 可以传入max_workers 固定进程数量
with ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, func1)
print(res)
if __name__ == '__main__':
asyncio.run(main())
案例:asyncio+不支持异步模块
import asyncio
import requests
import random
# asyncio + 不支持异步的模块下载
async def download_img(url):
print('开始下载图片', url)
loop = asyncio.get_event_loop()
# requests模块默认不支持一步操作,所以使用线程池方式配合
# run_in_executor方式非异步转为asyncio.future进行异步执行
# 内部默认是线程方式执行,3个人任务就是3个线程,那么资源上比普通的asyncio要大
future = loop.run_in_executor(None, requests.get, url)
ret = await future
file_img_name = str(random.randint(1, 10)) + '.jpg'
with open(file_img_name, mode='wb') as f:
f.write(ret.content)
if __name__ == '__main__':
url_list = [
'https://ts1.cn.mm.bing.net/th/id/R-C.df4462fabf18edd07195679a5f8a37e5?rik=FnNvr9jWWjHCVQ&riu=http%3a%2f%2fseopic.699pic.com%2fphoto%2f50059%2f8720.jpg_wh1200.jpg&ehk=ofb4q76uCls2S07aIlc8%2bab3H5zwrmj%2bhqiZ%2fyw3Ghw%3d&risl=&pid=ImgRaw&r=0',
'https://pic3.zhimg.com/v2-58d652598269710fa67ec8d1c88d8f03_r.jpg?source=1940ef5c',
'https://tse1-mm.cn.bing.net/th/id/OIP-C.xq6cOv82ubIhJY9qkFd5AgHaEK?pid=ImgDet&rs=1',
]
loop = asyncio.get_event_loop()
task_list = [download_img(i) for i in url_list]
loop.run_until_complete(asyncio.wait(task_list))
异步迭代器(不重要)
异步迭代器:
实现了 __aiter__() 和 __anext__() 方法的对象。__anext__必须返回一个awaitable对象 async for会处理异步迭代器 __anext__() 方法返回可执行等待对象,直到引发 stopasynciteration异常
异步可迭代对象
可在 async for 语句中使用的对象,必须通过__aiter__()方法返回一个asynchronous.iterator
import asyncio
# 无太大的应用场景
# 除非使用循环取每一个值,但是又不想使用 内部循环时可以使用
class Reader:
# 自定义异步迭代器(同时可以异步迭代对象)
def __init__(self):
self.count = 0
async def readline(self):
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val is None:
raise StopAsyncIteration
return val
# 正常的迭代器
# r = Reader()
# for i in r: print(i)
# 异步的迭代器
# async fro 必须写在异步函数内
async def func():
r = Reader()
async for i in r:
print(i)
if __name__ == '__main__':
asyncio.run(func())
异步上下文管理器(重要)
上下文管理器:with 上下文 自动帮打开文件写入信息并且在内容写入完毕后自动关闭文件
异步上下文管理器是一样的概念
使用对象定义 __aenter__() 和 __aexit__() 方法 对async with语句中的环境进行控制
# 在类中定义了:
__aenter__() 和 __aexit__() 支持async with上下文方式 [在正常定义的类中如果使用这个方法也可以使用这种方式]
import asyncio
# 作用例如: 链接数据库 关闭数据
# 正常的非异步 只要类中定义了这类方法都可以进行上下文
class AsyncioM:
def __init__(self):
self.count = 0
async def func(self):
return 666
async def __aenter__(self):
# 异步链接数据库操作
self.conn = asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 异步关闭数据库链接
await asyncio.sleep(1)
# async with 方法需要在异步函数内进行使用
# 当使用是会执行当前类中得__aenter__ 这个方法返回什么那么f就是什么[可以进行设置数据库链接]
# 当上下文完成后 就会自动使用__aexit__方法[关闭数据库链接]
async def func1():
async with AsyncioM() as f:
res = await f.func()
print(res)
asyncio.run(func1())
uvloop事件循环
事件循环的替换方案
效率: 默认event loop < uvloop(接近go语言效率)
1.安装
pip3 install uvloop # 当前模块暂时不支持win系统(无法安装)
2.需要将默认事件循环替换为 uvloop
import asyncio
import uvloop
# 将以前的event loop替换为uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 代码与原来的一样
asyncio.run(..)
框架内部使用了asgi -> uviorn(内部使用了unloop)
案例
异步Redis操作
假设:
后端服务 A服务器
redis数据库 B服务器
那么后端代码与redis进行交互式的时候,就会产生网络请求(网络io)链接/操作/断开都是网络io请求。
# 语法都是相近的只不过 同步变异步
# 文档;https://aioredis.readthedocs.io/en/latest/migration/
1.安装
pip install aioredis
2.使用
import asyncio
import aioredis
'''
aioredis模块基础了原来的redis的类使用方式是相同
'''
async def execute(address, password=None):
print('开始执行链接操作', address)
# 1.创建链接
# decode_responses 获取参数时解码
# encoding 用于响应解码的编解码器。
# 链接方式1: 默认的生成方式需要 传入指定的ip端口链接[io等待]
redis = await aioredis.Redis(host='127.0.0.1', port=6379, db=0, decode_responses=True, encoding='utf-8')
# 链接方式2: 通过url进行生成redis 链接对象 cdn链接[io等待]
redis1 = await aioredis.from_url(address, encoding="utf-8", decode_responses=True)
# 2.链接对象
# print(redis)
# print(redis1)
# 3.设置值[io等待]
await redis.set('66789', 'wkx')
await redis1.set('8899', 'wkx')
# 4.获取值[io等待]
key = await redis.get('66789')
print(key)
key1 = await redis1.get('8899')
print(key1)
# 5.关闭链接[io等待]
await redis.close()
await redis1.close()
await redis.wait_closed()
await redis1.wait_closed()
asyncio.run(execute('redis://127.0.0.1:6379/1'))
异步Mysql操作
io操作:链接/设置数据/获取数据/关闭mysql
# 语法都是相近的只不过 同步变异步
# 文档地址:https://aiomysql.readthedocs.io/en/latest/
1.安装
pip install aiomysql
2.普通的mysql链接方式
import pymysql
pymysql.install_as_MySQLdb()
# 1.创建服务链接
serve = pymysql.connect(host='127.0.0.1',port=3306,user='root',password='123456',db='db21')
# 2.创建控制mysql权柄光标
conn = serve.cursor()
# 3.执行sql
sql = conn.execute('show tables')
# 4.获取数据
data = conn.fetchall()
print(data)
# 5.关闭链接
conn.close()
serve.close()
3.使用aiomysql链接与普通的方式相同
内部处理语法相同的
import asyncio
import aiomysql
async def execute():
# 创建mysql服务链接
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123456', db='db21')
# 创建控制mysql权柄光标
cur = await conn.cursor()
# 执行命令
await cur.execute('show tables')
# 获取数据
res = await cur.fetchall()
print(res)
# 关闭链接
await cur.close() # 关闭光标
conn.close() # 关闭链接
asyncio.run(execute())
异步mongodb操作
# mongodb模块使用的异步模块是motor模块
# 语法都是相近的只不过 同步变异步
# 文档地址:https://motor.readthedocs.io/en/stable/
普通的链接方式:
import pymongo
# 设置了密码域账户
url = 'mongodb://root:[email protected]:27017/admin'
mon = pymongo.MongoClient(url)
print(mon.list_database_names())
1.安装
pip install motor
2.使用
import asyncio
from motor import motor_asyncio
url = 'mongodb://root:[email protected]:27017/admin'
async def func():
# 创建链接对象
conn = motor_asyncio.AsyncIOMotorClient(url)
# 获取数据库的全部名称
db = await conn.list_database_names()
print(db)
# 断开链接
conn.close()
asyncio.run(func())
FastAPI框架(为例)
# 文档:https://fastapi.tiangolo.com/zh/
性能比较高的框架(使用的事件循环:uvloop)
fastapi: 是一个api的接口异步框架
uvicorn:基于 uvloop 和 httptools 构建的非常快速的 ASGI '服务器'
1.安装fastapi框架
pip install fastapi
pip install uvicorn(asgi是一个支持异步的 uwsgi web服务器,内部基于uvloop)用来启动异步框架的服务器
2.基本使用[无异步功能,排队使用]
import asyncio
import uvicorn
from fastapi import FastAPI
app = FastAPI()
@app.get('/')
def index():
'''普通的接口程序'''
print(1234)
return {'code': 200, 'msg': '你好,欢迎使用', 'err': ''}
if __name__ == '__main__':
# uvicorn.run(内部的参数)
# app: 运行出口文件的函数 所在文件.py:fastaspi实例对象 例如:测试13:app[测试13.py文件下的app=FastAPI()]
# host: 服务启动的url 默认127.0.0.1
# port: 端口 默认为 8080
# reload : 热更新,如果内容修改重启服务器相当于falsk debug=True
# debug: 同reload
# reload_dirs:设置需要 reload 的目录,List[str] 类型
# log_level: 设置日志等级 默认为等级为info
uvicorn.run('测试13:app')
# 第二种启动方式: 在命令行启动
# uvicorn main:app 命令含义如下:
# main:main.py 文件(一个 Python "模块")。
# app:在 main.py 文件中通过 app = FastAPI() 创建的对象。
# --reload:让服务器在更新代码后重新启动。仅在开发时使用该选项。
3.异步使用
import asyncio
import aioredis
import uvicorn
from fastapi import FastAPI
app = FastAPI()
# 创建连接池redis对象,添加了一个max_connections限制链接
# 方式1
conn = aioredis.from_url('redis://127.0.0.1:6379',max_connections=10)
# 方式2
conn1 = aioredis.ConnectionPool.from_url('redis://127.0.0.1:6379',max_connections=10)
redis = aioredis.Redis(connection_pool=conn1)
@app.get('/')
def index():
'''普通的接口程序'''
print(1234)
return {'code': 200, 'msg': '你好,欢迎使用首页接口', 'err': ''}
@app.get('/red')
async def red():
'''异步接口,用户访问出现io操作,不会进行等待,去接待新的访问用户。'''
print(456777)
await redis.execute_command("set", "my-key", "value")
return {'code': 200, 'msg': '你好,欢迎使用red接口', 'err': ''}
if __name__ == '__main__':
uvicorn.run('测试13:app')
其他的异步框架写法也是相同的
爬虫
1.安装
pip install aiohttp
2.实例
import asyncio
import aiohttp
async def download_img(session, url, img_name):
async with session.get(url, verify_ssl=False) as response: # 发送io请求
content = await response.content.read()
with open(f'{img_name}.png', mode='wb', ) as file_obj:
file_obj.write(content)
print('本地下载完成' + f'{img_name}.png')
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://ts1.cn.mm.bing.net/th/id/R-C.df4462fabf18edd07195679a5f8a37e5?rik=FnNvr9jWWjHCVQ&riu=http%3a%2f%2fseopic.699pic.com%2fphoto%2f50059%2f8720.jpg_wh1200.jpg&ehk=ofb4q76uCls2S07aIlc8%2bab3H5zwrmj%2bhqiZ%2fyw3Ghw%3d&risl=&pid=ImgRaw&r=0',
'https://pic3.zhimg.com/v2-58d652598269710fa67ec8d1c88d8f03_r.jpg?source=1940ef5c',
'https://tse1-mm.cn.bing.net/th/id/OIP-C.xq6cOv82ubIhJY9qkFd5AgHaEK?pid=ImgDet&rs=1',
]
# 协程对象列表
task_list = [download_img(session, url, index) for index, url in enumerate(url_list)]
await asyncio.gather(*task_list) # 将协程对象列表批量添加到event loop 中作为task任务
if __name__ == '__main__':
asyncio.run(main())
asyncio模块
asyncio
1.单进程单线程的程序,不能提高程序的运算速度
2.作用: 比较适合处理等待的任务,网络通信,不存在系统级的上下文切换
3.async分为: coroutine function(协程函数) coroutine object(协程对象)
例如:
import asyncio [3.7版本以上]
# 1.当前这个函数被称为coroutine function(协程函数)[async开头函数的都叫协程函数]
async def main():
print('hello')
await asyncio.sleep(2)
print('world')
# 2.正常的调用main函数[只会获取到当前协程对象,不会执行]
coro = main()
# coroutine object(协程对象) 打印出来的函数是携程的对象<coroutine object ..>
print(coro) # 不会进行运行 提示:运行错误,协程函数main主进程未等待
如果想启动async函数需要进入 event loop(事件循环) 控制程序的状态
1.需要导入asyncio
2.使用asyncio.run()接管整个程序[那么当前的入口程序就需要一个入口函数]
3.asyncio.run()函数: 1.建立起 event loop [事件循环] 2. 会将当前的main函数[coroutine function(协程函数)]作为当前event loop[事件循环]当其中的task[任务]
4.asyncio.run('参数是一个async关键字函数') 函数将程序从 '同步模式' 改变为 '异步模式' 的入口
asyncio使用说明
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return '666'
async def main():
print(f'当前时间{time.strftime("%X")}')
a = await say_after(1, 'hello') # 接受返回值
print(a)
await say_after(2, 'word')
print(f'结束时间{time.strftime("%X")}')
asyncio.run(main())
1.await:
'''
await会将 say_after() 时,变为一个task(任务)
发生:
1. 当前这个 协程函数对象 被包装为一个task(任务),告诉了event loop (循环事件) 当前这个新任务的存在
2. 告诉了event loop (循环事件) 这个task(任务)需要等待 say_after(async函数)执行完毕后 才能接着执行
3.yield出去,告诉event loop(循环事件) 当前这个任务暂时执行不了 请执行其他task(任务)
4. 当前event loop(循环事件) 再次安排当前task(任务)执行时 会将say_after(async函数)中得真正返回值拿出来进行保存(赋值给变量)
'''
2.整段程序执行过程:
'''
1.asyncio.run(main()) 当main函数作为一个task(任务) 给到了event loop(循环事件)中
event loop 在寻找task(任务)时,发现只有一个main task(任务),就运行了main函数
2.main任务再执行先打印了:print(f'当前时间{time.strftime("%X")}')打印
3.执行了 await say_after(1, 'hello') 函数得到了 协程对象(coroutine object)
4.await say_after(1, 'hello') await 将整这个协程对象(coroutine object) 变为一个 task(任务) 放回了event loop 里面 同时告诉event loop需要等待他,将控制权给到了event loop
5.现在event loop 中存在两个任务[main,say_after],main运行不了需要等待say_after运行后才能运行,但是say_after函数内部有await asyncio.sleep(delay)[将当前的sleep作为一个task(任务)添加到event loop中]需要等待,又将控制权转给了event loop
6.say_after需要等待 await asyncio.sleep(delay)任务执行完毕。await asyncio.sleep(delay) 执行完毕后 event loop 让say_after 执行 打印了 print(what)执行完毕 ,将控制权交给了event loop
7.event loop 就会将控制权给main函数执行[main就执行完毕第一个 say_after] 在执行第二个say_after(与上面第一个执行一样)
'''
3. 注意
关于event loop控制权交回的两种方式:1.await 2.函数执行完毕后自动交回控制权 。如果当前任务中是一个死循环那么 event loop就直接卡死了
4.发现问题:当前两个async执行实际时间为3秒钟
因为await做的事情太多,需要将对象变为一个任务,告诉event loop 将控制权交还给event loop还需要等待,那么后面需要执行的协程函数也就需要等待前面的任务(协程函数已经通过awati转为任务后)完成后,后面的协程函数才转为一个任务,才被event loop 调用
问题解决: create_task函数
asyncio中得create_task函数
create_task('coroutine object(协程函数对象)')
create_taskd的作用:将协程对象对象转变为task(任务)注册到event loop中,分担了await一部分功能将协程函数对象包装为一个task(任务)
import asyncio
import time
1.使用create_task
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
# 分担await一部分功能,将协程函数对象包装为task(任务)告诉event loop 这个任务可以执行
# 但是没办执行,执行权在main手中 需要await将event loop控制权拿到并且执行
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'word'))
print(f'当前时间{time.strftime("%X")}')
# 将event loop控制权拿到并且执行
# 在之前await 协程对象时,需要将整个对象包装为task(任务)在进行执行 在将event loop 控制权拿到执行
# await task(任务) 直接获取event loop控制权进行执行(我需要这个task任务完成,将控制权交还给event loop) 在控制权回来的时候将当前任务的返回值保存
await task1
await task2
print(f'结束时间{time.strftime("%X")}')
asyncio.run(main())
2.create_task拿返回值
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return f'{what} - {delay}'
async def main():
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'word'))
print(f'当前时间{time.strftime("%X")}')
# 获取 任务(协程对象)返回值(返回值会在任务执行完毕后赋值给变量)
ret1 = await task1
ret2 = await task2
# 打印 协程对象返回值
print(ret1)
print(ret2)
print(f'结束时间{time.strftime("%X")}')
asyncio.run(main())
3.批量将协程函数转变为task(任务)交给event loop
使用gather('接受多个协程函数对象')
会将多个协程函数对象包装成为任务,给到event loop 进行执行
返回值处理: 会将全部的任务的返回值添加到一个列表中(返回值顺序与gather(task任务顺序一致))
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return f'{what} - {delay}'
async def main():
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'word'))
print(f'当前时间{time.strftime("%X")}')
task_list = [task1, task2] # 任务列表
ret = await asyncio.gather(*task_list)
print(ret) # 全部的返回值['hello - 1', 'word - 2']
print(f'结束时间{time.strftime("%X")}')
asyncio.run(main())
4.gather的另一个好处:将全部的协程对象包装为task(任务)
# 省去 cerate_task 的操作
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return f'{what} - {delay}'
async def main():
print(f'当前时间{time.strftime("%X")}')
task_list = [say_after(1, 'hello'), say_after(2, 'word')] # 协程函数对象列表
ret = await asyncio.gather(*task_list)
print(ret) # 全部的返回值['hello - 1', 'word - 2']
print(f'结束时间{time.strftime("%X")}')
asyncio.run(main())
async理解
event loop 当做大脑,若干个可以执行的task任务,task任务没办控制event loop去到那个task执行的,只能告诉event loop 我在等待那个task执行完毕,由event loop 进行控制权的分配到那个task
将event loop权限给出去的方式: 1.await 2.函数执行完毕
其实可以理解为,只是当前的代码在执行,只不过将等待的事件给利用了起来执行了另外的内容,节约了时间,如果代码中存在等待的操作,那么使用async最好,如果没有等待那么没什么用
要理解:
1.coroutine function(协程函数)
2.coroutine object(协程对象[协程函数加()执行的结果])
3.task(任务[是由协程对象被asyncio方法包装为任务给到event loop中,变为task才能被执行])
4.拿到task返回值需要使用await方法,await方法会获取到event loop控制权(告诉它这个任务需要执行)
标签:异步,task,协程,await,print,loop,asyncio From: https://www.cnblogs.com/wkxz/p/17094146.html