【一】引入
【1】什么是进程间通信
- 进程间通信(Inter-Process Communication, IPC)是指两个或多个进程之间进行信息交换的过程
【2】如何实现进程间通信
- 借助于消息队列,进程可以将消息放入队列中,然后由另一个进程从队列中取出
- 这种通信方式是非阻塞的,即发送进程不需要等待接收进程的响应即可继续执行
- multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
【3】什么是管道
- 管道是一种半双工的通信机制,即只能在一个方向上进行数据传输,子进程可以通过继承父进程的管道来实现通信
- stdin、stdout和stderr是Python中的三个内置文件对象,它们分别代表标准输入、标准输出和标准错误,这些对象也可以作为管道使用
- 当我们在一个进程中使用read方法读取管道内的消息后,其他进程将无法再获取该管道内的任何其他消息,因此我们需要使用锁或其他同步机制来确保多个进程能够正确地访问和修改共享资源
【4】什么是队列(管道 + 锁)
- 队列是一种线程安全的数据结构,它支持在多线程环境中高效地实现生产者-消费者模型
- 队列的特性是先进先出,即先插入队列的数据将先被取出
【5】进程间通信的目的
- 存是为了更好的取,千方百计的存,简单快捷的取
【二】队列介绍
【1】创建队列的类
# 创建共享的进程队列
import queue
left_pipe, right_pipe = Pipe()
# 默认参数是dumplex : 默认双通道的管道
【2】主要方法介绍
-
q.put:用于在队列中插入数据,参数有blocked和timeout
-
如果blocked为True,并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常
-
如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
-
-
q.get:用于从队列中获取数据,参数有blocked和timeout
-
如果blocked为True,并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常
-
如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常
-
-
q.get_nowait():同q.get(False)
-
q.put_nowait():同q.put(False)
-
q.empty():判断当前队列是否空了,空则返回True
-
q.full():判断当前队列是否满了,已满则返回True
-
q.qsize():返回队列中目前项目的正确数量
【3】方法使用
import queue
# 创建队列
# 括号内可以传参数,表示生成的队列最大可以同时存放的数据量
# 默认 为 0
q = queue.Queue(5)
# 向队列中放数据
q.put(111)
q.put(222)
q.put(333)
# 在队列中取数据
print(q.get())
print(q.get())
print(q.get())
# 111
# 222
# 333
print(q.qsize()) # 0
print(q.empty()) # True
print(q.full()) # False
print(q.get_nowait())
# raise Empty
# _queue.Empty
【4】总结
import queue
# 创建队列桶
q = queue.Queue(指定桶的容量)
# 向队列中添加数据
q.put(放入的数据类型)
# 判断当前队列是否满了,满了返回 True 否则为 False
q.full()
# 从队列中取出数据,队列中无数据不会报错,但是会夯住
data = q.get()
# 从队列中取出数据,队列中无数据会在指定延迟时间后抛出异常:raise Empty : _queue.Empty
data = q.get(timeout=秒数)
# 从队列中取出数据,队列中无数据会抛出异常:raise Empty : _queue.Empty
data = q.get_nowait()
# 判断当前队列是够空了,空了返回 True 否则为 False
q.empty()
【三】队列实现进程间通信
【1】什么是IPC机制
- IPC机制指进程间通信机制,它是指在不同进程间传输数据或者信息的一种机制
- 在多进程操作系统中,各个进程相互独立,不能直接访问对方的内存空间,所以必须通过特殊的通信方式实现进程之间的信息交换和协调
- 常见的IPC机制包括管道、队列、共享内存和信号量等方式。其中,管道、队列和共享内存都是用于进程之间的数据传输
【2】子进程与主进程之间通信
from multiprocessing import Process, Queue
# 创建一个子进程函数
def producer(queue):
# 子进程放入消息
queue.put('子进程中放入了一条消息')
print('这是主进程中的一个子进程的队列')
def main():
# 【一】创建队列对象
queue = Queue()
# 【二】创建子进程:目标函数和参数带进去
pro_cess = Process(target=producer, args=(queue,))
# 【三】启动子进程
pro_cess.start()
# 【四】主进程获取消息
msg_from_p = queue.get()
# 【五】查看消息
print(msg_from_p)
# 这是主进程中的一个子进程的队列
# 子进程中放入了一条消息
if __name__ == '__main__':
main()
【3】子进程与子进程之间通信
from multiprocessing import Process, Queue
import os
# 子进程和子进程之间进行通信
def producer(queue):
# 生产者
# 向队列中放数据
print(f'this is producer {os.getpid()} starting ... ')
# 向队列中添加数据
msg = 'this is a message from producer'
queue.put(msg)
print(f'this is producer {os.getpid()} ending ... ')
def customer(queue):
# 消费者
print(f'this is customer {os.getpid()} starting ... ')
msg = queue.get()
print(f'the message is {msg}')
print(f'this is customer {os.getpid()} ending ... ')
def main():
# 【一】得到一个队列对象
queue = Queue(5)
# 【二】创建两个多进程对象
# 【1】生产者子进程
producer_process = Process(target=producer, args=(queue,))
# 【2】消费者子进程
customer_process = Process(target=customer, args=(queue,))
# 【3】启动子进程
producer_process.start()
customer_process.start()
# 【4】阻塞并行
producer_process.join()
customer_process.join()
if __name__ == '__main__':
main()
'''
this is producer 83548 starting ...
this is producer 83548 ending ...
this is customer 82484 starting ...
the message is this is a message from producer
this is customer 82484 ending ...
'''
【四】生产者和消费者模型
【1】介绍
(1)生产者模型
- 生产者模型和消费者模型是指通过利用队列解耦生产者和消费者的一种并发编程模型
- 生产者负责将数据放入共享队列中,而消费者则从队列中取出数据进行处理,生产者和消费者之间通过共享这个队列来进行信息的交流
(2)消费者模型
- 消费者负责向队列中插入任务,而由线程池中的工作线程进行任务的处理
- 消费者和工作线程之间通过共享线程池中的任务队列来完成任务分发和执行
(3)小结
- 生产者:生产/制造东西
- 消费者:消费/处理东西
- 该模型还需要一个媒介
【2】场景引入
- 比如做包子是先将包子做好后放在蒸笼(媒介)里面,买包子的去蒸笼里面拿
- 厨师做菜之后用盘子(媒介)装着给消费者端过去
- 生产者与消费者之间不是直接做交互的,而是借助于媒介
- 生产者(做包子的) + 媒介(蒸包子) + 消费者(吃包子的)
【3】代码实现
(1)消费者大于生产者
import random
import time
from multiprocessing import Process, Queue
def producer(name, food, queue):
for i in range(1, 5):
data = f"大厨:{name}生产出第{i}道菜:{food}"
time.sleep(random.randint(1, 5))
queue.put(data)
def customer(name, queue):
while True:
food = queue.get()
time.sleep(random.randint(1, 5))
print(f"消费者:{name}消费了:{food}")
def main():
queue = Queue(4)
# 建立模型(生产者,厨师)
p_producer_tony = Process(target=producer, args=('tony', '辣子鸡', queue))
p_producer_scott = Process(target=producer, args=('scott', '辣椒炒肉', queue))
# 建立模型(消费者,顾客)
p_customer_uzi = Process(target=customer, args=('uzi', queue))
p_customer_drake = Process(target=customer, args=('drake', queue))
# 启动
p_producer_tony.start()
p_producer_scott.start()
p_customer_uzi.start()
p_customer_drake.start()
# 等待
p_producer_tony.join()
p_producer_scott.join()
p_customer_uzi.join()
p_customer_drake.join()
if __name__ == '__main__':
main()
'''
消费者:drake消费了:大厨:scott生产出第1道菜:辣椒炒肉
消费者:uzi消费了:大厨:tony生产出第1道菜:辣子鸡
消费者:drake消费了:大厨:scott生产出第2道菜:辣椒炒肉
消费者:uzi消费了:大厨:tony生产出第2道菜:辣子鸡
消费者:drake消费了:大厨:scott生产出第3道菜:辣椒炒肉
消费者:drake消费了:大厨:scott生产出第4道菜:辣椒炒肉
消费者:uzi消费了:大厨:tony生产出第3道菜:辣子鸡
消费者:drake消费了:大厨:tony生产出第4道菜:辣子鸡
'''
(2)给生产者添加结束标志
- 在队列的结尾添加结束标志的符号,作为结束的标志
import random
import time
from multiprocessing import Process, Queue
def producer(name, food, queue):
for i in range(1, 5):
data = f"大厨:{name}生产出第{i}道菜:{food}"
time.sleep(random.randint(1, 5))
queue.put(data)
# 消费完成,添加结束标志
queue.put(None)
def customer(name, queue):
while True:
food = queue.get()
if food is None:
print(f"消费者{name}消费完成!")
break
time.sleep(random.randint(1, 5))
print(f"消费者:{name}消费了:{food}")
def main():
queue = Queue(4)
# 建立模型(生产者,厨师)
p_producer_tony = Process(target=producer, args=('tony', '辣子鸡', queue))
p_producer_scott = Process(target=producer, args=('scott', '辣椒炒肉', queue))
# 建立模型(消费者,顾客)
p_customer_uzi = Process(target=customer, args=('uzi', queue))
p_customer_drake = Process(target=customer, args=('drake', queue))
# 启动
p_producer_tony.start()
p_producer_scott.start()
p_customer_uzi.start()
p_customer_drake.start()
# 等待
p_producer_tony.join()
p_producer_scott.join()
p_customer_uzi.join()
p_customer_drake.join()
if __name__ == '__main__':
main()
'''
消费者:drake消费了:大厨:tony生产出第1道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第1道菜:辣椒炒肉
消费者:drake消费了:大厨:tony生产出第2道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第2道菜:辣椒炒肉
消费者:drake消费了:大厨:scott生产出第3道菜:辣椒炒肉
消费者:uzi消费了:大厨:tony生产出第3道菜:辣子鸡
消费者uzi消费完成!
消费者:drake消费了:大厨:scott生产出第4道菜:辣椒炒肉
消费者:drake消费了:大厨:tony生产出第4道菜:辣子鸡
消费者drake消费完成!
'''
(3)JoinableQueue模块
- 每当向队列对象中存入数据的时候,队列对象内部会有一个计数器 +1
- 每当调用一次
task_done()
方法的时候,队列对象内部的计数器 -1 join()
当计数器为 0时,继续执行代码
import random
import time
from multiprocessing import Process, Queue,JoinableQueue
def producer(name, food, queue):
for i in range(1, 5):
data = f"大厨:{name}生产出第{i}道菜:{food}"
time.sleep(random.randint(1, 5))
queue.put(data)
# 消费完成,添加结束标志
queue.join()
def customer(name, queue):
while True:
food = queue.get()
if food is None:
print(f"消费者{name}消费完成!")
break
time.sleep(random.randint(1, 5))
print(f"消费者:{name}消费了:{food}")
# 告诉队列已经从队列中已经取出一个数据了
queue.task_done()
def main():
queue = JoinableQueue()
# 建立模型(生产者,厨师)
p_producer_tony = Process(target=producer, args=('tony', '辣子鸡', queue))
p_producer_scott = Process(target=producer, args=('scott', '辣椒炒肉', queue))
# 建立模型(消费者,顾客)
p_customer_uzi = Process(target=customer, args=('uzi', queue))
p_customer_drake = Process(target=customer, args=('drake', queue))
# 启动
p_producer_tony.start()
p_producer_scott.start()
# 将消费者设置成守护进程:主进程死亡,子进程跟着死亡
p_customer_uzi.daemon = True
p_customer_drake.daemon = True
p_customer_uzi.start()
p_customer_drake.start()
# 等待
p_producer_tony.join()
p_producer_scott.join()
# 等待消息队列中的所有数据被取完再往下执行代码
queue.join()
if __name__ == '__main__':
main()
'''
消费者:uzi消费了:大厨:tony生产出第1道菜:辣子鸡
消费者:drake消费了:大厨:tony生产出第2道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第1道菜:辣椒炒肉
消费者:drake消费了:大厨:tony生产出第3道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第2道菜:辣椒炒肉
消费者:drake消费了:大厨:tony生产出第4道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第3道菜:辣椒炒肉
消费者:drake消费了:大厨:scott生产出第4道菜:辣椒炒肉
'''
标签:customer,queue,producer,队列,模型,drake,间通信,大厨
From: https://www.cnblogs.com/ligo6/p/18218356