首页 > 编程语言 >python并发与并行(十一) ———— 让asyncio的事件循环保持畅通,以便进一步提升程序的响应能力

python并发与并行(十一) ———— 让asyncio的事件循环保持畅通,以便进一步提升程序的响应能力

时间:2024-08-31 15:27:56浏览次数:13  
标签:python self write 并发 output path loop asyncio


前一篇blog说明了怎样把采用线程所实现的项目逐步迁移到asyncio方案上面。迁移后的run_tasks协程,可以将多份输入文件通过tail_async协程正确地合并成一份输出文件。

import asyncio

# On Windows, a ProactorEventLoop can't be created within
# threads because it tries to register signal handlers. This
# is a work-around to always use the SelectorEventLoop policy
# instead. See: https://bugs.python.org/issue33792
policy = asyncio.get_event_loop_policy()
policy._loop_factory = asyncio.SelectorEventLoop

async def run_tasks(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, write_async)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)

但这样写有个大问题,就是针对输出文件所做的open、close以及write操作,全都要放在主线程中执行,而这些操作又需要在程序所处的操作系统执行系统调用,这些调用可能会让事件循环阻塞很长一段时间,导致其他协程没办法推进。这会降低程序的总体响应能力,而且会增加延迟,对于高并发服务器来说,这个问题尤其严重。

调用asyncio.run函数时,把debug参数设为True,可以帮助我们发现这种问题。例如,下面这种写法就能显示出,slow_coroutine协程所执行的系统调用耗时比较长,这可以提醒我们注意,要读取的文件是否已经损坏,或者其中某一行是否读不出来。

import time

async def slow_coroutine():
    time.sleep(0.5)  # Simulating slow I/O

asyncio.run(slow_coroutine(), debug=True)

为了进一步提升程序的响应能力,我们可以想办法把那些有可能会执行系统调用的操作从程序本身的事件循环里面拿走。例如,新建这样一个Thread子类,让它把那种给输出文件写入数据的操作封装到自己的事件循环里面,这样就不会阻塞程序本身的事件循环了。

其他线程中的协程,可以直接调用这个线程类的write方法,并对该方法做await。其实这个write方法,只不过是把真正负责执行写入操作的那个real_write封装了起来。这种封装方式能够确保线程安全,因此不需要再通过Lock加锁.

然后,我们按照相似的思路,编写真正负责停止本线程的real_stop方法,并把它封装到stop里面,这样的话,其他协程就可以通过stop方法告知本线程应该结束工作。这项操作同样是线程安全的。

另外,还可以定义__aenter__与__aexit__方法,让我们的线程能够用在异步版本的with语句之中,以确保该线程的启动与关闭会安排在适当的时机执行,而不拖慢主事件循环所在的那条线程。

from threading import Thread

class WriteThread(Thread):
    def __init__(self, output_path):
        super().__init__()
        self.output_path = output_path
        self.output = None
        self.loop = asyncio.new_event_loop()

    def run(self):
        asyncio.set_event_loop(self.loop)
        with open(self.output_path, 'wb') as self.output:
            self.loop.run_forever()

        # Run one final round of callbacks so the await on
        # stop() in another event loop will be resolved.
        self.loop.run_until_complete(asyncio.sleep(0))


# Example 4
    async def real_write(self, data):
        self.output.write(data)

    async def write(self, data):
        coro = self.real_write(data)
        future = asyncio.run_coroutine_threadsafe(
            coro, self.loop)
        await asyncio.wrap_future(future)


# Example 5
    async def real_stop(self):
        self.loop.stop()

    async def stop(self):
        coro = self.real_stop()
        future = asyncio.run_coroutine_threadsafe(
            coro, self.loop)
        await asyncio.wrap_future(future)


# Example 6
    async def __aenter__(self):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.start)
        return self

    async def __aexit__(self, *_):
        await self.stop()

这段代码演示了如何结合使用 Python 的 asyncio 模块和线程 (threading.Thread) 来创建一个在单独线程中运行的异步事件循环。这在需要在异步环境中执行 I/O 操作,但又不想阻塞主事件循环时非常有用。下面是对代码中关键部分的解释:

