说明
进程间除了主子进程共享数据,也可以通过进程间通信实现交互、数据共享
multiprocessing 提供了多种方式来实现进程间通信,如管道(Pipe)、队列(Queue)和共享内存(Value 和 Array)等。
通过这些机制,不同进程之间可以安全地共享数据或进行通信
队列(Queue)
from multiprocessing import Process, Queue def worker(q): data = q.get() # 子进程处理/消费,主进程的生产的数据 print("Received data:", data) # ['data from main process', 'data from child process'] if __name__ == '__main__': q = Queue() p = Process(target=worker, args=(q,)) p.start() # 主进程塞进去/生产一个数据 q.put("Hello from main process")
管道(Pipe)
1 ''' 2 1. 创建管道对象 3 使用 Pipe() 函数创建了一个管道对象,返回两个连接对象 parent_conn 和 child_conn。parent_conn 用于从父进程向子进程发送消息,child_conn 用于从子进程接收消息。 4 5 2. 创建发送和接收进程,并传递管道连接对象 6 分别创建了发送进程 sender_process 和接收进程 receiver_process,并通过参数传递了相应的管道连接对象。这样,这两个进程就可以使用各自的连接对象进行通信。 7 8 3. 启动进程 9 使用 start() 方法启动发送和接收进程,它们会并行地执行任务。 10 11 4. 等待进程结束 12 使用 join() 方法阻塞等待发送和接收进程结束。这样,主进程会等待子进程完成任务后再继续执行。 13 14 在具体的任务函数中,sender() 函数使用 conn.send() 方法向管道发送一条消息 "Hello from sender",然后关闭连接。receiver() 函数使用 conn.recv() 方法接收来自管道的消息,并打印出来,然后关闭连接。 15 ''' 16 from multiprocessing import Process, Pipe 17 18 19 def sender(conn): 20 message = "Hello from sender" 21 conn.send(message) 22 conn.close() 23 24 25 def receiver(conn): 26 message = conn.recv() 27 print("Received message:", message) # Received message: Hello from sender
28 conn.close() 29 30 31 if __name__ == '__main__': 32 # 1. 创建管道对象 33 parent_conn, child_conn = Pipe() 34 35 # 2. 创建发送和接收进程,并传递管道连接对象 36 sender_process = Process(target=sender, args=(parent_conn,)) 37 receiver_process = Process(target=receiver, args=(child_conn,)) 38 39 # 3. 启动进程 40 sender_process.start() 41 receiver_process.start() 42 43 # 4. 等待进程结束 44 sender_process.join() 45 receiver_process.join()
进程间通信共享内存(Value 和 Array)
Value
1 ''' 2 在多进程编程中,可以使用 Value 和 Array 来实现进程间共享内存的通信 3 Value 类用于在进程间共享单个值,它是基于 ctypes 模块实现的。Value 可以通过指定的数据类型来创建共享变量,支持整型、浮点型和自定义的 ctypes 数据类型。 4 ''' 5 from multiprocessing import Process, Value 6 7 8 def worker(shared_value): 9 shared_value.value = 100 10 11 12 if __name__ == '__main__': 13 # 创建共享值对象,初始值为0 14 shared_value = Value('i', 0) 15 16 # 创建子进程,并传递共享值对象 17 p = Process(target=worker, args=(shared_value,)) 18 p.start() 19 p.join() 20 21 # 打印共享值对象的值 22 print(shared_value.value) # 100
使用 Value('i', 0)
创建了一个共享的整型值对象 shared_value
,初始值为0。然后,我们创建了一个子进程 p
,并将 shared_value
作为参数传递给子进程的任务函数 worker
。在子进程中,我们将 shared_value.value
设置为100。
最后,在主进程中,我们打印了 shared_value.value
的值,可以看到它已经被子进程修改为100。
Array
1 ''' 2 Array 类用于在进程间共享可变数组,它也是基于 ctypes 模块实现的。Array 可以通过指定的数据类型和大小来创建共享数组,支持整型、浮点型和自定义的 ctypes 数据类型。 3 ''' 4 5 from multiprocessing import Process, Array 6 7 8 def worker(shared_array): 9 for i in range(len(shared_array)): 10 shared_array[i] = i * 2 # 给array添加元素 11 12 13 if __name__ == '__main__': 14 # 创建共享数组对象,大小为5 15 shared_array = Array('i', 5) 16 17 # 创建子进程,并传递共享数组对象 18 p = Process(target=worker, args=(shared_array,)) 19 p.start() 20 p.join() 21 22 # 打印共享数组对象的值 23 print(list(shared_array)) # [0, 2, 4, 6, 8],需要转换为list 24 print(shared_array) # <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_long_Array_5 object at 0x000001E86912FB50>>
使用 Array('i', 5)
创建了一个包含5个整型元素的共享数组对象 shared_array
。然后,我们创建了一个子进程 p
,并将 shared_array
作为参数传递给子进程的任务函数 worker
。在子进程中,我们将 shared_array
的每个元素设置为索引乘以2的值。