首页 > 编程语言 >异步编程(协程asyncio)

异步编程(协程asyncio)

时间:2023-02-05 23:12:49浏览次数:37  
标签:异步 task 协程 await print loop asyncio

协程

1.异步非阻塞,asyncio
2.异步框架: 提升性能
    tomado fastapi django3.x asgi aiohttp 

协程是什么

协程是不是计算机提供出来的,程序员自己创建的。
​
协程(coroutine) 被称为微线程,是一种用户动态上下文切换的技术,简而言之使用一个线程在代码中进行切换的过程
​
1.采用原来的同步指向(代码从上到下执行) 一共使用了3秒时间
​
def func1():
    print(1)
    time.sleep(1)
​
def func2():
    print(2)
    time.sleep(2)
​
print('开始秒数' + time.strftime('%S'))
func1()
func2()
print('开始秒数' + time.strftime('%S'))
​
​
​
2.实现线程的方式
​
1.greenlet 早期模块
2.yield 关键字控制
3.asyncio装饰器 py3.4
4.async await 关键字 py3.5 [推荐]

greenlet实现协程方式

1.pip install greenlet
​
2.案例
from greenlet import greenlet
​
​
def func1():
    print('func1-第1次打印')
    gr2.switch()  # 2.切换到func2 进行执行[会进行记录当前这个函数执行的位置,如果切换回来就从当前开始执行]
    print('func1-第2次打印')
    gr2.switch()  # 4.切换到func2 进行执行
​
​
def func2():
    print('func2-第1次打印')
    gr1.switch()  # 3.切换到func1函数执行
    print('func2-第2次打印')
​
​
# 注册到greenlet 对象
gr1 = greenlet(func1)
gr2 = greenlet(func2)
# print(gr2) # greenlet.greenlet object
gr1.switch()  # 1.启动gr1对象的进行执行

yield关键字实现

def func1():
    yield 1
    yield from func2() # 跳到 执行func2生成器
    yield 2
​
​
def func2():
    yield 3
    yield 4
​
f = func1()
for i in f: # 循环执行生成器
    print(i)

asynico实现

实际执行时间2秒
​
import asyncio
import time
​
@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(1)  # 模拟io请求
    print(2)
​
​
@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2)
    print(4)
​
print('开始秒数' + time.strftime('%S'))
tasks = [asyncio.ensure_future(func1()), asyncio.ensure_future(func2())]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('结束秒数' + time.strftime('%S'))
​
​

async与await关键字实现

实际执行时间2秒
@asyncio.coroutine 替换为  async def 
yield from 替换为 await
​
​
import asyncio
import time
​
# 通过关键字将普通函数包装为协程函数(coroutine func)
async def func1():
    print(1)
    # await:1.将asyncio.sleep(1)协程对象 包装为task任务 告知event loop 2.将控制权限还给event loop
    await asyncio.sleep(1) # 模拟i/o操作
​
# 协程函数
async def func2():
    print(2)
    await asyncio.sleep(2)
​
​
async def main():
    print('开始秒数' + time.strftime('%S'))
    coroutine_list = [func1(), func2()] # coroutine func object list
    await asyncio.gather(*coroutine_list)
    print('结束秒数' + time.strftime('%S'))
​
if __name__ == '__main__':
    asyncio.run(main()) # main 获取event loop权限执行mian task任务执行 (协程函数对象)
    
当前的优势:
    遇到io操作,就会将线程进行切换执行其他的task进行执行,大大的节省了时间的损耗

协程的意义

在一个线程中如果遇到io等待时间,线程不会傻傻的等待,利用空闲的时间赶其他的事情,大大的提高了效率
​
​
1.实例代码:同步方式[排队执行] 使用3秒下载完成
import requests
​
def download_img(url,img_name):
    res = requests.get(url)
    with open(f'{img_name}.png', mode='wb', ) as file_obj:
        file_obj.write(res.content)
    print('本地下载完成' + f'{img_name}.png')
​
​
url_list = [
      'url2',
      'url1',
      'url3'
]
print('开始下载时间' + time.strftime('%X'))
for index,url in enumerate(url_list):
    print('url:' + url)
    download_img(url,index)
print('结束下载' + time.strftime('%X'))
​
# 如果图片下载时间为1分钟,需要花费3分钟[第一张下载完毕后,才会执行下一次下载]
​
​
​
​
2.实例代码:协程方式[不等带结果执行下一个] 使用1秒将3张图片下载完成
pip install aiohttp # 需要使用这个模块
​
import time
import asyncio
import aiohttp
​
async def download_img(session, url, img_name):
    async with session.get(url, verify_ssl=False) as response:  # 发送io请求
        content = await response.content.read()
        with open(f'{img_name}.png', mode='wb', ) as file_obj:
            file_obj.write(content)
            print('本地下载完成' + f'{img_name}.png')
​
​
async def main():
    print('开始秒数' + time.strftime('%S'))
    async with aiohttp.ClientSession() as session:
        url_list = [
            'url2',
            'url1',
            'url3'
        ]
        # 协程对象列表
        task_list = [download_img(session, url, index) for index, url in enumerate(url_list)]
        await asyncio.gather(*task_list)  # 将协程对象列表批量添加到event loop 中作为task任务
    print('结束秒数' + time.strftime('%S'))
