首页 > 编程语言 >基于multiprocessing map实现python并行化(全局变量共享 map机制实用向分析 常见问题 pandas存储数据)

基于multiprocessing map实现python并行化(全局变量共享 map机制实用向分析 常见问题 pandas存储数据)

时间:2023-05-31 17:47:16浏览次数:57  
标签:map 常见问题 df 进程 import ns pandas 全局变量

转载:(15条消息) 基于multiprocessing map实现python并行化(全局变量共享 map机制实用向分析 常见问题 pandas存储数据)_goto_past的博客-CSDN博客

基于multiprocessing map实现python并行化
之前从来没考虑python可以并行化,最近有一个项目需要计算100*100 次的遗传算法适应度,每次计算都要用到700000+的数据,每次计算不并行的话得用几十分钟,根本顶不住,因此调研并学习了一下并行化处理,还是很有效的,现在每次计算基本控制在2分钟以内。

首先看一个并行化实现for循环的博客,适合刚刚接触python并行化的伙伴。
[python]map方法与并行执行

我的并行化启蒙就是源自上面博客中的代码,果然Talk is cheap, show us the code!

之后在此基础上一步步解决了共享变量等问题,map相较于Process的一个好处是更容易获得返回值,这也是为啥之后见到的多是Process的教程我依然坚持用map。

话不多说,直接上代码,个人心得见注释。

"""
基于multiprocessing map
实现python并行化 的一些问题分享
环境:python 3.8.5 其他库也要进行相应的配套更新(很重要 不同的python版本并行机制有一定差异)
"""

import time
from tqdm import tqdm
import pickle
import numpy as np
import pandas as pd
import multiprocessing as mp
from multiprocessing import Array
# from read_data import get_distance_hav
from multiprocessing.dummy import freeze_support, Manager
from math import sin, asin, cos, radians, fabs, sqrt

"""
在引用自己定义的外部函数时要注意:
被引用文件的全局代码段会在引用时被执行! 造成大量的时间和内存浪费。
之前在引用'get_distance_hav'函数时没有注意,导致多进程开展前进行了大量的读取和运算。
"""

EARTH_RADIUS = 6371  # 地球平均半径,6371km
#plan_index = np.load('plan_index.npy', allow_pickle=True).item()  # 读入外部数据,作为全局变量,子进程会只读
plan_index = range(10)
W1, W2, W3 = 1, 1, 1

"""
multiprocessing map机制:
map函数对可迭代的参数列表里的每个参数执行所传入的func,并且创建Pool规定的进程数来并行实现前面的过程。
理论上来说,每个进程执行 "len(参数列表))/cpu_num" 次func。

***函数外部的代码(该注释上为例),每个进程都会且只会执行一次(进程第一次创建时),之后只会重复执行func的内容***
在函数外部声明的全局变量,每个进程都可以读到(使用global,原理上是子进程复制了该全局变量到自己的内存,子进程可以修改该值,但子进程不会影响该全局变量在主进程中的值。 另外,根据上面星号标注的内容所说,一旦子进程中更改过读进来的全局变量的值,子进程之后所有func的执行都以更改后的值为准,并不会恢复到声明时所赋的值。

(例如前面的EARTH_RADIUS,一旦子进程中修改该值为0,该子进程之后对func所有的执行读入EARTH_RADIUS就已经为0了)

对此有疑惑的可以见下一份代码示例
"""


# 下面是两个计算距离的功能函数 对本文没有帮助 可跳过
def hav(theta):
    s = sin(theta / 2)
    return s * s


def get_distance_hav(lat0, lng0, lat1, lng1):
    """用haversine公式计算球面两点间的距离。"""
    # do calculation
    return distance


def get_d(index1, index2, ns):
    # 这里同样可以读入ns里面的变量
    # 与本文关系不大 省略
    return 0


# 进程执行的func:
def find_min(args):
    index1, ns = args  # 读入了两个变量,需要计算的index下标,以及Manager Namespace
    global plan_index  # 读入全局变量 注意不要修改,否则子进程中下次func执行将会是修改后的值,而非最开始读入的值
    for i in range(1, len(ns.df)):  # ns.df 即为传入的df
        last, next = max(0, index1 - i), min(index1 + i, len(ns.df) - 1)
        d1 = get_d(index1, last, ns)
        d2 = get_d(index1, next, ns)
        d = min(d1, d2)
        index2 = (last if d == d1 else next)
        if d != np.inf:
            break
    v= d*W1+W2+W3
    #v = d * (W1 * ns.df[index1]['Q'] + W2 * ns.df.iloc[index1]['A'] + W3 * ns.df.iloc[index1]['T'])
    return v


if __name__ == '__main__':
    t1 = time.time()
    # with open('plan_able.pkl', 'rb') as f:
    #     plan_able = pickle.load(f)
    plan_able = list(range(10))
    # df = pd.read_pickle('TJ_UGS.pkl')

    code = np.zeros(385066)
    planed = np.random.choice(range(0, 385066), 120000)
    code[planed] = 1
    code = code.astype(int)

    # 通过Manger实现参数共享,节省内存占用。
    # Manger也可以实现不同进程对变量修改(需要锁),但本代码中 外部数据都是只读,没有用到。
    mgr = Manager()
    ns = mgr.Namespace()
    # ns.df = df
    ns.df = list(range(10))
    ns.code = code  # 为Namespace()存入df和code两个两边,func可访问

    p = mp.Pool(8)
    result = p.map(find_min, tqdm(list(zip(plan_able, [ns] * len(plan_able)))))
    # 由于map机制,要为每个共享的参数复制len份

    p.close()
    p.join()

    print(code)
    print(sum(result))
    t2 = time.time()
    print('time used:', t2 - t1)

