首页 > 系统相关 >Python跨进程共享数据/对象

Python跨进程共享数据/对象

时间:2023-05-21 21:22:24浏览次数:45  
标签:... Python self 对象 进程 共享 def

转载:(14条消息) Python跨进程共享数据/对象_python多进程共享对象_alpha.5的博客-CSDN博客

1. 跨进程共享方式
在multiprocess库中,跨进程对象共享有三种方式:

(1)第一种仅适用于原生机器类型,即python.ctypes当中的类型,这种在mp库的文档当中称为shared memory方式,即通过共享内存共享对象

(2)另外一种称之为server process,即有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象;

(3)最后一种在mp文档当中没有单独提出,但是在其中多次提到,而且是mp库当中最重要的一种共享方式,称为inheritance,即继承,对象在 父进程当中创建,然后在父进程是通过multiprocessing.Process创建子进程之后,子进程自动继承了父进程当中的对象,并且子进程对这 些对象的操作都是反映到了同一个对象。

2. Shared Memory模型
shared memory模型能共享ctypes当中的类型,通过RawValue,RawArray等包装类提供。通过查看multiprocess的源码可以看到支持的类型有:

Value、Array、Lock等,

def Event() -> synchronize.Event: ...
def Lock() -> synchronize.Lock: ...
def RLock() -> synchronize.RLock: ...
def Semaphore(value: int = ...) -> synchronize.Semaphore: ...
def Pipe(duplex: bool = ...) -> Tuple[connection.Connection, connection.Connection]: ...
def Pool(
    processes: Optional[int] = ...,
    initializer: Optional[Callable[..., Any]] = ...,
    initargs: Iterable[Any] = ...,
    maxtasksperchild: Optional[int] = ...,
) -> pool.Pool: ...
 