​
​
if __name__ == '__main__':
    # asyncio.run(main()) # 使用这种方式会出现RuntimeError: Event loop is closed 异常
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
   
# 使用协程asyncio模块需要配合着能使用协程方式的模块进行操作,不然不会有所作用

异步编程

事件循环'event loop'

事件循环: 称为'event loop'
作用: 可以理解为死循环,去检测并执行某写代码[检测任务列表中得任务]
​
理解代码:
    
任务列表 = [任务1,任务2,任务3]
​
while True:
    可执行任务列表,已完成任务的任务列表 = 去任务列表中检测全部任务,将可执行和可完成的任务返回('相当于当前的任务列表中得任务存在多个状态,将每个状态进行分类进行循环,直到任务列表中任务完全没有为止')
    
    for 就绪任务 in 已经准备就绪的任务列表
        执行就绪的任务
    
    for 已完成的任务 in 已完成的任务列表
        从'任务列表'中进行移除已完成的任务
  
如果任务列表中得任务全部完成,那么这个死循环就会终止
​
'''
简单的理解事件循环的操作(大致的流程):
​
任务列表 = ['正在io请求的任务','已经完成的任务','可以执行的任务']
那么事件循环 event loop 就会先将 '正在io请求的任务无视'
将已完成的任务从任务列表中剔除
执行可以执行的任务
​
如果可以执行的任务出现【io请求的操作】,那么就会【放任这个任务进行io操作】,并【标记】当前任务时【请求io的任务】,执行下一个任务。
​
如果【当前任务】已经完成,那么就会添加到【已完成任务列表】中,进行循环从【任务列表】中进行剔除
'''
​
​
代码:
import asyncio
​
# 当前 生成或者获取一个事件循环
loop = asyncio.get_enent_loop()
# 将任务放到任务列表中,让可以理解为让当前的任务可以被事件循环监听
loop.run_until_complete('存放任务')

协程函数与协程对象

1.协程函数 coroutine func
async def 函数名: #  async关键字为前缀的函数被称为'协程函数'
    pass

2.协程对象 coroutine object
协程函数() # 协程函数+()得到的就是协程对象,当前函数并不会执行


3.协程函数与普通定义函数区别
最大的区别就是,普通函数()当前函数就会进行执行,协程函数()只能获取协程对象[内部代码不会执行]

协程函数的使用(async关键字)

async def func()
	print('我是协程函数')

asy = func() # 不会执行
print(asy) # 获取当前协程函数的对象[协程对象]


怎么才能让协程函数执行?
需要借助 asyncio 模块中得事件循环 event loop

import asyncio

async def func():
    print('123456')

# 执行方式
    1.python3.7之前的执行操作    
    # 事件循环
    loop = asyncio.get_event_loop()
    # 使用事件循环执行协程函数
    loop.run_until_complete(func())

    
    2.python3.7后执行的方式
    asyncio.run(func())


# 关于asyncio.run
内部帮助我们创建获取事件循环,并让事件循环执行传入的函数,当前run()作为一个异步编程的入口进行操作
run 的内部还是完成了原来版本的操作 执行创建或者获取event loop 将任务执行


# 注意:
	异步函数一定要不事件循环进行一起使用,没有事件循环异步函数是没办法执行的

协程函数的使用(await关键字)

# await 等待对象[需要等待对象执行完毕后才会执行下面的代码]
# await +  代表的可等待的对象(协程对象,future,task对象[这三个对象可以理解为io等待对象])

await最重要的最用: 会将协程对象包装为一个task任务,告知event loop。


import asyncio


案例1:

async def func():
    print('123456') 1.先执行当前打印
    # 模拟io等待
    res = await asyncio.sleep(1)  2.await会将当前协程对象对象包装为task任务,添加到event loop中,并告知当前task任务需要等待(会记录当前执行的位置),将权限还给event loop,去执行其他不需要等待(等待完成的任务)的task任务
    print(res) 3.获取io等待返回值,当前等待结束变为可执行任务(已经有了执行的结果),event loop在下次循环中 会将权限给func()函数,从当前记录位置开始执行(并接受返回值)

asyncio.run(func())




案例2:

async def show():
    print('正在下载中...')  4.打印
    await asyncio.sleep(2) 5.等待asyncio.sleep(2)协程方法
    print('下载完成...') 5. 打印
    return '文件.txt' 6. 返回值

async def func():
    print('我要开始下载了')  2.打印
    # 执行的协程函数的执行结果的返回值
    res = await show() 3.包装携程对象 并进去show()进行执行 7.接受返回值
    print(res)  8.打印返回值

asyncio.run(func()) # 1.执行func() 协程对象



案例3

import asyncio
async def show():
    print('正在下载中...')
    await asyncio.sleep(2)
    print('下载完成...')
    return '文件.txt'

