首页 > 其他分享 >asyncio Queue和Semaphore的结合使用

asyncio Queue和Semaphore的结合使用

时间:2024-07-26 18:30:08浏览次数:13  
标签:task 协程 process Queue Semaphore sem data asyncio

import asyncio

# 假设这是你的大数据集
large_data_set = range(1000000)  # 用1到1000000的数字模拟大数据集

# 任务队列
task_queue = asyncio.Queue()

# 并发限制
sem = asyncio.Semaphore(10)

# 任务处理函数
async def process_data(sem, q):
    while True:
        # 从队列中获取任务
        data = await q.get()
        if data is None:
            # 收到None,表示没有更多任务了
            break
        # 处理任务
        await process_task(data, sem)
        q.task_done()

async def process_task(data, sem):
    async with sem:
        # 模拟异步处理数据
        await asyncio.sleep(0.01)
        print(f"Processed data: {data}")

async def main():
    # 启动工作协程
    workers = [asyncio.create_task(process_data(sem, task_queue)) for _ in range(10)]

    # 将数据分批添加到队列
    for data in large_data_set:
        await task_queue.put(data)

    # 等待队列被完全消费
    await task_queue.join()

    # 停止工作协程
    for _ in workers:
        task_queue.put_nowait(None)
    await asyncio.gather(*workers)

asyncio.run(main())

在这个示例中:

process_data函数是一个协程,它不断地从队列中取出数据并处理。
process_task函数是一个受Semaphore限制的协程,用于实际处理数据。
main函数负责将数据添加到队列,并启动工作协程。
这种方法允许你以可控的方式处理大量数据,而不会一次性创建大量任务。你可以根据系统资源和任务特性调整并发数量。此外,通过使用None作为结束信号,你可以优雅地关闭工作协程。

标签:task,协程,process,Queue,Semaphore,sem,data,asyncio
From: https://www.cnblogs.com/wangshx666/p/18326000

相关文章

  • 225. Implement Stack using Queues
    Implementalast-in-first-out(LIFO)stackusingonlytwoqueues.Theimplementedstackshouldsupportallthefunctionsofanormalstack(push,top,pop,andempty).ImplementtheMyStackclass:voidpush(intx)Pusheselementxtothetopofthestack.......
  • 手写Semaphore信号量
    publicclassMySemaphore{privateSyncsync;publicMySemaphore(intcount){sync=newSync(count);}publicvoidacquire(){sync.acquireShared(1);}publicvoidrelease(){sync.releaseShared(1);......
  • 队列(queue)与优先队列(priority_queue)
    队列在数据结构中也称为操作受限的线性表,是一种只允许在表的一端插入,在另一端删除的线性表。相关的几个概念1)队尾rear:插入端,线性表的表尾;2)队首front:删除端,线性表的表头;3)空队列:当队列中没有任何元素时,称为空队列。特点先进先出(FIFO,firstinfirstout),就像在食堂排队买......
  • [atcoder utpc2023_p] Priority Queue 3
    PriorityQueue3题意:有一个小根堆和\(1\)~\(n\)个数,以及一个操作序列,+表示\(push\),-表示\(pop\),\(pop\)有\(m\)次,问你有多少种插入顺序使得最后的pop集合与给出的的数字集合\(Y\)相同。首先有个浅显的发现:对于不在\(Y\)集合中的数,可选范围形如一个阶梯,换句话......
  • Java中的优先级队列(PriorityQueue)(如果想知道Java中有关优先级队列的知识点,那么只看这
        前言:优先级队列(PriorityQueue)是一种抽象数据类型,其中每个元素都关联有一个优先级,元素按照优先级顺序进行处理。✨✨✨这里是秋刀鱼不做梦的BLOG✨✨✨想要了解更多内容可以访问我的主页秋刀鱼不做梦-CSDN博客先让我们看一下本文大致的讲解内容:目录1.优......
  • 如何立即取消使用 Ollama Python 库生成答案的 Asyncio 任务?
    我正在使用Ollama通过OllamaPythonAPI从大型语言模型(LLM)生成答案。我想通过单击停止按钮取消响应生成。问题在于,只有当响应生成已经开始打印时,任务取消才会起作用。如果任务仍在处理并准备打印,则取消不起作用,并且无论如何都会打印响应。更具体地说,即使单击按钮后,此函数......
  • asyncio/trio fastdfs python client
    Codets.py#!/usr/bin/envpython"""FastDFS并发测试脚本Usage::$python<me>.py200--show"""importfunctoolsimportitertoolsimportjsonimportosimportpickleimportsysimporttimefrompathlibimportPathfr......
  • PriorityQueue
    PriorityQueue是Java中的一个基于优先级堆的优先队列实现,它能够在O(logn)的时间复杂度内实现元素的插入和删除操作,并且能够自动维护队列中元素的优先级顺序。//传入比较器PriorityQueue<String>priorityQueue=newPriorityQueue<>(Comparator.reverseOrder());add......
  • 数据结构(Java):队列&集合Queue&力扣面试OJ题
    1、队列1.1队列的概念队列是一个特殊的线性表,只允许在一端(队尾)进行插入数据操作,在另一端(对头)进行删除数据。队列具有先进先出FIFO(FirstInFirstOut)的特性。入队:数据只能从队尾进队列    出队:数据只能从对头出队列即:队尾进队头出我们可以把队列想象为一个排队......
  • day10-stack&Queue-part01-7.12
    tasksfortoday:1.理论基础2.232用栈实现队列3.225用队列实现栈4.20有效的括号5.1047删除字符串中所有相邻重复项--------------------------------------------------------------------------1.理论基础stack:firstinlastout     head    ......