首页 > 其他分享 >互斥锁,IPC机制,队列,生产者消费者模型

互斥锁,IPC机制,队列,生产者消费者模型

时间:2024-05-21 20:51:11浏览次数:12  
标签:PC机 Process name producer 队列 queue 互斥 print

Ⅰ 互斥锁

【一】什么是互斥锁

  • 互斥锁其实就是一种锁。为当前进程或线程添加额外的限制
  • 限制当前时间段只能由当前进程使用,当前进程使用完成后才能其他进程继续使用
    • 其作用是保证在同一时刻只有一个线程在访问共享资源,从而避免多个线程同时读写数据造成的问题。
    • 互斥锁的基本原理是在对共享资源进行访问前加锁,使得其他线程无法访问该资源,当访问完成后再解锁,使得其他线程可以进行访问。
    • 通过这种方式,可以保证同一时间只有一个线程在执行关键代码段,从而保证了数据的安全性。
    • 需要注意的是,互斥锁会带来一些额外的开销

【二】互斥锁引入

  • 应该用在多线程中
  • 进程和进程之间的资源是不共享的
  • 多进程中也可以使用互斥锁 :多个进程共享一个终端打印信息的时候就会导致大家一起打印

多个进程操作同一份数据的时候会造成数据的错乱!!!

这个时候需要加锁处理(互斥锁)

​ 将并行变成串行 牺牲了效率但是保证了数据的安全

【三】示例推导(模拟抢票)

# ticket_data.json

{"ticket_num": 2}
###############################################
当多个进程操作同一份数据的时候会造成数据的错乱!!!
这个时候需要加锁处理(互斥锁)
将并行变成串行  牺牲了效率但是保证了数据的安全
###############################################
# ---------错误示例如下--------
import json
from multiprocessing import Process
import time,random

# 查票
def check_ticket(name):
    with open(r'ticket_data.json', 'r', encoding='utf-8') as fp:
        data = json.load(fp)

    print(f"{name}查看当前余票{data.get('ticket_num')}")


# 买票
def buy_ticket(name):
    with open(r'ticket_data.json', 'r', encoding='utf-8') as fp:
        data = json.load(fp)
    # 模拟一下网速延迟
    time.sleep(random.randint(1, 4))
    # 判断是否有余票
    if data.get('ticket_num') > 0:
        data["ticket_num"] -= 1
        with open(r'ticket_data.json', 'w', encoding='utf-8') as fp:
            json.dump(data,fp)
        print(f'{name}抢票成功!')
    else:
        print(f'{name}抢票失败,没有多余的票了')


def sell_ticket(name):
    check_ticket(name)
    buy_ticket(name)

# 模拟多人同时抢票
if __name__ == '__main__':
    for i in range(10):
        p = Process(target=sell_ticket, args=(f'用户_{i}',))
        p.start()


# 用户_0查看当前余票2
# 用户_2查看当前余票2
# 用户_3查看当前余票2
# 用户_8查看当前余票2
# 用户_6查看当前余票2
# 用户_4查看当前余票2
# 用户_1查看当前余票2
# 用户_5查看当前余票2
# 用户_9查看当前余票2
# 用户_7查看当前余票2
# 用户_8抢票成功!
# 用户_5抢票成功!
# 用户_2抢票成功!
# 用户_6抢票成功!
# 用户_1抢票成功!
# 用户_9抢票成功!
# 用户_4抢票成功!
# 用户_0抢票成功!
# 用户_3抢票成功!
# 用户_7抢票成功!
# 加上互斥锁   将并行变成串行 
###################################
互斥锁不能轻易使用  容易造成死锁现象
互斥锁只在处理数据的部分加锁  不能什么地方都加  会严重影响程序的效率
###################################

# 查票可以一次性给所有人看 但是买票环节必须'排队'>>>  互斥锁
# ---------正确示例如下--------

import json
from multiprocessing import Process,Lock
import time,random

