首页 > 编程语言 >在 Python 中将 Tqdm 与 Asyncio 结合使用

在 Python 中将 Tqdm 与 Asyncio 结合使用

时间:2023-05-08 23:13:33浏览次数:47  
标签:Tqdm tqdm main Python completed __ total Asyncio asyncio

动动发财的小手,点个赞吧!

简介

困扰

在 Python 中使用并发编程来提高效率对于数据科学家来说并不罕见。在后台观察各种子进程或并发线程以保持我的计算或 IO 绑定任务的顺序总是令人满意的。

但是还有一点困扰我的是,当我在后台并发处理成百上千个文件或者执行成百上千个进程时,我总是担心会不会有几个任务偷偷挂了,整个代码永远跑不完。我也很难知道代码现在在哪里执行。

最糟糕的是,当我看着一个空白屏幕时,很难说出我的代码需要多长时间才能执行或 ETA 是多少。这对我安排工作日程的能力非常不利。

因此,我想要一种方法让我知道代码执行到了哪里。

已有方法

比较传统的做法是任务之间共享一块内存区域,在这块内存区域放一个计数器,当一个任务结束的时候让这个计数器+1,然后用一个线程不停的打印这个计数器的值。

这从来都不是一个好的解决方案:一方面,我需要在你现有的业务逻辑中添加一段用于计数的代码,这违反了“低耦合,高内聚”的原则。另一方面,由于线程安全问题,我必须非常小心锁定机制,这会导致不必要的性能问题。

tqdm

有一天,我发现了 tqdm 库,它使用进度条来可视化我的代码进度。我可以使用进度条来可视化我的 asyncio 任务的完成和预计到达时间吗?

那么本文我把这个方法分享给大家,让每个程序员都有机会监控自己并发任务的进度。

异步

在我们开始之前,我希望您了解一些 Python asyncio 的背景知识。我的文章描述了asyncio的一些常用API的用法,这将有助于我们更好地理解tqdm的设计:

tqdm 概述

如官方网站所述,tqdm 是一个显示循环进度条的工具。它使用简单、高度可定制并且占用资源少。

一个典型的用法是将一个可迭代对象传递给 tqdm 构造函数,然后你会得到一个如下所示的进度条:

from time import sleep
from tqdm import tqdm


def main():
    for _ in tqdm(range(100)):
        # do something in the loop
        sleep(0.1)


if __name__ == "__main__":
    main()

或者您可以在读取文件时手动浏览并更新进度条的进度:

import os
from tqdm import tqdm


def main():
    filename = "../data/large-dataset"
    with (tqdm(total=os.path.getsize(filename)) as bar,
            open(filename, "r", encoding="utf-8") as f):
        for line in f:
            bar.update(len(line))


if __name__ == "__main__":
    main()

将 tqdm 与异步集成

总体而言,tqdm 非常易于使用。但是,GitHub 上需要更多关于将 tqdm 与 asyncio 集成的信息。所以我深入研究了源代码,看看 tqdm 是否支持 asyncio。

幸运的是,最新版本的 tqdm 提供了包 tqdm.asyncio,它提供了类 tqdm_asyncio。

tqdm_asyncio 类有两个相关的方法。一个是 tqdm_asyncio.as_completed。从源码可以看出,它是对asyncio.as_completed的包装:

@classmethod
    def as_completed(cls, fs, *, loop=None, timeout=None, total=None, **tqdm_kwargs):
        """
        Wrapper for `asyncio.as_completed`.
        """
        if total is None:
            total = len(fs)
        kwargs = {}
        if version_info[:2] < (3, 10):
            kwargs['loop'] = loop
        yield from cls(asyncio.as_completed(fs, timeout=timeout, **kwargs),
                       total=total, **tqdm_kwargs)

另一个是 tqdm_asyncio.gather ,从源代码可以看出,它基于模拟 asyncio.gather 功能的 tqdm_asyncio.as_completed 的实现:

@classmethod
    async def gather(cls, *fs, loop=None, timeout=None, total=None, **tqdm_kwargs):
        """
        Wrapper for `asyncio.gather`.
        """
        async def wrap_awaitable(i, f):
            return i, await f

        ifs = [wrap_awaitable(i, f) for i, f in enumerate(fs)]
        res = [await f for f in cls.as_completed(ifs, loop=loop, timeout=timeout,
                                                 total=total, **tqdm_kwargs)]
        return [i for _, i in sorted(res)]

所以,接下来,我将描述这两个API的用法。在开始之前,我们还需要做一些准备工作。在这里,我写了一个简单的方法来模拟一个随机休眠时间的并发任务:

import asyncio
import random

from tqdm.asyncio import tqdm_asyncio


class AsyncException(Exception):
    def __int__(self, message):
        super.__init__(self, message)


