多任务就是操作系统能同时执行多个程序,比如:看电影,聊天,查看网页。进程(process)是计算机中已经运行程序的实体,一个任务就是一个进程。
1. 使用multiprocessing模块创建进程
1.1 用Process类创建进程
在这个模块中有一个Process类代表一个进程对象:
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- group: 参数未使用,值始终为None
- target: 表示当前进程启动时执行的可调用对象
- name: 进程 实例的别名
- args: 传递给target函数的参数(元组)
- kwargs:传递给target函数的参数(字典)
- daemon:
from multiprocessing import Process
def test(*args):
print("我是子进程:")
for i in args:
print(i, end=', ')
def main():
print("主进程开始")
# 创建进程对象
p = Process(target=test, args=(1, "abc", 2, 3, 4))
# 启动子进程
p.start()
print("主进程结束")
if __name__ == '__main__':
main()
输出:
主进程开始
主进程结束
我是子进程:
1, abc, 2, 3, 4,
Process的实例方法:
方法 | 说明 |
---|---|
is_alive() | 判断进程实例是否还在执行(True, False) |
start() | 启动进程实例(创建子进程) |
join([timeout]) | 等待进程实例执行结束,或等待多少秒 |
terminate() | 不管任务是否完成,立即终止 |
run() | 如果没有给定target参数,执行start时,就执行对象中的run方法 |
属性 | |
name | 进程别名 |
pid | 进程PID值 |
import os
from multiprocessing import Process
from time import sleep
def test(args):
print(f"我是子进程:{os.getpid()}")
sleep(args)
print(f"子进程:{os.getpid()}结束")
def main():
print("主进程开始")
# 创建进程对象
p1 = Process(target=test, args=(1,))
p2 = Process(target=test, args=(3,))
# 启动子进程
p1.start()
p2.start()
print(f"p1.pid:{p1.pid}, p2.pid:{p2.pid}")
p1.join()
p2.join()
print("主进程结束")
if __name__ == '__main__':
main()
1.2 用Process子类创建进程
- 定义子类,继承Process
- 重写Process中的run方法
from multiprocessing import Process
from time import sleep
class SubProcess(Process):
def __init__(self, time):
Process.__init__(self)
self.time = time
# 重写run方法,调用父类的start()时会自动执行run()
def run(self):
print(f"子进程{self.name}开始执行")
sleep(self.time)
print(f"子进程{self.name}执行结束")
def main():
print("主进程开始")
p1 = SubProcess(2)
p2 = SubProcess(1)
p1.start()
p2.start()
p1.join()
p2.join()
print("主进程结束")
if __name__ == "__main__":
main()
1.3 用Pool进程池创建进程
进程池可指定大小,之后添加N个任务,进程池自动管理进程的运行。
Pool类的方法:
方法 | 说明 |
---|---|
apply_async( func, args=(), kwds={}, callback=None, error_callback=None) | 非阻塞方式调用func,并行执行任务 |
apply(func, args=(), kwds={}) | 阻塞方式调用func,需要等上一个进程结束才能执行下一个进程。(不并行了) |
close() | 关闭Pool,不再接受新的任务 |
terminate() | 不管任务是否完成,立即终止 |
join() | 等待全部进程完成,必须在close, terminate之后使用 |
import os
import random
from multiprocessing import Pool
from time import sleep
def task(time):
print(f"进程{os.getpid()}开始执行, time:{time}")
sleep(time)
print(f"\t进程{os.getpid()}执行结束")
def main():
print("主进程开始")
p = Pool(3)
# 添加10个任务
for i in range(10):
p.apply_async(func=task, args=(random.randint(1, 5),))
p.close()
p.join()
print("主进程结束")
if __name__ == "__main__":
main()
执行结果:
主进程开始
进程3704开始执行, time:3
进程10648开始执行, time:3
进程10580开始执行, time:1
进程10580执行结束
进程10580开始执行, time:3
进程10648执行结束
进程3704执行结束
进程10648开始执行, time:1
进程3704开始执行, time:4
进程10648执行结束
进程10580执行结束
进程10648开始执行, time:5
进程10580开始执行, time:2
进程10580执行结束
进程10580开始执行, time:3
进程3704执行结束
进程3704开始执行, time:4
进程10648执行结束
进程10580执行结束
进程3704执行结束
主进程结束
可以看出,同时执行3个进程,完成一个后,另一个进程再开始执行。好比:有三个水龙头放水,要接10盆水一样,最多三个进程同时工作。
2. 进程间通信
每个进程都有自己的地址空间,内存,数据栈及其它记录其运行状态的辅助数据。
# 全局
global_value = 100
def add(time):
global global_value
print(f"进程{os.getpid()}开始运行,值为:{global_value}")
global_value += 100
sleep(time)
print(f"进程{os.getpid()}运行结束, 值为:{global_value}")
def sub(time):
global global_value
print(f"进程{os.getpid()}开始运行,值为:{global_value}")
global_value -= 100
sleep(time)
print(f"进程{os.getpid()}运行结束, 值为:{global_value}")
def main():
print("主进程开始")
p1 = Process(target=add, args=(1,))
p2 = Process(target=sub, args=(2,))
p1.start()
p2.start()
p1.join()
p2.join()
global global_value
print("global:", global_value)
print("主进程结束")
if __name__ == "__main__":
main()
结果:
主进程开始
进程10320开始运行,值为:100
进程10320运行结束, 值为:200
进程7300开始运行,值为:100
进程7300运行结束, 值为:0
global: 100
主进程结束
可以看出global全局变量并没有在进程之间数据共享。
2.1 通过队列实现进程间的通信
使用multiprocessing模块中的Queue实现进程之间的数据传递。
q = Queue(num)
创建一个Queue对列对象
方法 | 说明 |
---|---|
q.qsize() | 当前队列包含的消息数量 |
q.empty() | 队列为空返回True |
q.full() | 队列满了返回True |
q.get(block, timeout) | 从队列中获取一条消息,然后将其从队列中移除 |
q.get_nowait() | 队列为空(读不到数据)抛出异常 |
q.put(item,block,timeout) | 将item写入队列,block默认为True |
q.put_nowait(item) | 队列满了(存不了数据)抛出异常 |
- block默认为True。如果block为True,而且没有指定timeout(秒),消息队列将被阻塞,直到能读取或写入消息为止。如果设置了timeout,时间到了还没读到消息或写入消息,则抛出Queue.Empty异常
- block为False,如果消息队列满或空,即不能写入或读取直接抛出异常。
from multiprocessing import Process, Queue
from time import sleep
def write(q: Queue):
for i in range(10):
message = f"{i}"
q.put(message)
print(f"写入消息:{message}")
q.put("end")
def read(q: Queue):
message = ""
while message != "end":
sleep(1)
message = q.get()
print(f"\t读取消息:{message}")
def main():
print("主进程开始")
q = Queue(2)
p1 = Process(target=write, args=(q,))
p2 = Process(target=read, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
print("主进程结束")
结果:
主进程开始
写入消息:0
写入消息:1
读取消息:0
写入消息:2
读取消息:1
写入消息:3
读取消息:2
写入消息:4
读取消息:3
写入消息:5
读取消息:4
写入消息:6
读取消息:5
写入消息:7
读取消息:6
写入消息:8
读取消息:7
写入消息:9
读取消息:8
读取消息:9
读取消息:end
主进程结束
标签:__,Process,python,global,print,线程,time,进程
From: https://www.cnblogs.com/three-sheep/p/17438070.html