WriteThread 类 (继承自 Thread)

  1. 初始化 (__init__ 方法):
  • 构造函数设置了 output_path,这是输出文件的路径。
  • self.loop 创建了一个新的异步事件循环。
  1. run 方法:
  • 这个方法是线程的入口点,它设置当前线程的事件循环,并打开输出文件。
  • self.loop.run_forever() 使事件循环持续运行,直到调用 stop 方法。
  • 最后,通过 self.loop.run_until_complete(asyncio.sleep(0)) 确保事件循环能够完成所有挂起的协程。

异步写入方法

  1. real_write 协程:
  • 这是一个普通的协程,用于执行实际的写入操作。
  1. write 协程:
  • 这个方法使用 asyncio.run_coroutine_threadsafe 安全地从另一个线程运行 real_write 协程。
  • 它等待 real_write 完成,并使用 await asyncio.wrap_future(future)Future 对象包装为协程。

停止事件循环

  1. real_stop 协程:
  • 这个方法简单地调用 self.loop.stop() 来停止事件循环。
  1. stop 协程:
  • 类似于 write 方法,它使用 asyncio.run_coroutine_threadsafe 来安排 real_stop 在事件循环中运行。

上下文管理器协议

  1. __aenter__ 协程:
  • 这个方法实现了上下文管理器协议的 __enter__ 方法,允许使用 with 语句来管理 WriteThread 对象的生命周期。
  • 它使用 await 来启动线程。
  1. __aexit__ 协程:
  • 这个方法实现了上下文管理器协议的 __exit__ 方法,用于清理操作,比如停止事件循环。

关键点

  • 事件循环在新线程中运行:通过在 Thread 的子类中创建和运行自己的事件循环,可以在不阻塞主线程的情况下执行异步操作。
  • 线程安全地运行协程:使用 asyncio.run_coroutine_threadsafe 可以在不同的线程中安排协程的执行。
  • 上下文管理器:通过实现 __aenter____aexit__ 方法,WriteThread 对象可以在 with 语句中使用,这提供了一种优雅的资源管理方式。

这种模式允许你将异步 I/O 操作与线程结合使用,充分利用 asyncio 的优势,同时避免在 I/O 密集型操作中阻塞主事件循环。

写好了新的线程类之后,我们可以重构run_tasks,把它变成纯粹的异步版本。这个版本更易读懂,而且完全避免了那些耗时较长的系统调用把主事件循环所在的线程拖慢。

class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0, 2)
    length = handle.tell()

    if length == offset:
        raise NoNewData

    handle.seek(offset, 0)
    return handle.readline()

async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(
                None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)

async def run_fully_async(handles, interval, output_path):
    async with WriteThread(output_path) as output:
        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, output.write)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)

现在验证这样写是否正确。我们把一批输入文件所对应的句柄放在handles里面,交给run_fully_async去合并,然后调用confirm_merge函数,以确认这些文件之中的内容,已经合并到了输出文件里面。

import collections
import os
import random
import string
from tempfile import TemporaryDirectory

def write_random_data(path, write_count, interval):
    with open(path, 'wb') as f:
        for i in range(write_count):
            time.sleep(random.random() * interval)
            letters = random.choices(
                string.ascii_lowercase, k=10)
            data = f'{path}-{i:02}-{"".join(letters)}\n'
            f.write(data.encode())
            f.flush()

def start_write_threads(directory, file_count):
    paths = []
    for i in range(file_count):
        path = os.path.join(directory, str(i))
        with open(path, 'w'):
            # Make sure the file at this path will exist when
            # the reading thread tries to poll it.
            pass
        paths.append(path)
        args = (path, 10, 0.1)
        thread = Thread(target=write_random_data, args=args)
        thread.start()
    return paths

def close_all(handles):
    time.sleep(1)
    for handle in handles:
        handle.close()

def setup():
    tmpdir = TemporaryDirectory()
    input_paths = start_write_threads(tmpdir.name, 5)

    handles = []
    for path in input_paths:
        handle = open(path, 'rb')
        handles.append(handle)

    Thread(target=close_all, args=(handles,)).start()

    output_path = os.path.join(tmpdir.name, 'merged')
    return tmpdir, input_paths, handles, output_path


