首页 > 编程语言 >Python并发编程入门:使用concurrent.futures与asyncio

Python并发编程入门:使用concurrent.futures与asyncio

时间:2024-11-15 16:14:28浏览次数:3  
标签:并发 Python futures concurrent 任务 密集型 asyncio

Python并发编程入门:使用 concurrent.futuresasyncio

在现代应用中,并发编程已成为一种提升性能和效率的重要手段。Python提供了多种实现并发的方式,尤其是 concurrent.futuresasyncio,分别适用于不同的并发场景。本文将带你深入了解这两种并发编程方式,帮助你轻松上手并发编程。

一、并发编程简介

在Python中,并发编程的目的是在单个程序中同时执行多个任务,尤其是在遇到I/O密集型(如网络请求、文件读写)或计算密集型(如数据处理、图像处理)任务时,并发编程可以显著提高程序的效率。并发编程主要有以下几种方式:

  1. 多线程:通过多个线程执行任务,每个线程运行在独立的处理器时间片上。
  2. 多进程:通过多个进程执行任务,每个进程有自己的内存空间。
  3. 异步编程:通过事件循环在单个线程中调度和执行多个任务,主要适用于I/O密集型任务。

接下来,我们将介绍Python中两个主流的并发工具库:concurrent.futuresasyncio

二、使用 concurrent.futures 实现并发

concurrent.futures 是Python 3.2引入的一个标准库,提供了高层次的接口来实现多线程和多进程并发。它包含两个重要的类:

  • ThreadPoolExecutor:用于创建一个线程池,适合处理I/O密集型任务。
  • ProcessPoolExecutor:用于创建一个进程池,适合处理计算密集型任务。

1. 使用 ThreadPoolExecutor 处理 I/O 密集型任务

例如,我们有一个需要并发请求多个URL的场景,可以使用 ThreadPoolExecutor 来实现:

import concurrent.futures
import requests

urls = [
    'https://www.example.com',
    'https://www.python.org',
    'https://www.github.com',
    # 其他URL
]

def fetch(url):
    response = requests.get(url)
    return response.status_code

# 创建线程池并发请求
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(fetch, urls)

# 输出结果
for result in results:
    print(result)

在这里,ThreadPoolExecutor 创建了一个包含5个线程的线程池,通过 executor.map 将多个 fetch 函数并发执行。 ThreadPoolExecutorconcurrent.futures中一个非常简洁的接口,适合处理需要大量I/O操作的任务。

2. 使用 ProcessPoolExecutor 处理计算密集型任务

对于CPU密集型任务,如数据处理、图像处理等,我们可以用 ProcessPoolExecutor 来实现:

import concurrent.futures
import math

numbers = [1000000, 2000000, 3000000, 4000000, 5000000]

def compute(num):
    return math.sqrt(num)

# 创建进程池并发处理
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
    results = executor.map(compute, numbers)

# 输出结果
for result in results:
    print(result)

在这个例子中,我们创建了一个包含3个进程的进程池,通过 executor.map 将多个 compute 函数并发执行。ProcessPoolExecutor 将任务分配到不同的进程中运行,充分利用多核CPU的优势。

三、使用 asyncio 实现异步编程

asyncio 是Python 3.5引入的异步编程库,主要用于I/O密集型任务。通过 asyncio,我们可以在单个线程中调度多个异步任务,不同于传统的线程或进程,它在任务等待I/O操作时不会阻塞整个程序。

1. asyncio 基本语法

asyncio 中,异步函数用 async def 定义,异步调用需要使用 await。以下是一个基本示例:

import asyncio

async def hello():
    print("Hello,")
    await asyncio.sleep(1)
    print("World!")

# 运行异步任务
asyncio.run(hello())

在这个例子中,hello 函数会在 await asyncio.sleep(1) 时暂停执行1秒,而不是阻塞整个线程。

2. 使用 asyncio 并发执行多个任务

例如,我们可以使用 asyncio.gather 来并发执行多个异步任务:

import asyncio
import aiohttp

urls = [
    'https://www.example.com',
    'https://www.python.org',
    'https://www.github.com',
]

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response.status

async def main():
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(results)

# 运行异步任务
asyncio.run(main())

这里,我们创建了3个异步请求并同时发出,asyncio.gather 会等待所有任务完成后返回结果。aiohttp 是一个异步HTTP库,可以高效地处理网络请求。

3. asyncioawait 的优势

