首页 > 编程问答 >Python多重处理,如何避免创建具有百万个对象的元组

Python多重处理,如何避免创建具有百万个对象的元组

时间:2024-07-30 06:51:26浏览次数:15  
标签:python python-multiprocessing

python 多处理新手。 我有一项任务,涉及访问网络服务数百万次并将响应保存在文件中(每个请求都有单独的文件)。 我已经得到了高级工作代码,但对一些事情没有感到困惑。

  1. 以下两种语法有什么区别?

    pool = Pool(processes=4) 
    pool.starmap(task, listOfInputParametersTuple) 
    

    with Pool(processes=4) as pool:
        pool.starmap(task, listOfInputParametersTuple)
  1. 有没有办法避免在开始之前读取整个输入文件多处理池? 基本上,读取每一行并立即生成一个池任务。

    with open (list_of_ids, 'r') as infile:
    for id in infile:
       listOfInputParametersTuple.append( tuple ((id, queue, requestBodyTemplate))
    pool.starmap(task, listOfInputParametersTuple)
    
  2. 在该任务中,我使用相当大的 requestBodyTemplate 并传递 id 来创建 JSON 请求。此 requestBodyTemplate 变量在输入元组的每个元素中重复。有没有一种方法可以将它传递给输入元组之外的 task 函数。

  3. 如何确保在退出主程序之前完成所有生成的任务?

  4. 任何关于超时任务的指针,如果它释放池的时间太长?

请随意分享您可能有的任何其他建议。

谢谢

目标

  • 从文件中读取 100 万个 ID 的列表| ||对于每个 id,制作一个 JSON。
  • 使用上面创建的 JSON 请求一个 Web 服务。 (需要几秒钟才能得到响应)
  • 将响应的输出保存到目录。
  • 将状态保存到统一的“状态”文件
  • 任务

- 主要方法

def task(i,queue):

    # get the current process
    process = current_process()

    # generate some work
    s = random.randint(1, 10)

    # block to simulate work
    print (f"TASK function : {process} - sleep for {s} sec")
    sleep(s)

    data = f"{process} - sleep {s} sec - {i} - {queue}"

    print (f"TASK function: {data}")

    # put it on the queue
    queue.put(data)

- -

def main ():

set_start_method('spawn')
pool = Pool(processes=4)

requestBodyTemplate = getRequestBodyTemplateJSON()

with open (list_of_ids, 'r') as infile:
    for line in infile:
        listOfInputParametersTuple.append( tuple ((line, queue , requestBodyTemplate))

# set the fork start method
# create the manager
with Manager() as  manager:
    # create the shared queue
    queue = manager.Queue()

    Process(target=listener, args=(queue,)).start()
    print ("back in main after starting listener ")

    # execute the tasks_i in parallel # use starmap to have multiple params 
    pool.starmap(task, listOfInputParametersTuple)
    # pool.starmap(task, zip ( args_i, itertools.repeat(queue)))

    pool.close()
    pool.join()

    # wait for all tasks to get over
    sleep(10)

    print ("\n Sending None message to queue ")
    queue.put(None)

当然,让我们分解一下如何在的 Python 多处理场景中优化文件处理和任务管理。

1. pool.starmap 语法:带或不带 with

  • pool = Pool(); pool.starmap(...); pool.close(); pool.join() :这种方法显式地创建了一个进程池,对其执行 starmap ,然后在完成时仔细清理资源。它可以让更好地控制池的生命周期,但需要手动管理。

  • with Pool() as pool: pool.starmap(...) :这种方法利用了上下文管理器 ( with )。它在进入 with 块时自动创建一个进程池,并在退出时自动处理关闭和连接。这通常更简洁,不太容易出错。

在这两种情况下,结果都是一样的: starmap 并行地将可迭代的参数元组映射到的 task 函数。

2. 避免预先加载整个输入文件

绝对可以避免在开始处理之前将整个百万行 ID 列表加载到内存中!这是 Python 生成器发挥作用的地方。生成器允许按需迭代数据,而无需将其全部存储在内存中。

def read_ids_in_chunks(filename, chunk_size=1000):  # 调整块大小以获得最佳性能
    with open(filename, 'r') as f:
        chunk = []
        for line in f:
            chunk.append(line.strip())  # 假设每个 ID 一行
            if len(chunk) == chunk_size:
                yield chunk
                chunk = []
        if chunk:  # 处理最后一个块
            yield chunk

def task(id, queue, requestBodyTemplate):  # 添加 requestBodyTemplate
    # ... 的任务逻辑,使用提供的 requestBodyTemplate ...

def main():
    set_start_method('spawn')
    with Manager() as manager:
        queue = manager.Queue()
        Process(target=listener, args=(queue,)).start()

        with Pool(processes=4) as pool:
            for id_chunk in read_ids_in_chunks('list_of_ids.txt'):
                args = [(id, queue, requestBodyTemplate) for id in id_chunk]
                pool.starmap(task, args)

    # ... 剩余的代码 ...

3. 有效传递 requestBodyTemplate

与其将大型 requestBodyTemplate 重复发送到每个进程(这会导致不必要的开销),不如利用多处理的共享资源:

  • Manager.dict() 用于只读访问 :如果不需要修改 requestBodyTemplate ,这是一个很好的选择。

```python import json from multiprocessing import Process, Pool, Manager, set_start_method

def task(id, queue, shared_data): requestBodyTemplate = shared_data['template'] # ... 的任务逻辑 ...

def main(): set_start_method('spawn') with Manager() as manager: shared_data = manager.dict({'template': getRequestBodyTemplateJSON()}) # ... 的代码 ... with Pool(processes=4) as pool: for id_chunk in read_ids_in_chunks('list_of_ids.txt'): args = [(id, queue, shared_data) for id in id_chunk] pool.starmap(task, args) # ... 剩余的代码 ... ```

4. 确保任务完成

pool.close() pool.join() 对于确保在主程序退出之前完成所有任务至关重要:

  • pool.close() :通知池在完成当前任务队列后不再接受新任务。
  • pool.join() :等待所有工作进程完成。 只有在调用 close() 后才能调用它。

5. 超时任务

虽然 Python 的 multiprocessing 模块没有内置的任务级超时机制,但可以通过以下方式实现自己的机制:

  • Process.terminate() 作为最后的手段 :可以创建一个包装函数或类来启动一个新进程来运行的任务,并设置一个计时器来监控它。如果任务超过时间限制,请使用 process.terminate() 强制终止它。请注意,这是一种强制终止,可能无法进行清理,因此请谨慎使用。

```python import time from multiprocessing import Process

def run_with_timeout(target, args, timeout=60): p = Process(target=target, args=args) p.start() p.join(timeout) # 等待超时时间 if p.is_alive(): p.terminate() print(f"Process {p.pid} timed out!") else: print(f"Process {p.pid} completed successfully.")

# 在的主函数中: run_with_timeout(target=some_function, args=(arg1, arg2), timeout=120) ```

其他建议

  • 错误处理 :在 task 函数中包含稳健的错误处理,以便优雅地捕获和记录异常。可以使用 try...except 块,并将错误记录到队列或文件中。
  • 进度指示器 :对于涉及数百万个项目的长时间运行的任务,使用 tqdm 之类的库为用户提供进度反馈。
  • 优化 chunk_size :生成器中的 chunk_size 参数会影响 I/O 与处理开销之间的平衡。尝试不同的值以找到适合用例的最佳值。

通过实现这些改进,将拥有一个更高效、更健壮的 Python 多处理管道来处理大型数据集! 如果有任何其他问题,请告诉我。

标签:python,python-multiprocessing
From: 78809244

相关文章

  • Python OpenCV - 显示坏像素检查测试
    我想找到显示器中存在的每个坏像素。坏像素可能是颜色不正确的像素,或者像素只是黑色。显示屏的尺寸为160x320像素。所以如果显示效果好的话,必须有160*320=51200像素。如果显示器没有51200像素,那就是坏的。另外,我想知道每个坏像素的位置。一旦拍摄的图像太大,我将共享一个......
  • 在python日志输出的每一行前面添加变量缩进
    我正在将日志记录构建到一个Python应用程序中,我希望它是人类可读的。目前,调试日志记录了调用的每个函数以及参数和返回值。这意味着,实际上,嵌套函数调用的调试日志可能如下所示:2024-07-2916:52:26,641:DEBUG:MainController.initialize_componentscalledwithargs<control......
  • 使用 DQN 实现 pong,使用 python 中的特征向量而不是像素。我的 DQNA 实现代码正确吗,因
    我正在致力于使用OpenAI的Gym为Pong游戏实现强化学习(RL)环境。目标是训练人工智能代理通过控制球拍来打乒乓球。代理收到太多负面奖励,即使它看起来移动正确。具体来说,奖励函数会惩罚远离球的智能体,但这种情况发生得太频繁,即使球朝球拍移动时似乎也会发生。观察......
  • Python CDLL 无法加载两次
    我正在尝试用python创建一个密码管理器,但遇到了一个问题,一旦加载了一种类型的dll,我就无法加载不同的dll,在这个示例中,我加载了一个dll,并尝试解密加密的密码数据,它工作正常,直到我加载另一个不同的nss3.dll文件,此时它给我一个错误:“过程入口点HeapAlloc无法位于动态链......
  • 你能将 HTTPS 功能添加到 python Flask Web 服务器吗?
    我正在尝试构建一个Web界面来模拟网络设备上的静态接口,该网络设备使用摘要式身份验证和HTTPS。我想出了如何将摘要式身份验证集成到Web服务器中,但我似乎无法找到如何使用FLASK获取https,如果您可以向我展示如何实现,请评论我需要使用下面的代码做什么来实现这一点。from......
  • Python:比较 csv 文件并打印相似之处
    我需要比较两个csv文件并打印出它们的相似之处。第一个文件有名称和浓度,第二个文件就像只有名称的“最佳”列表,我需要绘制相似性图表。例如,这就是我的列表的样子:file1-old_file.csvname_id,conc_test1,conc_test2name1,####,####name2,###......
  • Python 类交叉引用
    我用Python创建了一个数独游戏。我有一个:单元格类-“保存”数字可能性单元格组-保存单元格类实例我使用这些组在数独中运行行、列和正方形功能。每个单元格包含所有组,他属于classCell:def__init__(groups):self.groups=groupscla......
  • 如何修复我的 Python Azure Function DevOps Pipeline 上的“找到 1 个函数(自定义)加载
    我正在尝试使用AzureDevOps构建管道将PythonAzureFunction部署到Azure门户。由于某种原因,代码被部署到服务器,但我在尝试访问端点时收到404错误。我收到一个错误,显示1functionsfound(Custom)0functionsloaded,以及在服务器上显示ModuleNotFound......
  • 使用 kivy 从 python 脚本的 buildozer 构建 android apk 时出错
    我想从使用kivy包构建的Python脚本构建apk为此,我使用googlecollab.这里是main.py脚本:importyoutube_dlfromkivy.appimportAppfromkivy.uix.boxlayoutimportBoxLayoutfromkivy.uix.buttonimportButtonfromkivy.uix.tex......
  • 自动解码并检索 S/MIME 加密电子邮件的正文 (python)
    我如何:用python代码连接我的邮件收件箱以自动获取未读电子邮件的加密内容;解码S/MIME加密电子邮件(我有密钥);检索电子邮件正文纯文本;检查正文(或主题)是否与某个关键字(现在为“test”)匹配,并在匹配时打印一些内容;在树莓派上使用此代码,无需手动......