首页 > 系统相关 >进程间通信(队列和生产消费模型)

进程间通信(队列和生产消费模型)

时间:2024-05-28 16:54:49浏览次数:33  
标签:customer queue producer 队列 模型 drake 间通信 大厨

【一】引入

【1】什么是进程间通信

  • 进程间通信(Inter-Process Communication, IPC)是指两个或多个进程之间进行信息交换的过程

【2】如何实现进程间通信

  • 借助于消息队列,进程可以将消息放入队列中,然后由另一个进程从队列中取出
  • 这种通信方式是非阻塞的,即发送进程不需要等待接收进程的响应即可继续执行
  • multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

【3】什么是管道

  • 管道是一种半双工的通信机制,即只能在一个方向上进行数据传输,子进程可以通过继承父进程的管道来实现通信
  • stdin、stdout和stderr是Python中的三个内置文件对象,它们分别代表标准输入、标准输出和标准错误,这些对象也可以作为管道使用
  • 当我们在一个进程中使用read方法读取管道内的消息后,其他进程将无法再获取该管道内的任何其他消息,因此我们需要使用锁或其他同步机制来确保多个进程能够正确地访问和修改共享资源

【4】什么是队列(管道 + 锁)

  • 队列是一种线程安全的数据结构,它支持在多线程环境中高效地实现生产者-消费者模型
  • 队列的特性是先进先出,即先插入队列的数据将先被取出

【5】进程间通信的目的

  • 存是为了更好的取,千方百计的存,简单快捷的取

【二】队列介绍

【1】创建队列的类

# 创建共享的进程队列
import queue

left_pipe, right_pipe = Pipe()  
# 默认参数是dumplex : 默认双通道的管道

【2】主要方法介绍

  • q.put:用于在队列中插入数据,参数有blocked和timeout

    • 如果blocked为True,并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常

    • 如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常

  • q.get:用于从队列中获取数据,参数有blocked和timeout

    • 如果blocked为True,并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常

    • 如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常

  • q.get_nowait():同q.get(False)

  • q.put_nowait():同q.put(False)

  • q.empty():判断当前队列是否空了,空则返回True

  • q.full():判断当前队列是否满了,已满则返回True

  • q.qsize():返回队列中目前项目的正确数量

【3】方法使用

import queue

# 创建队列
# 括号内可以传参数,表示生成的队列最大可以同时存放的数据量
# 默认 为 0
q = queue.Queue(5)

# 向队列中放数据
q.put(111)
q.put(222)
q.put(333)

# 在队列中取数据
print(q.get())
print(q.get())
print(q.get())
# 111
# 222
# 333
print(q.qsize())  # 0
print(q.empty())  # True
print(q.full())  # False
print(q.get_nowait())
#     raise Empty
# _queue.Empty

【4】总结

import queue

# 创建队列桶
q = queue.Queue(指定桶的容量)
# 向队列中添加数据
q.put(放入的数据类型)
# 判断当前队列是否满了,满了返回  True 否则为 False
q.full()
# 从队列中取出数据,队列中无数据不会报错,但是会夯住
data = q.get()
# 从队列中取出数据,队列中无数据会在指定延迟时间后抛出异常:raise Empty : _queue.Empty
data = q.get(timeout=秒数)
# 从队列中取出数据,队列中无数据会抛出异常:raise Empty : _queue.Empty
data = q.get_nowait()
# 判断当前队列是够空了,空了返回  True 否则为 False
q.empty()

【三】队列实现进程间通信

【1】什么是IPC机制

  • IPC机制指进程间通信机制,它是指在不同进程间传输数据或者信息的一种机制
  • 在多进程操作系统中,各个进程相互独立,不能直接访问对方的内存空间,所以必须通过特殊的通信方式实现进程之间的信息交换和协调
  • 常见的IPC机制包括管道、队列、共享内存和信号量等方式。其中,管道、队列和共享内存都是用于进程之间的数据传输

【2】子进程与主进程之间通信

from multiprocessing import Process, Queue


# 创建一个子进程函数
def producer(queue):
    # 子进程放入消息
    queue.put('子进程中放入了一条消息')
    print('这是主进程中的一个子进程的队列')


