首页 > 编程语言 >Python Coroutine 池化实现

Python Coroutine 池化实现

时间:2024-01-29 11:45:12浏览次数:22  
标签:__ task 协程 Coroutine Python CoroutinePoolExecutor 池化

Python Coroutine 池化实现

池化介绍

在当今计算机科学和软件工程的领域中,池化技术如线程池、连接池和对象池等已经成为优化资源利用率和提高软件性能的重要工具。然而,在 Python 的协程领域,我们却很少见到类似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。为什么会这样呢?

首先,Python Coroutine 的特性使得池化技术在协程中的应用相对较少。与像 Golang 这样支持有栈协程的语言不同,Python Coroutine 是无栈的,无法跨核执行,从而限制了协程池发挥多核优势的可能性。

其次,Python Coroutine 的轻量级和快速创建销毁的特性,使得频繁创建和销毁协程并不会带来显著的性能损耗。这也解释了为什么 Python 官方一直没有引入 CoroutinePoolExecutor。

然而,作为开发者,我们仍然可以在特定场景下考虑协程的池化。虽然 Python Coroutine 轻量,但在一些需要大量协程协同工作的应用中,池化技术能够提供更方便、统一的调度子协程的方式。尤其是在涉及到异步操作的同时需要控制并发数量时,协程池的优势就显而易见了。

关于 Python 官方是否会在未来引入类似于 TaskGroup 的 CoroutinePoolExecutor,这或许是一个悬而未决的问题。考虑到 Python 在异步编程方面的快速发展,我们不能排除未来可能性的存在。或许有一天,我们会看到 TaskGroup 引入一个 max_workers 的形参,以更好地支持对协程池的需求。

在实际开发中,我们也可以尝试编写自己的 CoroutinePoolExecutor,以满足特定业务场景的需求。通过合理的设计架构和对数据流的全局考虑,我们可以最大程度地发挥协程池的优势,提高系统的性能和响应速度。

在接下来的文章中,我们将探讨如何设计和实现一个简单的 CoroutinePoolExecutor,以及在实际项目中的应用场景。通过深入理解协程池的工作原理,我们或许能更好地利用这一技术,使我们的异步应用更为高效。

如何开始编写

如何开始编写 CoroutinePoolExecutor,首先我们要明确出其适用范畴、考虑到使用方式和其潜在的风险点:

  • 它并不适用于 Mult Thread + Mult Event Loop 的场景,因此它并非线程安全的。
  • 应当保持和 ThreadPoolExecutor 相同的调用方式。
  • 不同于 Mult Thread 中子线程不依赖于主线程的运行,而在 Mult Coroutine 中子协程必须依赖于主协程,因此主协程在子协程没有全部运行完毕之前不能直接 done 掉。这也解释了为什么 TaskGroup 官方实现中没有提供类似于 shutdown 之类的方法,而是只提供上下文管理的运行方式。

有了上述 3 点的考量,我们决定将 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。这样的好处在于,作为学习者一方面可以了解 ThreadPoolExecutor 的内部实现机制,另一方面站在巨人肩膀上的编程借鉴往往会事半功倍,对于自我的提升也是较为明显的。

在考虑这些因素的同时,我们将继续深入研究协程池的设计和实现。通过对适用范围和使用方式的明确,我们能更好地把握 CoroutinePoolExecutor 的潜在优势,为异步应用的性能提升做出更有针对性的贡献。

具体代码实现

在这里我先贴出完整的代码实现,其中着重点已经用注释标明。

以下是 CoroutinePoolExecutor 的代码实现:

import os
import asyncio
import weakref
import logging
import itertools


async def _worker(executor_reference: "CoroutinePoolExecutor", work_queue: asyncio.Queue):
    try:
        while True:
            work_item = await work_queue.get()

            if work_item is not None:
                await work_item.run()
                del work_item

                executor = executor_reference()
                if executor is not None:
                    # Notify available coroutines
                    executor._idle_semaphore.release()
                del executor
                continue

            # Notifies the next coroutine task that it is time to exit
            await work_queue.put(None)
            break

    except Exception as exc:
        logging.critical('Exception in worker', exc_info=True)


class _WorkItem:
    def __init__(self, future, coro):
        self.future = future
        self.coro = coro

    async def run(self):
        try:
            result = await self.coro
        except Exception as exc:
            self.future.set_exception(exc)
        else:
            self.future.set_result(result)


