首页 > 其他分享 >27.并发编制【四】互斥锁与队列

27.并发编制【四】互斥锁与队列

时间:2024-05-22 16:55:12浏览次数:16  
标签:--- 27 name 队列 ST queue 互斥 进程 ticket

【一】互斥锁(进程间同步)

1)概念

  • 一种用于多线程编程中控制对方共享资源访问机制

  • 为当前进程或线程添加额外的限制,限制当前时间段只能由当前进程使用,当前进程使用完成后才能其他进程继续使用

  • 其可保证同一时间只有一个进程在执行关键代码段,从而保证了数据的安全性

2)多个进程共享一个打印终端

1.为加锁

并发运行,效率高,但竞争同一终端,有可能发送数据混乱

from multiprocessing import Process
import os, time
def work(name):
    print(f'{name} is running---{os.getpid()}')
    time.sleep(2)
    print(f'{name} is done---{os.getpid()}')
if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work, args=(f'ST_{i}',))
        p.start()
# ST_0 is running---18820
# ST_1 is running---17412
# ST_2 is running---15656
# ST_0 is done---18820ST_1 is done---17412
#
# ST_2 is done---15656

2.加锁

串行运行,牺牲了运行效率,但避免了混乱

from multiprocessing import Process, Lock
import os, time
def work(name, lock):
    # 加锁,限制只能一个进程使用
    lock.acquire()
    print(f'{name} is running---{os.getpid()}')
    time.sleep(2)
    print(f'{name} is done---{os.getpid()}')
    # 释放锁,解除限制
    lock.release()
if __name__ == '__main__':
    # 创建锁对象,
    lock = Lock()
    # 创建子进程的列表
    task_list = []
    for i in range(3):
        p = Process(target=work, args=(f'ST_{i}', lock))
        # 添加到子进程中
        task_list.append(p)
        p.start()
        # 使用另一个循环等待所有进程完成
        for task in task_list:
            task.join()
# ST_0 is running---17908
# ST_0 is done---17908
# ST_1 is running---4896
# ST_1 is done---4896
# ST_2 is running---20964
# ST_2 is done---20964

3)多个进程共享同一文件

1.未加锁

每个用户都能抢到票,哪怕票不够

from multiprocessing import Process
import time, random, json
ticket_data = {'number': 2}

# 初始化票数文件
def init_ticket():
    with open('ticket.json', 'w') as f:
        json.dump(ticket_data, f)

# 读取票数
def get_ticket():
    with open('ticket.json', 'r') as f:
        ticket_data = json.load(f)
    return ticket_data

# 数据更新
def save_ticket(new_data):
    with open('ticket.json', 'w') as f:
        json.dump(new_data, f)

# 买票
def buy_ticket(name):
    ticket_data = get_ticket()
    # 模拟不同用户抢票速度
    time.sleep(random.randint(1, 3))
    # 抢票
    if ticket_data['number'] > 0:
        ticket_data['number'] -= 1
        save_ticket(new_data=ticket_data)
        print(f'{name}抢票成功')
    else:
        print(f'{name}抢票失败')
        
# 模拟抢票
def grab_ticket(name):
    # 查看票数
    ticket_num = get_ticket()['number']
    print(f'{name}查看了票数:还剩{ticket_num}张')
    # 买票
    buy_ticket(name)
    
if __name__ == '__main__':
    # 初始化
    init_ticket()
    p_list = []
    # 模拟多人抢票
    for i in range(5):
        p = Process(target=grab_ticket, args=(f'ST_{i}',))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
# ST_0查看了票数:还剩2张
# ST_2查看了票数:还剩2张
# ST_1查看了票数:还剩2张
# ST_3查看了票数:还剩2张
# ST_4查看了票数:还剩2张
# ST_1抢票成功
# ST_3抢票成功
# ST_2抢票成功
# ST_4抢票成功
# ST_0抢票成功

2.加锁

虽然都是显示2张,但只有两人能抢到

  • 虽牺牲了运行效率,但保障了数据安全
from multiprocessing import Process, Lock
import time, random, json
ticket_data = {'number': 2}

# 初始化票数文件
def init_ticket():
    with open('ticket.json', 'w') as f:
        json.dump(ticket_data, f)

