首页 > 编程语言 >python并发与并行(十) ———— 结合线程与协程,将代码顺利迁移到asyncio

python并发与并行(十) ———— 结合线程与协程,将代码顺利迁移到asyncio

时间:2024-08-31 15:27:47浏览次数:6  
标签:paths handle 协程 python write handles 线程 path output


在前一篇中,我们用asyncio模块把通过线程来执行阻塞式I/O的TCP服务器迁移到了协程方案上面。当时我们一下子就完成了迁移,而没有分成多个步骤,这对于大型的项目来说,并不常见。如果项目比较大,那通常需要一点一点地迁移,也就是要边改边测,确保迁移过去的这一部分代码的效果跟原来相同。

为了能够分步骤地迁移,必须让采用线程做阻塞式I/O的那些代码能够与采用协程做异步I/O的代码相互兼容。具体来说,这要求我们既能够在线程里面执行协程,又能够在协程里面启动线程并等待运行结果。好在asyncio模块已经内置了相关的机制,让线程与协程可以顺利地操作对方。

例如,现在要写一个程序,把许多份日志文件合并成一条输出流,以便我们调试程序。给定日志文件的句柄之后,我们得想办法判断有没有新数据到来,如果有,就返回下一行输入内容。为了实现这项功能,可以调用文件句柄的tell方法,并判断当前读取到的这个位置是否为文件中的最后一个位置。若是,就说明没有新数据,这时应该抛出异常。

class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0, 2)
    length = handle.tell()

    if length == offset:
        raise NoNewData

    handle.seek(offset, 0)
    return handle.readline()

offset = handle.tell(): 使用 handle.tell() 获取当前文件句柄的读取位置(即当前的偏移量)

handle.seek(0, 2): 使用 handle.seek(0, 2) 将文件句柄的读取位置移动到文件末尾(2 表示从文件末尾开始的偏移量)

length = handle.tell(): 再次使用 handle.tell() 获取文件末尾的偏移量,即文件的总长度

if length == offset: 如果文件末尾的偏移量与之前的 offset 相同,这意味着从上次读取后没有新数据被写入文件

这个函数可以封装在while循环里,这样就能够打造一条工作线程。如果出现了新的数据行,那就通过用户传来的write_func回调函数把这行数据写到输出日志里面。如果没有新数据,那么线程就先睡眠一段时间,然后再执行下一轮while循环,而不是频繁地在这里查询是否有新数据出现。要是输入文件的句柄关闭了,那么工作线程就退出while循环。

import time

def tail_file(handle, interval, write_func):
    while not handle.closed:
        try:
            line = readline(handle)
        except NoNewData:
            time.sleep(interval)
        else:
            write_func(line)

这个函数,执行try后,如果try没有抛出异常,就会执行else中的内容。这个 tail_file 函数的工作原理是持续监控文件句柄 handle,一旦有新数据写入(即 readline 函数能够读取到数据),就调用 write_func 来处理这些数据。如果在指定的时间间隔内没有新数据,函数会等待然后再次尝试读取。这种方式非常适合实时日志监控等场景,可以确保不错过任何更新。

现在,我们给每一份输入文件都启动一条工作线程,并把这些线程所输出的内容合起来放到一份输出文件里面。为此,要定义write这个辅助函数,并让它在给输出流写入数据之前,先使用lock实例加锁,这样才能使这些工作线程有秩序地输出,而不会出现某条线程写了一半就被另一条线程打断的情况。

from threading import Lock, Thread

