转载:(14条消息) Python跨进程共享数据/对象_python多进程共享对象_alpha.5的博客-CSDN博客
1. 跨进程共享方式
在multiprocess库中,跨进程对象共享有三种方式:
(1)第一种仅适用于原生机器类型,即python.ctypes当中的类型,这种在mp库的文档当中称为shared memory方式,即通过共享内存共享对象
(2)另外一种称之为server process,即有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象;
(3)最后一种在mp文档当中没有单独提出,但是在其中多次提到,而且是mp库当中最重要的一种共享方式,称为inheritance,即继承,对象在 父进程当中创建,然后在父进程是通过multiprocessing.Process创建子进程之后,子进程自动继承了父进程当中的对象,并且子进程对这 些对象的操作都是反映到了同一个对象。
2. Shared Memory模型
shared memory模型能共享ctypes当中的类型,通过RawValue,RawArray等包装类提供。通过查看multiprocess的源码可以看到支持的类型有:
Value、Array、Lock等,
def Event() -> synchronize.Event: ... def Lock() -> synchronize.Lock: ... def RLock() -> synchronize.RLock: ... def Semaphore(value: int = ...) -> synchronize.Semaphore: ... def Pipe(duplex: bool = ...) -> Tuple[connection.Connection, connection.Connection]: ... def Pool( processes: Optional[int] = ..., initializer: Optional[Callable[..., Any]] = ..., initargs: Iterable[Any] = ..., maxtasksperchild: Optional[int] = ..., ) -> pool.Pool: ... # Functions Array and Value are copied from context.pyi. # See https://github.com/python/typeshed/blob/ac234f25927634e06d9c96df98d72d54dd80dfc4/stdlib/2and3/turtle.pyi#L284-L291 # for rationale def Array(typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...) -> sharedctypes._Array: ... def Value(typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value: ...
共享的使用方法示例如下:
import multiprocessing def func(num): num.value=11.11 # 子进程改变数值的值,主进程跟着改变 if __name__=="__main__": # d表示数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value) num=multiprocessing.Value("d", 10.0) p=multiprocessing.Process(target=func,args=(num,)) p.start() p.join() print(num.value)
3. Server Process模型
这个模式支持跨进程共享所有对象,也即是想要共享 “自定义对象”,只能使用这个方式!
在server process模型中,有一个manager进程(就是那个server进程),负责管理实际的对象,真正的对象也是在manager进程的内存空间中。所有需要访问该对象的进程都需要先连接到该管理进程,然后获取到对象的一个代理对象(Proxy object)。这个模型是一个典型的RPC(远程过程调用)的模型。因为每个客户进程实际上都是在访问manager进程当中的对象,因此完全可以通过这个实现对象共享。
(1)Manager支持类型
通过查看源码可以发现,Manage() 支持的类型有:
def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ... def Condition(self, lock: Any = ...) -> threading.Condition: ... def Event(self) -> threading.Event: ... def Lock(self) -> threading.Lock: ... def Namespace(self) -> _Namespace: ... def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ... def RLock(self) -> threading.RLock: ... def Semaphore(self, value: Any = ...) -> threading.Semaphore: ... def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ... def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ... def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ... def list(self, sequence
使用示例如下:
import multiprocessing def func(dict_in,list_in): # 跨进程共享, 子进程修改,主进程跟着改变 dict_in["index1"]="xxx" list_in.append("xx") list_in.append("yy") if __name__=="__main__": with multiprocessing.Manager() as mg: # 创建主进程与子进程之间共享的dict/list mydict=multiprocessing.Manager().dict() mylist=multiprocessing.Manager().list(range(5)) p=multiprocessing.Process(target=func,args=(mydict,mylist)) p.start() p.join() print(mylist) print(mydict)
(2)共享自定义类
很多场景下,Manager自带的类并不能满足我们的需求,这时候就需要用到Manager对自定义类的支持。Server Process模型共享自定义对象的实现流程如下:
(1) 基于multiprocessing.managers 重写MyManager,类内部啥都不用实现:
class MyManager(managers.BaseManager): """ 自定义Manager """ # Pass is really enough. Nothing needs to be done here. pass
(2) 注册自定义类,如:RedisService、MySQLService,
# RedisService/MySQLService是自定义类, 类内部分别包含Redis连接和MySQL连接,类定义此处省略 MyManager.register("RedisService", RedisService) MyManager.register("MySQLService", MySQLService)
(3) 构造MyManager的实例,并由它创建多进程共享的自定义对象,
manager = MyManager() manager.start() # 创建共享对象 self.redis_service = manager.RedisService(settings) self.mysql_service = manager.MySQLService(settings)
(4) 该对象以参数形式传入到子进程中,子进程直接使用。
全流程的代码示例如下:
from concurrent.futures import ProcessPoolExecutor from multiprocessing import managers class MyManager(managers.BaseManager): """ 自定义Manager """ # Pass is really enough. Nothing needs to be done here. pass def proc_worker(redis_service, mysql_service, task_id): """ TODO::工作进程 """ # 此处可直接使用进程池共享的redis和mongo服务 rst = redis_service.get(task_id) rst = mysql_service.save_result(rst) ... class ServerExecutor: """ 调度执行器 """ def __init__(self, settings): # 配置信息, 用于创建redis链接和mysql链接 self.settings = settings # 在Manager中注册自定义类(RedisService/MySQLService是我的自定义类, 类内部分别包含Redis连接和MySQL连接) MyManager.register("RedisService", RedisService) MyManager.register("MySQLService", MySQLService) manager = MyManager() manager.start() # 创建共享对象 self.redis_service = manager.RedisService(settings) self.mysql_service = manager.MySQLService(settings) # 这里不仅可以是ProcessPoolExecutor,也可以是多进程Process或者进程池Pool,各自用法略有不同 self.executor = ProcessPoolExecutor(settings.executor_num) def submit(self, task_id): """ 提交任务 """ future = self.executor.submit(proc_work, self.redis_service, self.mysql_service, task_id) return future # demo ... executor = ServerExecutor(settings) future = executor.submit(task_id) future.add_done_callback(task_done_callback) ...
标签:...,Python,self,对象,进程,共享,def From: https://www.cnblogs.com/zhiminyu/p/17419196.html