async def func():
    print('我要开始下载了')
    res1 = await show()
    print(res1)
    res2 = await show() # 等待第一个await等待对象的值得到结果后才会执行
    print(res2)

asyncio.run(func())

只有第一个await协程对象(将这对象包装为一个event loop中得一个任务)执行完毕后,并且获取执行结果后,才会执行下一个协程函数
原因: 因为第二个协程对象并未被await包装为任务,告知event loop。代码是从上而下的执行,并没有执行到第二个await

Task对象

task作用:
    1.帮助我们在事件循环(event loop)中添加多个任务
    例如:
        任务列表 = [task1,task2,task3] # 添加多个任务到event loop 中
        当某个任务出现io操作就会就会进行任务切换
    
    2.tasks 用于并发的调度协程,通过asyncio.create_task(协程对象)[3.7上使用]的方式创建task对象,这样可以将协程对象添加到event loop(事件循环)。其他的方式(更低级) loop.create_task() 或者asyncio.evsur_future()[3.7下使用]不建议手动实例化task对象
    

# 重点:

import asyncio
import time


1.实例1通过create_task将协程对象包装为task任务[2个任务执行2秒]

# 使用的较少create_task创建任务 需要大量的await进行通知event loop调度

async def show(num):
    print('show函数正在执行...%s'%num)
    await asyncio.sleep(2) # 模拟io阻塞
    print('show函数执行完毕...%s'%num)
    return 'ok'

async def func():
    print('执行func函数')
    # 创建task对象,将当前执行show函数添加到事件循环中(event loop)
    task1 = asyncio.create_task(show(1)) 
    task2 = asyncio.create_task(show(2)) 
    #  当执行到某个遇到io操作会自动化切换执行其他任务
    #  此处的awati是等待响应的协程全都执行完毕后获取的结果

    task1_return = await task1 
    task2_return = await task2
    print(task1_return,task2_return)
   
print(time.strftime('%S'))
asyncio.run(func())
print(time.strftime('%S'))

执行过程说明:
1.现在event loop中存在3个任务: [func(),task1,task2]

2.执行过程模拟(未必详细,大概过程)
	2.1 执行脚本时,先执行asyncio.run()执行func()对象,并将任务添加到event loop中
    
    2.2 执行到asyncio.create_task() 添加任务到event loop中[将task1-task2添加],任务列表中已经存在了3个任务
    
    2.3 await task1 就会执行task1任务内部代码,遇到 asyncio.sleep(2) task1任务阻塞,'开始任务切换' 
    
    2.4 切换到task2任务(为什么不会切换到func任务中,因为当前的任务是在func任务内,task1在阻塞,那func也在阻塞),遇到 asyncio.sleep(2) task2任务阻塞,'开始任务切换' 
    
    2.5.切换到task1任务执行完成 获取返回值
    
    2.6.切换到task2任务执行完成 获取返回值
    
    2.7.func()任务执行完成,整段程序执行完毕
    
    

2.实例代码2

# 使用的较多,节省await编写调度

asyncio.gather(*任务列表)批量调度 # 需要通过*解构列表
asyncio.wait(任务列表)批量调度 # 直接添加列表



async def show(num):
    print('show函数正在执行...%s'%num)
    await asyncio.sleep(2)
    print('show函数执行完毕...%s'%num)
    return 'ok'

async def func():
    print('执行func函数')
    # 执行的协程函数的执行结果的返回值
    task_list = []
    
    # 循环将任务呢添加到任务列表中task_list
    for i in range(5):
        task = asyncio.create_task(show(i))
        task_list.append(task)
    
    # 方式1:通过gather方法批量通知event loop进行调度 asyncio.gather
     res = await asyncio.gather(*task_list) 
    print(res) # 接受任务执行返回的结果 [task1,task2] 存放结果为添加任务的顺序
    
    # 方式2:直接将列表添加 asyncio.wait
    done,pending = await asyncio.wait(task_list,timeout=1) # timeout=None
    print(done) # 返回的结果的集合 
    print(pending) # 返回timeout超时未完成任务集合 与参数timeout(设置后,timeout=2,任务最大执行时间为2秒,超出存入当前集合中)
    
   
print(time.strftime('%S'))
asyncio.run(func())
print(time.strftime('%S'))


注意:
# 不是用函数调用方式添加任务需要一下写法    
async def show(num):
    print('show函数正在执行...%s'%num)
    await asyncio.sleep(2)
    print('show函数执行完毕...%s'%num)
    return 'ok'

task_list = [show(1),show(2)]
'''
不能
task_list = [
asyncio.create_task(show(1)),
asyncio.create_task(show(2))
]
因为当前操作不直接将协程对象添加到事件循环中 event loop 但是当前还没有创建事件循环就会报错

async def func():
	pass
asyncio.run(func()) 会先创建事件循环将func()对象包装为任务进行添加并且执行
'''
asyncio.run(asyncio.wait(task_list)) # 当前内部代码如果发现并事件循环就会进行创建

asyncio.Future对象

future是底层对象,是task类的基类

Task继承Future,task对象内部await结果的处理基于future对象的