def run_threads(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        lock = Lock()
        def write(data):
            with lock:
                output.write(data)

        threads = []
        for handle in handles:
            args = (handle, interval, write)
            thread = Thread(target=tail_file, args=args)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

只要输入文件的句柄处于开启状态,相应的工作线程就不会退出。反过来说,这条线程要是退出了,那就意味着有人把那份文件的句柄关了。于是,只需要等待所有的线程都完工,就可以确定这些文件的句柄已经全部关闭。

我们可以定义这样一个confirm_merge函数,让它判断刚才那个run_threads函数能不能把许多份输入文件(input_paths)正确地合并到同一份输出文件(output_path)里面。

import collections
import os
import random
import string
from tempfile import TemporaryDirectory

def write_random_data(path, write_count, interval):
    with open(path, 'wb') as f:
        for i in range(write_count):
            time.sleep(random.random() * interval)
            letters = random.choices(
                string.ascii_lowercase, k=10)
            data = f'{path}-{i:02}-{"".join(letters)}\n'
            f.write(data.encode())
            f.flush()

def start_write_threads(directory, file_count):
    paths = []
    for i in range(file_count):
        path = os.path.join(directory, str(i))
        with open(path, 'w'):
            # Make sure the file at this path will exist when
            # the reading thread tries to poll it.
            pass
        paths.append(path)
        args = (path, 10, 0.1)
        thread = Thread(target=write_random_data, args=args)
        thread.start()
    return paths

def close_all(handles):
    time.sleep(1)
    for handle in handles:
        handle.close()

def setup():
    tmpdir = TemporaryDirectory()
    input_paths = start_write_threads(tmpdir.name, 5)

    handles = []
    for path in input_paths:
        handle = open(path, 'rb')
        handles.append(handle)

    Thread(target=close_all, args=(handles,)).start()

    output_path = os.path.join(tmpdir.name, 'merged')
    return tmpdir, input_paths, handles, output_path


# Example 5
def confirm_merge(input_paths, output_path):
    found = collections.defaultdict(list)
    with open(output_path, 'rb') as f:
        for line in f:
            for path in input_paths:
                if line.find(path.encode()) == 0:
                    found[path].append(line)

    expected = collections.defaultdict(list)
    for path in input_paths:
        with open(path, 'rb') as f:
            expected[path].extend(f.readlines())

    for key, expected_lines in expected.items():
        found_lines = found[key]
        assert expected_lines == found_lines, \
            f'{expected_lines!r} == {found_lines!r}'

input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

run_threads(handles, 0.1, output_path)

confirm_merge(input_paths, output_path)

tmpdir.cleanup()

完整代码如下:

# Example 1
class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0, 2)
    length = handle.tell()

    if length == offset:
        raise NoNewData

    handle.seek(offset, 0)
    return handle.readline()


# Example 2
import time

def tail_file(handle, interval, write_func):
    while not handle.closed:
        try:
            line = readline(handle)
        except NoNewData:
            time.sleep(interval)
        else:
            write_func(line)


# Example 3
from threading import Lock, Thread

def run_threads(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        lock = Lock()
        def write(data):
            with lock:
                output.write(data)

        threads = []
        for handle in handles:
            args = (handle, interval, write)
            thread = Thread(target=tail_file, args=args)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()


# Example 4
# This is all code to simulate the writers to the handles
import collections
import os
import random
import string
from tempfile import TemporaryDirectory

def write_random_data(path, write_count, interval):
    with open(path, 'wb') as f:
        for i in range(write_count):
            time.sleep(random.random() * interval)
            letters = random.choices(
                string.ascii_lowercase, k=10)
            data = f'{path}-{i:02}-{"".join(letters)}\n'
            f.write(data.encode())
            f.flush()

def start_write_threads(directory, file_count):
    paths = []
    for i in range(file_count):
        path = os.path.join(directory, str(i))
        with open(path, 'w'):
            # Make sure the file at this path will exist when
            # the reading thread tries to poll it.
            pass
        paths.append(path)
        args = (path, 10, 0.1)
        thread = Thread(target=write_random_data, args=args)
        thread.start()
    return paths

def close_all(handles):
    time.sleep(1)
    for handle in handles:
        handle.close()

def setup():
    tmpdir = TemporaryDirectory()
    input_paths = start_write_threads(tmpdir.name, 5)

    handles = []
    for path in input_paths:
        handle = open(path, 'rb')
        handles.append(handle)

    Thread(target=close_all, args=(handles,)).start()

    output_path = os.path.join(tmpdir.name, 'merged')
    return tmpdir, input_paths, handles, output_path


# Example 5
def confirm_merge(input_paths, output_path):
    found = collections.defaultdict(list)
    with open(output_path, 'rb') as f:
        for line in f:
            for path in input_paths:
                if line.find(path.encode()) == 0:
                    found[path].append(line)

    expected = collections.defaultdict(list)
    for path in input_paths:
        with open(path, 'rb') as f:
            expected[path].extend(f.readlines())

    for key, expected_lines in expected.items():
        found_lines = found[key]
        if expected_lines == found_lines:
            print("pass")
        # assert expected_lines == found_lines, \
        #     f'{expected_lines!r} == {found_lines!r}'

input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

run_threads(handles, 0.1, output_path)

confirm_merge(input_paths, output_path)

tmpdir.cleanup()

这是一套用线程制作出来的方案,我们现在要从这里开始,把它逐渐转换为一套采用asyncio与协程实现的方案。转换思路有两个,要么从上往下转,要么从下往上转。

从上往下转,必须由整个项目的最高点(例如main入口点)开始。把这一部分改掉之后,再沿着调用体系向下,去转换它所调用的那些函数与类。如果项目维护的是一批常用模块,而且有许多程序都会用到这些模块,那么就比较适合从上往下转换。首先要把入口点转换过来,至于那些模块,可以等其他地方全都迁移到协程之后,再去移植。

具体步骤为:

1)修改当前的顶层函数,把声明方式从def改成async def。

