首页 > 编程语言 >并发编程补充

并发编程补充

时间:2023-09-03 10:23:01浏览次数:37  
标签:__ task 协程 补充 编程 并发 print await asyncio

目录

并发编程补充

一、asyncio 模块

  • asyncio是Python 3.4版本引入的标准库,用于实现异步编程。它基于事件循环(Event Loop)模型,通过协程(Coroutines)来实现任务的切换和调度(也叫协程模块)。在asyncio中,我们可以使用async关键字定义协程函数,并使用await关键字挂起协程的执行
  • 异步编程的核心思想是避免阻塞,即当一个任务在等待某个操作完成时,可以让出CPU执行权,让其他任务继续执行。这样可以充分利用CPU的时间片,提高程序的整体效率
  • 一旦决定使用异步,则系统每一层都必须是异步,“开弓没有回头箭”

1. asyncio中的几个重要概念

  • 事件循环

    • 事件循环是异步编程的核心机制,它负责协调和调度协程的执行,以及处理IO操作和定时器等事件。它会循环监听事件的发生,并根据事件的类型选择适当的协程进行调度
    • asyncio库中,可以通过asyncio.get_event_loop()方法获取默认的事件循环对象,也可以使用asyncio.new_event_loop()方法创建新的事件循环对象
  • Future

    • future表示还没有完成的工作结果。事件循环可以通过监视一个future对象的状态来指示它已经完成。future对象有几个状态:Pending、Running、Done、Cancelled
    • 创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消,状态为cancel
  • Task

    • task是Future的一个子类,它知道如何包装和管理一个协程的执行。任务所需的资源

      可用时,事件循环会调度任务,并生成一个结果,从而可以由其他协程消费

    • asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数创建:

      asyncio.async() 、loop.create_task() 或 asyncio.ensure_future()

  • coroutine

    • 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用

2. asyncio模块的常用方法

(1)run_until_complete()

  • 阻塞调用,直到协程运行结束才返回。参数是future、协程对象,传入协程对象时内部会自动变为future
  • 在最早的Python 3.4中,协程函数是通过@asyncio.coroutine 和 yeild from 实现的
 import asyncio
 
 @asyncio.coroutine
 def func1(i):
     print("协程函数{}马上开始执行。".format(i))
     yield from asyncio.sleep(2)
     print("协程函数{}执行完毕!".format(i))
 
 if __name__ == '__main__':
     # 获取事件循环
     loop = asyncio.get_event_loop()
 
     # 执行协程任务
     loop.run_until_complete(func1(1))
 
     # 关闭事件循环
     loop.close()

(2)asyncio.run()

  • 这是异步程序的主入口,相当于C语言中的main函数。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次
  • Python 3.7之前执行协程任务都是分三步进行的,代码有点冗余。Python 3.7提供了一个更简便的asyncio.run方法,下面代码为asyncio.run出现前后的对比:
# asyncio.run出现之前
import asyncio
 
 # 这是一个协程函数
 async def func1(i):
     print("协程函数{}马上开始执行。".format(i))
     await asyncio.sleep(2)
     print("协程函数{}执行完毕!".format(i))
 
 if __name__ == '__main__':
     # 获取事件循环
     loop = asyncio.get_event_loop()
 
     # 执行协程任务
     loop.run_until_complete(func1(1))
 
     # 关闭事件循环


# asyncio.run方法出现之后
 import asyncio
 
 async def func1(i):
     print(f"协程函数{i}马上开始执行。")
     await asyncio.sleep(2)
     print(f"协程函数{i}执行完毕!")
 
 if __name__ == '__main__':

(3)asyncio.sleep()

  • 模拟IO操作,这样的休眠不会阻塞事件循环,前面加上await后会把控制权交给主事件循环,在休眠(IO操作)结束后恢复这个协程
  • 注意:若在协程中需要有延时操作,应该使用 await asyncio.sleep(),而不是使用time.sleep(),因为使用time.sleep()后会释放GIL,阻塞整个主线程,从而阻塞整个事件循环