# 查票
def check_ticket(name):
    with open(r'ticket_data.json', 'r', encoding='utf-8') as fp:
        data = json.load(fp)

    print(f"{name}查看当前余票{data.get('ticket_num')}")


# 买票
def buy_ticket(name):
    with open(r'ticket_data.json', 'r', encoding='utf-8') as fp:
        data = json.load(fp)
    # 模拟一下网速延迟
    time.sleep(random.randint(1, 4))
    # 判断是否有余票
    if data.get('ticket_num') > 0:
        data["ticket_num"] -= 1
        with open(r'ticket_data.json', 'w', encoding='utf-8') as fp:
            json.dump(data,fp)
        print(f'{name}抢票成功!')
    else:
        print(f'{name}抢票失败,没有多余的票了')


def sell_ticket(name, mutex):
    check_ticket(name)
    # 只需要把买票变成串行
    mutex.acquire()   # 枪锁
    buy_ticket(name)
    mutex.release()   # 放锁


# 模拟多人同时抢票
if __name__ == '__main__':
    #  互斥锁在主进程中产生一把  交给多个子进程使用
    mutex = Lock()
    for i in range(10):
        p = Process(target=sell_ticket, args=(f'用户_{i}', mutex))
        p.start()

# 用户_1查看当前余票2
# 用户_0查看当前余票2
# 用户_4查看当前余票2
# 用户_2查看当前余票2
# 用户_5查看当前余票2
# 用户_3查看当前余票2
# 用户_9查看当前余票2
# 用户_6查看当前余票2
# 用户_7查看当前余票2
# 用户_8查看当前余票2
# 用户_1抢票成功!
# 用户_0抢票成功!
# 用户_7抢票失败,没有多余的票了
# 用户_9抢票失败,没有多余的票了
# 用户_6抢票失败,没有多余的票了
# 用户_8抢票失败,没有多余的票了
# 用户_3抢票失败,没有多余的票了
# 用户_5抢票失败,没有多余的票了
# 用户_2抢票失败,没有多余的票了
# 用户_4抢票失败,没有多余的票了

'''
锁相关知识:
	行锁:针对行数据加锁 同一时间只能一个人操作
	表锁:针对表数据加锁 同一时间只能一个人操作
'''

【四】互斥锁的优缺点

【1】加锁的优点

  • 加锁可以保证多个进程修改同一块数据时

    • 同一时间只能有一个任务可以进行修改,即串行的修改
    • 没错,速度是慢了,但牺牲了速度却保证了数据安全。

【2】加锁的缺点

  • 虽然可以用文件共享数据实现进程间通信,但问题是:

    • 1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    • 2.需要自己加锁处理

【3】优化方案

  • 因此我们最好找寻一种解决方案能够兼顾:

    • 1、效率高(多个进程共享一块内存的数据)
    • 2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
  • 1 队列和管道都是将数据存放于内存中

  • 2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,

  • 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

【五】行锁和表锁

行锁和表锁是数据库中常用的锁定机制。

【1】行锁

  • 行锁是在对数据库表中的某个数据行进行修改时

    • 一次只允许一个用户操作该行
    • 其他用户如果需要修改该行数据就必须等待。
  • 通过行锁定可以避免多个用户同时修改同一行数据所导致的数据不一致问题。

【2】表锁

  • 表锁则是当一个用户操作某个数据库表时

    • 会锁定整个表,其他用户同时不能操作该表。
  • 这在一些特殊场景下比如表维护、备份等是非常有用的。

【3】小结

  • 总的来说

    • 行锁定是比较细粒度的锁定
    • 而表锁定则是更为粗粒度的锁定方法。

【六】特别提醒

  • 锁不要轻易使用,容易造成死锁现象
  • 锁只在处理数据的部分加,用来保证数据的安全

Ⅱ IPC机制(进程间通信)

