【一】什么是互斥锁
-
互斥锁是一种用于多线程编程中控制对共享资源访问的机制
-
限制当前时间段只能由当前进程使用,当前进程使用完成后才能其他进程继续使用
-
基本原理是在对共享资源进行访问前加锁,使得其他线程无法访问该资源,当访问完成后再解锁,使得其他线程可以进行访问
【二】多个进程共享同一打印终端
- 进程间的数据不共享,要想共享一套文件系统,就会竞争,从而带来错乱,通过加锁来处理
- 未加锁
import multiprocessing
import os
import time
def work():
print(f"{os.getpid()} is starting")
time.sleep(3)
print(f"{os.getpid()} is ending")
def main():
for i in range(5):
p = multiprocessing.Process(target=work)
p.start()
if __name__ == '__main__':
main()
# 打印结果出现混乱
'''
74832 is starting
73188 is starting
67936 is starting
75732 is starting
68348 is starting
74832 is ending
73188 is ending
75732 is ending67936 is ending
68348 is ending
'''
- 加锁
import multiprocessing
from multiprocessing import Lock
import os
import time
# 为了解决多个子进程共享终端打印导致粘连,于是加锁,为每一个子进程加锁
def work(lock):
# 加锁,限制只能一个进程使用
lock.acquire()
print(f"{os.getpid()} is starting")
time.sleep(3)
print(f"{os.getpid()} is ending")
# 释放锁,将唯一限制解除,其他进程可进行操作
lock.release()
# 只有当前进程释放锁才能被下一个进程使用
# 加上互斥锁的效果就变成了 ---> 并行 ---> 串行
# 好处就是每一个进程之间的数据是唯一的
def main():
# 创建锁对象
lock = Lock()
# 子进程的列表
process_list = []
for i in range(5):
p = multiprocessing.Process(target=work, args=(lock,))
process_list.append(p)
p.start()
# 最后,我们使用另一个循环来等待所有的进程完成。
# 在每次迭代中,我们都检查是否有任何尚未完成的进程。
# 如果有,我们就调用其 join() 方法,等待它完成。
# 只有当所有的进程都已经完成之后,程序才会退出
for process in process_list:
process.join()
if __name__ == '__main__':
main()
'''
19464 is starting
19464 is ending
75068 is starting
75068 is ending
75108 is starting
75108 is ending
74596 is starting
74596 is ending
75204 is starting
75204 is ending
'''
【三】多个进程共享同一文件
-
文件当数据库,模拟抢票
-
未加锁
import json
import multiprocessing
import random
import time
# 初始化数据
def init_ticket():
with open('ticket.json', 'w', encoding="utf8") as fp:
json.dump(fp=fp, obj={'ticket_number': 2})
# 读取票的数据
def read_ticket():
with open('ticket.json', 'r', encoding="utf8") as fp:
data = json.load(fp=fp)
return data
# 保存票的数据
def save_ticket(data):
with open('ticket.json', 'w', encoding="utf8") as fp:
json.dump(fp=fp, obj=data)
# 查看当前票的数量
def search_ticket(name):
ticket_number = read_ticket().get('ticket_number')
print(f"用户:{name}正在查票,当前余票:{ticket_number}")
# 买票
def buy_ticket(name):
ticket_data = read_ticket()
time.sleep(random.randint(1, 5))
# 判断当前是否有票
if ticket_data.get('ticket_number') > 0:
# 有票再买票,在原来的数据上 -1
ticket_data['ticket_number'] -= 1
# 将买完票的数据更新进原本的数据
save_ticket(ticket_data)
print(f"用户:{name} 买票成功!")
else:
print(f"用户:{name} 买票失败!票已经售罄!")
# 整合成一个人的功能
def main(name):
# 所有人都去查票
search_ticket(name)
# 所有人都能随便买票
buy_ticket(name)
if __name__ == '__main__':
# 初始化票数
init_ticket()
process_list = []
# 多个人买票
for i in range(1, 5):
p = multiprocessing.Process(target=main, args=(i,))
process_list.append(p)
p.start()
for process in process_list:
process.join()
# 可以看到余票数量并未减少,所有人都能查票成功,所有人都能购票成功
'''
用户:1正在查票,当前余票:2
用户:2正在查票,当前余票:2
用户:3正在查票,当前余票:2
用户:4正在查票,当前余票:2
用户:4 买票成功!
用户:2 买票成功!
用户:1 买票成功!
用户:3 买票成功!
'''
- 加锁
- 将并发变成串行牺牲效率,但是保证了数据的安全
import json
import multiprocessing
import random
from multiprocessing import Lock
import time
# 初始化数据
def init_ticket():
with open('ticket.json', 'w', encoding="utf8") as fp:
json.dump(fp=fp, obj={'ticket_number': 2})
# 读取票的数据
def read_ticket():
with open('ticket.json', 'r', encoding="utf8") as fp:
data = json.load(fp=fp)
return data
# 保存票的数据
def save_ticket(data):
with open('ticket.json', 'w', encoding="utf8") as fp:
json.dump(fp=fp, obj=data)
# 查看当前票的数量
def search_ticket(name):
ticket_number = read_ticket().get('ticket_number')
print(f"用户:{name}正在查票,当前余票:{ticket_number}")
# 买票
def buy_ticket(name):
ticket_data = read_ticket()
time.sleep(random.randint(1, 5))
# 判断当前是否有票
if ticket_data.get('ticket_number') > 0:
# 有票再买票,在原来的数据上 -1
ticket_data['ticket_number'] -= 1
# 将买完票的数据更新进原本的数据
save_ticket(ticket_data)
print(f"用户:{name} 买票成功!")
else:
print(f"用户:{name} 买票失败!票已经售罄!")
# 整合成一个人的功能
def main(name,lock):
# 所有人都去查票
search_ticket(name)
# 所有人都能随便买票
# 再买票环节加锁
lock.acquire()
buy_ticket(name)
# 释放锁
lock.release()
if __name__ == '__main__':
# 在主进程中生成一把锁,让所有的子进程去抢,谁抢到谁就先买票
lock = Lock()
# 初始化票数
init_ticket()
process_list = []
# 多个人买票
for i in range(1, 5):
p = multiprocessing.Process(target=main, args=(i,lock))
process_list.append(p)
p.start()
for process in process_list:
process.join()
# 所有人都能查票成功,但是只有前两名购票成功
'''
用户:1正在查票,当前余票:2
用户:2正在查票,当前余票:2
用户:3正在查票,当前余票:2
用户:4正在查票,当前余票:2
用户:1 买票成功!
用户:2 买票成功!
用户:3 买票失败!票已经售罄!
用户:4 买票失败!票已经售罄!
'''
【四】互斥锁的优缺点
【1】加锁的优点
-
加锁可以保证多个进程修改同一块数据时,数据不错乱
-
- 同一时间只能有一个任务可以进行修改,即串行的修改
- 牺牲了速度却保证了数据安全。
【2】加锁的缺点
-
虽然可以用文件共享数据实现进程间通信,但问题是:
-
- 效率低(共享数据基于文件,而文件是硬盘上的数据),需要自己加锁处理,容易发生死锁现象
【3】优化方案
-
使用基于消息的IPC通信机制:队列和管道
-
- 效率高(多个进程共享一块内存的数据)
-
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
【五】特别提醒
- 锁不要轻易使用,容易造成死锁现象
- 锁只在处理数据的部分加,用来保证数据的安全(只在争抢数据的环节加锁)