# 读取票数
def get_ticket():
    with open('ticket.json', 'r') as f:
        ticket_data = json.load(f)
    return ticket_data

# 数据更新
def save_ticket(new_data):
    with open('ticket.json', 'w') as f:
        json.dump(new_data, f)

# 买票
def buy_ticket(name, lock):
    # 在操作之前加锁
    lock.acquire()
    ticket_data = get_ticket()
    # 模拟不同用户抢票速度
    time.sleep(random.randint(1, 3))
    # 抢票
    if ticket_data['number'] > 0:
        ticket_data['number'] -= 1
        save_ticket(new_data=ticket_data)
        print(f'{name}抢票成功')
    else:
        print(f'{name}抢票失败')
    # 解除限制
    lock.release()

# 模拟抢票
def grab_ticket(name, lock):
    # 查看票数
    ticket_num = get_ticket()['number']
    print(f'{name}查看了票数:还剩{ticket_num}张')
    # 买票
    buy_ticket(name, lock)

if __name__ == '__main__':
    lock = Lock()
    # 初始化
    init_ticket()
    p_list = []
    # 模拟多人抢票
    for i in range(5):
        p = Process(target=grab_ticket, args=(f'ST_{i}', lock))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
# ST_0查看了票数:还剩2张
# ST_2查看了票数:还剩2张
# ST_3查看了票数:还剩2张
# ST_1查看了票数:还剩2张
# ST_4查看了票数:还剩2张
# ST_0抢票成功
# ST_2抢票成功
# ST_3抢票失败
# ST_1抢票失败
# ST_4抢票失败

4)优缺点

1.优点

  • 加锁可以保证多个进程修改同一块数据时
    • 同一时间只能有一个任务可以进行修改,即串行的修改
    • 没错,速度是慢了,但牺牲了速度却保证了数据安全。

2.缺点

  • 虽然可以用文件共享数据实现进程间通信,但问题是:
    • 1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    • 2.需要自己加锁处理

3.优化方案

  • 因此我们最好找寻一种解决方案能够兼顾:
    • 效率高(多个进程共享一块内存的数据)
    • 帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道

4.特别提醒

锁不要轻易使用,容易造成死锁现象
锁只在处理数据的部分加,用来保证数据的安全(只在争抢数据的环节加锁)

【二】进程间通信(IPC)

1)进程间通信概念

  • 进程间通信(Inter-Process Communication, IPC)
  • 指两个或多个进程之间进行信息交换的过程
  • 是一种计算机编程技术,用于在不同进程之间共享数据和资源

2)实现方法

  • 借助于消息队列,进程可以将消息放入队列中,再由另一个进程从队列中取出
  • 这种通信方法是非阻塞的,即发送进程不需要等待接受进程的响应即可执行
  • multiprocessing模块支持两种形式:
    • 队列,管道
    • 这两种方式都是使用消息传递的

3)管道概念

  • 一种半双工的通信机制,即只能在一个方向上进行数据传输
  • 子进程可以通过继承父进程的管道来实现通信
  • stdin(标准输入)、stdout(标准输出)和stderr(标准错误)是Python中的三个内置文件对象,这些对象也可以作为管道使用。
  • 当我们在一个进程中使用read方法读取管道内的消息后,其他进程将无法再获取该管道内的任何其他消息。
  • 因此,我们需要使用锁或其他同步机制来确保多个进程能够正确地访问和修改共享资源。

4)队列(管道+锁)概念

  • 队列是一种线程安全的数据结构,它支持在多线程环境中高效地实现生产者-消费者模型。
  • 队列的特性是先进先出(First-In-First-Out, FIFO),即先插入队列的数据将先被取出。
  • 堆栈是一种后进先出(Last-In-First-Out, LIFO)的数据结构,与队列相反,最后插入的数据将首先被取出。

5)进程间通信目的

  • 千方百计的存是为了更简单快捷的取

  • 为了实现进程通信

  • 某一个子进程和其他子进程之间需要传输数据

【三】队列模块介绍

1)语法

import queue

q = queue.Queue(maxsize)
# maxsize:队列中允许的最大项数,默认无限大
  • 创建共享的进程队列,实现多进程之间的数据传递

2)方法介绍