2)把这个函数所要执行的I/O调用(也就是有可能阻塞事件循环的那些调用)全都用asyncio.run_in_executor封装起来。

3)确保run_in_executor所使用的资源以及它所触发的回调函数都经过适当的同步处理(这需要通过Lock或asyncio.run_coroutine_threadsafe函数来实现)。

4)沿着调用体系向下走,按照刚才那三个步骤将当前函数所调用到的其他函数与方法迁移到协程方案上面,看看这样迁移能不能把get_event_loop与run_in_executor从目前这一层里面拿掉。

下面,我们按照上面提到的第1至3步来修改run_threads函数.

import asyncio


async def run_tasks_mixed(handles, interval, output_path):
    loop = asyncio.get_event_loop()

    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)
            
        # 这个函数用于从同步执行环境调用异步写入函数。它使用 asyncio.run_coroutine_threadsafe 来安全地从另一个线程执行异步代码。
        def write(data):
            coro = write_async(data)
            future = asyncio.run_coroutine_threadsafe(
                coro, loop)
            future.result()

        tasks = []
        for handle in handles:
            task = loop.run_in_executor(
                None, tail_file, handle, interval, write)
            tasks.append(task)

        await asyncio.gather(*tasks)

在上面的代码中,loop = asyncio.get_event_loop()的解释

asyncio 模块中,get_event_loop() 函数用于获取当前线程的事件循环。事件循环是 asyncio 编程的核心,它负责处理所有异步操作和事件的调度。

以下是 loop = asyncio.get_event_loop() 这行代码的详细解释:

  • 获取当前线程的事件循环:
    get_event_loop() 函数检查当前线程是否已经有一个活动事件循环。如果有,它返回这个循环;如果没有,它将创建一个新的事件循环,并将其与当前线程关联。
  • 将事件循环赋值给变量 loop:
    通过将返回的事件循环赋值给变量 loop,你可以在代码中引用这个循环,执行各种操作,比如创建任务、调度回调、运行在执行器中的同步函数等。

这个函数的使用场景包括:

  • 在旧版本的 Python 中(3.6 及以下),你需要手动创建和关闭事件循环。在这种情况下,get_event_loop() 可以用来获取或创建循环,并在完成所有异步操作后使用 loop.close() 来关闭循环。
  • 在 Python 3.7+ 中,asyncio.run() 被引入,它创建了一个新的事件循环,运行传入的协程,然后关闭循环。在这种情况下,get_event_loop() 主要用于与旧代码兼容,或者在需要手动管理事件循环时使用。

请注意,在 Python 3.7+ 中,推荐使用 asyncio.run() 来执行异步程序,因为它会自动处理事件循环的创建和关闭。但在某些情况下,你可能仍然需要手动获取事件循环,比如在与使用旧的 asyncio API 的库进行交互时。

