进程间通信(队列和生产者消费者模型)
一、关于进程间通信
[1]什么是进程间通信(Inter-Process Communication, IPC)
- 进程间通信(Inter-Process Communication, IPC)是指两个或多个进程之间进行信息交换的过程。
- 它是一种计算机编程技术,用于在不同进程之间共享数据和资源。
[2]如何实现进程间通信
- 借助于消息队列,进程可以将消息放入队列中,然后由另一个进程从队列中取出。
- 这种通信方式是非阻塞的,即发送进程不需要等待接收进程的响应即可继续执行。
- multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
[3]什么是管道
- 管道是一种半双工的通信机制,即只能在一个方向上进行数据传输。
- 子进程可以通过继承父进程的管道来实现通信。
stdin
、stdout
和stderr
是Python中的三个内置文件对象,它们分别代表标准输入、标准输出和标准错误。- 这些对象也可以作为管道使用。
- 当我们在一个进程中使用read方法读取管道内的消息后,其他进程将无法再获取该管道内的任何其他消息。
- 因此,我们需要使用锁或其他同步机制来确保多个进程能够正确地访问和修改共享资源。
[4]什么是队列(管道 + 锁)
- 队列是一种线程安全的数据结构,它支持在多线程环境中高效地实现生产者-消费者模型。
- 队列的特性是先进先出(First-In-First-Out, FIFO),即先插入队列的数据将先被取出。
- 堆栈是一种先进后出(First-In-Last-Out, FILO)的数据结构,与队列相反,最后插入的数据将首先被取出。
[5]进程间通信的目的
- 存是为了更好的取
- 千方百计的存
- 简单快捷的取
二、队列的介绍(推荐使用)
[1]创建队列的类(底层以管道和锁定的方式实现)
(1)语法
import queue
q = queue.Queue(maxsize)
Queue([maxsize])
:- 创建共享的进程队列
- Queue是多进程安全的队列
- 可以使用Queue实现多进程之间的数据传递。
(2)参数介绍
maxsize
是队列中允许最大项数,省略则无大小限制。
[2]方法介绍
(1)主要方法
-
p.put(item)
-
用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。
-
如果blocked为True(默认值)
- 如果没有设置timeout值(默认值为None),当队列中的数据达到设定的数据上限时,会一直阻塞,无法运行下去
- 如果设置了timeout值(必须为正数,非正数报错),当队列中的数据达到设定的数据上限时,会阻塞设定的秒数(timeout值),然后报错
-
如果blocked为False,当队列中的数据达到设定的数据上限时,会立即抛出
queue.Full
异常。
-
import queue
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
# 没有设置timeout值且队列已满,程序会一直阻塞,不会停止运行
start_time = time.time()
q.put(1, timeout=1)
q.put(2, timeout=1)
try:
q.put(3, timeout=1)
except queue.Full:
print("queue.Full")
print(time.time() - start_time)
# 设置了timeout 且对队列满了,程序会阻塞设置的时间后抛出异常
"""
queue.Full
1.012343406677246
"""
# 将block的值设置为False,且对队列满了,会立即抛出异常
q.put(1, block=False)
q.put(2, block=False)
q.put(3, block=False) # queue.Full
-
q.get
- 可以从队列读取并且删除一个元素,同样,get方法有两个可选参数:blocked和timeout。
- 如果blocked为True(默认值)
- 如果没有设置timeout值(默认值为None),当读取不到数据时,会一直阻塞,无法运行下去
- 如果设置了timeout值(必须为正数,非正数报错),当读取不到数据时,会阻塞设定的秒数(timeout值),然后抛出
queue.Empty
异常
- 如果blocked为False,当读取不到数据时,会立即抛出
queue.Empty
异常。
q.get() q.get() q.get() # 没有设置timeout值且读取不到数据,程序会一直阻塞,不会停止运行 start_time = time.time() q.get(timeout=1) q.get(timeout=1) try: q.get(timeout=1) except queue.Empty: print("queue.Empty") print(time.time() - start_time) """ queue.Empty 1.0107645988464355 """ # 设置了timeout 且读取不到数据,程序会阻塞设置的时间后抛出异常 q.get(block=False) q.get(block=False) q.get(block=False) # _queue.Empty # 将block的值设置为False,且读取不到数据,会立即抛出异常
-
q.get_nowait()
- 同
q.get(False)
- 同
-
q.put_nowait()
- 同
q.put(False)
- 同
-
q.empty()
- 调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
import queue
q = queue.Queue(5)
# 列表为空时返回True
print(q.empty()) # True
q.put(1)
# 列表不是空的则返回False
print(q.empty()) # False
q.full()
- 调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
import queue
q = queue.Queue(4)
q.put(1)
q.put(2)
q.put(3)
# 队列还没满时返回 False
print(q.full()) # False
q.put(4)
# 队列满了则返回True
print(q.full()) # True
q.qsize()
- 返回队列中目前项目的正确数量,结果也不可靠,理由同
q.empty()
和q.full()
一样
- 返回队列中目前项目的正确数量,结果也不可靠,理由同
(2)其他方法(了解)
q.cancel_join_thread()
- 不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close()
- 关闭队列,防止队列中加入更多数据。
- 调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。
- 如果q被垃圾收集,将调用此方法。
- 关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。
- 例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread()
- 连接队列的后台线程。
- 此方法用于在调用
q.close()
方法之后,等待所有队列项被消耗。 - 默认情况下,此方法由不是q的原始创建者的所有进程调用。调用
q.cancel_join_thread
方法可以禁止这种行为
注意:
- 以上方法在多进程 的情况下不精确
- 多进程的队列切换时间是非常短的,存在可能判断失误
三、进程间通信(IPC机制)
[1]什么是IPC机制
- IPC机制指进程间通信机制(Inter-Process Communication),它是指在不同进程间传输数据或者信息的一种机制。
- 在多进程操作系统中,各个进程相互独立,不能直接访问对方的内存空间,所以必须通过特殊的通信方式实现进程之间的信息交换和协调。
- 常见的IPC机制包括管道、消息队列、共享内存和信号量等方式。
- 其中,管道、消息队列和共享内存都是用于进程之间的数据传输
- 而信号量则是用于进程之间的同步与互斥。
- 通过使用IPC机制,可以使得不同进程能够有效地协同工作,在复杂的程序设计中起到十分重要的作用。
[2]子进程与主进程之间通过队列进行通信
- 主进程与子进程之间借助于队列进行通信
from multiprocessing import Process, Queue
import time
def producer(queue_obj):
print('子进程开始!')
# 在子进程中在队列中放入信息
queue_obj.put('这是发给主进程的消息')
time.sleep(1)
print('子进程结束!')
if __name__ == '__main__':
# 先在主进程实例化队列对象
q = Queue(5)
# 在通过实例化子进程对象时将队列对象传入
p = Process(target=producer, args=(q,))
#启动子进程
p.start()
p.join()
# 等子进程结束然后主进程就可以拿到子进程在对对象中放入的消息
print(f'这是子进程发来的消息:{q.get()}')
"""
子进程开始!
子进程结束!
这是子进程发来的消息:这是发给主进程的消息
"""
[2]子进程与子进程之间借助队列进行通信
from multiprocessing import Process, Queue, Lock
import time
def producer(queue_obj, lock):
lock.acquire()
print('生产者进程开始!')
queue_obj.put('这是发送给消费者的信息!')
time.sleep(1)
print('生产者进程结束')
lock.release()
def customer(queue_obj, lock):
lock.acquire()
print('消费者进程开始!')
time.sleep(1)
print(f'这是从生产者传来的消息{queue_obj.get()}')
print('消费者进程结束!')
lock.release()
if __name__ == '__main__':
q = Queue(5)
lock = Lock()
pro = Process(target=producer, args=(q, lock))
cus = Process(target=customer, args=(q, lock))
pro.start()
cus.start()
pro.join()
cus.join()
"""
生产者进程开始!
生产者进程结束
消费者进程开始!
这是从生产者传来的消息这是发送给消费者的信息!
消费者进程结束!
"""
四、生产者和消费者模式
[1]理论
(1)生产者模型
- 生产者模型和消费者模型是指通过利用队列解耦生产者和消费者的一种并发编程模型。
- 在生产者模型中,生产者负责将数据放入共享队列中,而消费者则从队列中取出数据进行处理。
- 生产者和消费者之间通过共享这个队列来进行信息的交流。
- 这种模型适用于生产者和消费者之间的处理速度不一致的情况,同时还能够保证数据传输的安全性和正确性。
(2)消费者模型
- 在消费者模型中,消费者负责向队列中插入任务,而由线程池中的工作线程进行任务的处理。
- 消费者和工作线程之间通过共享线程池中的任务队列来完成任务分发和执行。
- 这种模型适用于任务处理需要一定时间的情况,能够充分利用多线程的优势提高系统的并发性能,提高效率。
[2]通过示例了解
- 比如做包子是先将包子做好后放在蒸笼(媒介)里面,买包子的去蒸笼里面拿
- 厨师做菜之后用盘子(媒介)装着给消费者端过去
- 生产者与消费者之间不是直接做交互的,而是借助于媒介
- 生产者(做包子的) + 媒介(蒸包子) + 消费者(吃包子的)
(1)消费者大于生产者
from multiprocessing import Process, Queue
import time
def producer(name, food, queue_obj):
for i in range(1, 3):
data = f'厨师{name}烹饪了{food}{i}'
queue_obj.put(data)
def customer(name, queue_obj):
for i in range(1, 3):
print(f'{name}品尝了{queue_obj.get()}')
if __name__ == '__main__':
q = Queue(5)
p_list = []
Xanadu = Process(target=producer, args=('Xanadu', '宫保鸡丁', q))
p_list.append(Xanadu)
bridge = Process(target=producer, args=('bridge', '宫保鸡丁', q))
p_list.append(bridge)
bridge = Process(target=customer, args=('water', q))
p_list.append(bridge)
farm = Process(target=customer, args=('farm', q))
p_list.append(farm)
[p.start() for p in p_list]
[p.join() for p in p_list]
"""
water品尝了厨师Xanadu烹饪了宫保鸡丁1
water品尝了厨师Xanadu烹饪了宫保鸡丁2
farm品尝了厨师bridge烹饪了宫保鸡丁1
farm品尝了厨师bridge烹饪了宫保鸡丁2
"""
(2)为生产者添加结束标志
- 向队列的结尾添加结束标志的符号,作为结束的标志
- 但是消费者的个数决定了添加标志的个数
from multiprocessing import Process, Queue, Lock
import time
def producer(name, food, queue_obj):
for i in range(3):
data = f'厨师{name}烹饪的{food}{i + 1}'
time.sleep(1)
queue_obj.put(data)
queue_obj.put('q')
def customer(queue_obj):
while True:
data = queue_obj.get()
if data == 'q':
print('品尝结束!')
break
print(f'消费者品尝了{data}')
time.sleep(1)
def main():
q = Queue(5)
p_list = []
p_list_new = []
xanadu = Process(target=producer, args=('Xanadu', '宫保鸡丁', q))
p_list.append(xanadu)
bridge = Process(target=producer, args=('bridge', '鱼香肉丝', q))
p_list.append(bridge)
customers = Process(target=customer, args=(q,))
p_list.append(customers)
for p in p_list:
p.start()
p_list_new.append(p)
for p in p_list_new:
p.join()
if __name__ == '__main__':
main()
"""
消费者品尝了厨师bridge烹饪的鱼香肉丝1
消费者品尝了厨师Xanadu烹饪的宫保鸡丁1
消费者品尝了厨师bridge烹饪的鱼香肉丝2
消费者品尝了厨师Xanadu烹饪的宫保鸡丁2
消费者品尝了厨师bridge烹饪的鱼香肉丝3
消费者品尝了厨师Xanadu烹饪的宫保鸡丁3
品尝结束!
"""
(3)JoinableQueue
模块
- 每当向队列对象中存入数据的时候,队列对象内部会有一个计数器 +1
- 每当调用一次 task_done() 方法的时候,队列对象内部的计数器 -1
- join() 当计数器为 0 时,继续执行代码
from multiprocessing import Process, JoinableQueue
import time
def producer(name, food, queue_obj):
for i in range(3):
data = f'厨师{name}烹饪的{food}{i + 1}'
time.sleep(1)
queue_obj.put(data)
def customer(queue_obj):
while True:
data = queue_obj.get()
time.sleep(1)
print(f'消费者品尝了{data}')
queue_obj.task_done()
def main():
q = JoinableQueue()
p_list = []
p_list_new = []
xanadu = Process(target=producer, args=('Xanadu', '宫保鸡丁', q))
p_list.append(xanadu)
bridge = Process(target=producer, args=('bridge', '鱼香肉丝', q))
p_list.append(bridge)
customers = Process(target=customer, args=(q,))
for p in p_list:
p.start()
p_list_new.append(p)
customers.daemon = True
customers.start()
for p in p_list_new:
p.join()
q.join()
if __name__ == '__main__':
main()
"""
消费者品尝了厨师Xanadu烹饪的宫保鸡丁1
消费者品尝了厨师bridge烹饪的鱼香肉丝1
消费者品尝了厨师Xanadu烹饪的宫保鸡丁2
消费者品尝了厨师bridge烹饪的鱼香肉丝2
消费者品尝了厨师Xanadu烹饪的宫保鸡丁3
消费者品尝了厨师bridge烹饪的鱼香肉丝3
"""
[补充]
一、JoinableQueue
模块
【1】介绍
- JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。
- 通知进程是使用共享的信号和条件变量来实现的。
【2】使用
- JoinableQueue([maxsize])
- maxsize是队列中允许最大项数,省略则无大小限制。
from multiprocessing import JoinableQueue
q = JoinableQueue()
# 方法介绍:JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
# q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
# q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
二、PriorityQueue
设置队列优先级
- 数字越小优先级越高
import queue
q = queue.PriorityQueue()
q.put((1, 'a'))
q.put((2, 'b'))
q.put((3, 'c'))
q.put((0, 'd'))
count = 4
while count > 0:
print(f'从队列中取出{q.get()}')
count -= 1
"""
从队列中取出(0, 'd')
从队列中取出(1, 'a')
从队列中取出(2, 'b')
从队列中取出(3, 'c')
"""
- 拓展(了解)
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
构造一个优先级队列,其中maxsize是一个整数,用于设置可以放入队列的项目数量的上限.一旦达到这个上限,插入就会阻塞,直到队列中有项目被消耗。如果maxsize小于或等于0,则队列长度为无穷大。
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).
首先检索最低值的条目(最低值的条目是指列表经过排序后取到的索引为0的那个元素,一般条目是(优先级数字,数据)这种元组的形式
exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.
当表示非阻塞的get()或get_nowait()在一个空的队列对象中被调用时,会抛出异常
exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.
当表示非阻塞的put()或put_nowait()在一个满的队列对象中被调用时,会抛出异常
Queue.qsize()
Queue.empty() #return True if empty
当队列为空返回True
Queue.full() # return True if full
当队列为满返回True
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).
将一个项放入队列。如果可选参数block为true并且timeout为None(默认值),则在必要时阻塞,直到有空闲槽可用。如果参数timeout是一个正数,它最多阻塞timeout秒,如果在这段时间内没有可用的空闲槽,则会引发Full异常。否则(block为false),如果有空闲槽可用,则将一个项目放入队列中,否则引发Full异常(在这种情况下,timeout被忽略)。
Queue.put_nowait(item)
Equivalent to put(item, False).
Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).
从队列中移除并返回一个项。如果可选参数block为true并且timeout为None(默认值),则在必要时阻塞,直到有可用的项。如果timeout为正数,则最多阻塞timeout秒,如果在该时间内没有可用项,则抛出Empty异常。否则(block为false),如果一个项目可用,则返回那个项目,否则引发Empty异常(在这种情况下,timeout被忽略)。
Queue.get_nowait()
Equivalent to get(False).
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
提供了两种方法来支持追踪进入队列的任务是否已被生产者的守护线程完全处理。
Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
假定先前进入队列的任务已完成。并且被队列生产者使用。对于每个用于获取任务的get(),后续对task_done()的调用都会告诉队列任务的处理已经完成。
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
如果join()当前正被阻塞,它将在所有项都被处理完时恢复(这意味着对于每个已经put()到队列中的项都接收到task_done()调用)。
Raises a ValueError if called more times than there were items placed in the queue.
如果调用次数超过放入队列的项数,将引发ValueError。
Queue.join()
阻塞,直到queue被消费完毕
标签:queue,get,队列,间通信,生产者,timeout,put,进程
From: https://www.cnblogs.com/taoyuanshi/p/18124733