首页 > 系统相关 >Python多进程运行——Multiprocessing基础教程2

Python多进程运行——Multiprocessing基础教程2

时间:2023-05-22 17:12:16浏览次数:55  
标签:p1 Python multiprocessing 进程 square result 基础教程 print Multiprocessing

转载:Python多进程运行——Multiprocessing基础教程2 - 知乎 (zhihu.com)

1 数据共享

在多进程处理中,所有新创建的进程都会有这两个特点:独立运行,有自己的内存空间。

我们来举个例子展示一下:

import multiprocessing 

# empty list with global scope 
result = [] 

def square_list(mylist): 
    global result 
    # append squares of mylist to global list result 
    for num in mylist: 
        result.append(num * num) 
    # print global list result 
    print("Result(in process p1): {}".format(result)) 

if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 

    # creating new process 
    p1 = multiprocessing.Process(target=square_list, args=(mylist,)) 
    # starting process 
    p1.start() 
    # wait until process is finished 
    p1.join() 

    # print global result list 
    print("Result(in main program): {}".format(result))

这个程序的输出结果是:

Result(in process p1): [1, 4, 9, 16]
Result(in main program): []

在上面的程序中我们尝试在两个地方打印全局列表result的内容:

  • square_list()函数中,由于这个函数是由进程p1调用的,所以result列表只在进程p1的内存空间中更改。
  • 在主程序中的p1进程完成后。由于主程序由不同的进程运行,它的内存空间中的result列表仍然是空的。

我们再用一张图来帮助理解记忆不同进程间的数据关系:

图1 进程间数据关系

1.1 内存共享

如果程序需要在不同的进程之间共享一些数据的话,该怎么做呢?不用担心,multiprocessing模块提供了Array对象和Value对象,用来在进程之间共享数据。

所谓Array对象和Value对象分别是指从共享内存中分配的ctypes数组和对象。我们直接来看一个例子,展示如何用Array对象和Value对象在进程之间共享数据:

import multiprocessing 

def square_list(mylist, result, square_sum): 
    # append squares of mylist to result array 
    for idx, num in enumerate(mylist): 
        result[idx] = num * num 

    # square_sum value 
    square_sum.value = sum(result) 

    # print result Array 
    print("Result(in process p1): {}".format(result[:])) 

    # print square_sum Value 
    print("Sum of squares(in process p1): {}".format(square_sum.value)) 

if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 

    # creating Array of int data type with space for 4 integers 
    result = multiprocessing.Array('i', 4) 

    # creating Value of int data type 
    square_sum = multiprocessing.Value('i') 

    # creating new process 
    p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum)) 

    # starting process 
    p1.start() 

    # wait until process is finished 
    p1.join() 

    # print result array 
    print("Result(in main program): {}".format(result[:])) 

    # print square_sum Value 
    print("Sum of squares(in main program): {}".format(square_sum.value))

程序输出的结果如下:

Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30

成功了!主程序和p1进程输出了同样的结果,说明程序中确实完成了不同进程间的数据共享。那么我们来详细看一下上面的程序做了什么:

在主程序中我们首先创建了一个Array对象:

result = multiprocessing.Array('i', 4)

向这个对象输入的第一个参数是数据类型:i表示整数,d代表浮点数。第二个参数是数组的大小,在这个例子中我们创建了包含4个元素的数组。

类似的,我们创建了一个Value对象:

square_sum = multiprocessing.Value('i')

我们只对Value对象输入了一个参数,那就是数据类型,与上述的方法一致。当然,我们还可以对其指定一个初始值(比如10),就像这样:

square_sum = multiprocessing.Value('i', 10)

随后,我们在创建进程对象时,将刚创建好的两个对象:result和square_sum作为参数输入给进程:

p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))

在函数中result元素通过索引进行数组赋值,square_sum通过value属性进行赋值。

注意:为了完整打印result数组的结果,需要使用result[:]进行打印,而square_sum也需要使用value属性进行打印:

print("Result(in process p1): {}".format(result[:])) 
print("Sum of squares(in process p1): {}".format(square_sum.value))

1.2 服务器进程

每当python程序启动时,同时也会启动一个服务器进程。随后,只要我们需要生成一个新进程,父进程就会连接到服务器并请求它派生一个新进程。这个服务器进程可以保存Python对象,并允许其他进程使用代理来操作它们。

multiprocessing模块提供了能够控制服务器进程的Manager类。所以,Manager类也提供了一种创建可以在不同流程之间共享的数据的方法。

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型,如列表、字典、队列、值、数组等。此外,单个管理器可以由网络上不同计算机上的进程共享。

但是,服务器进程管理器的速度比使用共享内存要慢。