(4)async 和 await

  • python在3.5以后引入asyncawait,代替了asyncio.coroutine 装饰器,基于他编写的协程代码其实就是上一示例的加强版,让代码可以更加简便可读
  • async用在定义函数之前,声明当前定义的函数时一个异步函数
    • 异步函数不能直接调用,直接调用异步函数不会返回结果,而是返回一个coroutine对象。需要放在异步框架中调用
  • await的作用是挂起一个函数,在挂起时,该挂起函数仍在执行,而当前线程会去执行其他异步函数,而不是阻塞住等其全部执行完;当挂起函数执行完成后,线程会从其它异步函数返回继续执行挂起函数之后的内容
    • await只可对异步函数使用(本质是实现了 __await__ 方法的类),且只能在一个异步函数中使用

(5)asyncio.create_task()

  • asyncio.create_task()其是对loop.create_task()的封装,相当于创建并行的任务

  • 接收一个协程,返回一个asyncio.Task的实例,也是asyncio.Future的实例,毕竟Task是Future的子类。返回值可直接传入run_until_complete()

  • 返回的Task对象可以看到协程的运行情况

  • 我们只执行了单个协程任务(函数)。实际应用中,我们先由协程函数创建协程任务,然后把它们加入协程任务列表,最后一起交由事件循环执行。

    根据协程函数创建协程任务有多种方法,其中最新的是Python 3.7版本提供的asyncio.create_task方法,如下所示:

 # 方法1:使用ensure_future方法。future代表一个对象,未执行的任务。
 task1 = asyncio.ensure_future(func1(1))
 task2 = asyncio.ensure_future(func1(2))
 
 # 方法2:使用loop.create_task方法
 task1 = loop.create_task(func1(1))
 task2 = loop.create_task(func1(2))
 
 # 方法3:使用Python 3.7提供的asyncio.create_task方法
 task1 = asyncio.create_task(func1(1))
 task2 = asyncio.create_task(func1(2))
  • 简单实例
import asyncio

async def coroutine_example():
    await asyncio.sleep(1)
    print('zhihu ID: Zarten')

coro = coroutine_example()

loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print('运行情况:', task)

loop.run_until_complete(task)
print('再看下运行情况:', task)
loop.close()

# 输出结果:
'''
运行情况:<Tosk pending coro=<coroutine_example() running at E:/Profile/pyproject/test.py>>
zhihu ID:Zarten
再看下运行情况:<Task finished coro=<coroutine_example() done,definde at E:/Profile/pyproject/test.py>result=None>  

从输出结果可看到,当task为finished状态时,有个result()的方法,我们可以通过这个方法来获取协程的返回值
'''

(6)asyncio.wait() 和 asyncio.gather()

  • wait()方法将task任务列表加入到当前的事件循环中,等待多个异步任务完成等;(注意:必须先创建事件循环,后加入任务列表,否则会报错)

  • asyncio.wait()是一个协程,不会阻塞,立即返回,返回的是协程对象。传入的参数是future或协程构成的可迭代对象。最后将返回值传给run_until_complete()加入事件循环

  • 相同

    • 从功能上看,asyncio.waitasyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来
  • 不同:

    • asyncio.wait 使用一个set保存它创建的Task实例,因为set是无序的所以这也就是我们的任务不是顺序执行的原因。会返回两个值:done 和 pending,done 为已完成的协程 Task,pending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result;

      asyncio.gather 返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果,如果列表中传入的不是create_task方法创建的协程任务,它会自动将函数封装成协程任务

i. asyncio.wait实例

  • 最常见的写法是:await asyncio.wait(task_list)
import asyncio
import arrow
 
 
def current_time():
    '''
    获取当前时间
    :return:
    '''
    cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
    return cur_time
 
 
async def func(sleep_time):
    func_name_suffix = sleep_time        # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
    print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
    await asyncio.sleep(sleep_time)
    print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
    return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"
 
 
async def run():
    task_list = []
    for i in range(5):
        task = asyncio.create_task(func(i))
        task_list.append(task)
    #会返回两个值:done 和 pending,done 为已完成的协程 Task,pending 为超时未完成的协程 Task
    done, pending = await asyncio.wait(task_list, timeout=None)
    for done_task in done:
        print((f"[{current_time()}] 得到执行结果 {done_task.result()}"))
 
def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
 
 
if __name__ == '__main__':
    main()

ii. asyncio.gather 实例

  • *注意await asyncio.gather(*task_list),注意这里 task_list 前面有一个 (把list解构)
import asyncio
import arrow
 
 
def current_time():
    '''
    获取当前时间
    :return:
    '''
    cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
    return cur_time
 
 
async def func(sleep_time):
    func_name_suffix = sleep_time     # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
    print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
    await asyncio.sleep(sleep_time)
    print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
    return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"
 
 