【一】什么是进程间通信(Inter-Process Communication, IPC)

  • 进程间通信(Inter-Process Communication, IPC)是指两个或多个进程之间进行信息交换的过程
  • 它是一种计算机编程技术,用于在不同进程之间共享数据和资源

【二】如何实现进程间通信

  • 借助于消息队列,进程可以将消息放入队列中,然后由另一个进程从队列中取出。
  • 这种通信方式是非阻塞的,即发送进程不需要等待接收进程的响应即可继续执行。
  • multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

【三】什么是管道

  • 管道是一种半双工的通信机制,即只能在一个方向上进行数据传输
  • 子进程可以通过继承父进程的管道来实现通信
  • stdin、stdout和stderr是Python中的三个内置文件对象,它们分别代表标准输入、标准输出和标准错误。
  • 这些对象也可以作为管道使用。
  • 当我们在一个进程中使用read方法读取管道内的消息后,其他进程将无法再获取该管道内的任何其他消息。
  • 因此,我们需要使用锁或其他同步机制来确保多个进程能够正确地访问和修改共享资源。

【四】什么是队列(管道 + 锁)

  • 队列是一种线程安全的数据结构,它支持在多线程环境中高效地实现生产者-消费者模型。
  • 队列的特性是先进先出(First-In-First-Out, FIFO),即先插入队列的数据将先被取出。
  • 堆栈是一种后进先出(Last-In-First-Out, LIFO)的数据结构,与队列相反,最后插入的数据将首先被取出。

【五】进程间通信的目的

  • 为了实现进程通信
  • 某一个子进程和其他子进程之间需要传输数据

Ⅲ 队列

'''
队列:先进先出(使用频率很高)
堆栈:先进后出(特定情况下使用)
'''

【一】创建队列的类(底层就是以管道和锁定的方式实现)

【1】语法

import queue

q = queue.Queue(maxsize)
  • Queue([maxsize]):

    • 创建共享的进程队列
    • Queue是多进程安全的队列
    • 可以使用Queue实现多进程之间的数据传递。

【2】参数介绍

  • maxsize是队列中允许最大项数,省略则无大小限制

【二】方法介绍

【1】主要方法

  • q.put() 想队列中插入数据

    • 用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。
    • 如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常
    • 如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
  • q.get() 从队列中获取数据

    • 可以从队列读取并且删除一个元素,同样,get方法有两个可选参数:blocked和timeout。
    • 如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
    • 如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
  • q.get_nowait()

    • 同q.get(False)
  • q.put_nowait()

    • 同q.put(False)
  • q.empty() 判断当前队列是否空了

    • 调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
  • q.full() 判断当前队列是否满了

    • 调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
  • q.qsize() 获取当前队列中存在的数据量

    • 返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

【三】使用


from multiprocessing import Queue

q = Queue(5) # 自定义队列的长度

# 朝队列里放数据
q.put(111)
q.put(222)
q.put(333)
print(q.full())   # False  判断队列是否满了
q.put(444)
q.put(555)
print(q.full())   # True
# q.put(666)  # 超出最大长度 原地阻塞等待队列中出现空位

print(q.get())
print(q.get())
print(q.get())
print(q.empty())  # False  判断队列是否为空
print(q.get())
print(q.get())
print(q.empty())  # True
# print(q.get()) # 队列中没有值 继续获取则阻塞等待队列中给值
print(q.get_nowait()) # 队列中如果没有值 直接报错

'''
    full()
    empty()
    get_nowait()
上述方法能否在并发的场景下精准使用?
    不能用!!!!

'''