import asyncio
实例1:

# 在这种情况下创建future对象毫无意义因为没有用返回结果就处于等待
async def func():
    # asyncio.get_event_loop()  获取当前执行 run 创建的事件循环对象
    loop = asyncio.get_event_loop()
    print(loop)

    # 创建一个任务(future对象),这个任务什么都不干
    fut = loop.create_future()
    # 当前就会在等待任务(future对象) 等待结果出现,如果没有就一直等待
    await fut

asyncio.run(func())


案例2
# 如果有返回值那么future对象就会执行完毕,如果没有就会阻塞等待返回值
async def func1(fut):
    # 4.执行任务
    await asyncio.sleep(2)
    # 5.给future对象进行设置值
    fut.set_result('666')


async def func2():
    # 1.获取当前事件循环
    loop = asyncio.get_event_loop()
    # 2.创建一个future对象
    fut = loop.create_future()
    # 3.添加一个任务到携程对象中同时将future对象传入做参数
    await loop.create_task(func1(fut))

    # 6.等待fut对象如果向下执行,没有值阻塞等待值
    data = await fut
    print(data)

asyncio.run(func2())

####
await asyncio.create_task(函数对象) 创建任务,任务完成后自动执行set_result进行设置返回值 等待的状态不会等待

coucurrent.futures.future对象

使用线程池与进程池实现异步函数时使用的对象与asynio没有关系

# concurrent 项目中得第三方模块不支持异步,只支持线程异步或者进程异步使用当前模块

import time
import asyncio
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor  # 线程池
from concurrent.futures.process import ProcessPoolExecutor  # 进程池

1.基本使用了解
def func(value):
    time.sleep(1)
    print(value)
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
    # 返回值会赋值给fut
    fut = pool.submit(func,i)
    print(fut)
    

2. 使用场景: # 重点
	项目中进行异步时80%为异步编程,但是20%使用项目使用的模块不支持异步编程可以使用当前方式
 
def func1():
    print('我是一个普通函数')
    return '666'


async def main():
    # 获取事件循环
    loop = asyncio.get_event_loop()
    # 创建一个Future对象并且将函数包装为可以被async执行的协程对象
    # 默认使用的是线程模式 run_in_executor(线程池对象或者进程池对象[如果参数为none那么默认使用线程池],普通函数(第三方模块不支持异步方式))
    
    # 1.内部调用ThreadPoolExecutor的 submit 方法申请一个线程 去执行func1函数并返回一个concurrent.futures.Future对象
    
    # 2. asyncio.wrap_future 将 concurrent.futures.Future对象包装为 asyncio.Future的对象
    
    # 因为concurrent.futures.Future对象不支持 await方法所以需要包装
    fut = loop.run_in_executor(None, func1)
    res = await fut
    print(res)

    # 使用线程池: 可以传入max_workers 固定线程数量
    with ThreadPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, func1)
        print(res)

    # 使用进程池 可以传入max_workers 固定进程数量
    with ProcessPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, func1)
        print(res)


if __name__ == '__main__':
    asyncio.run(main())

案例:asyncio+不支持异步模块

import asyncio
import requests
import random


# asyncio + 不支持异步的模块下载


async def download_img(url):
    print('开始下载图片', url)
    loop = asyncio.get_event_loop()
    # requests模块默认不支持一步操作,所以使用线程池方式配合
    # run_in_executor方式非异步转为asyncio.future进行异步执行
    # 内部默认是线程方式执行,3个人任务就是3个线程,那么资源上比普通的asyncio要大
    future = loop.run_in_executor(None, requests.get, url)

    ret = await future

    file_img_name = str(random.randint(1, 10)) + '.jpg'
    with open(file_img_name, mode='wb') as f:
        f.write(ret.content)


if __name__ == '__main__':
    url_list = [
        'https://ts1.cn.mm.bing.net/th/id/R-C.df4462fabf18edd07195679a5f8a37e5?rik=FnNvr9jWWjHCVQ&riu=http%3a%2f%2fseopic.699pic.com%2fphoto%2f50059%2f8720.jpg_wh1200.jpg&ehk=ofb4q76uCls2S07aIlc8%2bab3H5zwrmj%2bhqiZ%2fyw3Ghw%3d&risl=&pid=ImgRaw&r=0',
        'https://pic3.zhimg.com/v2-58d652598269710fa67ec8d1c88d8f03_r.jpg?source=1940ef5c',
        'https://tse1-mm.cn.bing.net/th/id/OIP-C.xq6cOv82ubIhJY9qkFd5AgHaEK?pid=ImgDet&rs=1',
    ]

    loop = asyncio.get_event_loop()

    task_list = [download_img(i) for i in url_list]
    loop.run_until_complete(asyncio.wait(task_list))

异步迭代器(不重要)

异步迭代器:
实现了 __aiter__() 和 __anext__() 方法的对象。__anext__必须返回一个awaitable对象 async for会处理异步迭代器 __anext__() 方法返回可执行等待对象,直到引发 stopasynciteration异常