下面这份代码示例主要用于说明map子进程中变量更改的一些机制
在函数外部声明的全局变量,每个进程都可以读到(使用global,原理上是子进程复制了该全局变量到自己的内存,子进程可以修改该值,但子进程不会影响该全局变量在主进程中的值。) 一旦子进程中更改过读进来的全局变量的值,子进程之后所有func的执行都以更改后的值为准,并不会恢复到声明时所赋的值。

import time, os
from tqdm import tqdm
import pickle
import numpy as np
import pandas as pd
import os
import multiprocessing as mp
from multiprocessing.dummy import freeze_support, Manager

pid = os.getpid()  #用于辨别父进程和两个子进程
ppid = os.getppid()
print('pid:', pid, 'ppid:', ppid)

EARTH_RADIUS = 6371  # 地球平均半径,6371km
plan_index = np.load('plan_index.npy', allow_pickle=True).item()
W1, W2, W3 = 1, 1, 1


def find_min(arg):
    global EARTH_RADIUS
    if EARTH_RADIUS == 6371:  #在运行时,只会输出两次,因为子进程在之后修改了自己的EARTH_RADIUS为0
        print('************************************')
    pid = os.getpid()
    EARTH_RADIUS = 0
    print('in pid:{}, value:{}'.format(pid, EARTH_RADIUS))


if __name__ == '__main__':
    print('main before change:', EARTH_RADIUS)
    p = mp.Pool(2)
    p.map(find_min, list(range(1000)))  #range的值不能太小,否则map只会分配给一个进程执行
    p.close() 
    p.join()
    # find_min(0)

    print('main after change:', EARTH_RADIUS)  # 仍为 6371

另外,推荐一篇主动实现sharedmem (共享内存)的博文,亲测有效:
知乎 Python3.8 多进程之共享内存

 

标签:map,常见问题,df,进程,import,ns,pandas,全局变量
From: https://www.cnblogs.com/zhiminyu/p/17446864.html

相关文章

  • HashMap 源码解毒
    PUT方法解毒:hashcode高低16进行异或运算,尽量降低哈希冲突的概率如果数组很小,hashcode的高位就不能被很好利用。finalVputVal(inthash,Kkey,Vvalue,booleanonlyIfAbsent,booleanevict){Node<K,V>[]tab;Node<K,V>p;intn,i;......
  • pandas dataframe 过滤——apply最灵活!!!
    按照某特定string字段长度过滤:importpandasaspddf=pd.read_csv('filex.csv')df['A']=df['A'].astype('str')df['B']=df['B'].astype('str')mask=(df['A'].str.len()==10)&(df......
  • Java中常见转换-数组与list互转、驼峰下划线互转、Map转Map、List转Map、进制转换的多
    场景Java中数组与List互转的几种方式数组转List1、最简单的方式,Arrays.asList(array);创建的是不可变列表,不能删除和新增元素String[]array=newString[]{"a","b"};List<String>stringList=Arrays.asList(array);System.out.println(strin......
  • sqlmap的使用 ----常用tamper模块,TODO,绕过WAF的测试
    sqlmap的使用----自带绕过脚本tamperwkend2018-09-1520:23:39sqlmap在默认的的情况下除了使用char()函数防止出现单引号,没有对注入的数据进行修改,还可以使用–tamper参数对数据做修改来绕过waf等设备。0x01命令如下sqlmap-u[url]--tamper[模块名]sqlmap的绕过脚本在目录u......
  • nmap -A 启用操作系统和版本检测,脚本扫描和路由跟踪功能
    nmap-Axx.xx.IP.xxStartingNmap7.91(https://nmap.org)at2021-08-1810:13CSTNmapscanreportfor39.108.15.161Hostisup(0.075slatency).Notshown:989filteredportsPORTSTATESERVICEVERSION22/tcpopensshOpenSSH......
  • pandas groupby 分组操作
    最一般化的groupby方法是apply.tips=pd.read_csv('tips.csv')tips[:5]新生成一列tips['tip_pct']=tips['tip']/tips['total_bill']tips[:6]根据分组选出最高的5个tip_pct值deftop(df,n=5,column='tip_pct'):returndf.sort_index(by=colum......
  • 【python教程】map、多进程与进度条
    转载:【python教程】map、多进程与进度条-知乎(zhihu.com)今天讲讲我在实习中学到的一点python知识,核心内容是多进程,也即我们常说的并行计算。map首先提个问题,给出一个列表,对列表中的每个元素都平方,代码怎么写?最简单直观的方法自然就是for循环。alist=[1,2,3,4,5,6,......
  • 办公自动化pandas
    需求如下:实现代码:未优化代码importpandasaspddefrep_huan():#读取excel文件df1=pd.read_excel('data1.xlsx')#表一df2=pd.read_excel('data2.xlsx')#表二replace_dict=[]forxm,ncinzip(df1['姓名'],df1[�......
  • 常见问题
    常见问题思源适合我吗?或者说我应该如何选择笔记软件?这个问题因人而异,很难统一回答。如果你不太确定思源是否适合你,请看如下建议:如果你需要经常分享笔记或者与别人协作编辑,并且需要数据表格功能,推荐使用:FlowUs息流-新一代知识管理与协作平台我来wolai-不仅仅是未来的......
  • 3.4. Java集合框架(List、Set、Map等)
    Java集合框架是Java提供的一套用于存储和操作数据的接口和类。它包括以下几个主要部分:接口:集合框架定义了一系列接口,如Collection、List、Set、Map等。实现类:集合框架提供了一些实现这些接口的类,如ArrayList、LinkedList、HashSet、LinkedHashSet、HashMap、LinkedHashMap等。......