进程池Pool
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
一:使用进程池
例1:非阻塞
from multiprocessing import Pool
import os, time, random
def worker(name):
t_start = time.time()
print("%s开始执行,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
t_stop = time.time()
print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))
def main():
po = Pool(5) # 定义一个进程池,最大进程数5
# 往进程池中添加任务
for i in range(10):
# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
po.apply_async(worker, (f'liang{i}',))
print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("----all_done----")
if __name__ == '__main__':
main()
执行结果
----start----
liang0开始执行,进程号为10404
liang1开始执行,进程号为9920
liang2开始执行,进程号为13136
liang3开始执行,进程号为10180
liang4开始执行,进程号为7708
liang4 执行完毕,耗时0.57
liang5开始执行,进程号为7708
liang2 执行完毕,耗时1.20
liang6开始执行,进程号为13136
liang1 执行完毕,耗时1.33
liang7开始执行,进程号为9920
liang0 执行完毕,耗时1.34
liang8开始执行,进程号为10404
liang3 执行完毕,耗时1.96
liang9开始执行,进程号为10180
liang5 执行完毕,耗时1.73
liang9 执行完毕,耗时0.54
liang8 执行完毕,耗时1.28
liang7 执行完毕,耗时1.37
liang6 执行完毕,耗时1.88
----all_done----
函数解释:
- apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表, kwds为传递给func的关键字参数列表;
- apply(func[, args[, kwds]]):使用阻塞方式调用func
- close():关闭Pool,使其不再接受新的任务;
- terminate():不管任务是否完成,立即终止;
- join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;
执行说明:创建一个进程池pool,并设定进程的数量为5,range(10)会相继产生10个对象,10个对象被提交到pool中,因pool指定进程数为5,所以0、1、2、3、4会直接送到进程中执行,当其中一个执行完后才空出一个进程处理对象,继续去执行新的对象,所以会出现输出“liang5开始执行,进程号为7708”出现在"liang4 执行完毕,耗时0.57"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“----start----”,主程序在pool.join()处等待各个进程的结束。
例2:阻塞
from multiprocessing import Pool
import os, time, random
def worker(name):
t_start = time.time()
print("%s开始执行,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
t_stop = time.time()
print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))
def main():
po = Pool(3) # 定义一个进程池,最大进程数3
# 往进程池中添加任务
for i in range(0, 5):
# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
po.apply(worker, (f'liang{i}',))
print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("----all_done----")
if __name__ == '__main__':
main()
输出
liang0开始执行,进程号为1976
liang0 执行完毕,耗时1.75
liang1开始执行,进程号为12624
liang1 执行完毕,耗时0.57
liang2开始执行,进程号为12444
liang2 执行完毕,耗时0.52
liang3开始执行,进程号为1976
liang3 执行完毕,耗时1.23
liang4开始执行,进程号为12624
liang4 执行完毕,耗时0.85
----start----
----all_done----
因为是阻塞,主函数会等待进程的执行,执行完之后才会继续往下,所以运行完所有进程后才输出“----start----”
例3、使用进程池,并返回结果
from multiprocessing import Pool
import os, time, random
def worker(name):
print("%s开始执行,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
return name,os.getpid()
def main():
po = Pool(3) # 定义一个进程池,最大进程数3
res=[]
# 往进程池中添加任务
for i in range(0, 5):
# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
res.append(po.apply_async(worker, (f'liang{i}',)))
print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
for result in res:
print(result.get()) #get()函数得出每个返回结果的值
print("----all_done----")
if __name__ == '__main__':
main()
输出结果:
----start----
liang0开始执行,进程号为14012
liang1开始执行,进程号为13000
liang2开始执行,进程号为14120
liang3开始执行,进程号为14012
liang4开始执行,进程号为14012
('liang0', 14012)
('liang1', 13000)
('liang2', 14120)
('liang3', 14012)
('liang4', 14012)
----all_done----
例4、多进程执行多个任务
from multiprocessing import Pool
import os, time, random
def worker1(name):
print("%s开始执行work1,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
def worker2(name):
print("%s开始执行work2,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
def worker3(name):
print("%s开始执行work3,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
def main():
po = Pool(4) # 定义一个进程池,最大进程数3
work_list=[worker1,worker2,worker3]
# 往进程池中添加任务
for work in work_list:
for i in range(3):
po.apply_async(work, (f'liang{i}',))
print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("----all_done----")
if __name__ == '__main__':
main()
线程池4个线程执行3个任务,每个任务执行3次。
输出:
----start----
liang0开始执行work1,进程号为13088
liang1开始执行work1,进程号为4908
liang2开始执行work1,进程号为4200
liang0开始执行work2,进程号为8124
liang1开始执行work2,进程号为4908
liang2开始执行work2,进程号为13088
liang0开始执行work3,进程号为8124
liang1开始执行work3,进程号为4200
liang2开始执行work3,进程号为4908
----all_done----
二、进程池进程之间的通讯
进程池中进程的通讯队列
from multiprocessing import Pool, Manager
q = Manager().Queue()
import os
import time
from multiprocessing import Pool, Manager
def work(name, q):
time.sleep(1)
print(f"{name}:---{os.getpid()}---{q.get()}")
def main():
# 创建一个用于进程池通信的队列
q = Manager().Queue()
for i in range(1000):
q.put(f'data-{i}')
# 创建一个拥有五个进程的进程池
po = Pool(5)
# 往进程池中添加20个任务
for i in range(20):
po.apply_async(work, (f'liang{i}', q))
# close:关闭进程池(进程池停止接收任务)
po.close()
# 主进程等待进程池中的任务结束再往下执行
po.join()
if __name__ == '__main__':
main()
三、多进程+协程实现并发
小练习:
假设一个队列中有100000个URL地址,每个请求需要1秒钟,尝试用4个进程,每个进程中开启1000个协程去请求!统计运行时间
from gevent import monkey
monkey.patch_all(thread=False)
import gevent
import time
from multiprocessing import Process, Queue
import os
def time_count(func):
def wrapper(*args, **kwargs):
start_time = time.time()
func(*args, **kwargs)
end_time = time.time()
print('总耗时:', end_time - start_time)
return wrapper
class Myprocess(Process):
def __init__(self, que):
super().__init__()
self.que = que
#重写进程子类的run函数
def run(self):
cos = []
#开启多协程
for i in range(1000):
#调用工作函数
cor = gevent.spawn(self.work)
cos.append(cor)
gevent.joinall(cos)
#定义工作函数
def work(self):
while self.que.qsize() > 0:
try:
url = self.que.get(timeout=1)
time.sleep(1)
print(f"{os.getpid()}正在请求url:{url}")
except Exception as e:
print(e.__repr__())
break
@time_count
def main():
q = Queue()
for i in range(100000):
q.put(f'https://www.baidu.com--{i}')
process_list = []
for i in range(4):
p = Myprocess(q)
process_list.append(p)
p.start()
for p in process_list:
p.join()
print("任务结束")
if __name__ == '__main__':
main()
运行时间27秒