异步可迭代对象
可在 async for 语句中使用的对象,必须通过__aiter__()方法返回一个asynchronous.iterator


import asyncio


# 无太大的应用场景 
# 除非使用循环取每一个值,但是又不想使用 内部循环时可以使用
class Reader:
    # 自定义异步迭代器(同时可以异步迭代对象)
    def __init__(self):
        self.count = 0

    async def readline(self):
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val is None:
            raise StopAsyncIteration
        return val


# 正常的迭代器
# r = Reader()
# for i in r: print(i)

# 异步的迭代器
# async fro 必须写在异步函数内
async def func():
    r = Reader()
    async for i in r:
        print(i)


if __name__ == '__main__':
    asyncio.run(func())

异步上下文管理器(重要)

上下文管理器:with 上下文 自动帮打开文件写入信息并且在内容写入完毕后自动关闭文件

异步上下文管理器是一样的概念
使用对象定义 __aenter__() 和 __aexit__() 方法 对async with语句中的环境进行控制

# 在类中定义了:
	__aenter__() 和 __aexit__() 支持async with上下文方式 [在正常定义的类中如果使用这个方法也可以使用这种方式]
    
    
import asyncio


# 作用例如: 链接数据库 关闭数据
# 正常的非异步 只要类中定义了这类方法都可以进行上下文
class AsyncioM:

    def __init__(self):
        self.count = 0

    async def func(self):
        return 666

    async def __aenter__(self):
        # 异步链接数据库操作
        self.conn = asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 异步关闭数据库链接
        await asyncio.sleep(1)


# async with 方法需要在异步函数内进行使用
# 当使用是会执行当前类中得__aenter__ 这个方法返回什么那么f就是什么[可以进行设置数据库链接]
# 当上下文完成后 就会自动使用__aexit__方法[关闭数据库链接]
async def func1():
    async with AsyncioM() as f:
        res = await f.func()
        print(res)


asyncio.run(func1())

uvloop事件循环

事件循环的替换方案
效率: 默认event loop < uvloop(接近go语言效率)


1.安装
pip3 install uvloop # 当前模块暂时不支持win系统(无法安装)

2.需要将默认事件循环替换为 uvloop

import asyncio
import uvloop
# 将以前的event loop替换为uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 代码与原来的一样

asyncio.run(..)


框架内部使用了asgi -> uviorn(内部使用了unloop)

案例

异步Redis操作

假设:
	后端服务 A服务器
	redis数据库 B服务器
那么后端代码与redis进行交互式的时候,就会产生网络请求(网络io)链接/操作/断开都是网络io请求。
# 语法都是相近的只不过 同步变异步
# 文档;https://aioredis.readthedocs.io/en/latest/migration/



1.安装
pip install aioredis

2.使用
import asyncio
import aioredis

'''
aioredis模块基础了原来的redis的类使用方式是相同
'''


async def execute(address, password=None):
    print('开始执行链接操作', address)

    # 1.创建链接
    # decode_responses 获取参数时解码
    # encoding 用于响应解码的编解码器。
    # 链接方式1: 默认的生成方式需要 传入指定的ip端口链接[io等待]
    redis = await aioredis.Redis(host='127.0.0.1', port=6379, db=0, decode_responses=True, encoding='utf-8')
    # 链接方式2: 通过url进行生成redis 链接对象 cdn链接[io等待]
    redis1 = await aioredis.from_url(address, encoding="utf-8", decode_responses=True)

    # 2.链接对象
    # print(redis)
    # print(redis1)

    # 3.设置值[io等待]
    await redis.set('66789', 'wkx')
    await redis1.set('8899', 'wkx')

    # 4.获取值[io等待]
    key = await redis.get('66789')
    print(key)
    key1 = await redis1.get('8899')
    print(key1)

    # 5.关闭链接[io等待]
    await redis.close()
    await redis1.close()

	await redis.wait_closed()
    await redis1.wait_closed()


asyncio.run(execute('redis://127.0.0.1:6379/1'))

异步Mysql操作

io操作:链接/设置数据/获取数据/关闭mysql
# 语法都是相近的只不过 同步变异步
# 文档地址:https://aiomysql.readthedocs.io/en/latest/
1.安装
pip install aiomysql


2.普通的mysql链接方式
import pymysql
pymysql.install_as_MySQLdb()

# 1.创建服务链接
serve = pymysql.connect(host='127.0.0.1',port=3306,user='root',password='123456',db='db21')

# 2.创建控制mysql权柄光标
conn = serve.cursor() 

# 3.执行sql
sql = conn.execute('show tables')

# 4.获取数据
data = conn.fetchall()
print(data)

# 5.关闭链接
conn.close()
serve.close()


3.使用aiomysql链接与普通的方式相同
内部处理语法相同的
import asyncio
import aiomysql


async def execute():
    # 创建mysql服务链接
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123456', db='db21')

    # 创建控制mysql权柄光标
    cur = await conn.cursor()

    # 执行命令
    await cur.execute('show tables')

    # 获取数据
    res = await cur.fetchall()
    print(res)

    # 关闭链接
    await cur.close() # 关闭光标
    conn.close() # 关闭链接