async def run():
    task_list = []
    for i in range(5):
        task = asyncio.create_task(func(i))
        task_list.append(task)
    results = await asyncio.gather(*task_list)
    for result in results:
        print((f"[{current_time()}] 得到执行结果 {result}"))
 
 
def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
 
 
if __name__ == '__main__':
    main()
 

(7)add_done_callback回调

  • 我们还可以给每个协程任务通过add_done_callback的方法给单个协程任务添加回调函数
 #-*- coding:utf-8 -*-
 import asyncio
 
 async def func1(i):
     print(f"协程函数{i}马上开始执行。")
     await asyncio.sleep(2)
     return i
 
 # 回调函数
 def callback(future):
     print(f"执行结果:{future.result()}")
 
 async def main():
     tasks = []
     for i in range(1, 5):
         task = asyncio.create_task(func1(i))
         
         # 注意这里,增加回调函数
         task.add_done_callback(callback)
         tasks.append(task)
 
     await asyncio.wait(tasks)
 
 if __name__ == '__main__':
     asyncio.run(main())

(8)其他

很多协程任务都是很耗时的,当你使用wait方法收集协程任务时,可通过timeout选项设置任务切换前单个任务最大等待时间长度,如下所示:

# 获取任务执行结果,如下所示: done,pending = await asyncio.wait(tasks, timeout=10)

asyncio.current_task: 返回当前运行的Task实例,如果没有正在运行的任务则返回 None。如果 loop 为 None 则会使用 get_running_loop()获取当前事件循环。
asyncio.all_tasks: 返回事件循环所运行的未完成的Task对象的集合

3. 简单的协程实例

  • 基于python3.5及之后版本
1. 
import asyncio

async def hsw_test():
    print('---你好')
    await asyncio.sleep(2)
    print('---我的朋友')

asyncio.run(hsw_test())


2. 同步运行多个协程
import asyncio,time

async def hsw_test(msg,delay):
    await asyncio.sleep(delay)
    print(msg)


async def main():
    print(f"start {time.strftime('%H:%M:%S')}")
    await hsw_test('你好',2)
    await hsw_test('我的朋友', 4)
    print(f"end {time.strftime('%H:%M:%S')}")

asyncio.run(main())

'''
start 15:13:09
你好
我的朋友
end 15:13:15

从起止时间可以看出,两个协程是 顺序 执行的,总共耗时2+4=6秒
'''

3. 并发运行多个协程

import asyncio,time

async def hsw_test(msg,delay):
    await asyncio.sleep(delay)
    print(msg)
    
async def main():
    task1 = asyncio.create_task(hsw_test('你好',2))
    task2 = asyncio.create_task(hsw_test('我的朋友', 4))
    print(f"start {time.strftime('%H:%M:%S')}")
    await task1
    await task2
    print(f"end {time.strftime('%H:%M:%S')}")  
    
asyncio.run(main())    

"""
start 15:17:23
你好
我的朋友
end 15:17:27

从起止时间可以看出,两个协程是 并发 执行的,总共耗时等于携程中的最大耗时4秒
asyncio.create_task() 在后面的异步编程中非常常用,相当于创建了多条生产线,让协程在不同生产线之间切换执行
"""

4. asyncio.create_task()并发运行多个协程
import asyncio

# 定义异步函数
async def foo2():
    print('----start foo')
    await asyncio.sleep(6)
    print('foo  foo')

    await asyncio.sleep(3)
    print('----end foo')

# 定义异步函数
async def bar2():
    print('----start bar')
    await asyncio.sleep(3)

    print('bar  bar')
    await asyncio.sleep(6)
    print('----end bar')

async def sdf():
    tasks = []
    tasks.append(asyncio.create_task(foo2()))  # 创建两个异步任务
    tasks.append(asyncio.create_task(bar2()))
    await asyncio.wait(tasks)  # asyncio.wait()可以控制多任务,asyncio.wait()是一个协程,不会阻塞,立即返回,返回的是协程对象。传入的参数是future或协程构成的可迭代对象。最后将返回值传给run_until_complete()加入事件循环;run_until_complete()方法作用是阻塞调用,直到协程运行结束才返回。参数是future,传入协程对象时内部会自动变为future。Future对象表示尚未完成的计算,还未完成的结果
    
if __name__ == '__main__':

    asyncio.run(sdf())    
    

