什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
(1)生产者模型
- 生产者模型和消费者模型是指通过利用队列解耦生产者和消费者的一种并发编程模型。
- 在生产者模型中,生产者负责将数据放入共享队列中,而消费者则从队列中取出数据进行处理。
- 生产者和消费者之间通过共享这个队列来进行信息的交流。
- 这种模型适用于生产者和消费者之间的处理速度不一致的情况,同时还能够保证数据传输的安全性和正确性。
(2)消费者模型
- 在消费者模型中,消费者负责向队列中插入任务,而由线程池中的工作线程进行任务的处理。
- 消费者和工作线程之间通过共享线程池中的任务队列来完成任务分发和执行。
- 这种模型适用于任务处理需要一定时间的情况,能够充分利用多线程的优势提高系统的并发性能,提高效率。
(3)小结
- 生产者:生产/制造东西
- 消费者:消费/处理东西
- 该模型还需要一个媒介
为什么要使用生产者消费者模型
生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。
同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
- 比如做包子是先将包子做好后放在蒸笼(媒介)里面,买包子的去蒸笼里面拿
- 厨师做菜之后用盘子(媒介)装着给消费者端过去
- 生产者与消费者之间不是直接做交互的,而是借助于媒介
- 生产者(做包子的) + 媒介(蒸包子) + 消费者(吃包子的)
如何实现生产者消费者模型
进程间引入队列可以实现生产者消费者模型,通过使用队列无需考虑锁的概念,因为进程间的通信是通过队列来实现的;
生产者生产的数据往队列里面写,消费者消费数据直接从队列里面取,这样就对实现了生产者和消费者之间的解耦。
生产者 -- > 队列 <--消费者
Queue实现生产者消费者模型
消费者生产者模型代码
from multiprocessing import Process, Queue
import time
# 消费者方法
def consumer(q, name):
while True:
res = q.get()
# if res is None: break
print("%s 吃了 %s" % (name, res))
# 生产者方法
def producer(q, name, food):
for i in range(3):
time.sleep(1) # 模拟生产西瓜的时间延迟
res = "%s %s" % (food, i)
print("%s 生产了 %s" % (name, res))
# 把生产的vegetable放入到队列中
q.put(res)
if __name__ == "__main__":
#创建队列
q = Queue()
# 创建生产者
p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
c1 = Process(target=consumer, args=(q, "peter",))
p1.start()
c1.start()
# p1.join()
# q.put(None)
print("主进程")
直接执行会出现一个问题就是生产者生产完了,没有向消费者发送一个停止的信号,所以消费者一直会一直阻塞在q.get(),导致程序无法退出。
解决方案
方案一:为生产者添加结束标志
思路:向队列的结尾添加结束标志的符号,作为结束的标志
不足:但如果有n个消费者,就需要发送n个结束信号,这种方式就不是那么简洁
from multiprocessing import Process, Queue
import time
# 消费者方法
def consumer(q, name):
while True:
res = q.get()
if res is None: break
print("%s 吃了 %s" % (name, res))
# 生产者方法
def producer(q, name, food):
for i in range(3):
time.sleep(1) # 模拟生产西瓜的时间延迟
res = "%s %s" % (food, i)
print("%s 生产了 %s" % (name, res))
# 把生产的vegetable放入到队列中
q.put(res)
if __name__ == "__main__":
# 创建队列
q = Queue()
# 创建生产者
p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
p2 = Process(target=producer, args=(q, "kelly2", "香蕉"))
c1 = Process(target=consumer, args=(q, "peter",))
c2 = Process(target=consumer, args=(q, "peter2",))
c3 = Process(target=consumer, args=(q, "peter3",))
p1.start()
p2.start()
c1.start()
c2.start()
c3.start()
p1.join()
p2.join()
q.put(None)
q.put(None)
q.put(None)
print("主进程")
# kelly 生产了 西瓜 0
# peter2 吃了 西瓜 0
# kelly2 生产了 香蕉 0
# peter 吃了 香蕉 0
# kelly 生产了 西瓜 1
# peter3 吃了 西瓜 1
# kelly2 生产了 香蕉 1
# peter2 吃了 香蕉 1
# kelly 生产了 西瓜 2
# peter 吃了 西瓜 2
# kelly2 生产了 香蕉 2
# peter3 吃了 香蕉 2
# 主进程
其实我们现在就是生产者生产完数据之后想往队列里面发送一个结束信号,python语言提供了另外一种队列JoinableQueue([maxsize])来解决这种问题
方案二:JoinableQueue实现生产者消费者模型
思路:每当向队列对象中存入数据的时候,队列对象内部会有一个计数器 +1,每当调用一次 task_done() 方法的时候,队列对象内部的计数器 -1,join() 当计数器为 0 时,执行退出代码。
from multiprocessing import Process, JoinableQueue
import time
# 消费者方法
def consumer(q, name):
while True:
res = q.get()
if res is None: break
print("%s 吃了 %s" % (name, res))
q.task_done() # 发送信号给q.join(),表示已经从队列中取走一个值并处理完毕了
# 生产者方法
def producer(q, name, food):
for i in range(3):
time.sleep(1) # 模拟生产西瓜的时间延迟
res = "%s %s" % (food, i)
print("%s 生产了 %s" % (name, res))
# 把生产的vegetable放入到队列中
q.put(res)
q.join() # 等消费者把自己放入队列的所有元素取完之后才结束
if __name__ == "__main__":
# q = Queue()
q = JoinableQueue()
# 创建生产者
p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
p2 = Process(target=producer, args=(q, "kelly2", "蓝莓"))
# 创建消费者
c1 = Process(target=consumer, args=(q, "peter",))
c2 = Process(target=consumer, args=(q, "peter2",))
c3 = Process(target=consumer, args=(q, "peter3",))
c1.daemon = True
c2.daemon = True
c3.daemon = True
p_l = [p1, p2, c1, c2, c3]
for p in p_l:
p.start()
p1.join()
p2.join()
# 1.主进程等待p1,p2进程结束才继续执行
# 2.由于q.join()的存在,生产者只有等队列中的元素被消费完才会结束
# 3.生产者结束了,就代表消费者已经消费完了,也可以结束了,所以可以把消费者设置为守护进程(随着主进程的退出而退出)
print("主进程")
# kelly2 生产了 蓝莓 0
# kelly 生产了 西瓜 0
# peter 吃了 蓝莓 0
# peter2 吃了 西瓜 0
# kelly2 生产了 蓝莓 1
# peter3 吃了 蓝莓 1
# kelly 生产了 西瓜 1
# peter 吃了 西瓜 1
# kelly2 生产了 蓝莓 2
# peter2 吃了 蓝莓 2
# kelly 生产了 西瓜 2
# peter3 吃了 西瓜 2
# 主进程
【补充】JoinableQueue模块
(1)介绍
JoinableQueue([maxsize])
:这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
(2)使用
JoinableQueue([maxsize])
maxsize是队列中允许最大项数,省略则无大小限制。
from multiprocessing import JoinableQueue
q = JoinableQueue()
# 方法介绍:JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
# q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
# q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
标签:Process,队列,消费者,生产者,res,模型,编程,并发,name
From: https://www.cnblogs.com/xiao01/p/17978439