q.put()
# 放数据,向队列中插入数据
# put方法还有两个可选参数:blocked和timeout
# 如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常
# 如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
q.get()
# 取数据,从列表中获取数据
# get方法有两个可选参数:blocked和timeout
# 如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
# 如果blocked为False,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
q.get_nowait() 
# 同q.get(False)
q.put_nowait()
# 同q.put(False)
q.empty()
# 判断当前队列是否空了
q.full()
# 判断当前队列是否满了
q.qsize()
# 获取当前队列中存在的数据量

3)示例

from queue import Queue

# 创建队列对象
queue = Queue(maxsize=3)
# 放数据
queue.put(1)
queue.put(2)
queue.put(3)
queue.put(4)  # 超出容量就会阻塞,直至有一个数据被取出
queue.put_nowait(4)  # put 的时候如果超出队列最大容量就直接报错
# 判断当前队列是否填充满
print(queue.full())  # 满了:True
# 延迟时间,抛出异常
queue.put(4, timeout=1)  # queue.Full

# 取数据
print(queue.get())  # 1
print(queue.get())  # 2
print(queue.get())  # 3
print(queue.get())  # 没有数据会一直阻塞,直至出现新的数据
# 如果队列中没有数据的时候就报错 _queue.Empty
print(queue.get_nowait())
# 判断当前队列剩余数列数量
print(queue.qsize())
# 判断数量是否为空
print(queue.empty())  # 空:True
# 延迟时间,抛出异常()
print(queue.get(timeout=1))  # _queue.Empty

【四】队列实现进程间通信

1)子进程与主进程之间的通信

  • 主进程与子进程之间借助队列进行通信
import multiprocessing
from multiprocessing import Process, Queue

def process(name, queue):
    print(f'{name}---子进程开始')
    msg = queue.get()
    print(f'{name}接收到{msg}')
    print(f'{name}---子进程结束')

if __name__ == '__main__':
    # 创建一个队列
    queue = multiprocessing.Queue(3)
    msg = '*********'
    queue.put(msg)
    p = Process(target=process, args=('diva', queue))
    p.start()
    p.join()
# diva---子进程开始
# diva接收到*********
# diva---子进程结束

2)子进程与子进程之间的通信

  • 子进程与子进程之间借助队列进行通信
from multiprocessing import Process, Queue

def producer(name, queue):
    print(f'生产者{name}---子进程开始')
    msg = '*********'
    queue.put(msg)
    print(f'生产的消息:{msg}')
    print(f'生产者{name}---子进程结束')

def consumer(name, queue):
    print(f'消费者{name}---子进程开始')
    msg = queue.get()
    print(f'取到的消息:{msg}')
    print(f'消费者{name}---子进程结束')

if __name__ == '__main__':
    # 得到一个队列
    queue = Queue(5)
    # 生产者子进程
    p_producer = Process(target=producer, args=('PR', queue,))
    # 消费者子进程
    p_consumer = Process(target=consumer, args=('CO', queue,))
    # 启动
    p_producer.start()
    p_consumer.start()
    # 阻塞并行
    p_producer.join()
    p_consumer.join()
# 生产者PR---子进程开始
# 生产的消息:*********
# 生产者PR---子进程结束
# 消费者CO---子进程开始
# 取到的消息:*********
# 消费者CO---子进程结束

【五】生产者和消费者模型

1)概念

1.生产者模型

  • 生产者模型和消费者模型是指通过利用队列解耦生产者和消费者的一种并发编程模型。
  • 在生产者模型中,生产者负责将数据放入共享队列中,而消费者则从队列中取出数据进行处理。
  • 生产者和消费者之间通过共享这个队列来进行信息的交流。
  • 这种模型适用于生产者和消费者之间的处理速度不一致的情况,同时还能够保证数据传输的安全性和正确性。

2)消费者模型

  • 在消费者模型中,消费者负责向队列中插入任务,而由线程池中的工作线程进行任务的处理。
  • 消费者和工作线程之间通过共享线程池中的任务队列来完成任务分发和执行。
  • 这种模型适用于任务处理需要一定时间的情况,能够充分利用多线程的优势提高系统的并发性能,提高效率。

3)小结

  • 生产者:生产/制造东西
  • 消费者:消费/处理东西
  • 该模型还需要一个媒介

