import asyncio
# 假设这是你的大数据集
large_data_set = range(1000000) # 用1到1000000的数字模拟大数据集
# 任务队列
task_queue = asyncio.Queue()
# 并发限制
sem = asyncio.Semaphore(10)
# 任务处理函数
async def process_data(sem, q):
while True:
# 从队列中获取任务
data = await q.get()
if data is None:
# 收到None,表示没有更多任务了
break
# 处理任务
await process_task(data, sem)
q.task_done()
async def process_task(data, sem):
async with sem:
# 模拟异步处理数据
await asyncio.sleep(0.01)
print(f"Processed data: {data}")
async def main():
# 启动工作协程
workers = [asyncio.create_task(process_data(sem, task_queue)) for _ in range(10)]
# 将数据分批添加到队列
for data in large_data_set:
await task_queue.put(data)
# 等待队列被完全消费
await task_queue.join()
# 停止工作协程
for _ in workers:
task_queue.put_nowait(None)
await asyncio.gather(*workers)
asyncio.run(main())
在这个示例中:
process_data函数是一个协程,它不断地从队列中取出数据并处理。
process_task函数是一个受Semaphore限制的协程,用于实际处理数据。
main函数负责将数据添加到队列,并启动工作协程。
这种方法允许你以可控的方式处理大量数据,而不会一次性创建大量任务。你可以根据系统资源和任务特性调整并发数量。此外,通过使用None作为结束信号,你可以优雅地关闭工作协程。