由于GIL(global interpreter lock)的存在, 使得CPython并不能充分发挥多核CPU的优势,对于计算密集型任务CPython的多线程性能可能比串行执行还要低,但是在IO密集的任务中多线程还是有可取之处的,可以更方便的管理多线程。
这次想聊聊Python中的并行,而Python多线程基本上属于并发的范畴,那就从multiprocessing这个工具开始,希望从零开始讲明白。首先讲一下概念,进程是资源分配的最小单位, 线程是任务调度的最小单位。Python基于进程的并行没有线程的概念了,资源分配和调度的最小单位都是进程。 可以使用下面代码新建一个简单进程:
import multiprocessing as mp def x(msg): print(msg, "parent id ->", os.getppid(), "current pid ->", os.getpid()) if __name__ == '__main__': print("main:->", os.getpid()) p = mp.Process(target=x, args=("hello",)) p.start() output: main:-> 43436 hello parent id -> 43436 current pid -> 43437
multiprocessing.Process的使用说明可以查看文档, 一般常用的target和参数即可。这里打印出了一些pid 可以对进程有个直观的感觉, 可以看到main函数的pid为43436 进程p执行时打印的父进程也是43436。
关于父进程和子进程的启动方式有三种:
spawn:由父进程启动一个Python解释器进程,子进程只会继承运行对象的run()方法这些资源可以由父进程分配。父进程中的非必须的文件描述符和句柄不会被继承,这种启动方式最慢;
fork:父进程使用os.fork()来产生的解释器分叉,这种状态下父进程和子进程是相同的,父进程的所有资源都有子进程继承,安全分叉多线程进程管理起来会很困难;
forkserver: 程序启动并选择* forkserver * 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用os.fork()是安全的。没有不必要的资源被继承。
需要注意一点与一个上线文相关的对象可能与不同上下文的进程不兼容,例如当使用fork上下文创建的锁不能传递给使用spawn或forkserver启动方法启动的进程。想要使用特定启动方法的库应该使用 get_context() 以避免干扰库用户的选择。
mp.set_start_method('spawn') ctx = mp.get_context() print(ctx) lock = ctx.Lock() ctx.Process(target=f, args=(lock, 1)).start() outputs: <multiprocessing.context.SpawnContext object at 0x102c42860> Traceback (most recent call last): File "<string>", line 1, in <module> File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/spawn.py", line 105, in spawn_main exitcode = _main(fd) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/spawn.py", line 115, in _main self = reduction.pickle.load(from_parent) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/synchronize.py", line 111, in __setstate__ self._semlock = _multiprocessing.SemLock._rebuild(*state) FileNotFoundError: [Errno 2] No such file or directory
这里报错是因为mac默认的fork方式启动进程, 而代码里配置了"spawn" 方式这里就出错了。
把”spawn“改成”fork“即可通过。
进程间交换对象
multiprocessing 支持进程之间的两种通信通道
队列
Queue 队列是线程和进程安全的
def product(q: Queue, msg: str): q.put(msg) if __name__ == '__main__': ctx = mp.get_context() queue = ctx.Queue() p1 = ctx.Process(target=product(queue, "hello")) p2 = ctx.Process(target=product(queue, "world")) p1.start() p2.start() while not queue.empty(): print(queue.get())
管道
Pip()每个连接对象都有send()和recv()方法 默认情况下是双工的,如果两个进程同时对管道的同一端进行操作有可能发生损坏,同时对管道的不同端进行操作则不会有任何问题。
def f(conn): conn.send([110, 'hi', 'nice']) conn.close() if __name__ == '__main__': ctx = mp.get_context() parent_conn, child_conn = mp.Pipe() p = ctx.Process(target=f, args=(parent_conn,)) p.start() print(child_conn.recv()) p.join() p = ctx.Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join() outputs: [110, 'hi', 'nice'] [110, 'hi', 'nice']
可以看到Pipe() 返回的两个连接 只要不同时操作 得出的结果是一样的。
进程同步
锁没什么可讲的,需要注意的是产生锁的上下文启动方式和进程启动的方式要保持一致否则就会挂掉
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
进程间共享状态
multiprocessing有两个数据结构:Value、Array. 这两个类型可以和python原生数据类型转换不过也要花些开销。
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])
创建 num 和 arr 时使用的 'd' 和 'i' 参数是 array 模块使用的类型的 typecode : 'd' 表示双精度浮点数, 'i' 表示有符号整数。这些共享对象将是进程和线程安全的。
为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意ctypes对象。
————————————————
版权声明:本文为CSDN博主「weixin_42519772」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_42519772/article/details/112581612