asyncioawait 的主要优势在于,它只在需要等待的地方(如I/O操作)暂停任务,而不阻塞其他任务。相比多线程,它在I/O密集型场景中通常性能更优,因为在单线程中管理多个任务减少了线程切换的开销。

四、concurrent.futuresasyncio 的选择

在实际应用中,如何选择 concurrent.futuresasyncio?以下是一些建议:

  • I/O密集型任务asyncio 更适合这种场景,因为它能够在单个线程中并发处理多个任务,减少线程切换的开销。但如果代码中有阻塞式调用且无法改写为异步操作,ThreadPoolExecutor 可能更合适。

  • CPU密集型任务ProcessPoolExecutor 更适合计算密集型任务,因为它通过多进程利用多核CPU的优势,而 asyncio 则仅适合I/O密集型场景,无法充分利用CPU资源。

  • 兼容性:如果需要与同步代码或现有的库集成,concurrent.futures 更为灵活,而 asyncio 则可能需要重构代码来使用异步API。

五、建议

concurrent.futuresasyncio 是Python中两种强大且常用的并发工具,各有优劣。concurrent.futures 提供了线程池和进程池,更适合传统的同步I/O和CPU密集型任务;而 asyncio 则提供了一种高效的异步编程方式,非常适合处理I/O密集型任务。选择合适的并发工具,将显著提高Python应用程序的性能和响应速度。

六、进阶示例:混合使用 concurrent.futuresasyncio

在一些复杂的应用场景中,我们可能会需要同时处理I/O密集和CPU密集型任务。Python提供的 concurrent.futuresasyncio 可以结合使用,充分利用两者的优势。

例如,假设我们有一个数据处理任务,它需要先从多个API获取数据(I/O密集型任务),然后对数据进行大量计算(CPU密集型任务)。在这种情况下,可以使用 asyncio 来处理异步请求,用 concurrent.futures.ProcessPoolExecutor 进行数据的并行处理。

import asyncio
import aiohttp
import concurrent.futures

urls = [
    'https://www.example.com',
    'https://www.python.org',
    'https://www.github.com',
]

async def fetch_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

def process_data(data):
    # 进行计算密集型操作,比如数据分析或模型训练
    result = len(data)  # 示例:计算数据长度
    return result

async def main():
    # Step 1: 使用asyncio并发获取数据
    tasks = [fetch_data(url) for url in urls]
    data_list = await asyncio.gather(*tasks)

    # Step 2: 使用ProcessPoolExecutor处理数据
    with concurrent.futures.ProcessPoolExecutor() as executor:
        loop = asyncio.get_running_loop()
        # 将计算密集型任务提交到进程池
        results = await asyncio.gather(
            *[loop.run_in_executor(executor, process_data, data) for data in data_list]
        )
    
    print(results)

# 运行主任务
asyncio.run(main())

在这个示例中,fetch_data 函数通过 asyncio 异步获取数据,而 process_data 函数则使用 ProcessPoolExecutor 并行处理数据。通过 loop.run_in_executor 将计算密集型任务提交到进程池,可以在单个 asyncio 事件循环中同时处理I/O和CPU密集型任务,从而提高程序的整体性能。

七、常见问题和调试技巧

1. asyncioconcurrent.futures 兼容性问题

asyncio 的事件循环中使用 concurrent.futures 的执行器时,需要通过 loop.run_in_executor 将阻塞任务移到线程池或进程池中,以免阻塞事件循环。此外,asyncio.run() 只能在主线程中运行,因此通常不推荐在多线程或多进程中直接使用。

2. asyncio 的嵌套任务

如果在asyncio中调用嵌套的 asyncio.run() 会报错,因此应使用 await 等待嵌套任务,而不是直接启动新的事件循环。嵌套任务可以通过 await asyncio.gather(...) 方式并发执行。

3. 数据共享与线程安全

concurrent.futures 中,尤其是多线程的 ThreadPoolExecutor,共享数据需要考虑线程安全性。可以使用 threading.Lock 或者 concurrent.futures 的回调机制来确保数据的同步。在 ProcessPoolExecutor 中,数据在进程间是独立的,需要通过共享内存或队列来通信。

八、总结与最佳实践

  • 选择合适的工具:对于I/O密集型任务,asyncio 是更好的选择;对于CPU密集型任务,concurrent.futures.ProcessPoolExecutor 更适合。
  • 避免阻塞事件循环:在 asyncio 中,如果有阻塞任务,可以使用 run_in_executor 将其交给 ThreadPoolExecutorProcessPoolExecutor 执行。
  • 合理控制并发数:无论是线程池、进程池还是异步任务,并发数过多可能会导致系统资源耗尽,因此设置合理的 max_workers 或控制并发任务数尤为重要。
  • 优先使用异步库:如果使用 asyncio,尽量选择异步兼容的库,如 aiohttp 替代 requestsaiomysql 替代 pymysql 等,以避免阻塞。