2)场景

  • 生产者 ---> 厨子 --> 做菜 ---> 做包子
  • 借助媒介 ---> 蒸笼 ---> 把生包子扔到蒸笼里面蒸熟,消费者再去蒸笼里面拿熟包子
  • 消费者 --> 拿包子 ---> 吃包子

3)示例

import time, random
from multiprocessing import Process, JoinableQueue


def producer(name, food, queue):
    for i in range(5):
        i += 1
        data = f'大厨{name}生产出了第{i}道{food}!'
        sleep_time = random.randint(1, 3)
        time.sleep(sleep_time)
        print(data)
        queue.put(data)
    queue.join()


def customer(name, queue):
    while True:
        food = queue.get()
        sleep_time = random.randint(1, 3)
        time.sleep(sleep_time)
        data = f'当前顾客{name}消费了>>>{food}!'
        print(data)
        # task_done 这个方法接收到 自己主动结束消费
        queue.task_done()  # 0


# 生产者和消费者的子进程都要启动
# 生产者会等待所有生产的子进程结束后再结束
# 消费者子进程一直在跑 不给消费者加join
# 给消费者子进程增加守护进程 --> 会随着主进程死掉二死掉
def main():
    # 【一】创建队列对象
    queue = JoinableQueue(5)
    # 【1】生产者
    p_producer_1 = Process(target=producer, args=('PR1', '鱼香肉丝', queue))
    p_producer_2 = Process(target=producer, args=('PR2', '宫保鸡丁', queue))
    # 【2】消费者
    p_customer_1 = Process(target=customer, args=('CU1', queue))
    p_customer_2 = Process(target=customer, args=('CU2', queue))
	# 守护进程
    p_customer_1.daemon = True
    p_customer_2.daemon = True

    # 只需要启动生产者生产数据
    # 生产者生产结束 加了一个标志 告诉消费者 生产结束
    # 消费者task_done标志知道生产者没有东西了,主动结束
    p_producer_1.start()
    p_producer_2.start()
    p_customer_1.start()
    p_customer_2.start()

    p_producer_1.join()
    p_producer_2.join()


if __name__ == '__main__':
    main()
# 大厨PR2生产出了第1道宫保鸡丁!
# 大厨PR1生产出了第1道鱼香肉丝!
# 大厨PR2生产出了第2道宫保鸡丁!
# 大厨PR1生产出了第2道鱼香肉丝!
# 当前顾客CU2消费了>>>大厨PR1生产出了第1道鱼香肉丝!!
# 大厨PR2生产出了第3道宫保鸡丁!
# 当前顾客CU1消费了>>>大厨PR2生产出了第1道宫保鸡丁!!
# 当前顾客CU2消费了>>>大厨PR2生产出了第2道宫保鸡丁!!
# 大厨PR1生产出了第3道鱼香肉丝!
# 大厨PR2生产出了第4道宫保鸡丁!
# 大厨PR1生产出了第4道鱼香肉丝!
# 当前顾客CU2消费了>>>大厨PR2生产出了第3道宫保鸡丁!!
# 当前顾客CU1消费了>>>大厨PR1生产出了第2道鱼香肉丝!!
# 大厨PR1生产出了第5道鱼香肉丝!
# 当前顾客CU2消费了>>>大厨PR1生产出了第3道鱼香肉丝!!
# 大厨PR2生产出了第5道宫保鸡丁!
# 当前顾客CU1消费了>>>大厨PR2生产出了第4道宫保鸡丁!!
# 当前顾客CU2消费了>>>大厨PR1生产出了第4道鱼香肉丝!!
# 当前顾客CU1消费了>>>大厨PR1生产出了第5道鱼香肉丝!!
# 当前顾客CU2消费了>>>大厨PR2生产出了第5道宫保鸡丁!!

标签:---,27,name,队列,ST,queue,互斥,进程,ticket
From: https://www.cnblogs.com/Mist-/p/18206625

