首页 > 系统相关 >并发编程之进程通信(生产消费模型)

并发编程之进程通信(生产消费模型)

时间:2024-01-21 22:04:36浏览次数:28  
标签:Process 队列 消费者 生产者 res 模型 编程 并发 name

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

(1)生产者模型

  • 生产者模型和消费者模型是指通过利用队列解耦生产者和消费者的一种并发编程模型。
  • 在生产者模型中,生产者负责将数据放入共享队列中,而消费者则从队列中取出数据进行处理。
  • 生产者和消费者之间通过共享这个队列来进行信息的交流。
  • 这种模型适用于生产者和消费者之间的处理速度不一致的情况,同时还能够保证数据传输的安全性和正确性。

(2)消费者模型

  • 在消费者模型中,消费者负责向队列中插入任务,而由线程池中的工作线程进行任务的处理。
  • 消费者和工作线程之间通过共享线程池中的任务队列来完成任务分发和执行。
  • 这种模型适用于任务处理需要一定时间的情况,能够充分利用多线程的优势提高系统的并发性能,提高效率。

(3)小结

  • 生产者:生产/制造东西
  • 消费者:消费/处理东西
  • 该模型还需要一个媒介

为什么要使用生产者消费者模型

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。

同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

  • 比如做包子是先将包子做好后放在蒸笼(媒介)里面,买包子的去蒸笼里面拿
  • 厨师做菜之后用盘子(媒介)装着给消费者端过去
  • 生产者与消费者之间不是直接做交互的,而是借助于媒介
  • 生产者(做包子的) + 媒介(蒸包子) + 消费者(吃包子的)

如何实现生产者消费者模型

进程间引入队列可以实现生产者消费者模型,通过使用队列无需考虑锁的概念,因为进程间的通信是通过队列来实现的;

生产者生产的数据往队列里面写,消费者消费数据直接从队列里面取,这样就对实现了生产者和消费者之间的解耦。

生产者 -- >  队列  <--消费者

Queue实现生产者消费者模型

消费者生产者模型代码

from multiprocessing import Process, Queue
import time
 
# 消费者方法
def consumer(q, name):
    while True:
        res = q.get()
        # if res is None: break
        print("%s 吃了 %s" % (name, res))
 
# 生产者方法
def producer(q, name, food):
    for i in range(3):
        time.sleep(1)  # 模拟生产西瓜的时间延迟
        res = "%s %s" % (food, i)
        print("%s 生产了 %s" % (name, res))
        # 把生产的vegetable放入到队列中
        q.put(res)
 
if __name__ == "__main__":
    #创建队列
    q = Queue()
    # 创建生产者
    p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
    c1 = Process(target=consumer, args=(q, "peter",))
    p1.start()
    c1.start()
 
    # p1.join()
    # q.put(None)
    print("主进程")

直接执行会出现一个问题就是生产者生产完了,没有向消费者发送一个停止的信号,所以消费者一直会一直阻塞在q.get(),导致程序无法退出。

解决方案

方案一:为生产者添加结束标志

思路:向队列的结尾添加结束标志的符号,作为结束的标志

不足:但如果有n个消费者,就需要发送n个结束信号,这种方式就不是那么简洁

from multiprocessing import Process, Queue
import time


# 消费者方法
def consumer(q, name):
    while True:
        res = q.get()
        if res is None: break
        print("%s 吃了 %s" % (name, res))


# 生产者方法
def producer(q, name, food):
    for i in range(3):
        time.sleep(1)  # 模拟生产西瓜的时间延迟
        res = "%s %s" % (food, i)
        print("%s 生产了 %s" % (name, res))
        # 把生产的vegetable放入到队列中
        q.put(res)


if __name__ == "__main__":
    # 创建队列
    q = Queue()
    # 创建生产者
    p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
    p2 = Process(target=producer, args=(q, "kelly2", "香蕉"))
    c1 = Process(target=consumer, args=(q, "peter",))
    c2 = Process(target=consumer, args=(q, "peter2",))
    c3 = Process(target=consumer, args=(q, "peter3",))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    c3.start()

    p1.join()
    p2.join()
    q.put(None)
    q.put(None)
    q.put(None)
    print("主进程")
    
    
# kelly 生产了 西瓜 0
# peter2 吃了 西瓜 0
# kelly2 生产了 香蕉 0
# peter 吃了 香蕉 0
# kelly 生产了 西瓜 1
# peter3 吃了 西瓜 1
# kelly2 生产了 香蕉 1
# peter2 吃了 香蕉 1
# kelly 生产了 西瓜 2
# peter 吃了 西瓜 2
# kelly2 生产了 香蕉 2
# peter3 吃了 香蕉 2
# 主进程

其实我们现在就是生产者生产完数据之后想往队列里面发送一个结束信号,python语言提供了另外一种队列JoinableQueue([maxsize])来解决这种问题

方案二:JoinableQueue实现生产者消费者模型

思路:每当向队列对象中存入数据的时候,队列对象内部会有一个计数器 +1,每当调用一次 task_done() 方法的时候,队列对象内部的计数器 -1,join() 当计数器为 0 时,执行退出代码。

from multiprocessing import Process, JoinableQueue
import time


# 消费者方法
def consumer(q, name):
    while True:
        res = q.get()
        if res is None: break
        print("%s 吃了 %s" % (name, res))
        q.task_done()  # 发送信号给q.join(),表示已经从队列中取走一个值并处理完毕了