总结

Python 的 concurrent.futuresasyncio 为开发者提供了简单且强大的并发工具,帮助我们更好地应对不同的并发需求。无论是多线程、多进程还是异步编程,理解其核心概念并合理应用,将使你的应用程序更加高效、快速。通过掌握这两种并发方式的基础和进阶应用,相信你能在不同的项目中灵活应用并发编程,使你的代码性能和用户体验得到显著提升。

标签:并发,Python,futures,concurrent,任务,密集型,asyncio
From: https://blog.csdn.net/liaoqingjian/article/details/143736451

相关文章

  • 【Python】将同一目录下的多个doc文件批量转为docx文件
    同一目录有多个doc文件: importwin32com.clientaswcimportos#设置.doc文件所在的目录docs_directory=r'F:\xxx\PycharmProjects\Python学习项目\doc文件'defgetpath(docs_directory):#输出docx文件的路径和名称[路径,名称]#遍历目录中的所有.doc文件......
  • 【Python】将同一目录下的多个docx文件内容写入同一个txt文件
    同一目录下有多个docx文件 importdocximportoslist_w=[]forfilenameinos.listdir(r"F:\xxxx\PycharmProjects\Python学习项目\docx文件"):iffilename.endswith('.docx'):#如果文件以.docx结尾document=docx.Document("F:\\xxxx\......
  • python2.7安装pip
    我的python版本号具体信息如下:Python2.7(r27:82525,Jul42010,07:43:08)[MSCv.150064bit(AMD64)]onwin32Type"help","copyright","credits"or"license"formoreinformation.python2.7没有自带pip工具,需要安装一个setuptools包,我使用的py......
  • Python文件操作
    七、文件操作7.1文件打开和关闭打开文件open(file,mode='r',buffering=-1,encoding=None,errors=None,newline=None,closefd=True,opener=None)file:文件路径mode:文件打开模式文本模式'r':只读模式(默认),文件必须存在。'w':写入模式,若文件存在则清空内容,若不存......
  • 基于yolov10的柿子成熟度检测系统,支持图像、视频和摄像实时检测【pytorch框架、python
     更多目标检测和图像分类识别项目可看我主页其他文章功能演示:yolov10,柿子成熟度检测系统,支持图像、视频和摄像实时检测【pytorch框架、python】_哔哩哔哩_bilibili(一)简介基于yolov10的柿子成熟度检测系统是在pytorch框架下实现的,这是一个完整的项目,包括代码,数据集,训练好的......
  • python的decimal默认精度为28
    python的decimal的精度可以修改的计算pi使用莱布尼茨级数计算圆周率fromdecimalimportDecimal,getcontext#设置全局精度为100位getcontext().prec=100#使用莱布尼茨级数计算圆周率pi=Decimal(0)forkinrange(1000000):pi+=(Decimal(-1)**k)/(......
  • Python异常处理
    六、异常处理基本语法try:#尝试运行的代码块result=10/0exceptZeroDivisionErrorase:#捕获ZeroDivisionError异常print("不能除以零!",e)exceptTypeErrorase:print("类型错误!",e)exceptExceptionase:#这里可以捕获所有的......
  • python实现的扫雷游戏的AI解法(启发式算法)
    相关:python编写的扫雷游戏如何使用计算机程序求解扫雷游戏本文中实现的《扫雷》游戏的AI解法的项目地址:https://openi.pcl.ac.cn/devilmaycry812839668/AI_mine_game该项目的解法效果:之前介绍了网上的一些解决《扫雷》游戏的一些解法,包括DQN和启发式等AI算法,看着这......
  • python调用百度通用翻译API
    文章目录1.简介2.使用步骤3.api调用实现4.编码实现1.简介前段时间在做视频语音识别生成多语种字幕时,使用了百度翻译通用翻译api进行翻译。百度翻译平台经过个人认证之后,每月有200万字符的免费翻译额度。还是比较舒服的。百度翻译开放平台是百度翻译面向广大......
  • Python类
    五、类5.1定义类使用class关键字定义一个类,类名通常采用首字母大写的驼峰命名法classPerson:pass5.2构造函数基本语法classPerson:def__init__(self,name,age):#定义构造函数self.name=name#初始化name属性s......