asyncio.run_coroutine_threadsafe 函数是 Python asyncio 模块提供的一个便利函数,它用于将一个协程(coroutine)安排到一个事件循环上,但是这个调用是线程安全的。这意味着你可以从任何线程调用这个函数,而不仅仅是事件循环所在的线程。

下面是对 asyncio.run_coroutine_threadsafe(coro, loop) 这行代码的详细解释:

  • coro: 这是要运行的协程对象。它应该是一个 async def 函数的实例。
  • loop: 这是要将协程安排到其上运行的事件循环。如果你不指定 looprun_coroutine_threadsafe 将默认使用当前线程的事件循环。

函数的行为如下:

  1. 线程安全地调度协程: 这个函数将协程安排到指定的事件循环上执行,同时确保这个过程是线程安全的。这意味着你可以从非事件循环所在的线程调用这个函数,而不会违反事件循环的使用规则。
  2. 返回一个 Future 对象: 函数返回一个 Future 对象,这个对象代表了协程的运行结果。你可以使用 Future 对象来查询协程的状态(比如,是否完成或被取消),以及等待协程的结果。
  3. 错误处理: 如果协程抛出异常,这个异常将被捕获并设置到返回的 Future 对象上。你可以使用 Futureresult() 方法来获取结果,如果协程抛出异常,这个方法将重新抛出那个异常。
  4. 使用场景: 这个函数通常用在需要从同步函数或线程中触发异步操作的情况。例如,你可能有一个在线程中运行的同步函数,需要启动一个协程,但是你不能直接在那个线程中创建事件循环。使用 run_coroutine_threadsafe 可以避免直接与事件循环交互的复杂性。

在上面的代码片段中,future = asyncio.run_coroutine_threadsafe(coro, loop) 这行代码的作用是将协程 coro 安全地调度到事件循环 loop 上执行,并存储返回的 Future 对象到变量 future 中。之后,你可以使用 future.result() 来获取协程的返回值或处理异常。

这段代码通过run_in_executor方法命令事件循环采用特定的ThreadPoolExecutor来执行特定的函数,在本例中,这个函数指tail_file。ThreadPoolExecutor要通过方法的第一个参数来指定,如果第一个参数为None,就采用默认的executor实例。run_tasks_mixed协程针对每一份输入文件都调用一次run_in_executor方法以执行相应的任务,而且run_in_executor前面不加await,这样能够形成许多条并发的执行路径,从而实现任务fan-out(分派)。然后,该方法通过asyncio.gather函数收集这些任务的执行结果,这个函数会等待所有的tail_file线程都完工,从而实现成果fan-in(归集),这次函数前面要加上await。

这段代码在实现辅助函数write时,不需要再通过Lock实例加锁了,因为它是用asyncio.run_coroutine_threadsafe函数来提交写入操作的。这个负责执行写入操作的write_async协程,无论由哪一条工作线程提交,最终都会安排到事件循环(即loop)里面执行(这个事件循环一般位于主线程之中,如果有必要,也可以放在其他某条线程里面)。这些协程全都是放在同一条线程里面执行的,因此,这实际上意味着,它们对输出文件所做的写入操作本身就会有秩序地执行,而不会出现相互重叠的情况。只要asyncio.gather有了执行结果,我们就可以认定,对输出文件所做的那些写入操作全都已经执行完毕了,于是,我们可以放心地让with结构把表示输出文件的那个output句柄关掉,而不用担心其中是不是有写入操作还没执行完。

现在看看修改之后的代码,能不能实现预期的效果。我们通过asyncio.run启动run_tasks_mixed协程,把事件循环放在主线程之中运行。

input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