# 打印结果
'''
----start foo
----start bar
bar  bar
foo  foo
----end foo
----end bar

'''

标签:__,task,协程,补充,编程,并发,print,await,asyncio
From: https://www.cnblogs.com/Mcoming/p/17674668.html

相关文章

  • 编程破解软件集合(自用合集)(侵删)
    编程软件破解集合(自用合集)(侵删)nagivate:https://www.cnblogs.com/marchxd/p/15580739.html#5201423(来自https://www.cnblogs.com/marchxd/p/15580739.html#5201423)jetbrain:https://www.aliyundrive.com/s/iqNgQAGtn6q(这是破解软件,主软件自己下)教程:http......
  • 大模型和人一样需要 提高对 编程语言认知
    今天在ChatGLM2-6B的仓库里看到了这么一个issue:https://github.com/THUDM/ChatGLM2-6B/issues/122:这位兄弟说的挺好,其中有点小错误:三星Tizen架构其实不是架构,是属于arm架构,Tizen是三星的一个操作系统。由此我想到了ChatGLM2是国人开源的LLM,因此我去国内的几个大模型应用上......
  • go并发编程系列五:线程分组及控制线程的合作执行
    背景:线程的合作执行,体现的是团结协作,应该是比较理想的状态,如果人人都能够少一些算计、多一点互帮互助,那该有多好啊?班主任不是资本家,班级更应该提倡团队精神,学生之间不应该竞争,应该互相协作!这篇文章以团结协作为出发点,讲解线程的合作执行。 TRANSLATEwithxEnglish......
  • go并发编程系列四:线程分组及控制线程的执行
    背景:在上一篇中,作为班主任的你,对班级的管理初见成效,但理想和现实总有差距,理想情况下,从接手一个调皮的班级到班级的管理井井有条,是从0到1的跨越,没有中间的过渡阶段,然而,现实是:班级里少不了调皮的学生,对于这样的情况,应该怎么办呢?这篇文章讲解的正是现实中从0到1的过渡阶段,本文仅以讲......
  • 【ChatGPT答】编程方式(编写计算机程序的方法和范式)
    不同的编程方式,每种都有其特定的语法、结构和应用领域,根据任务需求和个人喜好选择一种或多种结合使用。命令式编程(ImperativeProgramming):最常见的方式之一。通过一系列的命令和状态改变来描述程序的执行过程,需要明确指定每个步骤的执行。常见语言:C、C++、Java和Python。声......
  • go并发编程系列三:线程的顺序执行
    背景:在go并发编程系列二,你作为班主任,接手了有三个学生的班级,现状是这三个学生很调皮,看起来乱糟糟的,三个学生就是三个线程,怎么变的有序呢?答案是:给线程加锁!加锁需要使用go标准包的sync.Mutex,这是一个互斥锁。给线程加锁的代码:packageconcurrentimport("fmt""sync"......
  • go并发编程系列二:线程的并发执行
    新建concurrent文件夹,在该文件夹下创建concurrent.go,代码如下:packageconcurrentimport("fmt""time")funcThreadZhangSan(){fori:=1;i<=3;i++{fmt.Println("张三:",i)time.Sleep(time.Millisecond*50......
  • go并发编程系列:一、建立线程
    在go语言中,使用线程的格式是:go函数名当然,你也可以使用匿名函数,我们建立三个线程,代码如下:gotest1()gotest2()gofunc(){}是不是很简单?在下一系列中,我们将扩展这三个线程,进一步讲解go的并发编程!TRANSLATEwithxEnglishArabicHebrewPolishBulgar......
  • SpringBoot 最大连接数及最大并发数是多少?
    来源:laker.blog.csdn.net/article/details/130957301每个SpringBoot版本和内置容器不同,结果也不同,这里以SpringBoot2.7.10版本+内置Tomcat容器举例。概序在SpringBoot2.7.10版本中内置Tomcat版本是9.0.73,SpringBoot内置Tomcat的默认设置如下:Tomcat的连接等待队列长度,默认是100......
  • 系统编程-线程池
    工程背景考虑这么一个情况:为应对某场景的实际需求,要在程序中创建大量线程,并且这些线程的数量和生命周期均不确定,可能方生方死,也可能常驻内存,如何在满足其要求的同时,尽可能降低系统负载?一个基本事实是,线程的创建和销毁都是需要额外的系统资源的,如果线程的生命周期很短,那么相对于......