asyncio.run(execute())

异步mongodb操作

# mongodb模块使用的异步模块是motor模块
# 语法都是相近的只不过 同步变异步
# 文档地址:https://motor.readthedocs.io/en/stable/

普通的链接方式:
import pymongo
# 设置了密码域账户
url = 'mongodb://root:[email protected]:27017/admin'
mon = pymongo.MongoClient(url)
print(mon.list_database_names())


1.安装
pip install motor

2.使用
import asyncio
from motor import motor_asyncio

url = 'mongodb://root:[email protected]:27017/admin'


async def func():
    # 创建链接对象
    conn = motor_asyncio.AsyncIOMotorClient(url)
    # 获取数据库的全部名称
    db = await conn.list_database_names()
    print(db)
    # 断开链接
    conn.close()


asyncio.run(func())

FastAPI框架(为例)

# 文档:https://fastapi.tiangolo.com/zh/

性能比较高的框架(使用的事件循环:uvloop)
fastapi: 是一个api的接口异步框架
uvicorn:基于 uvloop 和 httptools 构建的非常快速的 ASGI '服务器'

1.安装fastapi框架
pip install fastapi
pip install uvicorn(asgi是一个支持异步的 uwsgi web服务器,内部基于uvloop)用来启动异步框架的服务器


2.基本使用[无异步功能,排队使用]
import asyncio
import uvicorn
from fastapi import FastAPI

app = FastAPI()


@app.get('/')
def index():
    '''普通的接口程序'''
    print(1234)
    return {'code': 200, 'msg': '你好,欢迎使用', 'err': ''}


if __name__ == '__main__':
    # uvicorn.run(内部的参数)
    # app: 运行出口文件的函数  所在文件.py:fastaspi实例对象 例如:测试13:app[测试13.py文件下的app=FastAPI()]
    # host: 服务启动的url 默认127.0.0.1
    # port: 端口 默认为 8080
    # reload : 热更新,如果内容修改重启服务器相当于falsk debug=True
    # debug: 同reload
    # reload_dirs:设置需要 reload 的目录,List[str] 类型
    # log_level: 设置日志等级 默认为等级为info
    uvicorn.run('测试13:app')


# 第二种启动方式: 在命令行启动
# uvicorn main:app 命令含义如下:
# main:main.py 文件(一个 Python "模块")。
# app:在 main.py 文件中通过 app = FastAPI() 创建的对象。
# --reload:让服务器在更新代码后重新启动。仅在开发时使用该选项。




3.异步使用

import asyncio
import aioredis
import uvicorn

from fastapi import FastAPI

app = FastAPI()
# 创建连接池redis对象,添加了一个max_connections限制链接
# 方式1
conn = aioredis.from_url('redis://127.0.0.1:6379',max_connections=10)

# 方式2
conn1 = aioredis.ConnectionPool.from_url('redis://127.0.0.1:6379',max_connections=10)
redis = aioredis.Redis(connection_pool=conn1)

@app.get('/')
def index():
    '''普通的接口程序'''
    print(1234)
    return {'code': 200, 'msg': '你好,欢迎使用首页接口', 'err': ''}


@app.get('/red')
async def red():
    '''异步接口,用户访问出现io操作,不会进行等待,去接待新的访问用户。'''
    print(456777)

    await redis.execute_command("set", "my-key", "value")
    return {'code': 200, 'msg': '你好,欢迎使用red接口', 'err': ''}


if __name__ == '__main__':
    uvicorn.run('测试13:app')

    
其他的异步框架写法也是相同的  

爬虫

1.安装
pip install aiohttp

2.实例

import asyncio
import aiohttp

async def download_img(session, url, img_name):
    async with session.get(url, verify_ssl=False) as response:  # 发送io请求
        content = await response.content.read()
        with open(f'{img_name}.png', mode='wb', ) as file_obj:
            file_obj.write(content)
            print('本地下载完成' + f'{img_name}.png')


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://ts1.cn.mm.bing.net/th/id/R-C.df4462fabf18edd07195679a5f8a37e5?rik=FnNvr9jWWjHCVQ&riu=http%3a%2f%2fseopic.699pic.com%2fphoto%2f50059%2f8720.jpg_wh1200.jpg&ehk=ofb4q76uCls2S07aIlc8%2bab3H5zwrmj%2bhqiZ%2fyw3Ghw%3d&risl=&pid=ImgRaw&r=0',
            'https://pic3.zhimg.com/v2-58d652598269710fa67ec8d1c88d8f03_r.jpg?source=1940ef5c',
            'https://tse1-mm.cn.bing.net/th/id/OIP-C.xq6cOv82ubIhJY9qkFd5AgHaEK?pid=ImgDet&rs=1',
        ]
        # 协程对象列表
        task_list = [download_img(session, url, index) for index, url in enumerate(url_list)]
        await asyncio.gather(*task_list)  # 将协程对象列表批量添加到event loop 中作为task任务


if __name__ == '__main__':
    asyncio.run(main())

 

 

asyncio模块

asyncio