# 生产者方法
def producer(q, name, food):
    for i in range(3):
        time.sleep(1)  # 模拟生产西瓜的时间延迟
        res = "%s %s" % (food, i)
        print("%s 生产了 %s" % (name, res))
        # 把生产的vegetable放入到队列中
        q.put(res)
    q.join()  # 等消费者把自己放入队列的所有元素取完之后才结束


if __name__ == "__main__":
    # q = Queue()
    q = JoinableQueue()
    # 创建生产者
    p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
    p2 = Process(target=producer, args=(q, "kelly2", "蓝莓"))
    # 创建消费者
    c1 = Process(target=consumer, args=(q, "peter",))
    c2 = Process(target=consumer, args=(q, "peter2",))
    c3 = Process(target=consumer, args=(q, "peter3",))

    c1.daemon = True
    c2.daemon = True
    c3.daemon = True

    p_l = [p1, p2, c1, c2, c3]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    # 1.主进程等待p1,p2进程结束才继续执行
    # 2.由于q.join()的存在,生产者只有等队列中的元素被消费完才会结束
    # 3.生产者结束了,就代表消费者已经消费完了,也可以结束了,所以可以把消费者设置为守护进程(随着主进程的退出而退出)

    print("主进程")

# kelly2 生产了 蓝莓 0
# kelly 生产了 西瓜 0
# peter 吃了 蓝莓 0
# peter2 吃了 西瓜 0
# kelly2 生产了 蓝莓 1
# peter3 吃了 蓝莓 1
# kelly 生产了 西瓜 1
# peter 吃了 西瓜 1
# kelly2 生产了 蓝莓 2
# peter2 吃了 蓝莓 2
# kelly 生产了 西瓜 2
# peter3 吃了 西瓜 2
# 主进程

【补充】JoinableQueue模块

(1)介绍

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

(2)使用

JoinableQueue([maxsize])
maxsize是队列中允许最大项数,省略则无大小限制。

from multiprocessing import JoinableQueue
    
q = JoinableQueue()

# 方法介绍:JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

# q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

# q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

标签:Process,队列,消费者,生产者,res,模型,编程,并发,name
From: https://www.cnblogs.com/xiao01/p/17978439

相关文章

  • 并发编程之多线程理论篇
    什么叫线程线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。线程自己不用有系统资源,只拥有一点在运行中必不可少的资源,但它可与同属一个一个进程的其他线程共享其所拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个......
  • 并发编程之守护线程
    无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁需要强调的是:运行完毕并非终止运行#1.对主进程来说,运行完毕指的是主进程代码运行完毕#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕#1主进程在其代码结束后就......
  • 并发编程之多线程操作篇
    多线程简单介绍多线程,或者说多任务,指的是操作系统同时运行多个任务。例如,听歌、洗衣服、看视频可以同时进行,这种就是多任务。单核CPU执行多任务:操作系统轮流让各个任务交替执行,任务1执行t1时间,切换到任务2,任务2执行t2时间,再切换到任务3,执行t3时间...如此反复执行,表面上看,每个任......
  • 并发编程之操作系统引入
    一、引言顾名思义,进程就是正在执行的一个过程。进程是对正在运行程序的一个抽象说法。所谓进程,起源于操作系统最核心的概念,操作系统的其他所有内容都是围绕进程的概念展开的。所以想要真正了解进程,必须事先了解操作系统。二、为什么要有操作系统现代的计算机系统主要是由一个......
  • 并发编程之多进程理论篇
    引言进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是......
  • 阿里发布通义千问!1行代码,免费对话GPT大模型
    大家好,这里是程序员晚枫,今天给大家分享一个好用的东西......
  • 【译】大型语言模型的直观解释
    原作:史蒂夫·纽曼引子:我没有深入研究数学,而是解释了“为什么”它们被构建为“预测下一个单词”引擎,并提出了为什么它们会出现概念性错误的理论。 有很多文章解释了ChatGPT等大型语言模型(LLMs)的工作原理。然而,他们往往会深入研究那些与大多数用户无关的细枝末叶。了解“t......
  • 为大模型工程提效,基于阿里云 ACK 的云原生 AI 工程化实践
    作者:张凯背景以GPT(GenerativePre-trainedTransformer)和Diffusionmodel为代表的大语言模型(Largelanguagemodel,LLM)和生成式人工智能(Generativeartificialintelligence,GAI)在过往两年,将人们对AI的梦想与期待推向了一个新高峰。这一次,AI带来的“智能”效果和“涌现”能力,吸......
  • D6-OpenCompass 大模型评测
    测评结果使用OpenCompass评测InternLM2-Chat-7B模型在C-Eval数据集上的性能:pythonrun.py--datasetsceval_gen--hf-path/share/temp/model_repos/internlm-chat-7b/--tokenizer-path/share/temp/model_repos/internlm-chat-7b/--tokenizer-kwargspadding_side='left'......
  • 全赞GPT - 全赞工程师的知乎问答训练的GPT大模型
    之前在知乎问答上吹水说我训练了我国第81个大模型,想想该兑现了。第一步,准备问答数据,分为训练集和测试集两个部分,刚开始我准备用自己的知乎问答作为数据集,发现有困难,一是我的问答数量太少,还不够一千条,第二知乎不让导出,爬虫啥的我已经戒了,所以决定干脆找一个开源的中文问答数据......