让我们来看一个例子:

import multiprocessing 

def print_records(records): 
    for record in records: 
        print("Name: {0}\nScore: {1}\n".format(record[0], record[1])) 

def insert_record(record, records): 
    records.append(record) 
    print("New record added!\n") 

if __name__ == '__main__': 
    with multiprocessing.Manager() as manager: 
        # creating a list in server process memory 
        records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)]) 
        # new record to be inserted in records 
        new_record = ('Jeff', 8) 

        # creating new processes 
        p1 = multiprocessing.Process(target=insert_record, args=(new_record, records)) 
        p2 = multiprocessing.Process(target=print_records, args=(records,)) 

        # running process p1 to insert new record 
        p1.start() 
        p1.join() 

        # running process p2 to print records 
        p2.start() 
        p2.join()

这个程序的输出结果是:

New record added!

Name: Sam
Score: 10

Name: Adam
Score: 9

Name: Kevin
Score: 9

Name: Jeff
Score: 8

我们来理解一下这个程序做了什么:首先我们创建了一个manager对象

with multiprocessing.Manager() as manager:

在with语句下的所有行,都是在manager对象的范围内的。接下来我们使用这个manager对象创建了列表(类似的,我们还可以用manager.dict()创建字典)。

最后我们创建了进程p1(用于在records列表中插入一条新的record)和p2(将records打印出来),并将records作为参数进行传递。

服务器进程的概念再次用下图总结一下:

图2 服务器进程数据共享

2 数据传递

为了能使多个流程能够正常工作,常常需要在它们之间进行一些通信,以便能够划分工作并汇总最后的结果。multiprocessing模块支持进程之间的两种通信通道:Queue和Pipe。

2.1 Queue

使用队列来回处理多进程之间的通信是一种比较简单的方法。任何Python对象都可以使用队列进行传递。我们来看一个例子:

import multiprocessing 

def square_list(mylist, q): 
    # append squares of mylist to queue 
    for num in mylist: 
        q.put(num * num) 

def print_queue(q): 
    print("Queue elements:") 
    while not q.empty(): 
        print(q.get()) 
    print("Queue is now empty!") 

if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 

    # creating multiprocessing Queue 
    q = multiprocessing.Queue() 

    # creating new processes 
    p1 = multiprocessing.Process(target=square_list, args=(mylist, q)) 
    p2 = multiprocessing.Process(target=print_queue, args=(q,)) 

    # running process p1 to square list 
    p1.start() 
    p1.join() 

    # running process p2 to get queue elements 
    p2.start() 
    p2.join()

上面这个程序的输出结果是:

Queue elements:
1
4
9
16
Queue is now empty!

我们来看一下上面这个程序到底做了什么。首先我们创建了一个Queue对象:

q = multiprocessing.Queue()

然后,将这个空的Queue对象输入square_list函数。该函数会将列表中的数平方,再使用put()方法放入队列中:

q.put(num * num)

随后使用get()方法,将q打印出来,直至q重新称为一个空的Queue对象:

while not q.empty():
    print(q.get())

我们还是用一张图来帮助理解记忆:

图2 服务器进程数据共享

2.2 Pipe

一个Pipe对象只能有两个端点。因此,当进程只需要双向通信时,它会比Queue对象更好用。

multiprocessing模块提供了Pipe()函数,该函数返回由管道连接的一对连接对象。Pipe()返回的两个连接对象分别表示管道的两端。每个连接对象都有send()recv()方法。

我们来看一个例子:

import multiprocessing 

def sender(conn, msgs): 
    for msg in msgs: 
        conn.send(msg) 
        print("Sent the message: {}".format(msg)) 
    conn.close() 

def receiver(conn): 
    while 1: 
        msg = conn.recv() 
        if msg == "END": 
            break
        print("Received the message: {}".format(msg)) 

if __name__ == "__main__": 
    # messages to be sent 
    msgs = ["hello", "hey", "hru?", "END"] 

    # creating a pipe 
    parent_conn, child_conn = multiprocessing.Pipe() 

    # creating new processes 
    p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs)) 
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,)) 

    # running processes 
    p1.start() 
    p2.start() 

    # wait until processes finish 
    p1.join() 
    p2.join()

上面这个程序的输出结果是:

Sent the message: hello
Sent the message: hey
Sent the message: hru?
Received the message: hello
Sent the message: END
Received the message: hey
Received the message: hru?

我们还是来看一下这个程序到底做了什么。首先创建了一个Pipe对象:

parent_conn, child_conn = multiprocessing.Pipe()