# Functions Array and Value are copied from context.pyi.
# See https://github.com/python/typeshed/blob/ac234f25927634e06d9c96df98d72d54dd80dfc4/stdlib/2and3/turtle.pyi#L284-L291
# for rationale
def Array(typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...) -> sharedctypes._Array: ...
def Value(typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value: ...

共享的使用方法示例如下:

import multiprocessing
 
 
def func(num):
    num.value=11.11  # 子进程改变数值的值,主进程跟着改变
 
 
if __name__=="__main__":
    # d表示数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value)
    num=multiprocessing.Value("d", 10.0) 
    p=multiprocessing.Process(target=func,args=(num,))
    p.start()
    p.join()
 
    print(num.value)

3. Server Process模型
这个模式支持跨进程共享所有对象,也即是想要共享 “自定义对象”,只能使用这个方式!

在server process模型中,有一个manager进程(就是那个server进程),负责管理实际的对象,真正的对象也是在manager进程的内存空间中。所有需要访问该对象的进程都需要先连接到该管理进程,然后获取到对象的一个代理对象(Proxy object)。这个模型是一个典型的RPC(远程过程调用)的模型。因为每个客户进程实际上都是在访问manager进程当中的对象,因此完全可以通过这个实现对象共享。

(1)Manager支持类型

通过查看源码可以发现,Manage() 支持的类型有:

def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...
def Condition(self, lock: Any = ...) -> threading.Condition: ...
def Event(self) -> threading.Event: ...
def Lock(self) -> threading.Lock: ...
def Namespace(self) -> _Namespace: ...
def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ...
def RLock(self) -> threading.RLock: ...
def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...
def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...
def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ...
def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ...
def list(self, sequence

使用示例如下:

import multiprocessing
 
 
def func(dict_in,list_in):
    # 跨进程共享, 子进程修改,主进程跟着改变
    dict_in["index1"]="xxx"
 
    list_in.append("xx")
    list_in.append("yy")
 
 
if __name__=="__main__":
with multiprocessing.Manager() as mg:
    # 创建主进程与子进程之间共享的dict/list
    mydict=multiprocessing.Manager().dict()
    mylist=multiprocessing.Manager().list(range(5))
    p=multiprocessing.Process(target=func,args=(mydict,mylist))
    p.start()
    p.join()
    
    print(mylist)
    print(mydict)

(2)共享自定义类

很多场景下,Manager自带的类并不能满足我们的需求,这时候就需要用到Manager对自定义类的支持。Server Process模型共享自定义对象的实现流程如下:

(1) 基于multiprocessing.managers 重写MyManager,类内部啥都不用实现:

class MyManager(managers.BaseManager):
    """
    自定义Manager
    """
    # Pass is really enough. Nothing needs to be done here.
    pass

(2) 注册自定义类,如:RedisService、MySQLService,

# RedisService/MySQLService是自定义类, 类内部分别包含Redis连接和MySQL连接,类定义此处省略
MyManager.register("RedisService", RedisService)
 MyManager.register("MySQLService", MySQLService)

(3) 构造MyManager的实例,并由它创建多进程共享的自定义对象,

manager = MyManager()
manager.start()
 
# 创建共享对象
self.redis_service = manager.RedisService(settings)
self.mysql_service = manager.MySQLService(settings)

(4) 该对象以参数形式传入到子进程中,子进程直接使用。

全流程的代码示例如下:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import managers
 
 
class MyManager(managers.BaseManager):
    """
    自定义Manager
    """
    # Pass is really enough. Nothing needs to be done here.
    pass
 
def proc_worker(redis_service, mysql_service, task_id):
    """
    TODO::工作进程
    """
    
    # 此处可直接使用进程池共享的redis和mongo服务
    rst = redis_service.get(task_id)
    rst = mysql_service.save_result(rst)
    ...
 
class ServerExecutor:
    """
    调度执行器
    """
    def __init__(self, settings):
        # 配置信息, 用于创建redis链接和mysql链接
        self.settings = settings
 
        # 在Manager中注册自定义类(RedisService/MySQLService是我的自定义类, 类内部分别包含Redis连接和MySQL连接)
        MyManager.register("RedisService", RedisService)
        MyManager.register("MySQLService", MySQLService)
        manager = MyManager()
        manager.start()
 
        # 创建共享对象
        self.redis_service = manager.RedisService(settings)
        self.mysql_service = manager.MySQLService(settings)
 
        # 这里不仅可以是ProcessPoolExecutor,也可以是多进程Process或者进程池Pool,各自用法略有不同
        self.executor = ProcessPoolExecutor(settings.executor_num)
 
    def submit(self, task_id):
        """
        提交任务
        """
        future = self.executor.submit(proc_work,
                                      self.redis_service,
                                      self.mysql_service,
                                      task_id)
        return future
 
 
# demo
...
executor = ServerExecutor(settings)
future = executor.submit(task_id)
future.add_done_callback(task_done_callback)
...
 

 

标签:...,Python,self,对象,进程,共享,def
From: https://www.cnblogs.com/zhiminyu/p/17419196.html

相关文章

  • Python字符串的encode与decode
    首先要搞清楚,字符串在Python内部的表示是unicode编码.因此,在做编码转换时,通常需要以unicode作为中间编码,即先将其他编码的字符串解码(decode)成unicode,再从unicode编码(encode)成另一种编码。decode的作用是将其他编码的字符串转换成unicode编码,如str1.decode('gb2312'),表示将gb2312编......
  • python解析XML
    xml简介XML全称ExtensibleMarkupLanguage,中文译为可扩展标记语言。XML之前有两个先行者:SGML和HTML,率先登场的是SGML,尽管它功能强大,但文档结构复杂,既不容易学也不易于使用,因此几个主要的浏览器厂商均拒绝支持SGML,这些因素限制了SGML在网上的传播性;1989年HTML登场,它继......
  • 众惠生活是一种以共享、互助为核心的新型方式
    众惠生活是一种新型的生活方式,它强调的是人与人之间的互助和共享。在众惠生活中,人们可以通过分享资源、技能和知识来实现共赢,同时也能够减少浪费和消费的压力。众惠生活的出现源于当今社会快速发展和资源日益紧张的状况。随着城市化进程的加速,人们的生活变得越来越快节奏,同时也面临......
  • 以共享、互助为核心的新型生活方式——众惠生活
    众惠生活,是近年来出现的一种新型生活方式。它强调的是人与人之间的互助和共享,通过分享资源、技能和知识来满足各自的需求,从而实现共赢。在这个快节奏、压力巨大的都市生活中,众惠生活为人们提供了一种全新的选择。众惠生活的核心理念就是共享。在这种生活方式中,人们可以通过共享房屋......
  • python datetime时区转换
    比如把格林威治时间转换为上海时间:fromdatetimeimportdatetimeimportpytzprint('格林威治时间:',datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))print('上海时间:',datetime.now().astimezone(pytz.timezone("Asia/Shanghai")).strftime("......
  • Python 多进程之间共享变量
    转载:Python多进程之间共享变量-知乎(zhihu.com)Python多线程之间共享变量很简单,直接定义全局global变量即可。而多进程之间是相互独立的执行单元,这种方法就不可行了。不过Python标准库已经给我们提供了这样的能力,使用起来也很简单。但要分两种情况来看,一种是Process......
  • python类中调用类方法时,报错self参数未填
    转载:(14条消息)python类中调用类方法时,报错self参数未填。_追天一方的博客-CSDN博客又碰到了一个小错误比如一个类如下:classprint_number(object):def__init__(self,string="数字是"):self.string=stringdefprint_(self,ss=3):print("{}:{}".......
  • 网络编辑的使用和知识点,进程线程之间实现交互
    软件开放的框架c/s架构c就是Client客户端就是要去请求数据的s就是Server服务端就是给客服端根据客户的要求提供数据的服务端的必备条件时刻提供服务等待客服端的访问有一个固定的地址能够接受多个服务端的请求(高并发)B/s架构B就是Browser就是一个浏览器充当所有服务端......
  • 阿里云对象存储OSS————跨域资源共享(CORS)(m3u8 无法加载m3u8:跨域访问被拒绝)...
    今天在做视频直播录像的时候,添加一个录制APP的.M3U8文件到OSS的一个test文件中存储,结果是访问不到了:提示:无法加载m3u8:跨域访问被拒绝!!!!!项目代码测试地址:https://github.com/Tinywan/ThinkPhpStudy阿里云帮助文档:https://help.aliyun.com/document_detail/31928.html......
  • 1.脚本高级命令,进程优先级命令,进程管理工具,任务相关命令
    一.总结脚本高级命令trap,install,mktemp,expect,进程优先级命令:nice,renice,进程管理工具:ps,pstree,prtstat,pgrep,pidof,uptime,mpstat,top,htop,free,pmap,vmstat,iostat,iotop,iftop,nload,nethogs,iptraf-ng,dstat,glances,cockpit,kill,job,任务......