【一】互斥锁
【1】什么是进程同步(互斥锁)
-
互斥锁(Mutex)是一种用于多线程编程中控制对共享资源访问的机制。
-
- 其作用是保证在同一时刻只有一个线程在访问共享资源,从而避免多个线程同时读写数据造成的问题。
- 互斥锁的基本原理是在对共享资源进行访问前加锁,使得其他线程无法访问该资源,当访问完成后再解锁,使得其他线程可以进行访问。
- 通过这种方式,可以保证同一时间只有一个线程在执行关键代码段,从而保证了数据的安全性。
- 需要注意的是,互斥锁会带来一些额外的开销
【2】多个进程共享同一打印终端
-
互斥锁应该用在多线程中
-
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
-
而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理
[1] 未加锁
# 并发运行,效率高,但竞争同一打印终端,带来了打印错乱
import multiprocessing
import random
import time
def work(name):
print(f'{name} is starting ...')
time.sleep(random.randint(1, 4))
print(f'{name} is finished ...')
def main():
task_list = []
for i in range(3):
task = multiprocessing.Process(
target=work,
args=(i,)
)
task.start()
task_list.append(task)
for task in task_list:
task.join()
if __name__ == '__main__':
main()
# 0 is starting ...
# 1 is starting ...
# 2 is starting ...
# 2 is finished ...
# 1 is finished ...
# 0 is finished ...
[2] 加锁
-
由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Lock
import multiprocessing
import random
import time
def work(name, lock):
# 当前子进程启动之前 加锁
# 例如 出门的时候,要给自己家的门上个锁
lock.acquire()
print(f'{name} is starting ...')
time.sleep(random.randint(1, 4))
print(f'{name} is finished ...')
# 回家了,就解锁进门
# 子进程结束释放锁
lock.release()
# 为了解决多个子进程共享终端打印导致粘连,于是加锁,为每一个子进程加锁
# 只有当前进程释放锁才能被下一个进程使用
# 加上互斥锁的效果就变成了 ---> 并行 ---> 串行
# 好处: 每一个进程之间的数据是唯一的
# 互斥锁的最终目的:牺牲效率保证数据安全
def main():
# 实例化得到一个锁的对象
lock = Lock()
task_list = []
for i in range(3):
task = multiprocessing.Process(
target=work,
args=(i, lock)
)
task.start()
task_list.append(task)
for task in task_list:
task.join()
if __name__ == '__main__':
main()
# 1 is starting ...
# 1 is finished ...
# 0 is starting ...
# 0 is finished ...
# 2 is starting ...
# 2 is finished ...
[3] 系统示例(火车票购票平台)
- 有一个数据(文件)里面存了几张火车票
- 用户可以每一个人都能看到有多少票
- 每一个人也能抢票
import time
import random
ticket_data = {'number': 3}
# 【一】 初始化票的张数
def init_ticket():
with open('ticket_data.json', 'w') as fp:
json.dump(fp=fp, obj=ticket_data)
# 【二】 读取当前票的张票
def read_ticket():
with open('ticket_data.json', 'r') as fp:
data = json.load(fp=fp)
return data
# 【三】 买完票更新票的数据
def save_data(data):
with open('ticket_data.json', 'w') as fp:
json.dump(obj=data, fp=fp)
# 【四】查看当前票的张数
def search_ticket(name):
ticket_number = read_ticket().get('number')
print(f'now name is {name} , now ticket number is {ticket_number}!')
# 【五】买票
def buy_ticket(name, lock):
# 在操作数据之前加锁
lock.acquire()
ticket_data = read_ticket()
sleep_time = random.randint(1, 4)
time.sleep(sleep_time)
# 【1】判断当前是否有票
if ticket_data.get('number') > 0:
# 【2】买票 在原来的数据上 -1
ticket_data['number'] -= 1
# 【3】将买完票的数据更新进原本的数据
save_data(ticket_data)
print(f'now name is {name} , success buy a ticket')
else:
print(f'now name is {name} , failed buy a ticket')
lock.release()
# 不模拟不买票这种情况
def main(name, lock):
# 让这个人查票
search_ticket(name)
# 让这个人买票
buy_ticket(name, lock)
# 原本是只有两张票
# 结果发现大家都抢到了票
# 并且每一个人看到的都是两张票
def main_process():
# 互斥锁# 大家看票的时候都可以看到总票数
# 买票的时候买不到了
# 锁应该加载买票的时候
# 买票之前加锁
# 付款之后解锁
# 【1】实例化得到锁对象
lock = multiprocessing.Lock()
task_list = []
for i in range(5):
task = multiprocessing.Process(
target=main,
args=(f'name_{i}', lock)
)
task.start()
task_list.append(task)
for task in task_list:
task.join()
if __name__ == '__main__':
print(f'Ticket rush starting .... ')
# 初始化票的数据
# init_ticket()
# 有了票才能抢票
main_process()
print(f'ending .... ')
# Ticket rush starting ....
# now name is name_0 , now ticket number is 3!
# now name is name_2 , now ticket number is 3!
# now name is name_1 , now ticket number is 3!
# now name is name_4 , now ticket number is 3!
# now name is name_3 , now ticket number is 3!
# now name is name_0 , success buy a ticket
# now name is name_2 , success buy a ticket
# now name is name_1 , success buy a ticket
# now name is name_4 , failed buy a ticket
# now name is name_3 , failed buy a ticket
# ending ....
[4] 互斥锁的优缺点
(1)加锁的优点
- 加锁可以保证多个进程修改同一块数据时
- 同一时间只能有一个任务可以进行修改,即串行的修改
- 虽然速度慢,但牺牲了速度却保证了数据安全。
(2)加锁的缺点
- 虽然可以用文件共享数据实现进程间通行,但问题是:
- 效率低(共享了数据基于文件,而文件是硬盘上的数据)
- 需要自己加锁处理,可能会造成死锁状态。
(3)优化方案
-
因此我们最好找一种解决方案能够兼顾:
- 效率高(多个进程共享一块内存的数据)
- 帮我们处理好锁问题。这就是
multiprocessing
模块为我们提供的基于消息的 IPC 通信机制:队列和管道。
-
队列和管道都是将数据存放于内存中
-
队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来。
-
应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且再进程数目增多时,往往可以获得更好的可获展性。
[5] 特别提醒
- 锁不要轻易使用,容易造成死锁现象
- 锁只在处理数据的部分加,用来保证数据的安全(只在争数据的环节加锁)
【二】进程间通信(IPC)
【1】什么是进程间通信(Inter-Process Communication, IPC)
- 进程间通信(Inter-Process Communication, IPC)是指两个或多个进程之间进行信息交换的过程。
- 它是一种计算机编程技术,用于在不同进程之间共享数据和资源。
【2】如何实现进程间通信
- 借助于消息队列,进程可以将消息放入队列中,然后由另一个进程从队列中取出。
- 这种通信方式是非阻塞的,即发送进程不需要等待接收进程的响应即可继续执行。
- multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
【3】什么是管道
- 管道是一种半双工的通信机制,即只能在一个方向上进行数据传输。
- 子进程可以通过继承父进程的管道来实现通信。
- stdin、stdout和stderr是Python中的三个内置文件对象,它们分别代表标准输入、标准输出和标准错误。
- 这些对象也可以作为管道使用。
- 当我们在一个进程中使用read方法读取管道内的消息后,其他进程将无法再获取该管道内的任何其他消息。
- 因此,我们需要使用锁或其他同步机制来确保多个进程能够正确地访问和修改共享资源。
【4】什么是队列(管道 + 锁)
- 队列是一种线程安全的数据结构,它支持在多线程环境中高效地实现生产者-消费者模型。
- 队列的特性是先进先出(First-In-First-Out, FIFO),即先插入队列的数据将先被取出。
- 堆栈是一种后进先出(Last-In-First-Out, LIFO)的数据结构,与队列相反,最后插入的数据将首先被取出。
【5】进程间通信的目的
- 存是为了更好的取
- 千方百计的存
- 简单快捷的取
【三】队列模块介绍
【1】语法
import queue
q = queue.Queue(maxsize)
-
Queue([maxsize]):
-
- 创建共享的进程队列
- Queue是多进程安全的队列
- 可以使用Queue实现多进程之间的数据传递。
-
maxsize是队列中允许最大项数,省略则无大小限制
【2】方法介绍
q.put()
- 放数据,向队列中插入数据
q.get()
- 取数据,从队列中获取数据
q.get_nowait()
- 和
q.get()
取数据
- 和
q.put_nowait()
- 和
q.put()
放数据
- 和
q.empty()
- 判断当前队列是否空了
q.full()
- 判断当前队列是否满了
q.qsize()
- 获取当前队列中存在的数据量
【3】使用
# 【1】模块介绍 queue
# 创建一个共享的队列,大家都通过一个队列操作数据
# 【2】导入模块
from queue import Queue
# 1.创建队列对象
queue = Queue(maxsize=3)
# 2.放数据
queue.put(1)
queue.put(2)
queue.put(3)
# print(queue.full()) # True 返回当前队列是否填充满,如果满了就会返回True
# queue.put(4, timeout=1) # timeout : 延迟时间,延迟时间抛出异常 queue.Full
# queue.put(4) # put 的时候如果超出队列最大容量就会阻塞,直至有一个人取出队列中的一个数据
# queue.put_nowait(4) # put 的时候如果超出队列最大容量就直接报错 queue.Full
# (3)取数据
print(queue.get()) # 1
print(queue.get()) # 2
print(queue.get()) # 3
print(queue.qsize()) # 0 # 判断当前队列中还存在多少数据
# print(queue.empty()) # 判断当前队列是够全部为空
# print(queue.get(timeout=1)) # timeout : 延迟时间,延迟时间抛出异常 _queue.Empty
# print(queue.get()) # 如果队列中没有数据的时候就会一直阻塞,直至队列中增加新的数据
# print(queue.get_nowait()) # 如果队列中没有数据的时候就报错 _queue.Empty
【四】队列实现进程间通信
【1】主子通信
import multiprocessing
import queue
from queue import Queue
def son_process(name,queue):
print(f'son_process {name} is starting ...')
msg = queue.get()
print(f'son_process {name} recv msg is {msg}')
print(f'son_process {name} is ending ...')
def main_process():
# 【一】创建一个队列对象 ---> 用多进程里面的队列模块
queue = multiprocessing.Queue(5)
msg = f'from son_process the message is you are handsome!!'
queue.put(msg)
task = multiprocessing.Process(
target=son_process,
kwargs={'name':"son",'queue':queue}
)
task.start()
task.join()
if __name__ == '__main__':
main_process()
# son_process son is starting ...
# son_process son recv msg is from son_process the message is you are handsome!!
# son_process son is ending ...
【2】子子通信
from multiprocessing import Process,Queue
import os
# 子进程和子进程之间进行通信:
def producer(queue):
# 生产者
# 向队列中添加数据
print(f'this is producer {os.getpid()} starting ...')
# 向队列中添加数据
msg = 'this is a message from producer'
queue.put(msg)
print(f'this is producer {os.getpid()} ending ...')
def customer(queue):
# 消费者
print(f'this is customer {os.getpid()} starting ...')
msg = queue.get()
print(f'the message is {msg}')
print(f'this is customer {os.getpid()} ending ...')
def main():
# 【一】得到一个队列对象
queue = Queue(5)
# 【二】创建两个多进程对象
# 【1】生产者子进程
producer_process = Process(
target=producer,
args=(queue,)
)
# 【2】消费者子进程
customer_process = Process(
target=customer,
args=(queue,)
)
# 【3】启动子进程
producer_process.start()
customer_process.start()
# 【4】阻塞并行
producer_process.join()
customer_process.join()
if __name__ == '__main__':
main()
# this is producer 20724 starting ...
# this is customer 21192 starting ...
# this is producer 20724 ending ...
# the message is this is a message from producer
# this is customer 21192 ending ...
【五】生产者和消费者模型
【1】生产者模型
- 生产者模型和消费者模型是指通过利用队列解耦生产者和消费者的一种并发编程模型。
- 在生产者模型中,生产者负责将数据放入共享队列中,而消费者则从队列中取出数据进行处理。
- 生产者和消费者之间通过共享这个队列来进行信息的交流。
- 这种模型适用于生产者和消费者之间的处理速度不一致的情况,同时还能够保证数据传输的安全性和正确性。
【2】消费者模型
- 在消费者模型中,消费者负责向队列中插入任务,而由线程池中的工作线程进行任务的处理。
- 消费者和工作线程之间通过共享线程池中的任务队列来完成任务分发和执行。
- 这种模型适用于任务处理需要一定时间的情况,能够充分利用多线程的优势提高系统的并发性能,提高效率。
【3】总结
- 生产者:生产/制造东西
- 消费者:消费/处理东西
- 该模型还需要一个媒介
# 如果是用户下单
# 同一时刻是不是有100个用户下单
# 100个订单其实就是生产者生产出来的数据
# 10个人去处理 10个人一起处理 -->
# 负责处理的10个人就是消费者
# 在多进程中实现生产者和消费者模型
# 【1】场景
# 生产者 ---> 厨子 --> 做菜 ---> 做包子
# 借助媒介 ---> 蒸笼 ---> 把生包子扔到蒸笼里面蒸熟,消费者再去蒸笼里面拿熟包子
# 消费者 --> 拿包子 ---> 吃包子
'''
import time
import random
from multiprocessing import Process, Queue
# 【2】生产者模型
def producer(name, food, queue):
# 【一】生产者的标志 : 名字
# 【二】生产出食物
# 【三】借助队列将食物扔到队列中
for i in range(5):
data = f'当前大厨{name} :>>> 生产出了 第{i}道{food}!'
sleep_time = random.randint(1, 3)
time.sleep(sleep_time)
queue.put(data)
def customer(name, queque):
while True:
food = queque.get()
sleep_time = random.randint(1, 3)
time.sleep(sleep_time)
data = f'当前顾客{name} :>>> 消费了 {food}!'
print(data)
# 产生问题:
# 生产者生产完所有数据
# 消费者消费完所有数据,但是消费者还在等待生产者生产数据
# 解决思路:
# 在生产者生产完最后一道菜的时候添加标志
# 消费者消费完最后一道菜的时候应该主动结束消费
def main():
# 【一】创建生产者对象
# 【二】创建队列对象
queue = Queue(5)
# 【1】生产者 dream
p_producer_dream = Process(
target=producer,
args=('dream', '鱼香肉丝', queue)
)
# 【2】生产者 hope
p_producer_hope = Process(
target=producer,
args=('hope', '宫保鸡丁', queue)
)
# 【3】消费者 opp
p_producer_opp = Process(
target=customer,
args=('opp', queue)
)
# 【4】消费者 ooo
p_producer_ooo = Process(
target=customer,
args=('ooo', queue)
)
p_producer_dream.start()
p_producer_hope.start()
p_producer_opp.start()
p_producer_ooo.start()
p_producer_dream.join()
p_producer_hope.join()
p_producer_opp.join()
p_producer_ooo.join()
'''
# 【2】给生产者和消费者添加标志
import time
import random
from multiprocessing import Process, Queue
# 【2】生产者模型
'''
def producer(name, food, queue):
# 【一】生产者的标志 : 名字
# 【二】生产出食物
# 【三】借助队列将食物扔到队列中
for i in range(5):
data = f'当前大厨{name} :>>> 生产出了 第{i}道{food}!'
sleep_time = random.randint(1, 3)
time.sleep(sleep_time)
queue.put(data)
queue.put('q')
def customer(name, queque):
while True:
food = queque.get()
if food == 'q':
data = f'当前顾客{name} :>>>店铺已打样!'
print(data)
break
sleep_time = random.randint(1, 3)
time.sleep(sleep_time)
data = f'当前顾客{name} :>>> 消费了 {food}!'
print(data)
# 产生问题:
# 生产者生产完所有数据
# 消费者消费完所有数据,但是消费者还在等待生产者生产数据
# 解决思路:
# 在生产者生产完最后一道菜的时候添加标志
# 消费者消费完最后一道菜的时候应该主动结束消费
def main():
# 【一】创建生产者对象
# 【二】创建队列对象
queue = Queue(5)
# 【1】生产者 dream
p_producer_dream = Process(
target=producer,
args=('dream', '鱼香肉丝', queue)
)
# 【2】生产者 hope
p_producer_hope = Process(
target=producer,
args=('hope', '宫保鸡丁', queue)
)
# 【3】消费者 opp
p_producer_opp = Process(
target=customer,
args=('opp', queue)
)
# 【4】消费者 ooo
p_producer_ooo = Process(
target=customer,
args=('ooo', queue)
)
p_producer_dream.start()
p_producer_hope.start()
p_producer_opp.start()
p_producer_ooo.start()
p_producer_dream.join()
p_producer_hope.join()
p_producer_opp.join()
p_producer_ooo.join()
'''
from multiprocessing import JoinableQueue, Process
import random
# 借助别人写好的模块完成标志的增加
def producer(name, food, queue):
# 【一】生产者的标志 : 名字
# 【二】生产出食物
# 【三】借助队列将食物扔到队列中
for i in range(1, 5):
data = f'当前大厨{name} :>>> 生产出了 第{i} 道 {food}!'
sleep_time = random.randint(1, 3)
time.sleep(sleep_time)
print(data)
queue.put(data)
# 结束标志自己主动加 q
# queue.put('q')
# 只需要借助模块的方法
queue.join() # 使用堵塞方法时程序没有出结果就挂起,不继续,直到结果出来
def customer(name, queue):
while True:
food = queue.get()
sleep_time = random.randint(1, 3)
time.sleep(sleep_time)
data = f'当前顾客{name} :>>> 消费了 {food}!'
print(data)
# 当生产者生产完所有数据后会增加 join 标志 ---> 阻塞
# task_done 这个方法接收到 自己主动结束消费
queue.task_done() # 0
# 生产者和消费者的子进程都要启动
# 生产者会等待所有生产的子进程结束后再结束
# 消费者子进程一直在跑 不给消费者加join
# 给消费者子进程增加守护进程 --> 会随着主进程死掉二死掉
def main():
# 创建生产者对象
# 创建队列对象
queue = JoinableQueue(3)
# 生产者 max
p_producer_max = Process(
target=producer,
args=('max', '鱼香肉丝', queue)
)
# 生产者 tom
p_producer_tom = Process(
target=producer,
args=('tom', '宫保鸡丁', queue)
)
# 消费者 opp
p_customer_opp = Process(
target=customer,
args=('opp', queue)
)
# 消费者 chosen
p_customer_chosen = Process(
target=customer,
args=('chosen', queue)
)
p_customer_opp.daemon = True
p_customer_chosen.daemon = True
# 只需要启动生产者生产数据
# 生产者生产结束 加了一个标志 告诉消费者 生产结束
# 消费者task_done标志知道生产者没有东西了,主动结束
p_producer_max.start()
p_producer_tom.start()
p_customer_opp.start()
p_customer_chosen.start()
p_producer_max.join()
p_producer_tom.join()
if __name__ == '__main__':
main()
输出结果:
# 当前大厨max :>>> 生产出了 第1 道 鱼香肉丝!
# 当前大厨tom :>>> 生产出了 第1 道 宫保鸡丁!
# 当前大厨tom :>>> 生产出了 第2 道 宫保鸡丁!
# 当前大厨max :>>> 生产出了 第2 道 鱼香肉丝!
# 当前顾客opp :>>> 消费了 当前大厨max :>>> 生产出了 第1 道 鱼香肉丝!!
# 当前大厨tom :>>> 生产出了 第3 道 宫保鸡丁!
# 当前顾客chosen :>>> 消费了 当前大厨tom :>>> 生产出了 第1 道 宫保鸡丁!!
# 当前顾客opp :>>> 消费了 当前大厨tom :>>> 生产出了 第2 道 宫保鸡丁!!
# 当前顾客chosen :>>> 消费了 当前大厨max :>>> 生产出了 第2 道 鱼香肉丝!!
# 当前大厨tom :>>> 生产出了 第4 道 宫保鸡丁!
# 当前大厨max :>>> 生产出了 第3 道 鱼香肉丝!
# 当前顾客opp :>>> 消费了 当前大厨tom :>>> 生产出了 第3 道 宫保鸡丁!!
# 当前大厨max :>>> 生产出了 第4 道 鱼香肉丝!
# 当前顾客chosen :>>> 消费了 当前大厨tom :>>> 生产出了 第4 道 宫保鸡丁!!
# 当前顾客opp :>>> 消费了 当前大厨max :>>> 生产出了 第3 道 鱼香肉丝!!
# 当前顾客chosen :>>> 消费了 当前大厨max :>>> 生产出了 第4 道 鱼香肉丝!!
标签:queue,name,producer,队列,间通信,print,进程,ticket
From: https://www.cnblogs.com/chosen-yn/p/18216199