转载:(15条消息) 基于multiprocessing map实现python并行化(全局变量共享 map机制实用向分析 常见问题 pandas存储数据)_goto_past的博客-CSDN博客
基于multiprocessing map实现python并行化
之前从来没考虑python可以并行化,最近有一个项目需要计算100*100 次的遗传算法适应度,每次计算都要用到700000+的数据,每次计算不并行的话得用几十分钟,根本顶不住,因此调研并学习了一下并行化处理,还是很有效的,现在每次计算基本控制在2分钟以内。
首先看一个并行化实现for循环的博客,适合刚刚接触python并行化的伙伴。
[python]map方法与并行执行
我的并行化启蒙就是源自上面博客中的代码,果然Talk is cheap, show us the code!
之后在此基础上一步步解决了共享变量等问题,map相较于Process的一个好处是更容易获得返回值,这也是为啥之后见到的多是Process的教程我依然坚持用map。
话不多说,直接上代码,个人心得见注释。
""" 基于multiprocessing map 实现python并行化 的一些问题分享 环境:python 3.8.5 其他库也要进行相应的配套更新(很重要 不同的python版本并行机制有一定差异) """ import time from tqdm import tqdm import pickle import numpy as np import pandas as pd import multiprocessing as mp from multiprocessing import Array # from read_data import get_distance_hav from multiprocessing.dummy import freeze_support, Manager from math import sin, asin, cos, radians, fabs, sqrt """ 在引用自己定义的外部函数时要注意: 被引用文件的全局代码段会在引用时被执行! 造成大量的时间和内存浪费。 之前在引用'get_distance_hav'函数时没有注意,导致多进程开展前进行了大量的读取和运算。 """ EARTH_RADIUS = 6371 # 地球平均半径,6371km #plan_index = np.load('plan_index.npy', allow_pickle=True).item() # 读入外部数据,作为全局变量,子进程会只读 plan_index = range(10) W1, W2, W3 = 1, 1, 1 """ multiprocessing map机制: map函数对可迭代的参数列表里的每个参数执行所传入的func,并且创建Pool规定的进程数来并行实现前面的过程。 理论上来说,每个进程执行 "len(参数列表))/cpu_num" 次func。 ***函数外部的代码(该注释上为例),每个进程都会且只会执行一次(进程第一次创建时),之后只会重复执行func的内容*** 在函数外部声明的全局变量,每个进程都可以读到(使用global,原理上是子进程复制了该全局变量到自己的内存,子进程可以修改该值,但子进程不会影响该全局变量在主进程中的值。 另外,根据上面星号标注的内容所说,一旦子进程中更改过读进来的全局变量的值,子进程之后所有func的执行都以更改后的值为准,并不会恢复到声明时所赋的值。 (例如前面的EARTH_RADIUS,一旦子进程中修改该值为0,该子进程之后对func所有的执行读入EARTH_RADIUS就已经为0了) 对此有疑惑的可以见下一份代码示例 """ # 下面是两个计算距离的功能函数 对本文没有帮助 可跳过 def hav(theta): s = sin(theta / 2) return s * s def get_distance_hav(lat0, lng0, lat1, lng1): """用haversine公式计算球面两点间的距离。""" # do calculation return distance def get_d(index1, index2, ns): # 这里同样可以读入ns里面的变量 # 与本文关系不大 省略 return 0 # 进程执行的func: def find_min(args): index1, ns = args # 读入了两个变量,需要计算的index下标,以及Manager Namespace global plan_index # 读入全局变量 注意不要修改,否则子进程中下次func执行将会是修改后的值,而非最开始读入的值 for i in range(1, len(ns.df)): # ns.df 即为传入的df last, next = max(0, index1 - i), min(index1 + i, len(ns.df) - 1) d1 = get_d(index1, last, ns) d2 = get_d(index1, next, ns) d = min(d1, d2) index2 = (last if d == d1 else next) if d != np.inf: break v= d*W1+W2+W3 #v = d * (W1 * ns.df[index1]['Q'] + W2 * ns.df.iloc[index1]['A'] + W3 * ns.df.iloc[index1]['T']) return v if __name__ == '__main__': t1 = time.time() # with open('plan_able.pkl', 'rb') as f: # plan_able = pickle.load(f) plan_able = list(range(10)) # df = pd.read_pickle('TJ_UGS.pkl') code = np.zeros(385066) planed = np.random.choice(range(0, 385066), 120000) code[planed] = 1 code = code.astype(int) # 通过Manger实现参数共享,节省内存占用。 # Manger也可以实现不同进程对变量修改(需要锁),但本代码中 外部数据都是只读,没有用到。 mgr = Manager() ns = mgr.Namespace() # ns.df = df ns.df = list(range(10)) ns.code = code # 为Namespace()存入df和code两个两边,func可访问 p = mp.Pool(8) result = p.map(find_min, tqdm(list(zip(plan_able, [ns] * len(plan_able))))) # 由于map机制,要为每个共享的参数复制len份 p.close() p.join() print(code) print(sum(result)) t2 = time.time() print('time used:', t2 - t1)
下面这份代码示例主要用于说明map子进程中变量更改的一些机制
在函数外部声明的全局变量,每个进程都可以读到(使用global,原理上是子进程复制了该全局变量到自己的内存,子进程可以修改该值,但子进程不会影响该全局变量在主进程中的值。) 一旦子进程中更改过读进来的全局变量的值,子进程之后所有func的执行都以更改后的值为准,并不会恢复到声明时所赋的值。
import time, os from tqdm import tqdm import pickle import numpy as np import pandas as pd import os import multiprocessing as mp from multiprocessing.dummy import freeze_support, Manager pid = os.getpid() #用于辨别父进程和两个子进程 ppid = os.getppid() print('pid:', pid, 'ppid:', ppid) EARTH_RADIUS = 6371 # 地球平均半径,6371km plan_index = np.load('plan_index.npy', allow_pickle=True).item() W1, W2, W3 = 1, 1, 1 def find_min(arg): global EARTH_RADIUS if EARTH_RADIUS == 6371: #在运行时,只会输出两次,因为子进程在之后修改了自己的EARTH_RADIUS为0 print('************************************') pid = os.getpid() EARTH_RADIUS = 0 print('in pid:{}, value:{}'.format(pid, EARTH_RADIUS)) if __name__ == '__main__': print('main before change:', EARTH_RADIUS) p = mp.Pool(2) p.map(find_min, list(range(1000))) #range的值不能太小,否则map只会分配给一个进程执行 p.close() p.join() # find_min(0) print('main after change:', EARTH_RADIUS) # 仍为 6371
另外,推荐一篇主动实现sharedmem (共享内存)的博文,亲测有效:
知乎 Python3.8 多进程之共享内存
标签:map,常见问题,df,进程,import,ns,pandas,全局变量 From: https://www.cnblogs.com/zhiminyu/p/17446864.html