1.单进程单线程的程序,不能提高程序的运算速度
2.作用: 比较适合处理等待的任务,网络通信,不存在系统级的上下文切换
3.async分为: coroutine function(协程函数) coroutine object(协程对象)




例如:
import asyncio [3.7版本以上]

# 1.当前这个函数被称为coroutine function(协程函数)[async开头函数的都叫协程函数]
async def main(): 
    print('hello')
    await asyncio.sleep(2)
    print('world')
    
# 2.正常的调用main函数[只会获取到当前协程对象,不会执行]
coro = main()
# coroutine object(协程对象) 打印出来的函数是携程的对象<coroutine object ..>
print(coro)   # 不会进行运行 提示:运行错误,协程函数main主进程未等待


如果想启动async函数需要进入 event loop(事件循环) 控制程序的状态
1.需要导入asyncio

2.使用asyncio.run()接管整个程序[那么当前的入口程序就需要一个入口函数]

3.asyncio.run()函数: 1.建立起 event loop [事件循环] 2. 会将当前的main函数[coroutine function(协程函数)]作为当前event loop[事件循环]当其中的task[任务]

4.asyncio.run('参数是一个async关键字函数') 函数将程序从 '同步模式' 改变为 '异步模式' 的入口

asyncio使用说明

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return '666'


async def main():
    print(f'当前时间{time.strftime("%X")}')
    a = await say_after(1, 'hello')  # 接受返回值
    print(a)
    await say_after(2, 'word')
    print(f'结束时间{time.strftime("%X")}')


asyncio.run(main())


1.await:
'''
await会将 say_after() 时,变为一个task(任务)
发生:
    1. 当前这个 协程函数对象 被包装为一个task(任务),告诉了event loop (循环事件) 当前这个新任务的存在
    2. 告诉了event loop (循环事件) 这个task(任务)需要等待 say_after(async函数)执行完毕后 才能接着执行
    3.yield出去,告诉event loop(循环事件) 当前这个任务暂时执行不了 请执行其他task(任务)
    4. 当前event loop(循环事件) 再次安排当前task(任务)执行时 会将say_after(async函数)中得真正返回值拿出来进行保存(赋值给变量)
'''

2.整段程序执行过程:
'''

1.asyncio.run(main())  当main函数作为一个task(任务) 给到了event loop(循环事件)中
event loop 在寻找task(任务)时,发现只有一个main task(任务),就运行了main函数
2.main任务再执行先打印了:print(f'当前时间{time.strftime("%X")}')打印
3.执行了 await say_after(1, 'hello')  函数得到了 协程对象(coroutine object)
4.await say_after(1, 'hello') await 将整这个协程对象(coroutine object) 变为一个 task(任务) 放回了event loop 里面 同时告诉event loop需要等待他,将控制权给到了event loop
5.现在event loop 中存在两个任务[main,say_after],main运行不了需要等待say_after运行后才能运行,但是say_after函数内部有await asyncio.sleep(delay)[将当前的sleep作为一个task(任务)添加到event loop中]需要等待,又将控制权转给了event loop
6.say_after需要等待 await asyncio.sleep(delay)任务执行完毕。await asyncio.sleep(delay) 执行完毕后 event loop 让say_after 执行 打印了 print(what)执行完毕 ,将控制权交给了event loop
7.event loop 就会将控制权给main函数执行[main就执行完毕第一个 say_after] 在执行第二个say_after(与上面第一个执行一样)
'''

3. 注意
关于event loop控制权交回的两种方式:1.await 2.函数执行完毕后自动交回控制权 。如果当前任务中是一个死循环那么 event loop就直接卡死了


4.发现问题:当前两个async执行实际时间为3秒钟
因为await做的事情太多,需要将对象变为一个任务,告诉event loop 将控制权交还给event loop还需要等待,那么后面需要执行的协程函数也就需要等待前面的任务(协程函数已经通过awati转为任务后)完成后,后面的协程函数才转为一个任务,才被event loop 调用
问题解决: create_task函数

asyncio中得create_task函数

create_task('coroutine object(协程函数对象)')
create_taskd的作用:将协程对象对象转变为task(任务)注册到event loop中,分担了await一部分功能将协程函数对象包装为一个task(任务)

import asyncio
import time


1.使用create_task

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    # 分担await一部分功能,将协程函数对象包装为task(任务)告诉event loop 这个任务可以执行
    # 但是没办执行,执行权在main手中 需要await将event loop控制权拿到并且执行
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'word'))
    print(f'当前时间{time.strftime("%X")}')
    # 将event loop控制权拿到并且执行
    # 在之前await 协程对象时,需要将整个对象包装为task(任务)在进行执行 在将event loop 控制权拿到执行
    # await task(任务) 直接获取event loop控制权进行执行(我需要这个task任务完成,将控制权交还给event loop) 在控制权回来的时候将当前任务的返回值保存
    await task1
    await task2
    print(f'结束时间{time.strftime("%X")}')


asyncio.run(main())