# (1)创建队列对象
queue = Queue(maxsize=3)
# (2)放数据
queue.put(1)
queue.put(2)
queue.put(3)
# print(queue.full()) # True 返回当前队列是否填充满,如果满了就会返回True
# queue.put(4, timeout=1)  # timeout : 延迟时间,延迟时间抛出异常 queue.Full
# queue.put(4)  # put 的时候如果超出队列最大容量就会阻塞,直至有一个人取出队列中的一个数据
# queue.put_nowait(4)  # put 的时候如果超出队列最大容量就直接报错 queue.Full
# (3)取数据
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.qsize())  # 判断当前队列中还存在多少数据
# print(queue.empty()) # 判断当前队列是够全部为空
# print(queue.get(timeout=1)) # timeout : 延迟时间,延迟时间抛出异常 _queue.Empty
# print(queue.get()) # 如果队列中没有数据的时候就会一直阻塞,直至队列中增加新的数据
# print(queue.get_nowait()) # 如果队列中没有数据的时候就报错 _queue.Empty

【四】队列实现进程间通信个人

【1】主子通信

from multiprocessing import Process, Queue

def producer(q):
    print('子进程producer从队列中取值>>> ',q.get())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q,))
    p.start()
    q.put(741)  # 主程序往队列中存放数据741
    print('主程序')

# 主程序
# 子进程producer从队列中取值>>>  741

【2】子子通信

from multiprocessing import Process, Queue

def producer(q):
    # print('子进程producer从队列中取值>>> ',q.get())
    q.put('子进程producer往队列中添加值')


def consumer(q):
    print('子进程consumer从队列中取值>>> ',q.get())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q,))
    p1 = Process(target=consumer, args=(q,))
    p.start()
    p1.start()
    # q.put(741)  # 主程序往队列中存放数据741
    print('主程序')

# 主程序
# 子进程consumer从队列中取值>>>  子进程producer往队列中添加值

【五】队列实现进程间通信模板

【1】主子通信

import multiprocessing
import time
import random


def son_process(name, queue):
    print(f'son_process {name} is starting ... ')
    msg = queue.get()
    print(f'son_process {name} recv msg is {msg}')
    print(f'son_process {name} is ending ... ')


def main_process():
    # 【一】创建一个队列对象 --- 用多进程里面的队列模块
    queue = multiprocessing.Queue(5)
    # from 来自
    # main_process 主进程
    # message 消息
    # handsome 帅!
    msg = f'from son_process the message is you are handsome!'
    queue.put(msg)
    task = multiprocessing.Process(target=son_process,
                                   kwargs={'name': "son", "queue": queue})
    task.start()
    task.join()


if __name__ == '__main__':
    main_process()

【2】子子通信

from multiprocessing import Process, Queue
import os


# 子进程和子进程之间进行通信
def producer(queue):
    # 生产者
    # 向队列中放数据
    print(f'this is producer {os.getpid()} starting ... ')
    # 向队列中添加数据
    msg = 'this is a message from producer'
    queue.put(msg)
    print(f'this is producer {os.getpid()} ending ... ')
    ...


def customer(queue):
    # 消费者
    print(f'this is customer {os.getpid()} starting ... ')
    msg = queue.get()
    print(f'the message is {msg}')
    print(f'this is customer {os.getpid()} ending ... ')


def main():
    # 【一】得到一个队列对象
    queue = Queue(5)
    # 【二】创建两个多进程对象
    # 【1】生产者子进程
    producer_process = Process(target=producer, args=(queue,))
    # 【2】消费者子进程
    customer_process = Process(target=customer, args=(queue,))
    # 【3】启动子进程
    producer_process.start()
    customer_process.start()
    # 【4】阻塞并行
    producer_process.join()
    customer_process.join()

    # this is customer 302192 starting ...
    # this is producer 303140 starting ...
    # this is producer 303140 ending ...
    # the message is this is a message from producer
    # this is customer 302192 ending ...


if __name__ == '__main__':
    main()

Ⅳ 生产者消费者模型

【一】理论

【1】生产者模型

  • 生产者模型和消费者模型是指通过利用队列解耦生产者和消费者的一种并发编程模型。
  • 在生产者模型中,生产者负责将数据放入共享队列中,而消费者则从队列中取出数据进行处理
  • 生产者和消费者之间通过共享这个队列来进行信息的交流
  • 这种模型适用于生产者和消费者之间的处理速度不一致的情况,同时还能够保证数据传输的安全性和正确性。

