python 多处理新手。 我有一项任务,涉及访问网络服务数百万次并将响应保存在文件中(每个请求都有单独的文件)。 我已经得到了高级工作代码,但对一些事情没有感到困惑。
-
以下两种语法有什么区别?
pool = Pool(processes=4) pool.starmap(task, listOfInputParametersTuple)
和
with Pool(processes=4) as pool:
pool.starmap(task, listOfInputParametersTuple)
-
有没有办法避免在开始之前读取整个输入文件多处理池? 基本上,读取每一行并立即生成一个池任务。
with open (list_of_ids, 'r') as infile: for id in infile: listOfInputParametersTuple.append( tuple ((id, queue, requestBodyTemplate)) pool.starmap(task, listOfInputParametersTuple)
-
在该任务中,我使用相当大的 requestBodyTemplate 并传递 id 来创建 JSON 请求。此 requestBodyTemplate 变量在输入元组的每个元素中重复。有没有一种方法可以将它传递给输入元组之外的
task
函数。 -
如何确保在退出主程序之前完成所有生成的任务?
-
任何关于超时任务的指针,如果它释放池的时间太长?
请随意分享您可能有的任何其他建议。
谢谢
目标 :
- 从文件中读取 100 万个 ID 的列表| ||对于每个 id,制作一个 JSON。
- 使用上面创建的 JSON 请求一个 Web 服务。 (需要几秒钟才能得到响应)
- 将响应的输出保存到目录。
- 将状态保存到统一的“状态”文件
- 任务
- 主要方法
def task(i,queue):
# get the current process
process = current_process()
# generate some work
s = random.randint(1, 10)
# block to simulate work
print (f"TASK function : {process} - sleep for {s} sec")
sleep(s)
data = f"{process} - sleep {s} sec - {i} - {queue}"
print (f"TASK function: {data}")
# put it on the queue
queue.put(data)
- -
def main ():
set_start_method('spawn')
pool = Pool(processes=4)
requestBodyTemplate = getRequestBodyTemplateJSON()
with open (list_of_ids, 'r') as infile:
for line in infile:
listOfInputParametersTuple.append( tuple ((line, queue , requestBodyTemplate))
# set the fork start method
# create the manager
with Manager() as manager:
# create the shared queue
queue = manager.Queue()
Process(target=listener, args=(queue,)).start()
print ("back in main after starting listener ")
# execute the tasks_i in parallel # use starmap to have multiple params
pool.starmap(task, listOfInputParametersTuple)
# pool.starmap(task, zip ( args_i, itertools.repeat(queue)))
pool.close()
pool.join()
# wait for all tasks to get over
sleep(10)
print ("\n Sending None message to queue ")
queue.put(None)
当然,让我们分解一下如何在的 Python 多处理场景中优化文件处理和任务管理。
1.
pool.starmap
语法:带或不带
with
-
pool = Pool(); pool.starmap(...); pool.close(); pool.join()
:这种方法显式地创建了一个进程池,对其执行starmap
,然后在完成时仔细清理资源。它可以让更好地控制池的生命周期,但需要手动管理。 -
with Pool() as pool: pool.starmap(...)
:这种方法利用了上下文管理器 (with
)。它在进入with
块时自动创建一个进程池,并在退出时自动处理关闭和连接。这通常更简洁,不太容易出错。
在这两种情况下,结果都是一样的:
starmap
并行地将可迭代的参数元组映射到的
task
函数。
2. 避免预先加载整个输入文件
绝对可以避免在开始处理之前将整个百万行 ID 列表加载到内存中!这是 Python 生成器发挥作用的地方。生成器允许按需迭代数据,而无需将其全部存储在内存中。
def read_ids_in_chunks(filename, chunk_size=1000): # 调整块大小以获得最佳性能
with open(filename, 'r') as f:
chunk = []
for line in f:
chunk.append(line.strip()) # 假设每个 ID 一行
if len(chunk) == chunk_size:
yield chunk
chunk = []
if chunk: # 处理最后一个块
yield chunk
def task(id, queue, requestBodyTemplate): # 添加 requestBodyTemplate
# ... 的任务逻辑,使用提供的 requestBodyTemplate ...
def main():
set_start_method('spawn')
with Manager() as manager:
queue = manager.Queue()
Process(target=listener, args=(queue,)).start()
with Pool(processes=4) as pool:
for id_chunk in read_ids_in_chunks('list_of_ids.txt'):
args = [(id, queue, requestBodyTemplate) for id in id_chunk]
pool.starmap(task, args)
# ... 剩余的代码 ...
3. 有效传递
requestBodyTemplate
与其将大型
requestBodyTemplate
重复发送到每个进程(这会导致不必要的开销),不如利用多处理的共享资源:
-
Manager.dict()
用于只读访问 :如果不需要修改requestBodyTemplate
,这是一个很好的选择。
```python import json from multiprocessing import Process, Pool, Manager, set_start_method
def task(id, queue, shared_data): requestBodyTemplate = shared_data['template'] # ... 的任务逻辑 ...
def main(): set_start_method('spawn') with Manager() as manager: shared_data = manager.dict({'template': getRequestBodyTemplateJSON()}) # ... 的代码 ... with Pool(processes=4) as pool: for id_chunk in read_ids_in_chunks('list_of_ids.txt'): args = [(id, queue, shared_data) for id in id_chunk] pool.starmap(task, args) # ... 剩余的代码 ... ```
4. 确保任务完成
pool.close()
和
pool.join()
对于确保在主程序退出之前完成所有任务至关重要:
-
pool.close()
:通知池在完成当前任务队列后不再接受新任务。 -
pool.join()
:等待所有工作进程完成。 只有在调用close()
后才能调用它。
5. 超时任务
虽然 Python 的
multiprocessing
模块没有内置的任务级超时机制,但可以通过以下方式实现自己的机制:
-
Process.terminate()
作为最后的手段 :可以创建一个包装函数或类来启动一个新进程来运行的任务,并设置一个计时器来监控它。如果任务超过时间限制,请使用process.terminate()
强制终止它。请注意,这是一种强制终止,可能无法进行清理,因此请谨慎使用。
```python import time from multiprocessing import Process
def run_with_timeout(target, args, timeout=60): p = Process(target=target, args=args) p.start() p.join(timeout) # 等待超时时间 if p.is_alive(): p.terminate() print(f"Process {p.pid} timed out!") else: print(f"Process {p.pid} completed successfully.")
# 在的主函数中: run_with_timeout(target=some_function, args=(arg1, arg2), timeout=120) ```
其他建议
-
错误处理
:在
task
函数中包含稳健的错误处理,以便优雅地捕获和记录异常。可以使用try...except
块,并将错误记录到队列或文件中。 -
进度指示器
:对于涉及数百万个项目的长时间运行的任务,使用
tqdm
之类的库为用户提供进度反馈。 -
优化
chunk_size
:生成器中的chunk_size
参数会影响 I/O 与处理开销之间的平衡。尝试不同的值以找到适合用例的最佳值。
通过实现这些改进,将拥有一个更高效、更健壮的 Python 多处理管道来处理大型数据集! 如果有任何其他问题,请告诉我。
标签:python,python-multiprocessing From: 78809244