首页 > 系统相关 >Python3.8多进程之共享内存

Python3.8多进程之共享内存

时间:2023-05-21 12:37:08浏览次数:68  
标签:dtype time np memory 进程 shared array 共享内存 Python3.8

转载:Python3.8多进程之共享内存 - 知乎 (zhihu.com)

最近发了个宏愿想写一个做企业金融研究的Python框架。拖出Python一看已经更新到了3.8,于是就发现了Python 3.8里新出现的模块:multiprocessing.shared_memory

随手写了个测试。生成一个240MB大小的pandas.DataFrame,然后转换成numpy.recarray。这个DataFarme里包括了datetime,整型和字符串类型的列。使用numpy.recarray的目的是为了保存dtype,这样才能在子进程中正确从共享内存里读数据。 我在子进程中简单地使用numpy.nansum来做计算。第一种方法是使用共享内存,第二种方法是直接将numpy.recarray作为参数传递给子进程。 下图为测试代码的输出。

 

可以看出,使用共享内存的第一种方法只使用了可以忽略不计的内存,并且2秒结束战斗。传参数的方法使用了1.8GB的内存,并且慢得要命,花费200多秒。当然这跟我使用的测试机是一台2017年的12寸MacBook 4-core i5 8G RAM(已停产)有可能,不过侧面也说明在数据足够大的时候,尽量避免没必要的复制和传递还是很有效的。

测试代码如下:

from multiprocessing.shared_memory import SharedMemory
from multiprocessing.managers import SharedMemoryManager
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import current_process, cpu_count
from datetime import datetime
import numpy as np
import pandas as pd
import tracemalloc
import time


def work_with_shared_memory(shm_name, shape, dtype):
    print(f'With SharedMemory: {current_process()=}')
    # Locate the shared memory by its name
    shm = SharedMemory(shm_name)
    # Create the np.recarray from the buffer of the shared memory
    np_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf)
    return np.nansum(np_array.val)


def work_no_shared_memory(np_array: np.recarray):
    print(f'No SharedMemory: {current_process()=}')
    # Without shared memory, the np_array is copied into the child process
    return np.nansum(np_array.val)


if __name__ == "__main__":
    # Make a large data frame with date, float and character columns
    a = [
        (datetime.today(), 1, 'string'),
        (datetime.today(), np.nan, 'abc'),
    ] * 5000000
    df = pd.DataFrame(a, columns=['date', 'val', 'character_col'])
    # Convert into numpy recarray to preserve the dtypes
    np_array = df.to_records(index=False)
    del df
    shape, dtype = np_array.shape, np_array.dtype
    print(f"np_array's size={np_array.nbytes/1e6}MB")

    # With shared memory
    # Start tracking memory usage
    tracemalloc.start()
    start_time = time.time()
    with SharedMemoryManager() as smm:
        # Create a shared memory of size np_arry.nbytes
        shm = smm.SharedMemory(np_array.nbytes)
        # Create a np.recarray using the buffer of shm
        shm_np_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf)
        # Copy the data into the shared memory
        np.copyto(shm_np_array, np_array)
        # Spawn some processes to do some work
        with ProcessPoolExecutor(cpu_count()) as exe:
            fs = [exe.submit(work_with_shared_memory, shm.name, shape, dtype)
                  for _ in range(cpu_count())]
            for _ in as_completed(fs):
                pass
    # Check memory usage
    current, peak = tracemalloc.get_traced_memory()
    print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB")
    print(f'Time elapsed: {time.time()-start_time:.2f}s')
    tracemalloc.stop()

    # Without shared memory
    tracemalloc.start()
    start_time = time.time()
    with ProcessPoolExecutor(cpu_count()) as exe:
        fs = [exe.submit(work_no_shared_memory, np_array)
              for _ in range(cpu_count())]
        for _ in as_completed(fs):
            pass
    # Check memory usage
    current, peak = tracemalloc.get_traced_memory()
    print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB")
    print(f'Time elapsed: {time.time()-start_time:.2f}s')
    tracemalloc.stop()

值得一提的是,numpy.ndarraydtype一定不能是object,不然子进程访问共享内存的时候一定segfault,但如果在主进程里访问共享内存就没事。

补充更新一下,上面的测试代码work_with_shared_memory 函数里不能解引用np_array,比如print(np_array),不然会segfault。使用np_array.valnp_array.date则没有问题则是因为这两个column的dtype不是object。而np_array.character_coldtype在这个代码里是object

解决这个问题的办法也很简单,(踩坑无数次后),在to_records()里指定dtype。

 np_array = df.to_records(index=False,column_dtypes={'character_col': 'S6'}) 

这里我们指定character_col为长度为6的字符串。 如果是unicode的话,可以将S6换成U6。 超出指定长度的字符串则会被truncate。

这样就不会有segfault了。重点就是不能有objectdtype

 

补充一点点

 

在不涉及str类型的情况下似乎是正确的。

涉及str类型的话,dtype会是object类型,从而导致无法正确的处置。

 

这是因为

