锁
在multiprocessing
模块中,可以使用Lock
(锁)来实现进程间的同步。Lock
提供了一种机制,确保在任意时刻只有一个进程能够访问共享资源。
Lock
的工作原理
Lock
是一种互斥锁,用于保护共享资源的访问。当一个进程获得了锁之后,其他进程将被阻塞,直到锁被释放。只有释放锁的进程才能继续访问共享资源。
Lock
的最佳实践
-
创建
Lock
对象:在创建Lock
对象时,无需指定参数,使用默认值即可。 -
获取和释放锁:进程在需要访问受
Lock
保护的共享资源之前,必须先通过调用acquire()
方法获取锁。访问完成后,必须通过调用release()
方法释放锁。 -
处理异常情况:在使用
Lock
时,可能会出现超时或中断等异常情况。为了保证程序的稳定性,应该在适当的地方捕获并处理这些异常。 -
确保锁的释放:在编写代码时,需要确保每次获取到的锁都能被正确地释放。否则,可能会导致其他进程一直处于阻塞状态。
-
避免死锁:使用多个锁时,需要避免死锁的风险。死锁通常发生在多个进程相互等待对方释放资源的情况下。为了避免死锁,需要仔细设计和管理资源的获取和释放顺序,以及合理地使用锁。
Lock
的坑
-
死锁的风险:使用多个锁时,需要避免死锁的风险。死锁通常发生在多个进程相互等待对方释放资源的情况下。为了避免死锁,需要仔细设计和管理资源的获取和释放顺序,以及合理地使用锁。
-
锁的性能开销:获取和释放锁会带来一定的性能开销。在使用锁时,需要权衡并考虑性能问题,避免过度使用锁导致性能下降。
示例
''' multiprocessing 提供了锁(Lock)、信号量(Semaphore)等同步原语,用于实现进程间的互斥和同步。 可以使用这些同步原语来保护共享资源,防止多个进程同时访问造成数据竞争 ''' from multiprocessing import Process, Lock def worker(lock): with lock: # 上锁,with会自动解锁 print("This is a critical section") if __name__ == '__main__': lock = Lock() # 创建1个锁对象 processes = [] for _ in range(4): p = Process(target=worker, args=(lock,)) # 把锁传递给子进程们,让其上锁、解锁 p.start() processes.append(p) # 主进程等待子进程结束 for p in processes: p.join() ''' 输出: This is a critical section This is a critical section This is a critical section This is a critical section '''
1 from multiprocessing import Process, Lock 2 3 4 def worker(lock, id): 5 # 获取锁 6 lock.acquire() 7 try: 8 print(f"Worker {id} is working") 9 except: 10 print("捕获所有异常,避免上传传递") 11 finally: 12 # 释放锁 13 lock.release() 14 print(f"Worker {id} finished") 15 16 17 if __name__ == '__main__': 18 # 创建一个Lock对象 19 lock = Lock() 20 21 processes = [] 22 for i in range(5): 23 p = Process(target=worker, args=(lock, i)) # 创建进程 24 processes.append(p) 25 p.start() 26 # 主进程,等待子进程的结束(主进程阻塞) 27 for p in processes: 28 p.join() 29 30 ''' 31 输出: 32 Worker 0 is working 33 Worker 0 finished 34 Worker 1 is working 35 Worker 1 finished 36 Worker 2 is working 37 Worker 2 finished 38 Worker 3 is working 39 Worker 3 finished 40 Worker 4 is working 41 Worker 4 finished 42 43 '''
信号量
在multiprocessing
模块中,由于进程之间的内存空间是独立的,所以不能直接使用Python标准库中的threading.Semaphore
。不过可以通过使用multiprocessing.Manager
来创建一个进程间共享的Semaphore
对象
multiprocessing
中的Semaphore
工作原理
在multiprocessing
中,可以使用multiprocessing.Manager
创建一个进程间共享的Semaphore
对象。Semaphore
的工作原理与标准库中的相同,用于控制对共享资源的访问。
multiprocessing
中的Semaphore
最佳实践
-
使用
multiprocessing.Manager
:在使用Semaphore
时,需要使用multiprocessing.Manager
来创建一个进程间共享的Semaphore
对象。 -
获取和释放许可证:进程在需要访问受
Semaphore
保护的共享资源之前,必须先通过调用acquire()
方法获取一个许可证。访问完成后,必须通过调用release()
方法释放许可证。 -
处理异常情况:在使用
Semaphore
时,可能会出现超时或中断等异常情况。为了保证程序的稳定性,应该在适当的地方捕获并处理这些异常。 -
确保许可证被释放:在编写代码时,需要确保每个获取到的许可证都能被正确地释放。否则,可能会导致其他进程无法获取许可证而陷入阻塞状态。
-
关闭
Manager
:在程序结束后,应该显式地关闭Manager
来释放资源。
multiprocessing
中的Semaphore
坑
-
进程间通信开销:由于进程之间的内存空间是独立的,所以进程间通信会带来一定的开销。在使用
multiprocessing.Manager
创建的Semaphore
对象时,要注意控制进程间通信的频率和数据量,以避免性能问题。 -
异常处理:在使用
Semaphore
时,可能会出现超时或中断等异常情况。为了保证程序的稳定性,应该在适当的地方捕获并处理这些异常,避免阻塞或异常传播到整个系统。
示例
1 from multiprocessing import Process, Manager 2 3 4 def worker(semaphore, id): 5 # 获取许可证 6 semaphore.acquire() 7 try: 8 print(f"Worker {id} is working") 9 except: 10 print("处理所有的异常") 11 finally: 12 # 释放许可证 13 semaphore.release() 14 print(f"Worker {id} finished") 15 16 17 if __name__ == '__main__': 18 with Manager() as manager: 19 # 创建一个进程间共享的Semaphore对象 20 semaphore = manager.Semaphore(2) 21 22 processes = [] 23 for i in range(5): 24 p = Process(target=worker, args=(semaphore, i)) 25 processes.append(p) 26 p.start() 27 28 for p in processes: 29 p.join()
输出:
Worker 0 is working Worker 0 finished Worker 1 is working Worker 1 finished Worker 2 is working Worker 2 finished Worker 3 is working Worker 3 finished Worker 4 is working Worker 4 finished
使用multiprocessing.Manager
创建了一个进程间共享的Semaphore
对象 semaphore
。然后,我们定义了一个 worker()
函数作为进程的任务函数。在 worker()
函数内部,首先通过 semaphore.acquire()
获取一个许可证,表示该进程开始占用资源。然后,打印一条工作信息,并最终通过 semaphore.release()
释放许可证,表示该进程结束占用资源。
在主程序中,我们创建了5个进程并启动它们,每个进程都会执行 worker()
函数。由于初始许可证数量为2,所以最多只有两个进程可以同时执行工作。其他进程会在调用 semaphore.acquire()
处阻塞,直到有可用的许可证。最后,我们使用 p.join()
等待所有进程执行完毕。
通过Semaphore
的控制,我们可以限制同时访问共享资源的进程数量,从而避免资源的竞争和冲突。