2.create_task拿返回值
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return f'{what} - {delay}'

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'word'))
    print(f'当前时间{time.strftime("%X")}')
    # 获取 任务(协程对象)返回值(返回值会在任务执行完毕后赋值给变量)
    ret1 = await task1
    ret2 = await task2
    # 打印 协程对象返回值
    print(ret1)
    print(ret2)
    print(f'结束时间{time.strftime("%X")}')


asyncio.run(main())



3.批量将协程函数转变为task(任务)交给event loop

使用gather('接受多个协程函数对象')
会将多个协程函数对象包装成为任务,给到event loop 进行执行
返回值处理: 会将全部的任务的返回值添加到一个列表中(返回值顺序与gather(task任务顺序一致))


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return f'{what} - {delay}'


async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'word'))
    print(f'当前时间{time.strftime("%X")}')

    task_list = [task1, task2] # 任务列表
    ret = await asyncio.gather(*task_list)
    print(ret)  # 全部的返回值['hello - 1', 'word - 2']
    print(f'结束时间{time.strftime("%X")}')


asyncio.run(main())


4.gather的另一个好处:将全部的协程对象包装为task(任务)
# 省去 cerate_task 的操作

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return f'{what} - {delay}'


async def main():
    print(f'当前时间{time.strftime("%X")}')
    task_list = [say_after(1, 'hello'), say_after(2, 'word')]  # 协程函数对象列表
    ret = await asyncio.gather(*task_list)
    print(ret)  # 全部的返回值['hello - 1', 'word - 2']
    print(f'结束时间{time.strftime("%X")}')

asyncio.run(main())

async理解

event loop 当做大脑,若干个可以执行的task任务,task任务没办控制event loop去到那个task执行的,只能告诉event loop 我在等待那个task执行完毕,由event loop 进行控制权的分配到那个task

将event loop权限给出去的方式: 1.await 2.函数执行完毕

其实可以理解为,只是当前的代码在执行,只不过将等待的事件给利用了起来执行了另外的内容,节约了时间,如果代码中存在等待的操作,那么使用async最好,如果没有等待那么没什么用

要理解:
    1.coroutine function(协程函数) 
    2.coroutine object(协程对象[协程函数加()执行的结果])
    3.task(任务[是由协程对象被asyncio方法包装为任务给到event loop中,变为task才能被执行])
    4.拿到task返回值需要使用await方法,await方法会获取到event loop控制权(告诉它这个任务需要执行)

 

标签:异步,task,协程,await,print,loop,asyncio
From: https://www.cnblogs.com/wkxz/p/17094146.html

相关文章

  • Spring异步Async和事务Transactional注解
    Spring开发中我们我们常常用到@Transaction和@Async,但这2个注解加在一起很多的开发者不敢用,担心事务不生效。下面我们就仔细讲解一下这2个注解同时运用,文章用3个场景讲述它......
  • 帮你快速理解同步 ,异步,并发/并行,串行
    同步:多个任务情况下,一个任务A执行结束,才可以执行另一个任务B。只存在一个线程。异步:多个任务情况下,一个任务A正在执行,同时可以执行另一个任务B。任务B不用等待任务A结束才......
  • 探秘多线程-闭锁、栅栏与异步编排
    无论是项目开发还是开源代码阅读,多线程都是不可或缺的一个重要知识点,基于这个考量,于是总结出本篇文章,讨论闭锁(CountDownLatch)、栅栏(CyclicBarrier)与异步编排(CompletableF......
  • 通过一个示例形象地理解C# async await 非并行异步、并行异步、并行异步的并发量控制
    前言接上一篇通过一个示例形象地理解C#asyncawait异步我在.NET与大数据中吐槽前同事在双层循环体中(肯定是单线程了)频繁请求es,导致接口的总耗时很长。这不能怪前同......
  • Python 异步: 什么是事件循环 ?(6)
    asyncio程序的核心是事件循环。在本节中,我们将花点时间看一下asyncio事件循环。1.什么是Asyncio事件循环事件循环是用于在单个线程中执行协程的环境。事件循环是异......
  • Python 异步集群使用
    目前在做Python项目用到同步和异步的方法使用Redis单机,现在要增加兼容Redis集群。也就说当前项目用到中以下4种Python使用Redis的方法都用到了。-同步异步单机......
  • Java并发编程——CompletebaleFuture 异步回调的原理和使用
    CompletebaleFuture的底层原理是:Fork/joinPoll+Treiberstack(异步任务栈)+CAS,可以实现:创建较少的线程(减少线程上下文切换)执行较多的任务(不耗时的任务) 结论:当任务......
  • JavaScript之异步编程
    什么是异步异步:Asynchronous,async是与同步synchronous,sync相对的概念。传统单线程编程中,程序的运行是同步的,指程序运行在一个控制流之中运行。而异步的概念就是不保证同......
  • 使用@Async实现异步调用
    什么是“异步调用”?“异步调用”对应的是“同步调用”,同步调用指程序按照定义顺序依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;异步调用指程序在顺序执行......
  • Spring开启@Async异步方法(javaconfig配置)
    在Spring中,基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。应用场景:某些耗时较长的......