生产者消费者模型要点
主要使用 JoinableQueue, Process 类
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。
通知进程是使用共享的信号和条件变量来实现的。
参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制。
方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。
如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。
阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
-*- coding: utf-8 -*-
import os
import random
import time
from multiprocessing import JoinableQueue, Process
def producers(name, q):
for i in range(10):
time.sleep(random.randint(1, 3))
food = '%s-%s' % (name, i)
q.put(food)
print(f"{os.getpid()} produce {food}")
# what is the q.join() meaning? producer call the function to make sure all the food was token by consumers.
q.join()
def consumers(q):
while True:
food = q.get()
time.sleep(random.randint(1, 3))
print(f"{os.getpid()} eat {food}")
# what is the q.task_done() meaning? consumers eat all the food that it get from q(combine with the q.join).
q.task_done()
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producers, args=('hamburgers', q,))
p2 = Process(target=producers, args=('apple', q,))
p3 = Process(target=producers, args=('banana', q,))
c1 = Process(target=consumers, args=(q,), daemon=True)
c2 = Process(target=consumers, args=(q,), daemon=True)
# consumer process will be shut down when the main process was done. because it is daemon subprocess
# when the main process will be done ? when the 3 subprocess of producers finished ,all the food was made.
# why the consumer will consume all the time?
process_tuple = (p1, p2, p3, c1, c2)
for i in process_tuple:
i.start()
for i in process_tuple[:3]:
i.join()
print('main process end')
标签:Process,join,consumers,Consumers,food,Producers,process,done,Model
From: https://www.cnblogs.com/mrsphere/p/17270273.html