def main():
    # 【一】创建队列对象
    queue = Queue()
    # 【二】创建子进程:目标函数和参数带进去
    pro_cess = Process(target=producer, args=(queue,))
    # 【三】启动子进程
    pro_cess.start()
    # 【四】主进程获取消息
    msg_from_p = queue.get()

    # 【五】查看消息
    print(msg_from_p)
    # 这是主进程中的一个子进程的队列
    # 子进程中放入了一条消息


if __name__ == '__main__':
    main()

【3】子进程与子进程之间通信

from multiprocessing import Process, Queue
import os


# 子进程和子进程之间进行通信
def producer(queue):
    # 生产者
    # 向队列中放数据
    print(f'this is producer {os.getpid()} starting ... ')
    # 向队列中添加数据
    msg = 'this is a message from producer'
    queue.put(msg)
    print(f'this is producer {os.getpid()} ending ... ')


def customer(queue):
    # 消费者
    print(f'this is customer {os.getpid()} starting ... ')
    msg = queue.get()
    print(f'the message is {msg}')
    print(f'this is customer {os.getpid()} ending ... ')


def main():
    # 【一】得到一个队列对象
    queue = Queue(5)
    # 【二】创建两个多进程对象
    # 【1】生产者子进程
    producer_process = Process(target=producer, args=(queue,))
    # 【2】消费者子进程
    customer_process = Process(target=customer, args=(queue,))
    # 【3】启动子进程
    producer_process.start()
    customer_process.start()
    # 【4】阻塞并行
    producer_process.join()
    customer_process.join()


if __name__ == '__main__':
    main()
'''
this is producer 83548 starting ... 
this is producer 83548 ending ... 
this is customer 82484 starting ... 
the message is this is a message from producer
this is customer 82484 ending ... 
'''

【四】生产者和消费者模型

【1】介绍

(1)生产者模型

  • 生产者模型和消费者模型是指通过利用队列解耦生产者和消费者的一种并发编程模型
  • 生产者负责将数据放入共享队列中,而消费者则从队列中取出数据进行处理,生产者和消费者之间通过共享这个队列来进行信息的交流

(2)消费者模型

  • 消费者负责向队列中插入任务,而由线程池中的工作线程进行任务的处理
  • 消费者和工作线程之间通过共享线程池中的任务队列来完成任务分发和执行

(3)小结

  • 生产者:生产/制造东西
  • 消费者:消费/处理东西
  • 该模型还需要一个媒介

【2】场景引入

  • 比如做包子是先将包子做好后放在蒸笼(媒介)里面,买包子的去蒸笼里面拿
  • 厨师做菜之后用盘子(媒介)装着给消费者端过去
  • 生产者与消费者之间不是直接做交互的,而是借助于媒介
  • 生产者(做包子的) + 媒介(蒸包子) + 消费者(吃包子的)

【3】代码实现

(1)消费者大于生产者

import random
import time
from multiprocessing import Process, Queue


def producer(name, food, queue):
    for i in range(1, 5):
        data = f"大厨:{name}生产出第{i}道菜:{food}"
        time.sleep(random.randint(1, 5))
        queue.put(data)


def customer(name, queue):
    while True:
        food = queue.get()
        time.sleep(random.randint(1, 5))
        print(f"消费者:{name}消费了:{food}")


def main():
    queue = Queue(4)
    # 建立模型(生产者,厨师)
    p_producer_tony = Process(target=producer, args=('tony', '辣子鸡', queue))
    p_producer_scott = Process(target=producer, args=('scott', '辣椒炒肉', queue))
    # 建立模型(消费者,顾客)
    p_customer_uzi = Process(target=customer, args=('uzi', queue))
    p_customer_drake = Process(target=customer, args=('drake', queue))
    # 启动
    p_producer_tony.start()
    p_producer_scott.start()
    p_customer_uzi.start()
    p_customer_drake.start()
    # 等待
    p_producer_tony.join()
    p_producer_scott.join()
    p_customer_uzi.join()
    p_customer_drake.join()


if __name__ == '__main__':
    main()

'''
消费者:drake消费了:大厨:scott生产出第1道菜:辣椒炒肉
消费者:uzi消费了:大厨:tony生产出第1道菜:辣子鸡
消费者:drake消费了:大厨:scott生产出第2道菜:辣椒炒肉
消费者:uzi消费了:大厨:tony生产出第2道菜:辣子鸡
消费者:drake消费了:大厨:scott生产出第3道菜:辣椒炒肉
消费者:drake消费了:大厨:scott生产出第4道菜:辣椒炒肉
消费者:uzi消费了:大厨:tony生产出第3道菜:辣子鸡
消费者:drake消费了:大厨:tony生产出第4道菜:辣子鸡
'''