【2】消费者模型

  • 在消费者模型中,消费者负责向队列中插入任务,而由线程池中的工作线程进行任务的处理。
  • 消费者和工作线程之间通过共享线程池中的任务队列来完成任务分发和执行。
  • 这种模型适用于任务处理需要一定时间的情况,能够充分利用多线程的优势提高系统的并发性能,提高效率。

【3】小结

  • 生产者:负责生产数据
  • 消费者:消费数据
  • 该模型还需要一个媒介:队列
# 举例
如果是用户下单
同一时刻是不是有100个用户下单
100个订单其实就是生产者生产出来的数据
10个人去处理 10个人一起处理 -->
负责处理的10个人就是消费者

【二】场景引入

  • 比如做包子是先将包子做好后放在蒸笼(媒介)里面,买包子的去蒸笼里面拿
  • 厨师做菜之后用盘子(媒介)装着给消费者端过去
  • 生产者与消费者之间不是直接做交互的,而是借助于媒介
  • 生产者(做包子的) + 媒介(蒸包子) + 消费者(吃包子的)

【1】生产者消费者相等

  • 消费者得不到数据 运行会卡住
from multiprocessing import Process, Queue
import time
import random


def producer(name, food, q):
    for i in range(10):
       data = f'{name} 生产了{food}{i}'
       print(data)
       time.sleep(random.randint(1,3)) # 模拟生产过程
       q.put(data)


def consumer(name, q):
    while True:
        food = q.get()
        time.sleep(random.random())
        print(f'{name}吃了{food}')



if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=('小福贵', '十全大补汤', q))
    p2 = Process(target=producer, args=('老八', '秘制小汉堡', q))

    c1 = Process(target=consumer, args=('白琉璃', q))
    c2 = Process(target=consumer, args=('紫月', q))
    p1.start()
    p2.start()
    c1.start()
    c2.start()


# 产生问题:
# 生产者生产完所有数据
# 消费者消费完所有数据,但是消费者还在等待生产者生产数据
# 解决思路:
# 在生产者生产完最后一道菜的时候添加标志
# 消费者消费完最后一道菜的时候应该主动结束消费

【2】为生产者添加结束标志

from multiprocessing import Process, Queue
import time
import random


def producer(name, food, q):
    for i in range(10):
       data = f'{name} 生产了{food}{i}'
       print(data)
       time.sleep(random.randint(1,3)) # 模拟生产过程
       q.put(data)


def consumer(name, q):
    while True:
        food = q.get()
        if food == None:
            break
        time.sleep(random.random())
        print(f'{name}吃了{food}')



if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=('小福贵', '十全大补汤', q))
    p2 = Process(target=producer, args=('老八', '秘制小汉堡', q))

    c1 = Process(target=consumer, args=('白琉璃', q))
    c2 = Process(target=consumer, args=('紫月', q))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    # 生产者生产完所有数据之后 往队列里添加结束的信号
    p1.join()
    p2.join()
    q.put(None)  # 结束信号的个数要跟消费者个数一致才可以
    q.put(None)
    '''队列中其实已经自己加了锁 所以多进程取值也不会冲突 并且取走了就没了'''
    
'''
这种要有几个生产者就要有几个结束信号  很麻烦
'''

【3】优化添加结束标志

from multiprocessing import Process, Queue,JoinableQueue
import time
import random


def producer(name, food, q):
    for i in range(10):
       data = f'{name} 生产了{food}{i}'
       print(data)
       time.sleep(random.randint(1,3)) # 模拟生产过程
       q.put(data)