class CoroutinePoolExecutor:
    """
    Coroutine pool implemented based on ThreadPoolExecutor
    Different from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutine
    So you must use the shutdown method to wait for all subtasks and wait for them to complete execution
    """

    # Used to assign unique thread names when coroutine_name_prefix is not supplied.
    _counter = itertools.count().__next__

    def __init__(self, max_workers, coroutine_name_prefix=""):

        if max_workers is None:
            max_workers = min(32, (os.cpu_count() or 1) + 4)
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = asyncio.Queue()
        self._idle_semaphore = asyncio.Semaphore(0)
        self._coroutines = set()
        self._shutdown = False
        self._shutdown_lock = asyncio.Lock()
        self._coroutine_name_prefix = (coroutine_name_prefix or (
            f"{__class__.__name__}-{self._counter()}"
        ))

    async def submit(self, coro):
        async with self._shutdown_lock:
            # When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed.
            # This is because after shutdown, all sub-coroutines will end their work
            # one after another. Even if there are new coroutine tasks, they will not
            # be reactivated.
            if self._shutdown:
                raise RuntimeError('cannot schedule new coroutine task after shutdown')

            f = asyncio.Future()
            w = _WorkItem(
                f,
                coro
            )
            await self._work_queue.put(w)
            await self._adjust_coroutine_count()
            return f

    async def _adjust_coroutine_count(self):

        try:
            # 2 functions:
            # - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine.
            # - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified
            # Since the Semaphore provided by asyncio does not have a timeout
            # parameter, you can choose to use it with wait_for.
            if await asyncio.wait_for(
                    self._idle_semaphore.acquire(),
                    0
            ):
                return
        except TimeoutError:
            pass

        num_coroutines = len(self._coroutines)
        if num_coroutines < self._max_workers:
            coroutine_name = f"{self._coroutine_name_prefix or self}_{num_coroutines}"
            t = asyncio.create_task(
                coro=_worker(
                    weakref.ref(self),
                    self._work_queue
                ),
                name=coroutine_name
            )

            self._coroutines.add(t)

    async def shutdown(self, wait=True, *, cancel_futures=False):
        async with self._shutdown_lock:
            self._shutdown = True

            if cancel_futures:
                while True:
                    try:
                        work_item = self._work_queue.get_nowait()
                    except asyncio.QueueEmpty:
                        break
                    if work_item is not None:
                        work_item.future.cancel()

            # None is an exit signal, given by the shutdown method, when the shutdown method is called
            # will notify the sub-coroutine to stop working and exit the loop
            await self._work_queue.put(None)

        if wait:
            for t in self._coroutines:
                await t

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.shutdown(wait=True)
        return False

以下是 CoroutinePoolExecutor 的使用方式:

import asyncio

from coroutinepoolexecutor import CoroutinePoolExecutor


async def task(i):
    await asyncio.sleep(1)
    print(f"task-{i}")


async def main():
    async with CoroutinePoolExecutor(2) as executor:
        for i in range(10):
            await executor.submit(task(i))

if __name__ == "__main__":
    asyncio.run(main())

我们知道,在线程池中,工作线程一旦创建会不断的领取新的任务并执行,除开 shutdown() 调用,否则对于静态的线程池来讲工作线程不会自己结束。

在上述协程池代码实现中,CoroutinePoolExecutor 类包含了主要的对外调用功能的接口、内部提供了存储 task 的 Queue、工作协程自动生成 name 的计数器、保障协程的信号量锁等等。

而 _worker 函数是工作协程的运行函数,其会在工作协程启动后,不断的从 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具体执行 coro task。

剩下的 _WorkItem 是一个 future 对象与 coro task 的封装器,其功能是解耦 future 对象和 coro task、并在 coro task 运行时和运行后设置 future 的结果。

对于异步循环的思考

在此 CoroutinePoolExecutor 实现后,我其实又有了一个新的思考。Python 的 EventLoop 相较于 Node.js 的 EventLoop 来说其实更加的底层,它有感的暴露了出来。

具体体现在当 Python Event Loop 启动后,如果 main coroutine 停止运行,那么所有的 subtask coroutine 也会停止运行,尤其是对于一些需要清理资源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都会在某些情况显得无措,说的更具体点就是不知道在什么时候调用。

对于这些问题,我们可以继承 BaseEventLoop 自己手动对 EventLoop 的功能进行扩展,如在事件循环关闭之前添加 hook function,甚至可以限制整个 EventLoop 的 max_workers 或者做成动态的可调节 coroutine 数量的 EventLoop 都行。