# Example 9
def confirm_merge(input_paths, output_path):
    found = collections.defaultdict(list)
    with open(output_path, 'rb') as f:
        for line in f:
            for path in input_paths:
                if line.find(path.encode()) == 0:
                    found[path].append(line)

    expected = collections.defaultdict(list)
    for path in input_paths:
        with open(path, 'rb') as f:
            expected[path].extend(f.readlines())

    for key, expected_lines in expected.items():
        found_lines = found[key]
        assert expected_lines == found_lines

input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

asyncio.run(run_fully_async(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

tmpdir.cleanup()

完整代码:

# Example 1
import asyncio

# On Windows, a ProactorEventLoop can't be created within
# threads because it tries to register signal handlers. This
# is a work-around to always use the SelectorEventLoop policy
# instead. See: https://bugs.python.org/issue33792
policy = asyncio.get_event_loop_policy()
policy._loop_factory = asyncio.SelectorEventLoop

async def run_tasks(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, write_async)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)


# Example 2
import time

async def slow_coroutine():
    time.sleep(0.5)  # Simulating slow I/O

asyncio.run(slow_coroutine(), debug=True)


# Example 3
from threading import Thread

class WriteThread(Thread):
    def __init__(self, output_path):
        super().__init__()
        self.output_path = output_path
        self.output = None
        self.loop = asyncio.new_event_loop()

    def run(self):
        asyncio.set_event_loop(self.loop)
        with open(self.output_path, 'wb') as self.output:
            self.loop.run_forever()

        # Run one final round of callbacks so the await on
        # stop() in another event loop will be resolved.
        self.loop.run_until_complete(asyncio.sleep(0))

    async def real_write(self, data):
        self.output.write(data)

    async def write(self, data):
        coro = self.real_write(data)
        future = asyncio.run_coroutine_threadsafe(
            coro, self.loop)
        await asyncio.wrap_future(future)


    async def real_stop(self):
        self.loop.stop()

    async def stop(self):
        coro = self.real_stop()
        future = asyncio.run_coroutine_threadsafe(
            coro, self.loop)
        await asyncio.wrap_future(future)


    async def __aenter__(self):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.start)
        return self

    async def __aexit__(self, *_):
        await self.stop()


class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0, 2)
    length = handle.tell()

    if length == offset:
        raise NoNewData

    handle.seek(offset, 0)
    return handle.readline()

async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(
                None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)

async def run_fully_async(handles, interval, output_path):
    async with WriteThread(output_path) as output:
        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, output.write)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)


# This is all code to simulate the writers to the handles
import collections
import os
import random
import string
from tempfile import TemporaryDirectory

def write_random_data(path, write_count, interval):
    with open(path, 'wb') as f:
        for i in range(write_count):
            time.sleep(random.random() * interval)
            letters = random.choices(
                string.ascii_lowercase, k=10)
            data = f'{path}-{i:02}-{"".join(letters)}\n'
            f.write(data.encode())
            f.flush()

def start_write_threads(directory, file_count):
    paths = []
    for i in range(file_count):
        path = os.path.join(directory, str(i))
        with open(path, 'w'):
            # Make sure the file at this path will exist when
            # the reading thread tries to poll it.
            pass
        paths.append(path)
        args = (path, 10, 0.1)
        thread = Thread(target=write_random_data, args=args)
        thread.start()
    return paths

def close_all(handles):
    time.sleep(1)
    for handle in handles:
        handle.close()

def setup():
    tmpdir = TemporaryDirectory()
    input_paths = start_write_threads(tmpdir.name, 5)

    handles = []
    for path in input_paths:
        handle = open(path, 'rb')
        handles.append(handle)

    Thread(target=close_all, args=(handles,)).start()

    output_path = os.path.join(tmpdir.name, 'merged')
    return tmpdir, input_paths, handles, output_path


def confirm_merge(input_paths, output_path):
    found = collections.defaultdict(list)
    with open(output_path, 'rb') as f:
        for line in f:
            for path in input_paths:
                if line.find(path.encode()) == 0:
                    found[path].append(line)

    expected = collections.defaultdict(list)
    for path in input_paths:
        with open(path, 'rb') as f:
            expected[path].extend(f.readlines())

    for key, expected_lines in expected.items():
        found_lines = found[key]
        assert expected_lines == found_lines

input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