def consumer(name, q):
    while True:
        food = q.get()
        # if food == None:
        #     break
        time.sleep(random.random())
        print(f'{name}吃了{food}')
        q.task_done()  # 每次取完数据必须给队列一个反馈


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer, args=('小福贵', '十全大补汤', q))
    p2 = Process(target=producer, args=('老八', '秘制小汉堡', q))

    c1 = Process(target=consumer, args=('白琉璃', q))
    c2 = Process(target=consumer, args=('紫月', q))
    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    # 生产者生产完所有数据之后 往队列里添加结束的信号
    p1.join()
    p2.join()
    # q.put(None)  # 结束信号的个数要跟消费者个数一致才可以
    # q.put(None)
    q.join()   # 等待队列中数据全部被取出(一定要让生产者全部结束才能判断正确)
    '''执行完上述的join方法表示消费者也已经消费完数据了
    
    解决办法就是把c1,c2设置为守护进程 数据消费完之后 自动结束
    '''


【4】模板引入到结束

# 【1】场景
# 生产者 ---> 厨子 --> 做菜 ---> 做包子
# 借助媒介 ---> 蒸笼 ---> 把生包子扔到蒸笼里面蒸熟,消费者再去蒸笼里面拿熟包子
# 消费者 --> 拿包子 ---> 吃包子
'''
import time
import random
from multiprocessing import Process, Queue


# 【2】生产者模型
def producer(name, food, queue):
    # 【一】生产者的标志 : 名字
    # 【二】生产出食物
    # 【三】借助队列将食物扔到队列中
    for i in range(5):
        data = f'当前大厨{name} :>>> 生产出了 第{i}道{food}!'
        sleep_time = random.randint(1, 3)
        time.sleep(sleep_time)
        queue.put(data)


def customer(name, queque):
    while True:
        food = queque.get()
        sleep_time = random.randint(1, 3)
        time.sleep(sleep_time)
        data = f'当前顾客{name} :>>> 消费了 {food}!'
        print(data)

# 产生问题:
# 生产者生产完所有数据
# 消费者消费完所有数据,但是消费者还在等待生产者生产数据
# 解决思路:
# 在生产者生产完最后一道菜的时候添加标志
# 消费者消费完最后一道菜的时候应该主动结束消费
def main():
    # 【一】创建生产者对象
    # 【二】创建队列对象
    queue = Queue(5)
    # 【1】生产者 dream
    p_producer_dream = Process(
        target=producer,
        args=('dream', '鱼香肉丝', queue)
    )
    # 【2】生产者 hope
    p_producer_hope = Process(
        target=producer,
        args=('hope', '宫保鸡丁', queue)
    )
    # 【3】消费者 opp
    p_producer_opp = Process(
        target=customer,
        args=('opp', queue)
    )
    # 【4】消费者 ooo
    p_producer_ooo = Process(
        target=customer,
        args=('ooo', queue)
    )

    p_producer_dream.start()
    p_producer_hope.start()
    p_producer_opp.start()
    p_producer_ooo.start()

    p_producer_dream.join()
    p_producer_hope.join()
    p_producer_opp.join()
    p_producer_ooo.join()
'''
# 【2】给生产者和消费者添加标志
import time
import random
from multiprocessing import Process, Queue