无论如何,只要心里有想法,就可以去将它实现 .. 学习本身就是一个不断挑战的过程。

标签:__,task,协程,Coroutine,Python,CoroutinePoolExecutor,池化
From: https://www.cnblogs.com/hanabi-cnblogs/p/17994172

相关文章

  • 虚拟环境python3.8安装GDAL包
    网上的方法直接是:pipinstallGDAL‑3.4.1‑cp38‑cp38‑win_amd64.whl但是这个方法不适用于我,因为我的pycharm上面的anaconda是python3.7,但是我创建了一个python3.8的虚拟环境所以需要:1.切换虚拟环境2.导入离线包python3.8对应着的GDAL为:GDAL-3.4.3-cp38-cp38-win_amd6......
  • ubuntu18容器内安装python3.9
    ubuntu18容器内编辑安装python3.9后,pip安装失败问题参考文档https://www.jianshu.com/p/8a17267caf5fhttps://blog.csdn.net/youxijishu/article/details/128885291https://blog.csdn.net/Beyond_F4/article/details/104004626https://zhuanlan.zhihu.com/p/598697953?utm_id=0......
  • 《最新出炉》系列初窥篇-Python+Playwright自动化测试-37-如何截图-上篇
    1.简介这个系列的文章也讲解和分享了差不多三分之一吧,突然有小伙伴或者童鞋们问道playwright有没有截图的方法。答案当然是:肯定有的。宏哥回过头来看看确实这个非常基础的知识点还没有讲解和分享。那么在这个契机下就把它插队分享和讲解一下。Playwright提供了一个截屏的API:page.......
  • 自动化测试神器:Python之Pytest库入门使用
    (自动化测试神器:Python之Pytest库入门使用)本文写作思路和建议:1、本文从Python的Pytest库基本安装使用开始讲起,详细说明pytest如何操作,如何快速入门?2、同时说明了pytest的常用特性,如何在自动化测试中进行应用,达到应有的测试效率?3、建议:阅读本文基本可以掌握Pytest的用法,另外学......
  • python之常用标准库-sys/os
    1.syssys常用的方法sys.path.append/sys.path.insert1#!/usr/bin/python2importos,sys3sys.path.insert(0,os.path.dirname(os.path.dirname(__file__)))#将路径插入第1个位置4sys.path.append(os.path.dirname(os.path.dirname(__file__)))#将路径追加到末尾View......
  • Python 基于pymongo操作Mongodb学习总结
    实践环境Python3.6.4pymongo4.1.1pymongo-3.12.3-cp36-cp36m-win_amd64.whl下载地址:https://pypi.org/simple/pymongo/代码实践#!/usr/bin/envpython#-*-coding:utf-8-*-importdatetimeimportrandomimportpymongofrompymongoimportMongoClientfrombson.......
  • Python笔记四之协程
    本文首发于公众号:Hunter后端原文链接:Python笔记四之协程协程是一种运行在单线程下的并发编程模型,它的特点是能够在一个线程内实现多个任务的并发操作,通过在执行任务时主动让出执行权,让其他任务继续执行,从而实现并发。以下所有的代码都是在Python3.8版本中运行。本篇笔记......
  • Python导入(import)模块的方法
    ​ Python中,导入(import)模块是一种常见的操作,用于加载模块或库的功能以便在当前的程序中使用。Python提供了几种不同的方式来导入模块,可以根据自己的需求选择合适的方式来导入所需的模块或函数。参考文档:Python导入(import)模块的方法-CJavaPy1、导入整个模块模块是扩展名......
  • Python Seaborn 基本数据排名分析
    ​ Python中使用Seaborn进行基本的数据排名分析通常涉及到可视化数据的分布和排名。Seaborn是一个基于Matplotlib的数据可视化库,提供了丰富的图表类型,使得数据分析更加直观。可以对数据进行初步的排名分析,了解数据的基本分布情况,从而为更深入的数据分析打下基础。1、条......
  • 卷积神经网络详解+Python实现卷积神经网络Cifar10彩色图片分类
    原文链接:https://blog.csdn.net/master_hunter/article/details/133156758卷积神经网络相对于普通神经网络在于以下四个特点:局部感知域:CNN的神经元只与输入数据的一小部分区域相连接,这使得CNN对数据的局部结构具有强大的敏感性,可以自动学习到图像的特征。参数共享:在CNN中,同一个......