np_array = df.to_records(index=False)

会把np_array的str类型转为object类型,

 

用事先给定dType,构造好dType,然后

npv = [ tuple(rx) for rx in df.values.tolist()]
nparray = np.array(npv, dtype=(dType))
这个方法构造np_array正确一些。

 

另外,

如果str不指定长度,似乎也是不正确的。

即np.dtype应该是'<U8',后边要给定一个数字,似乎要给定一个数字,

不然其他进程读出来的是空字符串。

 

在其他进程用:

shm = SharedMemory('psm_c76848bd')
np_array = np.recarray(shape=(10000,), dtype=rType, buf=shm.buf)
df = pd.DataFrame(np_array)
即可以重建dataframe

 

PS1:

How to use pandas DataFrame in shared memory during multiprocessing?这篇文章会有所帮助。
PS2:

其他进程退出的时候会自动删除共享内存,

为使得各个进程能够互不干扰的使用共享内存,还需要在进程退出的之前用unregister去掉共享内存的引用,避免进程退出时自动删掉共享内存,其他进程无法使用的尴尬情况。

标签:dtype,time,np,memory,进程,shared,array,共享内存,Python3.8
From: https://www.cnblogs.com/zhiminyu/p/17418430.html

相关文章

  • python进程池ProcessPoolExecutor的用法与实现分析
    转载:(14条消息)【Python随笔】python进程池ProcessPoolExecutor的用法与实现分析_utmhikari的博客-CSDN博客concurrent.futures—Launchingparalleltasks—Python3.11.3documentation在python开发期间,由于GIL的原因,不能直接采用并行的方式处理代码逻辑。在multiprocess......
  • Linux进程命令
    proc系统查看进程占用的物理内存top-p{pid}查看进程占用的内存cat/proc/{pid}/statusVmRSS是进程占用的物理内存fuser查看使用的文件的进程fuser-umv/proc查看启动指定端口的进程fuser-v2379/tcplsof查看进程使用的文件lsof|grepkubelet|head-n10pido......
  • Mac 删除MySQL后仍然有MySQL进程且杀不掉
    如图解决方案ps-ef|grepmysql|grep-vgrep那个其实是grep进程,真正的mysql进程已经被杀掉了。并且那一行也有grep这个词。......
  • 僵尸进程问题解决
    1何为僵尸进程僵尸进程是当子进程比父进程先结束,而父进程又没有回收子进程,释放子进程占用的资源,此时子进程将成为一个僵尸进程。如果父进程先退出,子进程被init接管,子进程退出后init会回收其占用的相关资源。在UNIX系统中,一个进程结束了,但是它的父进程没有等待(调用wait/wai......
  • Linux进程调度-组调度及带宽控制
    1.概述组调度(task_group)是使用Linuxcgroup(controlgroup)的cpu子系统来实现的,可以将进程进行分组,按组来分配CPU资源等。比如,看一个实际的例子:A和B两个用户使用同一台机器,A用户16个进程,B用户2个进程,如果按照进程的个数来分配CPU资源,显然A用户会占据大量的CPU时间,这对于B用户......
  • Linux基础22 进程的优先级nice, 后台进程管理, 系统平均负载, 系统启动流程
    进程的优先级:nice值越高:表示优先级越低,例如19,该进程容易将CPU使用量让给其他进程。nice值越低:表示优先级越高,例如-20,该进程更不倾向于让出CPU。#以设定的优先级启动nice-n-10tail-f/var/log/messages#重新设置一个进程的优先级(调整sshd的优先级)[root@oldboyedu~]#......
  • 记一次将 .netcore 项目用 IIS 进程调试
    环境:win10,VisualStudio2022 在.netframework年代,我们都习惯用iis进程调试代码。因为用F5调试代码效率太低下。现在.netcore时代,这种好习惯可不能丢。简单记录一下,我的操作过程。 1.首先用IIS挂载网站,看能不能把发布的好的网站跑起来2.其次用IIS增加网站,......
  • 如何查看Linux有哪些进程
    ​在Linux中,可以使用以下命令查看当前系统中运行的进程:ps命令:用于显示当前系统中的进程信息。psaux该命令会显示所有进程的详细信息,包括进程ID、父进程ID、CPU占用率、内存占用率、进程状态等。top命令:用于实时显示当前系统中的进程信息。top该命令会实......
  • WPF单进程实例
    用互斥量Mutex实现如果已经存在Mutex,则会创建失败。注意:Mutex要声明成全局的,不能是局部变量,否则会判断失败。 重写Startup函数,加上单例判断。参考下面代码:1publicpartialclassApp:Application2{3System.Threading.Mutexmutex;45......
  • Linux多进程20-共享内存
    共享内存共享内存允许两个或多个进程共享物理内存的同一块区域(通常称为段)一个共享内存段会成为一个进程用户空间的一部分,因此这种IPC机制无需内核介入,所有需要做的就是让一个进程将数据复制进共享内存中,并且这部分数据会对其他所有共享同一个段的进程可用管道是要求发......