与上文说的一样,该对象返回了一对管道两端的两个连接对象。然后使用send()方法和recv()方法进行信息的传递。就这么简单。在上面的程序中,我们从一端向另一端发送一串消息。在另一端,我们收到消息,并在收到END消息时退出。

要注意的是,如果两个进程(或线程)同时尝试从管道的同一端读取或写入管道中的数据,则管道中的数据可能会损坏。不过不同的进程同时使用管道的两端是没有问题的。还要注意,Queue对象在进程之间进行了适当的同步,但代价是增加了计算复杂度。因此,Queue对象对于线程和进程是相对安全的。

最后我们还是用一张图来示意:

图4 用Pipe完成进程间数据传输

 

标签:p1,Python,multiprocessing,进程,square,result,基础教程,print,Multiprocessing
From: https://www.cnblogs.com/zhiminyu/p/17421123.html

相关文章

  • 聊聊python的字符编码
    什么是字符编码?在计算机内部,所有的数据都是二进制形式存储的,无法直接存储我们人类的语言文字符号等,所以我们需要制定一种转换规则来明确计算机内部二进制与我们的数字符号文字之间的对应关系,这就出现了‘字符编码’。字符编码的发展史阶段一现代计算机起源于美国,所以......
  • 比较不同Python图形处理库或图像处理库的异同点
    python的图像处理库有很多种比如:pillow库、Numpy库、Scipy库、opencv库、pgmagic库等其中较常用的是NUmapy库、pillow库、openCV库,今天我们就这三种图像处理库来进行比较首先是numapy库;他是一个python库可以帮助我们处理所有类型的科学计算,他是在执行任何数据预处理或数据科......
  • Python中for循环
    在Python中,for循环用于遍历可迭代对象(如列表、元组、字符串等)中的元素,并执行相应的操作。for循环的基本语法如下:forelementiniterable:#执行操作其中,element是一个变量,用于存储每次循环迭代的元素值,iterable是一个可迭代对象,包含一系列元素。以下是一个简单的示例,演示如......
  • AcWing901. 滑雪(python)
    题目详情知识点记忆化DP思路自己的思路(仅参考):一开始想的是找最大值,然后从最大值开始向下滑,但是我们是要求最长路径,不一定是从最高的点滑下去的,也不一定是滑到最低点,而且会存在最大值不止一个的情况,所以我们应该是针对每一个点,都求出当前该点出发能去的最长路径,然后求完之后......
  • Python竖版大屏 | 用pyecharts开发可视化的奇妙探索!
    你好!我是@马哥python说,一枚10年程序猿......
  • python控制微信发消息
    使用pyautogui控制PC版微信,发消息。importpyautoguiimporttimedefOpen_Wechat():#使用快捷键打开微信。这个微信的默认设置的快捷键。pyautogui.hotkey('ctrl','alt','w')time.sleep(1)defChat_Who(ContactPerson):#使用快捷键打开查找,找一个......
  • <Python全景系列-1> Hello World,1分钟配置好你的python环境
    《从此开始:1分钟配置好你的python环境》欢迎来到我们的系列博客《Python360全景》!在这个系列中,我们将带领你从Python的基础知识开始,一步步深入到高级话题,帮助你掌握这门强大而灵活的编程语法。无论你是编程新手,还是有一定基础的开发者,这个系列都将提供你需要的知识和技能。这是我......
  • < Python全景系列-2 > Python数据类型大盘点
    欢迎来到我们的系列博客《Python全景系列》!在这个系列中,我们将带领你从Python的基础知识开始,一步步深入到高级话题,帮助你掌握这门强大而灵活的编程语法。无论你是编程新手,还是有一定基础的开发者,这个系列都将提供你需要的知识和技能。Python作为一门强大且灵活的编程语言,拥有丰富......
  • < Python全景系列-3 > Python控制流程盘点及高级用法、神秘技巧大揭秘!
    欢迎来到我们的系列博客《Python全景系列》!在这个系列中,我们将带领你从Python的基础知识开始,一步步深入到高级话题,帮助你掌握这门强大而灵活的编程语法。无论你是编程新手,还是有一定基础的开发者,这个系列都将提供你需要的知识和技能。这是系列第三篇,在这篇文章中我们将全面深入地......
  • < Python全景系列-4 > 史上最全文件类型读写库大盘点!什么?还包括音频、视频?
    欢迎来到我们的系列博客《Python全景系列》!在这个系列中,我们将带领你从Python的基础知识开始,一步步深入到高级话题,帮助你掌握这门强大而灵活的编程语言!本文系列第四篇,介绍史上最全PYTHON文件类型读写库大盘点!包含常用和不常用的大量文件格式!文本、音频、视频应有尽有!废话不多说!走......