asyncio.run(run_tasks_mixed(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

tmpdir.cleanup()

完整代码:

# Example 1
class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0, 2)
    length = handle.tell()

    if length == offset:
        raise NoNewData

    handle.seek(offset, 0)
    return handle.readline()


# Example 2
import time

def tail_file(handle, interval, write_func):
    while not handle.closed:
        try:
            line = readline(handle)
        except NoNewData:
            time.sleep(interval)
        else:
            write_func(line)


# Example 3
from threading import Lock, Thread

def run_threads(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        lock = Lock()
        def write(data):
            with lock:
                output.write(data)

        threads = []
        for handle in handles:
            args = (handle, interval, write)
            thread = Thread(target=tail_file, args=args)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()


# Example 4
# This is all code to simulate the writers to the handles
import collections
import os
import random
import string
from tempfile import TemporaryDirectory

def write_random_data(path, write_count, interval):
    with open(path, 'wb') as f:
        for i in range(write_count):
            time.sleep(random.random() * interval)
            letters = random.choices(
                string.ascii_lowercase, k=10)
            data = f'{path}-{i:02}-{"".join(letters)}\n'
            f.write(data.encode())
            f.flush()

def start_write_threads(directory, file_count):
    paths = []
    for i in range(file_count):
        path = os.path.join(directory, str(i))
        with open(path, 'w'):
            # Make sure the file at this path will exist when
            # the reading thread tries to poll it.
            pass
        paths.append(path)
        args = (path, 10, 0.1)
        thread = Thread(target=write_random_data, args=args)
        thread.start()
    return paths

def close_all(handles):
    time.sleep(1)
    for handle in handles:
        handle.close()

def setup():
    tmpdir = TemporaryDirectory()
    input_paths = start_write_threads(tmpdir.name, 5)

    handles = []
    for path in input_paths:
        handle = open(path, 'rb')
        handles.append(handle)

    Thread(target=close_all, args=(handles,)).start()

    output_path = os.path.join(tmpdir.name, 'merged')
    return tmpdir, input_paths, handles, output_path


# Example 5
def confirm_merge(input_paths, output_path):
    found = collections.defaultdict(list)
    with open(output_path, 'rb') as f:
        for line in f:
            for path in input_paths:
                if line.find(path.encode()) == 0:
                    found[path].append(line)

    expected = collections.defaultdict(list)
    for path in input_paths:
        with open(path, 'rb') as f:
            expected[path].extend(f.readlines())

    for key, expected_lines in expected.items():
        found_lines = found[key]
        if expected_lines == found_lines:
            print("pass")
        # assert expected_lines == found_lines, \
        #     f'{expected_lines!r} == {found_lines!r}'

# input_paths = ...
# handles = ...
# output_path = ...
#
# tmpdir, input_paths, handles, output_path = setup()
#
# run_threads(handles, 0.1, output_path)
#
# confirm_merge(input_paths, output_path)
#
# tmpdir.cleanup()

import asyncio

async def run_tasks_mixed(handles, interval, output_path):
    loop = asyncio.get_event_loop()

    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        def write(data):
            coro = write_async(data)
            future = asyncio.run_coroutine_threadsafe(
                coro, loop)
            future.result()

        tasks = []
        for handle in handles:
            task = loop.run_in_executor(
                None, tail_file, handle, interval, write)
            tasks.append(task)

        await asyncio.gather(*tasks)


# Example 7
input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

asyncio.run(run_tasks_mixed(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

tmpdir.cleanup()

output:

pass
pass
pass
pass
pass

按照前面三步修改之后,我们遵循第4步继续修改。这一步要求我们沿着调用栈向下走,把本函数所调用的其他函数,也按照前面三步来改写。于是,我们就针对run_tasks_mixed所依赖的tail_file,运用第1至第3步,把它由一个执行阻塞式I/O的普通函数变成一个异步的协程。

async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(
                None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)

将tail_file函数改为tail_async协程之后,就可以把原来为了执行该操作而使用的get_event_loop与run_in_executor从顶层函数run_tasks里面往下推,把它们放到调用栈的下一层。在这个例子之中,它们下沉到了tail_async里面,于是我们可以把顶层函数中的那两条对应语句删掉。现在的顶层函数,变得好懂多了。

async def run_tasks(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, write_async)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)

大家还可以继续重构,也就是在tail_async里面继续往下探查,把它所依赖的readline函数也按照早前讲的那三步转换为协程。但是,那个函数的任务本身就是执行许多项阻塞式的文件I/O操作,所以好像没必要移植,因为我们在判断是否需要移植时,应该考虑到,这样做会不会让代码变得难懂,会不会降低程序的效率。有的时候,所有代码都应该迁移到asyncio,但另一些场合则没必要这么做。

我们看看这次的run_tasks改得对不对。

完整代码为:

# Example 1
class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0, 2)
    length = handle.tell()

    if length == offset:
        raise NoNewData

    handle.seek(offset, 0)
    return handle.readline()


# Example 2
import time

def tail_file(handle, interval, write_func):
    while not handle.closed:
        try:
            line = readline(handle)
        except NoNewData:
            time.sleep(interval)
        else:
            write_func(line)


# Example 3
from threading import Lock, Thread

def run_threads(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        lock = Lock()
        def write(data):
            with lock:
                output.write(data)

        threads = []
        for handle in handles:
            args = (handle, interval, write)
            thread = Thread(target=tail_file, args=args)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()


# Example 4
# This is all code to simulate the writers to the handles
import collections
import os
import random
import string
from tempfile import TemporaryDirectory

def write_random_data(path, write_count, interval):
    with open(path, 'wb') as f:
        for i in range(write_count):
            time.sleep(random.random() * interval)
            letters = random.choices(
                string.ascii_lowercase, k=10)
            data = f'{path}-{i:02}-{"".join(letters)}\n'
            f.write(data.encode())
            f.flush()

def start_write_threads(directory, file_count):
    paths = []
    for i in range(file_count):
        path = os.path.join(directory, str(i))
        with open(path, 'w'):
            # Make sure the file at this path will exist when
            # the reading thread tries to poll it.
            pass
        paths.append(path)
        args = (path, 10, 0.1)
        thread = Thread(target=write_random_data, args=args)
        thread.start()
    return paths

def close_all(handles):
    time.sleep(1)
    for handle in handles:
        handle.close()

def setup():
    tmpdir = TemporaryDirectory()
    input_paths = start_write_threads(tmpdir.name, 5)

    handles = []
    for path in input_paths:
        handle = open(path, 'rb')
        handles.append(handle)

    Thread(target=close_all, args=(handles,)).start()

    output_path = os.path.join(tmpdir.name, 'merged')
    return tmpdir, input_paths, handles, output_path


# Example 5
def confirm_merge(input_paths, output_path):
    found = collections.defaultdict(list)
    with open(output_path, 'rb') as f:
        for line in f:
            for path in input_paths:
                if line.find(path.encode()) == 0:
                    found[path].append(line)

    expected = collections.defaultdict(list)
    for path in input_paths:
        with open(path, 'rb') as f:
            expected[path].extend(f.readlines())

    for key, expected_lines in expected.items():
        found_lines = found[key]
        if expected_lines == found_lines:
            print("pass")
        # assert expected_lines == found_lines, \
        #     f'{expected_lines!r} == {found_lines!r}'



import asyncio

# Example 8
async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(
                None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)


# Example 9
async def run_tasks(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, write_async)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)


# Example 10
input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

asyncio.run(run_tasks(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

tmpdir.cleanup()

output:

pass
pass
pass
pass
pass

asyncio模块的事件循环提供了一个返回awaitable对象的run_in_executor方法,它能够使协程把同步函数放在线程池执行器(ThreadPoolExecutor)里面执行,让我们可以顺利地将采用线程方案所实现的项目,从上至下地迁移到asyncio方案。

asyncio模块的事件循环还提供了一个可以在同步代码里面调用的run_until_complete方法,用来运行协程并等待其结束。它的功能跟asyncio.run_coroutine_threadsafe类似,只是后者面对的是跨线程的场合,而前者是为同一个线程设计的。这些都有助于将采用线程方案所实现的项目从下至上地迁移到asyncio方案。


标签:paths,handle,协程,python,write,handles,线程,path,output
From: https://blog.51cto.com/u_15302822/11882934

相关文章

  • python并发与并行(九) ———— 用asyncio改写通过线程实现的IO
    知道了协程的好处之后,我们可能就想把现有项目之中的代码全都改用协程来写,于是有人就担心,这样修改起来,工作量会不会比较大呢?所幸Python已经将异步执行功能很好地集成到语言里面了,所以我们很容易就能把采用线程实现的阻塞式I/O操作转化为采用协程实现的异步I/O操作。在这里我们要补充......
  • python并发与并行(八) ———— 用协程实现高并发的I/O
    在前面几条里,我们以生命游戏为例,试着用各种方案解决I/O并行问题,这些方案在某些情况下确实可行,但如果同时需要执行的I/O任务有成千上万个,那么这些方案的效率就不太理想了像这种在并发方面要求比较高的I/O需求,可以用Python的协程(coroutine)来解决。协程能够制造出一种效果,让我们觉得Py......
  • python并发与并行(五.1) ———— 不要在每次fan-out时都新建一批Thread实例
    我们使用康威生命游戏的例子来解释这个专题。首先我们要实现一个康威生命游戏。这是个经典的有限状态自动机。它的规则很简单:在任意长宽的二维网格中,每个单元格都必须处于ALIVE或EMPTY状态,前者表示这个单元格里有生命存在,后者表示这里没有生物(或者原有生物已经死亡)。时钟每走一格......
  • Selenium+Python自动化测试环境搭建
    1.什么是Selenium?        Selenium主要用于web应用程序的自动化测试,但并不局限于此,它还支持所有基于web的管理任务自动化。2、selenium自动化流程如下:自动化程序调用Selenium客户端库函数(比如点击按钮元素)客户端库会发送Selenium命令给浏览器的驱动程序浏览......
  • 【Python-办公自动化】1秒解决海量查找替换难题
    欢迎来到"花花ShowPython",一名热爱编程和分享知识的技术博主。在这里,我将与您一同探索Python的奥秘,分享编程技巧、项目实践和学习心得。无论您是编程新手还是资深开发者,都能在这里找到有价值的信息和灵感。自我介绍:我热衷于将复杂的技术概念以简单易懂的方式呈现给大家,......
  • 02python
    1.布尔类型和比较运算符1.1布尔(bool)类型布尔(bool)表达现实生活中的逻辑,即真和假:True表示真;False表示假。True本质上是一个数字记作1,False记作01.1.1布尔类型字面量True表示真(是、肯定)False表示假(否、否定)1.1.2定义变量存储布尔类型数据变量名称=布尔类型字面量布尔类型不......
  • A-计算机毕业设计定制:10508民大校园美食推荐系统的设计与实现(免费领源码)可做计算机毕
    摘要 随着数字化时代的到来,校园美食推荐系统的设计与实现具有重要意义。针对民大校园中商家、普通用户和管理员之间的信息交互和服务需求,开发这样一个系统能够有效促进校园内美食资源的共享和利用,提供美食介绍和美食推荐的渠道,提高校园内美食行业的服务水平,增强校园内外用户......
  • A-计算机毕业设计定制:18099居家养老服务系统(免费领源码)可做计算机毕业设计JAVA、PHP
    摘  要1绪论1.1研究背景1.2研究意义1.3主要研究内容1.4论文章节安排2 相关技术介绍2.1Node.JS编程语言2.2MySQL数据库3 系统分析3.1可行性分析3.1.1技术可行性分析3.1.2经济可行性分析3.1.3操作可行性分析3.2系统流程分析3.2.1 ......
  • [开题报告]flask框架的殡仪馆信息管理系统设计与实现(python+程序+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着社会的进步和人口老龄化的加剧,殡葬服务行业面临着前所未有的挑战与机遇。传统的手工记录与管理方式已难以满足现代殡仪馆高效、规范、......
  • [开题报告]flask框架的毕业生求职系统的设计与实现k2r16(程序+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着高等教育的普及和就业市场的日益竞争,毕业生在求职过程中面临着信息获取不对称、求职渠道有限、面试流程繁琐等挑战。传统的求职方式往......