asyncio.run(run_fully_async(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

tmpdir.cleanup()


标签:python,self,write,并发,output,path,loop,asyncio
From: https://blog.51cto.com/u_15302822/11882933

相关文章

  • python并发与并行(十) ———— 结合线程与协程,将代码顺利迁移到asyncio
    在前一篇中,我们用asyncio模块把通过线程来执行阻塞式I/O的TCP服务器迁移到了协程方案上面。当时我们一下子就完成了迁移,而没有分成多个步骤,这对于大型的项目来说,并不常见。如果项目比较大,那通常需要一点一点地迁移,也就是要边改边测,确保迁移过去的这一部分代码的效果跟原来相同。为......
  • python并发与并行(九) ———— 用asyncio改写通过线程实现的IO
    知道了协程的好处之后,我们可能就想把现有项目之中的代码全都改用协程来写,于是有人就担心,这样修改起来,工作量会不会比较大呢?所幸Python已经将异步执行功能很好地集成到语言里面了,所以我们很容易就能把采用线程实现的阻塞式I/O操作转化为采用协程实现的异步I/O操作。在这里我们要补充......
  • python并发与并行(八) ———— 用协程实现高并发的I/O
    在前面几条里,我们以生命游戏为例,试着用各种方案解决I/O并行问题,这些方案在某些情况下确实可行,但如果同时需要执行的I/O任务有成千上万个,那么这些方案的效率就不太理想了像这种在并发方面要求比较高的I/O需求,可以用Python的协程(coroutine)来解决。协程能够制造出一种效果,让我们觉得Py......
  • python并发与并行(五.1) ———— 不要在每次fan-out时都新建一批Thread实例
    我们使用康威生命游戏的例子来解释这个专题。首先我们要实现一个康威生命游戏。这是个经典的有限状态自动机。它的规则很简单:在任意长宽的二维网格中,每个单元格都必须处于ALIVE或EMPTY状态,前者表示这个单元格里有生命存在,后者表示这里没有生物(或者原有生物已经死亡)。时钟每走一格......
  • Selenium+Python自动化测试环境搭建
    1.什么是Selenium?        Selenium主要用于web应用程序的自动化测试,但并不局限于此,它还支持所有基于web的管理任务自动化。2、selenium自动化流程如下:自动化程序调用Selenium客户端库函数(比如点击按钮元素)客户端库会发送Selenium命令给浏览器的驱动程序浏览......
  • 【Python-办公自动化】1秒解决海量查找替换难题
    欢迎来到"花花ShowPython",一名热爱编程和分享知识的技术博主。在这里,我将与您一同探索Python的奥秘,分享编程技巧、项目实践和学习心得。无论您是编程新手还是资深开发者,都能在这里找到有价值的信息和灵感。自我介绍:我热衷于将复杂的技术概念以简单易懂的方式呈现给大家,......
  • 02python
    1.布尔类型和比较运算符1.1布尔(bool)类型布尔(bool)表达现实生活中的逻辑,即真和假:True表示真;False表示假。True本质上是一个数字记作1,False记作01.1.1布尔类型字面量True表示真(是、肯定)False表示假(否、否定)1.1.2定义变量存储布尔类型数据变量名称=布尔类型字面量布尔类型不......
  • A-计算机毕业设计定制:10508民大校园美食推荐系统的设计与实现(免费领源码)可做计算机毕
    摘要 随着数字化时代的到来,校园美食推荐系统的设计与实现具有重要意义。针对民大校园中商家、普通用户和管理员之间的信息交互和服务需求,开发这样一个系统能够有效促进校园内美食资源的共享和利用,提供美食介绍和美食推荐的渠道,提高校园内美食行业的服务水平,增强校园内外用户......
  • A-计算机毕业设计定制:18099居家养老服务系统(免费领源码)可做计算机毕业设计JAVA、PHP
    摘  要1绪论1.1研究背景1.2研究意义1.3主要研究内容1.4论文章节安排2 相关技术介绍2.1Node.JS编程语言2.2MySQL数据库3 系统分析3.1可行性分析3.1.1技术可行性分析3.1.2经济可行性分析3.1.3操作可行性分析3.2系统流程分析3.2.1 ......
  • [开题报告]flask框架的殡仪馆信息管理系统设计与实现(python+程序+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着社会的进步和人口老龄化的加剧,殡葬服务行业面临着前所未有的挑战与机遇。传统的手工记录与管理方式已难以满足现代殡仪馆高效、规范、......