# 【2】生产者模型
'''
def producer(name, food, queue):
    # 【一】生产者的标志 : 名字
    # 【二】生产出食物
    # 【三】借助队列将食物扔到队列中
    for i in range(5):
        data = f'当前大厨{name} :>>> 生产出了 第{i}道{food}!'
        sleep_time = random.randint(1, 3)
        time.sleep(sleep_time)
        queue.put(data)
    queue.put('q')


def customer(name, queque):
    while True:
        food = queque.get()
        if food == 'q':
            data = f'当前顾客{name} :>>>店铺已打样!'
            print(data)
            break
        sleep_time = random.randint(1, 3)
        time.sleep(sleep_time)
        data = f'当前顾客{name} :>>> 消费了 {food}!'
        print(data)


# 产生问题:
# 生产者生产完所有数据
# 消费者消费完所有数据,但是消费者还在等待生产者生产数据
# 解决思路:
# 在生产者生产完最后一道菜的时候添加标志
# 消费者消费完最后一道菜的时候应该主动结束消费
def main():
    # 【一】创建生产者对象
    # 【二】创建队列对象
    queue = Queue(5)
    # 【1】生产者 dream
    p_producer_dream = Process(
        target=producer,
        args=('dream', '鱼香肉丝', queue)
    )
    # 【2】生产者 hope
    p_producer_hope = Process(
        target=producer,
        args=('hope', '宫保鸡丁', queue)
    )
    # 【3】消费者 opp
    p_producer_opp = Process(
        target=customer,
        args=('opp', queue)
    )
    # 【4】消费者 ooo
    p_producer_ooo = Process(
        target=customer,
        args=('ooo', queue)
    )

    p_producer_dream.start()
    p_producer_hope.start()
    p_producer_opp.start()
    p_producer_ooo.start()

    p_producer_dream.join()
    p_producer_hope.join()
    p_producer_opp.join()
    p_producer_ooo.join()
'''
from multiprocessing import JoinableQueue


# 【3】借助别人写好的模块完成标志的增加
def producer(name, food, queue):
    # 【一】生产者的标志 : 名字
    # 【二】生产出食物
    # 【三】借助队列将食物扔到队列中
    for i in range(5):
        data = f'当前大厨{name} :>>> 生产出了 第{i}道{food}!'
        sleep_time = random.randint(1, 3)
        time.sleep(sleep_time)
        print(data)
        queue.put(data)
    # 结束标志自己主动加 q
    # queue.put('q')
    # 只需要借助模块的方法
    queue.join()


def customer(name, queue):
    while True:
        food = queue.get()
        sleep_time = random.randint(1, 3)
        time.sleep(sleep_time)
        data = f'当前顾客{name} :>>> 消费了 {food}!'
        print(data)

        # 当生产者生产完所有数据后会增加 join 标志 ---> 阻塞
        # task_done 这个方法接收到 自己主动结束消费
        queue.task_done()  # 0


# 生产者和消费者的子进程都要启动
# 生产者会等待所有生产的子进程结束后再结束
# 消费者子进程一直在跑 不给消费者加join
# 给消费者子进程增加守护进程 --> 会随着主进程死掉二死掉
def main():
    # 【一】创建生产者对象
    # 【二】创建队列对象
    queue = JoinableQueue(5)
    # 【1】生产者 dream
    p_producer_dream = Process(
        target=producer,
        args=('dream', '鱼香肉丝', queue)
    )
    # 【2】生产者 hope
    p_producer_hope = Process(
        target=producer,
        args=('hope', '宫保鸡丁', queue)
    )
    # 【3】消费者 opp
    p_customer_opp = Process(
        target=customer,
        args=('opp', queue)
    )
    # 【4】消费者 ooo
    p_customer_ooo = Process(
        target=customer,
        args=('ooo', queue)
    )

    p_customer_opp.daemon = True
    p_customer_ooo.daemon = True

    # 只需要启动生产者生产数据
    # 生产者生产结束 加了一个标志 告诉消费者 生产结束
    # 消费者task_done标志知道生产者没有东西了,主动结束
    p_producer_dream.start()
    p_producer_hope.start()
    p_customer_opp.start()
    p_customer_ooo.start()

    p_producer_dream.join()
    p_producer_hope.join()


if __name__ == '__main__':
    main()

标签:PC机,Process,name,producer,队列,queue,互斥,print
From: https://www.cnblogs.com/zyb123/p/18204901