async def some_coro(simu_exception=False):
    delay = round(random.uniform(1.0, 5.0), 2)

    # We will simulate throwing an exception if simu_exception is True
    if delay > 4 and simu_exception:
        raise AsyncException("something wrong!")

    await asyncio.sleep(delay)

    return delay

紧接着,我们将创建 2000 个并发任务,然后使用 tqdm_asyncio.gather 而不是熟悉的 asyncio.gather 方法来查看进度条是否正常工作:

async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())
    await tqdm_asyncio.gather(*tasks)

    print(f"All tasks done.")


if __name__ == "__main__":
    asyncio.run(main())

或者让我们用 tqdm_asyncio.as_completed 替换 tqdm_asyncio.gather 并重试:

async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())

    for done in tqdm_asyncio.as_completed(tasks):
        await done

    print(f"The tqdm_asyncio.as_completed also works fine.")


if __name__ == "__main__":
    asyncio.run(main())

本文由mdnice多平台发布

标签:Tqdm,tqdm,main,Python,completed,__,total,Asyncio,asyncio
From: https://www.cnblogs.com/swindler/p/17383455.html

相关文章

  • Python - 并发模型
    importitertoolsimporttimefromthreadingimportThread,Eventdefspin(msg:str,done:Event)->None:forcharinitertools.cycle(r'\|/-'):#1status=f'\r{char}{msg}'#2print(status,end=''......
  • python14:递归函数
    概念:在一个函数内部不调用其他函数,而是自己本身的话,这个函数就是递归函数。优点:看起来简单缺点:容易导致栈溢出,占内存。例子:defjiecheng(n):#result=1#foriteminrange(1,n+1):#result*=item#pass#returnresult##print('结果为:{}'.form......
  • Python + Selenium,分分钟搭建 Web 自动化测试框架!
    在程序员的世界中,一切重复性的工作,都应该通过程序自动执行。「自动化测试」就是一个最好的例子。随着互联网应用开发周期越来越短,迭代速度越来越快,只会点点点,不懂开发的手工测试,已经无法满足如今的业务要求,只能被企业逐步裁员淘汰。「自动化测试和持续测试」就成为了业界主流。......
  • python GUI(beeware) + uiautomator2 实现root后的安卓手机自动执行脚本
    python环境:python3.81:安装beewarebeeware教程:https://docs.beeware.org/en/latest/tutorial/tutorial-2.html2:安装python模块uiautomator23:测试代码代码结构 app.py"""Myfirstapplication"""fromtoga.styleimportPackfrom.dy_dianzanim......
  • 深入理解 python 虚拟机:描述器的王炸应用-property、staticmethod 和 classmehtod
    深入理解python虚拟机:描述器的王炸应用-property、staticmethod和classmehtod在本篇文章当中主要给大家介绍描述器在python语言当中有哪些应用,主要介绍如何使用python语言实现python内置的proterty、staticmethod和classmethod。property当你在编写Python代码......
  • Python实操面试题
    1、一行代码实现1--100之和#利用sum()函数求和sum(range(1,101))2、如何在一个函数内部修改全局变量#利用global在函数声明修改全局变量a=5deffunc(): globalaa=10func()print(a)#结果:103、列出5个python标准库'''os:提供了不少与操作系统......
  • python-Queue队列
    队列Queue提供同步的、线程安全的队列类,可以用于线程之间的线程通信。queue模块实现了多生产者、多消费者队列。这特别适用于消息必须安全地在多线程交换的线程编程。该模块实现了三种类型的队列,它们的区别是任务取回的顺序。在FIFO队列中,先添加任务的先取回。在LIFO队列中,最......
  • 时间序列的STL分解Python代码——以验潮站数据为例
    1.时间序列分解的作用和意义时间序列通常包括如下几种成分:一个时间序列包含三种影响因素: 长期趋势:在一个相当长的时间内表现为一种近似直线的持续向上、向下或平稳的趋势。季节变动:受季节变化影响所形成的一种长度和幅度固定的短期周期波动周期变动:与季节变动类似,但是波动......
  • python-手动借助google翻译来翻译文档
    1importos2importre3'''4读取指定的html文件5去掉所有的换行符6正则匹配特定项目:(?<=<divclass="block">).+?(?=</div>)7然后替换掉:</code>|<code>|<i>|</i>==>""8......
  • Python基础面试题
    1、Python和Java、PHP、C、C#、C++等其他语言的对比?'''1.C语言,它既有高级语言的特点,又具有汇编语言的特点,它是结构式语言。C语言应用指针:可以直接进行靠近硬件的操作,但是C的指针操作不做保护,也给它带来了很多不安全的因素。C++在这方面做了改进,在保留了指针操作的同时又增强......