(2)给生产者添加结束标志

  • 在队列的结尾添加结束标志的符号,作为结束的标志
import random
import time
from multiprocessing import Process, Queue


def producer(name, food, queue):
    for i in range(1, 5):
        data = f"大厨:{name}生产出第{i}道菜:{food}"
        time.sleep(random.randint(1, 5))
        queue.put(data)
    # 消费完成,添加结束标志
    queue.put(None)


def customer(name, queue):
    while True:
        food = queue.get()
        if food is None:
            print(f"消费者{name}消费完成!")
            break
        time.sleep(random.randint(1, 5))
        print(f"消费者:{name}消费了:{food}")


def main():
    queue = Queue(4)
    # 建立模型(生产者,厨师)
    p_producer_tony = Process(target=producer, args=('tony', '辣子鸡', queue))
    p_producer_scott = Process(target=producer, args=('scott', '辣椒炒肉', queue))
    # 建立模型(消费者,顾客)
    p_customer_uzi = Process(target=customer, args=('uzi', queue))
    p_customer_drake = Process(target=customer, args=('drake', queue))
    # 启动
    p_producer_tony.start()
    p_producer_scott.start()
    p_customer_uzi.start()
    p_customer_drake.start()
    # 等待
    p_producer_tony.join()
    p_producer_scott.join()
    p_customer_uzi.join()
    p_customer_drake.join()


if __name__ == '__main__':
    main()

'''
消费者:drake消费了:大厨:tony生产出第1道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第1道菜:辣椒炒肉
消费者:drake消费了:大厨:tony生产出第2道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第2道菜:辣椒炒肉
消费者:drake消费了:大厨:scott生产出第3道菜:辣椒炒肉
消费者:uzi消费了:大厨:tony生产出第3道菜:辣子鸡
消费者uzi消费完成!
消费者:drake消费了:大厨:scott生产出第4道菜:辣椒炒肉
消费者:drake消费了:大厨:tony生产出第4道菜:辣子鸡
消费者drake消费完成!
'''

(3)JoinableQueue模块

  • 每当向队列对象中存入数据的时候,队列对象内部会有一个计数器 +1
  • 每当调用一次task_done()方法的时候,队列对象内部的计数器 -1
  • join() 当计数器为 0时,继续执行代码
import random
import time
from multiprocessing import Process, Queue,JoinableQueue


def producer(name, food, queue):
    for i in range(1, 5):
        data = f"大厨:{name}生产出第{i}道菜:{food}"
        time.sleep(random.randint(1, 5))
        queue.put(data)
    # 消费完成,添加结束标志
    queue.join()


def customer(name, queue):
    while True:
        food = queue.get()
        if food is None:
            print(f"消费者{name}消费完成!")
            break
        time.sleep(random.randint(1, 5))
        print(f"消费者:{name}消费了:{food}")
        # 告诉队列已经从队列中已经取出一个数据了
        queue.task_done()


def main():
    queue = JoinableQueue()
    # 建立模型(生产者,厨师)
    p_producer_tony = Process(target=producer, args=('tony', '辣子鸡', queue))
    p_producer_scott = Process(target=producer, args=('scott', '辣椒炒肉', queue))
    # 建立模型(消费者,顾客)
    p_customer_uzi = Process(target=customer, args=('uzi', queue))
    p_customer_drake = Process(target=customer, args=('drake', queue))
    # 启动
    p_producer_tony.start()
    p_producer_scott.start()

    # 将消费者设置成守护进程:主进程死亡,子进程跟着死亡
    p_customer_uzi.daemon = True
    p_customer_drake.daemon = True

    p_customer_uzi.start()
    p_customer_drake.start()
    # 等待
    p_producer_tony.join()
    p_producer_scott.join()

    # 等待消息队列中的所有数据被取完再往下执行代码
    queue.join()

if __name__ == '__main__':
    main()

'''
消费者:uzi消费了:大厨:tony生产出第1道菜:辣子鸡
消费者:drake消费了:大厨:tony生产出第2道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第1道菜:辣椒炒肉
消费者:drake消费了:大厨:tony生产出第3道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第2道菜:辣椒炒肉
消费者:drake消费了:大厨:tony生产出第4道菜:辣子鸡
消费者:uzi消费了:大厨:scott生产出第3道菜:辣椒炒肉
消费者:drake消费了:大厨:scott生产出第4道菜:辣椒炒肉
'''