相关文章

  • 风险控制1、如果你的项目发布后失败,主要的原因会是什么?2、每个团队列出自己项目中目前
    项目发布失败的主要原因项目发布后失败可能由多种原因导致,这里列出几个主要的:需求不符合市场实际:产品没有满足目标市场的真实需求或者未能准确捕捉到用户的痛点。用户体验不佳:产品界面复杂难用,用户操作困难,导致用户流失。技术问题:产品存在缺陷或技术故障,影响功能实现或性能稳......
  • 【HZERO】事件以及消息队列
    事件以及消息队列服务事件:https://open.hand-china.com/document-center/doc/component/157/18147?doc_id=408820&_back=%2Fdocument-center%2Fsearch%3Fs%3D%25E4%25BA%258B%25E4%25BB%25B6%25E6%259C%258D%25E5%258A%25A1&doc_code=197230事件服务:https://open.hand-china.c......
  • .NET 中 Channel 类(内存级消息队列)简单使用
    Channel是干什么的#TheSystem.Threading.Channelsnamespaceprovidesasetofsynchronizationdatastructuresforpassingdatabetweenproducersandconsumersasynchronously.Thelibrarytargets.NETStandardandworksonall.NETimplementations.Channelsa......
  • 单调队列
    单调队列考虑在一个序列中维护一个类似于窗口的东西。以下不妨设求得是窗口最大值。首先根据贪心,如果当前数整个窗口中最大的,并且是最靠前的,那么这个数前面的所有数都不会对答案产生一点贡献。于是考虑维护一个单调递增的序列,需要从中找出答案。设置一个首指针,未指针代表这个窗......
  • 一个Java基于阻塞的定时消费内存队列
     @Getter@AllArgsConstructorpublicenumInsertQueueEnum{A(30000,10,TimeUnit.SECONDS,2,1000),;privatefinalintcapacity;//队列长度privatefinalinttime;//最长阻塞时间privatefinalTimeUnittimeUnit;//最长阻塞时间单位privatefi......
  • 【CodeChef】3out1in(优先队列)
    题目大意:给出数组a,问对于所有满足\(1\lek\len\)的奇数\(k\),\(f([a_1,a_2,...,a_k])\)的值。\(f([a_1,a_2,...,a_n])\)的值为对数组\([a_1,a_2,...,a_n]\)进行\(\frac{n+1}{2}\)次操作(选择数组中的三个元素,将其中一个取相反数,然后让它们合并成一个元素)后,数组最后剩下元素的最大......
  • kombu & celery:如何在Python中舒适地使用消息队列
    Kombu和Celery是Python中的两个库,它们可分开或结合起来使用,以实现基于分布式消息传递的异步任务队列。KombuKombu是一个Python消息库,它为多种消息队列提供了抽象和统一的使用方式。它支持AMQP协议的消息队列服务,如RabbitMQ和Redis,以及其他一些通过插件实现的传输方......
  • 线程安全队列(使用互斥锁进行实现)
    线程安全队列(使用互斥锁进行实现)没有设置队列上限的线程安全队列只需要采取一个std::condition_variable变量,用于处理队列为空的情况以下是示例代码,涉及了std::mutex和std::condition_variable、std::unique_lock、std::lockguard等多线程交互的类。测试方式采取的是3个生成者......
  • PikaScript - 面向嵌入式的超轻量级python引擎+Ring-Buffer - 仅80行代码的超简洁环形
    1、PikaScript-面向嵌入式的超轻量级python引擎PikaScript(前称mimiscript)是一个完全重写的超轻量级python引擎,零依赖,零配置,可以在少于4KB的RAM下运行(如stm32g030c8和stm32f103c8),极易部署和扩展。项目地址:https://github.com/pikasTech/pikascriptPikaScript是使用c语言写......
  • 力扣-232. 用栈实现队列
    1.题目信息2.题解2.1双栈的使用(用栈实现队列)思路我们一开始可能会想到对于每次入栈操作——由于我们可能希望他是加在队尾(栈底)而不是队头(栈首),所以我们就进行一次首尾互换,将instack中的数据倒腾到outstack,由于栈先进后出的特性,所以这时候原来的栈底在头部,我们直接将元素pus......