相关文章

  • Java RMI遇到的Connection refused to Host: 127.x.x.x/192.x.x.x/10.x.x.x问题解决方
    问题故障解决记录--JavaRMIConnectionrefusedtohost:x.x.x.x....在学习JavaRMI时,我遇到了以下情况问题原因:可能大家的host是10或者192的私有地址,我估计都是和我一样的一个原因:/etc/hosts文件的配置问题(我是ubuntu系统下的实验环境),也就是主机名称和IP地址的映射关系......
  • P3270 [JLOI2016] 成绩比较
    记\(a_i=N-R_i,n=N-1\)。先不考虑有多少人被碾压,计算出符合排名限制的方案数。枚举每门课和B神在每门课中的得分,选出\(a_i\)个人得分小于等于他,即:\[\prod\limits_{i=1}^m\dbinom{n}{a_i}\sum\limits_{j=1}^{U_i}j^{a_i}(U_i-j)^{n-a_i}\]设\(s(x)=\sum\limits_{j=1}^{......
  • 代码随想录算法训练营第一天|704,34,35(二分查找),27(双指针)
    二分查找1.使用条件:数组,升序,值不唯一。2.时间复杂度O(logn)可分为左闭右闭,左闭右开两种区间类型来求解。左闭右闭:left=0,right=nums.Length-1,while(left<=right),right=middle-1.左闭右开:left=0,right=nums.Length,while(left<right),right=middle.......
  • 算法随想录打卡第一天|704. 二分查找、27. 移除元素
    704.二分查找-力扣(LeetCode)自己的解法是这样的,超出了时间限制,现在觉得应该是在mid的计算中出了问题。然后在mid的转换中没有right减去1或者left加上1。这两点的问题。自己很习惯的方式是左闭合加上右闭合。可以省去很多对于临界值忘记考虑的麻烦。超时代码贴出:publicin......
  • CF1278F Cards(斯特林数+二项式定理)
    题意简述有\(m\)张牌,其中有一张是王牌。你现在可以洗\(n\)次牌(每种牌堆序列等概率出现),然后查看牌堆顶的第一张牌。设\(x\)为\(n\)次洗牌中第一张牌是王牌的次数,则得分为\(x^k\)。求得分的期望值对\(998244353\)取模的值。\(1\len,m<998244353,k\le5000\)分析将......
  • 互斥锁,IPC机制,队列,生产者消费者模型
    Ⅰ互斥锁【一】什么是互斥锁互斥锁其实就是一种锁。为当前进程或线程添加额外的限制限制当前时间段只能由当前进程使用,当前进程使用完成后才能其他进程继续使用其作用是保证在同一时刻只有一个线程在访问共享资源,从而避免多个线程同时读写数据造成的问题。互斥锁的基本原......
  • 【UR #27】红场阅兵
    题意给定一个积性函数\(S\),保证\(S(p^k)\)是关于\(p^k\)的二次函数,求\(\sum_{i=1}^n\sum_{j=1}^nS(ij)\)。数据范围:\(n\le10^9\)。算法一为了解决上面的问题,我们需要把积性函数推广到二维。首先二维数论函数\(g(x,y)\)还是没啥性质的普通函数。我们称二维数......
  • 风险控制1、如果你的项目发布后失败,主要的原因会是什么?2、每个团队列出自己项目中目前
    项目发布失败的主要原因项目发布后失败可能由多种原因导致,这里列出几个主要的:需求不符合市场实际:产品没有满足目标市场的真实需求或者未能准确捕捉到用户的痛点。用户体验不佳:产品界面复杂难用,用户操作困难,导致用户流失。技术问题:产品存在缺陷或技术故障,影响功能实现或性能稳......
  • hdu4027(线段树区间操作)
    Problem-4027(hdu.edu.cn)许多邪恶的战舰在战斗前排成一排。我们的指挥官决定使用我们的秘密武器来消灭战列舰。每艘战列舰都可以标记为耐力值。对于我们秘密武器的每一次攻击,它都可能降低连续部分战列舰的续航能力,使它们的续航力达到其原始续航力值的平方根。在我们的秘密武......
  • 【HZERO】事件以及消息队列
    事件以及消息队列服务事件:https://open.hand-china.com/document-center/doc/component/157/18147?doc_id=408820&_back=%2Fdocument-center%2Fsearch%3Fs%3D%25E4%25BA%258B%25E4%25BB%25B6%25E6%259C%258D%25E5%258A%25A1&doc_code=197230事件服务:https://open.hand-china.c......