标签:customer,queue,producer,队列,模型,drake,间通信,大厨
From: https://www.cnblogs.com/ligo6/p/18218356

相关文章

  • 如何使用Python和大模型进行数据分析和文本生成
    如何使用Python和大模型进行数据分析和文本生成Python语言以其简洁和强大的特性,成为了数据科学、机器学习和人工智能开发的首选语言之一。随着大模型(LargeLanguageModels,LLMs)如GPT-4的崛起,我们能够利用这些模型实现诸多复杂任务,从文本生成到智能对话、数据分析等等。在......
  • 消息队列
    进程A:#include<stdio.h>#include<stdlib.h>#include<signal.h>#include<unistd.h>#include<sys/types.h>#include<sys/ipc.h>#include<sys/msg.h>#include<errno.h>#include<string.h>intmsgid;......
  • 数据结构:队列
    目录队列的概念和结构队列的实现结构定义初始化判空入队列出队列返回队头元素返回队尾元素返回size销毁 队列的概念和结构队列:只允许在一端进行插入数据操作,在另一端进行删除数据操作的特殊线性表,队列具有先进先出FIFO(FirstInFirstOut)入队列:进行插入操......
  • 检测一切!Grounding DINO 1.5:最强开集目标检测模型
    前言 目标检测领域,迎来了新进展——GroundingDINO1.5,IDEA研究院团队出品,在端侧就可实现实时识别。欢迎关注公众号CV技术指南,专注于计算机视觉的技术总结、最新技术跟踪、经典论文解读、CV招聘信息。本文转载自量子位仅用于学术分享,若侵权请联系删除CV方向的准研究生们,未来......
  • 开源大模型与闭源大模型,你更看好哪一方?
    目录前言 一、数据隐私(一)数据隐私保护(二)用户数据安全二、商业应用(一)开源大模型的优势(二)闭源大模型的优势三、社区参与(一)开源大模型的社区参与与合作(二)闭源大模型的社区参与与合作(三)对行业发展的推动作用小结前言 简介:评价一个AI模型“好不好”“有没有发展......
  • 开源大模型与闭源大模型那个更好?
            开源大模型和闭源大模型各有其优势和劣势,究竟哪个更好取决于具体的应用场景、组织目标、资源和能力等因素。以下是两种模型的一些优势对比: 开源大模型的优势:1.社区支持与合作:开源大模型能够借助全球开发者社区的力量,形成跨地域、跨学科的协作网络,加速问题......
  • 关于Vearch在大模型中使用的一些实践
    背景这两年来大模型及其热门,不仅各大厂家的模型层出不穷,各类RGA、Agent应用也花样繁多。这也带火了一批基础设施,比如Langchain、向量数据库(也叫矢量数据库-VectorDatabase)等。现在市场上的向量库种类特别繁多,但主要还是分为两类,一类是在原有数据库基础上增加了向量相似性检索的......
  • AI绘画整合包最新Stable Diffusion安装包+教程+模型+插件+动作来了(纯教学)
    首先了解一下AI绘画工具,介绍一下什么是StableDiffusion,模型的主要功能和作用StableDiffusion(简称SD),是一种先进的人工智能技术。这项技术的核心能力在于,它能够根据用户提供的文字描述,生成丰富且细致的图像内容。不仅如此,SD还能够处理图像修补、扩展以及基于文本指导的图像转......
  • 核间通信:Linux中RPMsg和OpenAMP详解
    1.核间通信组件简介 目前针对不同级别的操作系统,存在几种核间通信组件,分别是以Linux内嵌组件RPMsg、支持跨平台移植的OpenAMP,短小精简的RPMsg-Lite,这三个组件在代码细节、收发策略、移植性上各有优劣,用户可根据需要选择。它们起初都来源于Linux的RPMsg,遵循统一的协议标准(交互过......
  • 边缘计算——介绍:是一种分布式计算模型
    边缘计算是一种分布式计算模型,它将数据处理和计算资源放置在接近数据产生源头的边缘设备、传感器或用户设备上,以提供更快速、实时的计算和数据分析能力。以下是关于边缘计算的详细解释:定义:边缘计算,如同其名字所示,指的是在网络的“边缘”进行数据处理和计算。这里的“边缘”指的......