转载:Python3.8多进程之共享内存 - 知乎 (zhihu.com)
最近发了个宏愿想写一个做企业金融研究的Python框架。拖出Python一看已经更新到了3.8,于是就发现了Python 3.8里新出现的模块:multiprocessing.shared_memory
。
随手写了个测试。生成一个240MB大小的pandas.DataFrame
,然后转换成numpy.recarray
。这个DataFarme
里包括了datetime
,整型和字符串类型的列。使用numpy.recarray
的目的是为了保存dtype
,这样才能在子进程中正确从共享内存里读数据。 我在子进程中简单地使用numpy.nansum
来做计算。第一种方法是使用共享内存,第二种方法是直接将numpy.recarray
作为参数传递给子进程。 下图为测试代码的输出。
可以看出,使用共享内存的第一种方法只使用了可以忽略不计的内存,并且2秒结束战斗。传参数的方法使用了1.8GB的内存,并且慢得要命,花费200多秒。当然这跟我使用的测试机是一台2017年的12寸MacBook 4-core i5 8G RAM(已停产)有可能,不过侧面也说明在数据足够大的时候,尽量避免没必要的复制和传递还是很有效的。
测试代码如下:
from multiprocessing.shared_memory import SharedMemory from multiprocessing.managers import SharedMemoryManager from concurrent.futures import ProcessPoolExecutor, as_completed from multiprocessing import current_process, cpu_count from datetime import datetime import numpy as np import pandas as pd import tracemalloc import time def work_with_shared_memory(shm_name, shape, dtype): print(f'With SharedMemory: {current_process()=}') # Locate the shared memory by its name shm = SharedMemory(shm_name) # Create the np.recarray from the buffer of the shared memory np_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf) return np.nansum(np_array.val) def work_no_shared_memory(np_array: np.recarray): print(f'No SharedMemory: {current_process()=}') # Without shared memory, the np_array is copied into the child process return np.nansum(np_array.val) if __name__ == "__main__": # Make a large data frame with date, float and character columns a = [ (datetime.today(), 1, 'string'), (datetime.today(), np.nan, 'abc'), ] * 5000000 df = pd.DataFrame(a, columns=['date', 'val', 'character_col']) # Convert into numpy recarray to preserve the dtypes np_array = df.to_records(index=False) del df shape, dtype = np_array.shape, np_array.dtype print(f"np_array's size={np_array.nbytes/1e6}MB") # With shared memory # Start tracking memory usage tracemalloc.start() start_time = time.time() with SharedMemoryManager() as smm: # Create a shared memory of size np_arry.nbytes shm = smm.SharedMemory(np_array.nbytes) # Create a np.recarray using the buffer of shm shm_np_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf) # Copy the data into the shared memory np.copyto(shm_np_array, np_array) # Spawn some processes to do some work with ProcessPoolExecutor(cpu_count()) as exe: fs = [exe.submit(work_with_shared_memory, shm.name, shape, dtype) for _ in range(cpu_count())] for _ in as_completed(fs): pass # Check memory usage current, peak = tracemalloc.get_traced_memory() print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB") print(f'Time elapsed: {time.time()-start_time:.2f}s') tracemalloc.stop() # Without shared memory tracemalloc.start() start_time = time.time() with ProcessPoolExecutor(cpu_count()) as exe: fs = [exe.submit(work_no_shared_memory, np_array) for _ in range(cpu_count())] for _ in as_completed(fs): pass # Check memory usage current, peak = tracemalloc.get_traced_memory() print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB") print(f'Time elapsed: {time.time()-start_time:.2f}s') tracemalloc.stop()
值得一提的是,numpy.ndarray
的dtype
一定不能是object
,不然子进程访问共享内存的时候一定segfault,但如果在主进程里访问共享内存就没事。
补充更新一下,上面的测试代码work_with_shared_memory
函数里不能解引用np_array
,比如print(np_array)
,不然会segfault。使用np_array.val
和np_array.date
则没有问题则是因为这两个column的dtype
不是object
。而np_array.character_col
的dtype
在这个代码里是object
。
解决这个问题的办法也很简单,(踩坑无数次后),在to_records()
里指定dtype。
np_array = df.to_records(index=False,column_dtypes={'character_col': 'S6'})
这里我们指定character_col
为长度为6的字符串。 如果是unicode的话,可以将S6
换成U6
。 超出指定长度的字符串则会被truncate。
这样就不会有segfault了。重点就是不能有object
的dtype
。
补充一点点
在不涉及str类型的情况下似乎是正确的。
涉及str类型的话,dtype会是object类型,从而导致无法正确的处置。
这是因为
np_array = df.to_records(index=False)
会把np_array的str类型转为object类型,
用事先给定dType,构造好dType,然后
npv = [ tuple(rx) for rx in df.values.tolist()]
nparray = np.array(npv, dtype=(dType))
这个方法构造np_array正确一些。
另外,
如果str不指定长度,似乎也是不正确的。
即np.dtype应该是'<U8',后边要给定一个数字,似乎要给定一个数字,
不然其他进程读出来的是空字符串。
在其他进程用:
shm = SharedMemory('psm_c76848bd')
np_array = np.recarray(shape=(10000,), dtype=rType, buf=shm.buf)
df = pd.DataFrame(np_array)
即可以重建dataframe
PS1:
How to use pandas DataFrame in shared memory during multiprocessing?这篇文章会有所帮助。
PS2:
其他进程退出的时候会自动删除共享内存,
为使得各个进程能够互不干扰的使用共享内存,还需要在进程退出的之前用unregister去掉共享内存的引用,避免进程退出时自动删掉共享内存,其他进程无法使用的尴尬情况。
标签:dtype,time,np,memory,进程,shared,array,共享内存,Python3.8 From: https://www.cnblogs.com/zhiminyu/p/17418430.html