multiprocessing 文档:https://docs.python.org/zh-cn/3.10/library/multiprocessing.html Process、Lock、Semaphore、Queue、Pipe、Pool:https://cuiqingcai.com/3335.html
把一个多线程改成多进程,主要有下面几种方法:
- subprocess
- signal
- threading
- multiprocessing
- concurrent.futures 标准库。它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对 threading 和 multiprocessing 的更高级的抽象,对编写 线程池/进程池 提供了直接的支持。
使用示例代码:
# -*- coding: utf-8 -*-
import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutor
r = redis.Redis(host='127.0.0.1', port=6379)
# 减库存函数, 循环直到减库存完成
# 库存充足, 减库存成功, 返回True
# 库存不足, 减库存失败, 返回False
def reduce_stock():
# python中redis事务是通过pipeline的封装实现的
with r.pipeline() as pipe:
while True:
try:
# watch库存键, multi后如果该key被其他客户端改变,
# 事务操作会抛出WatchError异常
pipe.watch('stock:count')
count = int(pipe.get('stock:count'))
if count > 0: # 有库存
# 事务开始
pipe.multi()
pipe.decr('stock:count')
# 把命令推送过去
# execute返回命令执行结果列表, 这里只有一个decr返回当前值
print(pipe.execute()[0])
return True
else:
return False
except WatchError as ex:
# 打印WatchError异常, 观察被watch锁住的情况
print(ex)
pipe.unwatch()
def worker():
while True:
# 没有库存就退出
if not reduce_stock():
break
if __name__ == "__main__":
# 设置库存为100
r.set("stock:count", 100)
# 多进程模拟多个客户端提交
with ProcessPoolExecutor() as pool:
for _ in range(10):
pool.submit(worker)
1、Python 单线程 和 多线程
Python 单线程
# -*- coding:utf-8 -*-
import time
import datetime
def music(argv):
for i in range(2):
print("听音乐 %s. %s" % (argv, datetime.datetime.now()))
time.sleep(1)
def movie(argv):
for i in range(2):
print("看电影 {}. {}".format(argv, datetime.datetime.now()))
time.sleep(5)
if __name__ == '__main__':
music('trouble is a friend')
movie('变形金刚')
print(f"all over {datetime.datetime.now()}")
Python 多线程
Python 中使用线程有两种方式:
- 通过使用 start_new_thread函数
- 用类来包装线程对象。
start_new_thread() 函数来产生新线程
函数式:调用 _thread 模块中的 start_new_thread() 函数来产生新线程
语法如下:
- _thread.start_new_thread ( function, args[, kwargs] )
参数说明:
- function - 线程函数。
- args - 传递给线程函数的参数,他必须是个tuple类型。
- kwargs - 可选参数。
使用示例:
import time
import _thread
# 为线程定义一个函数
def print_time(thread_name, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print("%s: %s" % (thread_name, time.ctime(time.time())))
if __name__ == '__main__':
try:
# 创建两个线程
_thread.start_new_thread(print_time, ("Thread-1", 2,))
_thread.start_new_thread(print_time, ("Thread-2", 4,))
except BaseException as be:
print(f"Error: 无法启动线程 ---> {be}")
while True:
pass
通过 继承 threading.Thread
使用 Threading 模块创建线程,直接从 threading.Thread 继承,然后重写 run 方法:
import threading
import time
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, delay):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.delay = delay
def run(self):
print("开始线程:" + self.name)
print_time(self.name, self.delay, 5)
print("退出线程:" + self.name)
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
time.sleep(delay)
print("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("退出主线程")
_thread 和 threading 模块( 强烈建议直接使用 threading )。python 提供了两个模块来实现多线程。 thread 有一些缺点,在 threading 得到了弥补。
setDaemon(True) 设置 精灵进程 。
setDaemon(True) 将线程声明为守护线程,必须在 start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句 print("父进程退出") 后,没有等待子线程,直接就退出了,同时子线程也一同结束。
# -*- coding: utf-8 -*-
import time
import threading
import datetime
def music(music_name):
for i in range(2):
print(f"[{datetime.datetime.now().replace(microsecond=0)}] 听音乐 {music_name}")
time.sleep(1)
def movie(movie_name):
for i in range(2):
print(f"[{datetime.datetime.now().replace(microsecond=0)}] 看电影 {movie_name}")
time.sleep(5)
tid_list = [
threading.Thread(target=music, args=('trouble is a friend',)),
threading.Thread(target=movie, args=('变形金刚',))
]
if __name__ == '__main__':
for tid in tid_list:
tid.setDaemon(True)
tid.start()
print("父进程退出")
pass
join() 方法,等待线程终止
join() 的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。注意:上面程序中 join() 方法的位置是在 for 循环外的,也就是说必须等待for循环里的两个进程都结束后,才去执行主进程。
# -*- coding: utf-8 -*-
import time
import threading
import datetime
def music(music_name):
for i in range(2):
print(f"[{datetime.datetime.now().replace(microsecond=0)}] 听音乐 {music_name}")
time.sleep(1)
def movie(movie_name):
for i in range(2):
print(f"[{datetime.datetime.now().replace(microsecond=0)}] 看电影 {movie_name}")
time.sleep(5)
tid_list = [
threading.Thread(target=music, args=('trouble is a friend',)),
threading.Thread(target=movie, args=('变形金刚',))
]
if __name__ == '__main__':
for tid in tid_list:
tid.start()
for tid in tid_list:
tid.join()
print("父进程退出")
threading.Thread
t.start() 激活线程,
t.getName() 获取线程的名称
t.setName() 设置线程的名称
t.name 获取或设置线程的名称
t.is_alive() 判断线程是否为激活状态
t.isAlive() 判断线程是否为激活状态
t.setDaemon() 设置为后台线程或前台线程(默认:False);
通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon() 判断是否为守护线程
t.ident 获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run() 线程被cpu调度后自动执行线程对象的run方法
2、线程同步
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。考虑这样一种情况:一个列表里所有元素都是 0,线程 "set" 从后向前把所有元素改成1,而线程 "print" 负责从前往后读取列表并打印。那么,可能线程 "set" 开始改的时候,线程 "print" 便来打印列表了,输出就成了一半 0 一半 1,这就是 数据的不同步。
为了避免这种情况,引入了 锁 的概念。锁有两种状态:锁定 和 未锁定。
每当一个线程比如 "set" 要访问共享数据时,必须先获得锁定;如果已经有别的线程比如 "print" 获得锁定了,那么就让线程 "set" 暂停,也就是同步阻塞;等到线程 "print" 访问完毕,释放锁以后,再让线程 "set" 继续。经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。
线程锁 threading.RLock 和 threading.Lock
使用 Thread对象 的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire方法 和 release方法。对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和release 方法之间。
threading.RLock 和 threading.Lock
- RLock 允许在同一线程中被多次 acquire。而 Lock 却不允许这种情况。使用 RLock 时 acquire 和 release 必须成对出现,即调用了 n 次 acquire,必须调用 n 次的release 才能真正释放所占用的琐。
# -*- coding: utf-8 -*-
import threading
lock = threading.Lock() # Lock对象
rLock = threading.RLock() # RLock对象
def main_1():
lock.acquire()
lock.acquire() # 产生了死琐。
lock.release()
lock.release()
def main_2():
rLock.acquire()
rLock.acquire() # 在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()
示例:
# -*- coding: utf-8 -*-
import time
import datetime
import threading
# 定义一个 "线程锁"
threadLock = threading.Lock()
count = 20
class MyThread(threading.Thread):
def __init__(self, name, delay_second):
threading.Thread.__init__(self)
self.name = name
self.delay_second = delay_second
def run(self):
print("Starting " + self.name)
result = print_time()
while result > 0:
print(f"[{datetime.datetime.now().replace(microsecond=0)}] {self.name} ---> {result}")
result = print_time()
time.sleep(self.delay_second)
def print_time():
# 获得锁,成功获得锁定后返回 True
# 可选的 timeout 参数不填时将一直阻塞直到获得锁定
# 否则超时后将返回 False
threadLock.acquire()
global count
count -= 1
# 释放锁
threadLock.release()
return count
if __name__ == '__main__':
tid_list = [
MyThread("thread_1", 1),
MyThread("thread_2", 1)
]
for tid in tid_list:
tid.start()
for tid in tid_list:
tid.join()
print("主线程退出")
pass
示例:
import threading
import time
globals_num = 0
lock = threading.RLock()
def func():
lock.acquire() # 获得锁
global globals_num
globals_num += 1
time.sleep(1)
print(globals_num)
lock.release() # 释放锁
for i in range(10):
t = threading.Thread(target=func)
t.start()
pass
Python 的 queue ( 线程安全
:https://docs.python.org/zh-cn/3.10/search.html?q=queue+&check_keywords=yes&area=default
Python 的 queue 模块中提供了同步的、线程安全的队列类。包括
- FIFO(先入先出) 队列
- LIFO(后入先出)队列
- 优先级队列 PriorityQueue
这些队列都实现了 锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
Queue 模块中的常用方法:
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]]) 获取队列,timeout是等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 写入队列,timeout是等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
示例:
# -*- coding: utf-8 -*-
import time
import queue
import threading
task_queue = queue.Queue()
def produce():
while True:
for num in range(100):
task_queue.put(num)
time.sleep(0.1)
def consume():
while True:
if task_queue.empty():
print('队列为空')
continue
num = task_queue.get()
print(num)
time.sleep(1)
if __name__ == '__main__':
thread_list = []
t1 = threading.Thread(target=produce)
thread_list.append(t1)
for i in range(3):
t_id = threading.Thread(target=consume)
thread_list.append(t_id)
for index in thread_list:
index.start()
for index in thread_list:
index.join()
threading.Condition
一个 condition 变量总是与某些类型的锁相联系,当几个condition变量必须共享和同一个锁的时候,是很有用的。锁 是 conditon 对象的一部分:没有必要分别跟踪。
Condition 类实现了一个 conditon 变量。这个 conditiaon 变量允许一个或多个线程等待,直到他们被另一个线程通知。
- 如果 lock 参数非空,那么他必须是一个 lock 或者 Rlock 对象,它用来做底层锁。
- 如果 lock 参数为空,则会创建一个新的 Rlock 对象,用来做底层锁。
condition 变量服从上下文管理协议:with 语句块封闭之前可以获取与锁的联系。
acquire() 和 release() 会调用与锁相关联的相应的方法。
其他和锁关联的方法必须被调用,wait()方法会释放锁,
当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,
wait(timeout=None) :等待通知,或者等到设定的超时时间。
当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError异常。
wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前会一直阻塞。
wait()还可以指定一个超时时间。 如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。
注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。
除非线程调用notify()和notify_all()之后放弃了锁的所有权。
在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。
修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。
例子:生产者-消费者模型
import threading
import time
def consumer(cond):
with cond:
print("consumer before wait")
cond.wait()
print("consumer after wait")
def producer(cond):
with cond:
print("producer before notifyAll")
cond.notifyAll()
print("producer after notifyAll")
condition = threading.Condition()
consumer_1 = threading.Thread(name="c1", target=consumer, args=(condition,))
consumer_2 = threading.Thread(name="c2", target=consumer, args=(condition,))
producer = threading.Thread(name="p", target=producer, args=(condition,))
consumer_1.start()
time.sleep(2)
consumer_2.start()
time.sleep(2)
producer.start()
python 多进程共享变量
共享内存 (Shared memory)
Data can be stored in a shared memory map using Value or Array.
For example, the following code. 16.6. multiprocessing — Process-based “threading” interface — Python 2.7.18 documentation
在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。如果你真有需要要共享数据, multiprocessing提供了两种方式。
multiprocessing 中的 Array 和 Value。数据可以用 Value 或 Array 存储在一个共享内存地图里,如下:
from multiprocessing import Array, Value, Process
def func(a, b):
a.value = 3.333333333333333
for j in range(len(b)):
b[j] = -b[j]
if __name__ == "__main__":
num = Value('d', 0.0)
arr = Array('i', range(11))
if 0:
t = Process(target=func, args=(num, arr))
t.start()
t.join()
else:
c = Process(target=func, args=(num, arr))
d = Process(target=func, args=(num, arr))
c.start()
d.start()
c.join()
d.join()
print(num.value)
print(arr[:])
for i in arr:
print i,
输出
3.33333333333
0 1 2 3 4 5 6 7 8 9 10
创建 num 和 arr 时,“d”和“i”参数 由Array模块使用的typecodes创建:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。
Array(‘i’, range(10))中的‘i’参数:
‘c’: ctypes.c_char
‘u’: ctypes.c_wchar
‘b’: ctypes.c_byte
‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short
‘H’: ctypes.c_ushort
‘i’: ctypes.c_int
‘I’: ctypes.c_uint
‘l’: ctypes.c_long,
‘L’: ctypes.c_ulong
‘f’: ctypes.c_float
‘d’: ctypes.c_double
Server process
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array.
16.6. multiprocessing — Process-based “threading” interface — Python 2.7.18 documentation
multiprocessing 中的 Manager()
Python中进程间共享数据,除了基本的queue,pipe和value+array外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口。
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。
Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
from multiprocessing import Process, Manager
def f(d, l):
d["name"] = "king"
d["age"] = 100
d["Job"] = "python"
l.reverse()
if __name__ == "__main__":
with Manager() as man:
d_temp = man.dict()
l_temp = man.list(range(10))
p = Process(target=f, args=(d_temp, l_temp))
p.start()
p.join()
print(d_temp)
print(l_temp)
Server process manager 比 shared memory 更灵活,因为它可以支持任意的对象类型。另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。
3、Python 线程的事件 ( threading.Event )
Python 线程的事件用于主线程控制其他线程的执行。
事件主要提供了三个方法
- set 。将 “Flag” 设置为 False
- wait 。将 “Flag” 设置为 True
- clear 。判断标识位是否为Ture。
事件处理的机制:全局定义了一个 “Flag”,如果 “Flag” 值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果 “Flag” 值为True,那么 event.wait 方法时便不再阻塞。
import threading
def do(event):
print('start')
event.wait()
print('execute')
event_obj = threading.Event()
for i in range(10):
t = threading.Thread(target=do, args=(event_obj,))
t.start()
event_obj.clear()
# inp = input('input:')
inp = raw_input('input:')
if inp == 'true':
event_obj.set()
当线程执行的时候,如果 flag 为False,则线程会阻塞,当 flag 为True 的时候,线程不会阻塞。它提供了 本地 和 远程 的并发性。
4、Python 协程
关于协程,可以参考 greenlet、stackless、gevent、eventlet 等的实现。
我们知道并发(不是并行)编程目前有四种方式,多进程,多线程,异步,和协程。
多线程编程 Python 中有 Thread 和 threading,在 linux 下所谓的线程,实际上是 LWP 轻量级进程,其在内核中具有和进程相同的调度方式,有关 LWP,COW(写时拷贝),fork,vfork,clone等的资料较多,不再赘述。异步在 linux 下主要有三种实现 select,poll,epoll 。
协程 又称 微线程 。英文名 Coroutine。
协程的好处:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
- 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
- 无法利用多核资源:协程的本质是个单线程,它不能同时将单个 CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
- 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
"函数 ( 又叫 子程序 ) " 在所有语言中都是层级调用,比如 A 调用 B,B 在执行过程中又调用了 C,C 执行完毕返回,B 执行完毕返回,最后是 A 执行完毕。所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
- 子程序调用总是一个入口,一次返回,调用顺序是明确的。
- 协程的调用 和 子程序不同。协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。注意:是在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。
比如:子程序 A、B:
def A():
print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'
假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B 也可能在执行过程中中断再去执行A,结果可能是:
1
2
x
y
3
z
但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。看起来 A、B 的执行有点像多线程,但协程的特点在于是一个线程执行,
协程和多线程比,协程有何优势?
- 协程最大的优势就是协程极高的执行效率。因为是子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
- 第二大优势就是不需要多线程的锁机制。因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
因为协程是一个线程执行,那怎么利用多核CPU呢 ?
- 多进程 + 协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
一个例子:
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:
import time
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
c.next()
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)
执行结果:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
注意到consumer函数是一个generator(生成器),把一个consumer传入produce后:
1. 首先调用c.next()启动生成器;
2. 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
3. consumer通过yield拿到消息,处理,又通过yield把结果传回;
4. produce拿到consumer处理的结果,继续生产下一条消息;
5. produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
最后套用Donald Knuth的一句话总结协程的特点:“子程序就是协程的一种特例”
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。
协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),
event loop是协程执行的控制点,如果你希望执行协程,就需要用到它们。
event loop提供了如下的特性:
注册、执行、取消延时调用(异步函数)、创建用于通信的client和server协议(工具)、创建和别的程序通信的子进程和协议(工具) 把函数调用送入线程池中
协程示例:
import asyncio
async def cor1():
print("COR1 start")
await cor2()
print("COR1 end")
async def cor2():
print("COR2")
asyncio.run(cor1())
python 的 greenlet 模块
import greenlet
def fun1():
print("12")
gr2.switch()
print("56")
gr2.switch()
def fun2():
print("34")
gr1.switch()
print("78")
gr1 = greenlet.greenlet(fun1)
gr2 = greenlet.greenlet(fun2)
gr1.switch()
greenlet、Eventlet,gevent ( 推荐
- greenlet :是使用生成器实现的协程,调度起来很麻烦,而且不是正在意义上的协程,只是实现的代码执行过程中的挂起,唤醒操作。Greenlet没有自己的调度过程,所以一般不会直接使用。greenlet:http://greenlet.readthedocs.org/en/latest/
- eventlet:是在 greenlet 的基础上实现了自己的 GreenThread,实际上就是 greenlet 类的扩展封装,而与Greenlet的不同是,Eventlet实现了自己调度器称为Hub,Hub类似于Tornado的IOLoop,是单实例的。在Hub中有一个event loop,根据不同的事件来切换到对应的GreenThread。同时 Eventlet 还实现了一系列的补丁来使 Python 标准库中的 socket 等等module 来支持 GreenThread 的切换。Eventlet 的 Hub 可以被定制来实现自己调度过程。eventlet 目前支持 CPython 2.7 和 3.4+ 并将在未来删除,仅保留 CPython 3.5+ 支持。
- Gevent:基于 libev 与 Greenlet 实现。不同于 Eventlet 的用 python 实现的 hub 调度,Gevent 通过 Cython 调用 libev 来实现一个高效的 event loop 调度循环。同时类似于 Event,Gevent 也有自己的 monkey_patch,在打了补丁后,完全可以使用 python 线程的方式来无感知的使用协程,减少了开发成本。
在 Python 的世界里由于 GIL 的存在,线程一直都不是很好用,所以就有了各种协程的 hack。Gevnet 是当前使用起来最方便的协程,但是由于依赖于 libev 所以不能在 pypy上 跑,如果需要在pypy上使用协程,Eventlet 是最好的选择。
gevent 属于第三方模块需要下载安装包
pip3 install --upgrade pip3
pip3 install gevent
示例:
import gevent
def fun1():
print("www.baidu.com") # 第一步
gevent.sleep(0)
print("end the baidu.com") # 第三步
def fun2():
print("www.zhihu.com") # 第二步
gevent.sleep(0)
print("end th zhihu.com") # 第四步
gevent.joinall([
gevent.spawn(fun1),
gevent.spawn(fun2),
])
示例:遇到 IO 操作自动切换:
import gevent
from gevent import monkey
from datetime import datetime
monkey.patch_all()
import requests
def func(url):
print(f"[{datetime.now().replace(microsecond=0)}] 开始请求 {url}")
gevent.sleep(0)
proxies = {
"http": "http://172.17.18.80:8080",
"https": "http://172.17.18.80:8080",
}
proxies = None
resp = requests.get(url, proxies=proxies)
print(f"[{datetime.now().replace(microsecond=0)}] {{resp.url}} ---> {len(resp.content)}")
gevent.joinall([
gevent.spawn(func, 'https://www.baidu.com/'),
gevent.spawn(func, 'https://www.sina.com.cn/'),
gevent.spawn(func, 'https://www.qq.com/'),
])
多线程 + 协程:
# -*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey
monkey.patch_all()
import sys
from importlib import reload
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
try:
s = requests.Session()
r = s.get(url, timeout=1) # 在这里抓取页面
except BaseException as be:
print(be)
return None
def setup_process(url_list):
tasks = []
for url in url_list:
tasks.append(gevent.spawn(fetch, url))
gevent.joinall(tasks) # 使用协程来执行
def main(filepath, per_task=100000): # 每10W条url启动一个进程
with open(filepath, 'r') as f: # 从给定的文件中读取url
url_list = f.readlines()
url_list = [temp.strip() for temp in url_list]
url_len = len(url_list)
process_count = int(url_len / per_task) if url_len % per_task == 0 else int(url_len / per_task) + 1
for index in range(process_count):
task_list = url_list[index * per_task: (index + 1) * per_task]
p = Process(target=setup_process, args=(url_list,))
p.start()
if __name__ == '__main__':
main('./test_data.txt') # 读取指定文件
例子中隐藏了一个问题:进程的数量会随着 url 数量的增加而不断增加,我们在这里不使用进程池multiprocessing.Pool 来控制进程数量的原因是 multiprocessing.Pool 和 gevent 有冲突不能同时使用,但是有兴趣的同学可以研究一下 gevent.pool 这个协程池。
"""
对于有些人来说Gevent和multiprocessing组合在一起使用算是个又高大上又奇葩的工作模式.
Python的多线程受制于GIL全局锁的特性,Gevent身为协程也是线程的一种,只是io调度上自己说了算而已。
那么如何使用多个cpu核心?
可以利用多进程 mutliprocessing 来进行多核并行工作,
在多进程里面使用gevent协程框架可以更好的做io调度,相比线程来说减少了无谓的上下文切换.
废话少说,直接上个例子.
下面是多进程下生产者消费者的工作模式
"""
import datetime
from multiprocessing import Process, cpu_count, Queue, JoinableQueue
import gevent
from gevent import monkey
monkey.patch_all()
class Consumer(object):
def __init__(self, task_queue, task_list, consumer_name):
self.task_queue = task_queue
self.task_list = task_list
self.consumer_name = consumer_name
self.__run_gevent()
def __run_gevent(self):
jobs = [gevent.spawn(self.__print_value) for x in range(self.task_list)]
gevent.joinall(jobs)
def __print_value(self):
while True:
value = self.task_queue.get()
if value is None:
self.task_queue.task_done()
break
else:
print(f"[{datetime.datetime.now()}] {self.consumer_name} ---> value: {value}")
return
class Producer(object):
def __init__(self, task_queue, task_list, producer_name, consumers_tasks):
self.task_queue = task_queue
self.task_list = task_list
self.producer_name = producer_name
self.consumer_tasks = consumers_tasks
self.__run_gevent()
def __run_gevent(self):
jobs = [gevent.spawn(self.produce) for x in range(self.task_list)]
gevent.joinall(jobs)
for x in range(self.consumer_tasks):
self.task_queue.put_nowait(None)
self.task_queue.close()
def produce(self):
for no in range(10000):
print(no)
self.task_queue.put(no, block=False)
return
def main():
worker_count = cpu_count() * 2
task_queue = JoinableQueue()
producer_gevent = 10
consumer_gevent = 7
pid_list = []
for index in range(worker_count):
if not index % 2:
pid = Process(
target=Producer,
args=(task_queue, 10, f"producer_{index}", consumer_gevent)
)
pid.start()
pid_list.append(pid)
else:
pid = Process(
target=Consumer,
args=(task_queue, consumer_gevent, f"consumer_{index}")
)
pid.start()
pid_list.append(pid)
for pid in pid_list:
pid.join()
if __name__ == '__main__':
main()
5、Python 的 multiprocessing
5.1 关于 multiprocessing
由于 Python 设计的限制 ( 这里指 CPython,GLI )。最多只能用满1个CPU核心。但是 Python 的多进程包 multiprocessing 可以轻松完成从单进程到并发执行的转换。像 线程一样管理进程,这个是 mutilprocess 的核心,他与 threading 很是相像,对多核CPU的利用率会比 threading 好的多。
此外 multiprocessing包中也有 Lock / Event / Semaphore / Condition类,用来同步进程,其用法也与 threading 包中的同名类一样。multiprocessing 的很大一部份与 threading 使用同一套 API,只不过换到了多进程的情境。但在使用这些共享API的时候,我们要注意以下几点:
- 在 UNIX 平台上,当某个进程终结之后,该进程需要被其父进程调用 wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
- multiprocessing 提供了 threading 包中没有的 IPC ( 比如:Pipe 和 Queue ),效率上更高。应优先考虑 Pipe 和 Queue,避免使用 Lock / Event / Semaphore / Condition 等同步方式 (因为它们占据的不是用户进程的资源 )。
- 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如 使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过 "共享内存" 和 "Manager 的方法" 来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
- Process.PID中保存有PID,如果进程还没有start(),则PID为None。
5.2 multiprocessing.Process
Process 类的构造方法:
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
参数说明:
group:进程所属组。基本不用
target:表示调用对象。
args:表示调用对象的位置参数元组。
name:别名
kwargs:表示调用对象的字典。
利用 multiprocessing.Process 可以创建一个 Process 对象,该对象与 Thread对象 的用法相同,拥有如下方法:
- is_alive()
- join([timeout])
- run()
- start()
- terminate()
- ........
属性有:
- authkey
- daemon(要通过start()设置)、
- exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、
- name
- pid
- ........
直接 创建 进程
# -*- coding: utf-8 -*-
import multiprocessing
def worker_func(num):
print(f'worker_func ---> {num}')
def main():
pid_list = []
for i in range(5):
pid = multiprocessing.Process(target=worker_func, args=(i,))
pid_list.append(pid)
for pid in pid_list:
pid.start()
for pid in pid_list:
pid.join()
if __name__ == '__main__':
main()
继承 multiprocessing.Process 创建 进程
利用 class 来创建进程,定制子类
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print('In %s' % self.name)
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
守护进程
守护进程就是不阻挡主程序退出,自己干自己的。 mutilprocess.setDaemon(True)
就这句。
等待守护进程退出,要加上 join,join 可以传入浮点数值,等待n久就不等了
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
time.sleep(2)
print('Exiting :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
print('Exiting :', name)
if __name__ == '__main__':
pid_1 = multiprocessing.Process(name='daemon', target=daemon)
pid_1.daemon = True
pid_2 = multiprocessing.Process(name='non-daemon', target=non_daemon)
pid_2.daemon = False
pid_1.start()
pid_2.start()
pid_1.join(1)
print(f'pid_2.is_alive() ---> {pid_2.is_alive()}')
pid_2.join()
终止进程
最好使用 poison pill,强制的使用 terminate()。注意 terminate 之后要 join,使其可以更新状态
import multiprocessing
import time
def slow_worker():
print('Starting worker')
time.sleep(0.1)
print('Finished worker')
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print('BEFORE:', p, p.is_alive())
p.start()
print('DURING:', p, p.is_alive())
p.terminate()
print('TERMINATED:', p, p.is_alive())
p.join()
print('JOINED:', p, p.is_alive())
进程的退出状态
- == 0 未生成任何错误
- 0 进程有一个错误,并以该错误码退出
- < 0 进程由一个-1 * exitcode信号结束
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
jobs = []
for f in [exit_error, exit_ok, return_value, raises, terminated]:
print('Starting process for', f.func_name)
j = multiprocessing.Process(target=f, name=f.func_name)
jobs.append(j)
j.start()
jobs[-1].terminate()
for j in jobs:
j.join()
print('%15s.exitcode = %s' % (j.name, j.exitcode))
Python 进程间传递消息
Python多线程与同步:javascript:void(0)
一般的情况是 Queue 来传递。
import multiprocessing
class MyFancyClass(object):
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print(f"{proc_name} ---> {self.name}")
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
示例:
import time
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
super(Consumer, self).__init__()
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print(f"退出进程 ---> {proc_name}")
self.task_queue.task_done()
break
print(f'{proc_name} 下一个任务 ---> {next_task}')
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return f'{self.a} * {self.b} = {self.a * self.b}'
def __str__(self):
return str(self.a * self.a)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print(f"创建 {num_consumers} 消费者")
consumers = [Consumer(tasks, results) for i in range(num_consumers)]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print(f"结果 ---> {result}")
num_jobs -= 1
进程间信号传递
Event 提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
w1.start()
w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2))
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
multiprocessing 进程 同步
我们可以从下面的程序中看到 Thread 对象和 Process对象 在使用上的相似性与结果上的不同。各个线程和进程都做一件事:打印PID。但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用 Lock 同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。
import os
import threading
import multiprocessing
# worker function
def worker(sign=None, t_lock=None):
t_lock.acquire()
print(sign, os.getpid())
t_lock.release()
# Main
print('Main:', os.getpid())
# Multi-thread
record = []
threading_lock = threading.Lock()
for i in range(5):
thread = threading.Thread(target=worker, args=('thread', threading_lock))
thread.start()
record.append(thread)
for thread in record:
thread.join()
# Multi-process
record = []
process_lock = multiprocessing.Lock()
for i in range(5):
process = multiprocessing.Process(target=worker, args=('process', process_lock))
process.start()
record.append(process)
for process in record:
process.join()
所有 Thread 的 PID 都与主程序相同,而每个 Process 都有一个不同的 PID。
Pipe ( 管道 ) 和 mutiprocessing.Queue( 队列 )
正如我们在 Linux多线程 中介绍的管道PIPE和消息队列 message queue,multiprocessing 包中有Pipe类 和 Queue类 来分别支持这两种 IPC 机制。Pipe 和 Queue 可以用来传送常见的对象。
Pipe 可以是单向(half-duplex),也可以是双向(duplex)。
通过mutiprocessing.Pipe(duplex=False) 创建单向管道 (默认为双向)。一个进程从 PIPE 一端输入对象,然后被 PIPE 另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。下面的程序展示了 Pipe 的使用:( 这里的 Pipe 是双向的。 )
import multiprocessing as mul
def proc1(pipe=None):
pipe.send('hello')
print('proc1 rec:', pipe.recv())
def proc2(pipe=None):
print('proc2 rec:', pipe.recv())
pipe.send('hello, too')
# Build a pipe
pipe = mul.Pipe()
# Pass an end of the pipe to process 1
p1 = mul.Process(target=proc1, args=(pipe[0],))
# Pass the other end of the pipe to process 2
p2 = mul.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
Pipe 对象建立的时候,返回一个含有两个元素的表,每个元素代表 Pipe 的一端(Connection对象)。对 Pipe 的某一端调用 send() 方法来传送对象,在另一端使用 recv() 来接收。
mutiprocessing.Queue
Queue 与 Pipe 相类似,都是先进先出的结构。但 Queue 允许多个进程放入,多个进程从队列取出对象。Queue 使用 mutiprocessing.Queue(maxsize) 创建,maxsize 表示队列中可以存放对象的最大数量。下面的程序展示了 Queue 的使用:
import os
import multiprocessing
import time
# input worker
def input_queue(queue=None):
info = str(os.getpid()) + '(put):' + str(time.time())
queue.put(info)
# output worker
def output_queue(queue=None, lock):
info = queue.get()
lock.acquire()
print(str(os.getpid()) + '(get):' + info)
lock.release()
# ===================
# Main
record1 = [] # store input processes
record2 = [] # store output processes
lock = multiprocessing.Lock() # To prevent messy print
queue = multiprocessing.Queue(3)
# input processes
for i in range(10):
process = multiprocessing.Process(target=input_queue, args=(queue,))
process.start()
record1.append(process)
# output processes
for i in range(10):
process = multiprocessing.Process(target=output_queue, args=(queue, lock))
process.start()
record2.append(process)
for p in record1:
p.join()
queue.close() # No more object will come, close the queue
for p in record2:
p.join()
一些进程使用 put() 在 Queue 中放入字符串,这个字符串中包含 PID 和时间。另一些进程从Queue 中取出,并打印自己的 PID 以及 get() 的字符串
共享资源 --- 共享内存
应该尽量避免多进程共享资源。多进程共享资源必然会带来进程间相互竞争。而这种竞争又会造成race condition,我们的结果有可能被竞争的不确定性所影响。但如果需要,我们依然可以通过 共享内存 和 Manager对象
例子:
import multiprocessing
def f(n, a):
n.value = 3.14
a[0] = 5
num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
这里我们实际上只有主进程和Process对象代表的进程。我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为0.0。而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。
共享资源 --- Manager
Manager 对象类似于 服务器 与 客户 之间的通信 (server-client),与我们在 Internet 上的活动很类似。我们用一个进程作为服务器,建立 Manager 来真正存放资源。其它的进程可以通过参数传递或者根据地址来访问Manager,建立连接后,操作服务器上的资源。在防火墙允许的情况下,我们完全可以将Manager运用于多计算机,从而模仿了一个真实的网络情境。下面的例子中,我们对Manager的使用类似于shared memory,但可以共享更丰富的对象类型。
import multiprocessing
def f(x, arr, l):
x.value = 3.14
arr[0] = 5
l.append('Hello')
server = multiprocessing.Manager()
x = server.Value('d', 0.0)
arr = server.Array('i', range(10))
l = server.list()
proc = multiprocessing.Process(target=f, args=(x, arr, l))
proc.start()
proc.join()
print(x.value)
print(arr)
print(l)
Manager 利用 list() 方法提供了表的共享方式。实际上你可以利用 dict() 来共享词典,Lock() 来共享 threading.Lock ( 注意,我们共享的是 threading.Lock,而不是进程的 mutiprocessing.Lock。后者本身已经实现了进程共享) 等。 这样 Manager 就允许我们共享更多样的对象。
5.3 multiprocessing.Pool
在使用 Python 进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用 Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时 进程池 就派上用场了。进程池 (Process Pool) 可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的士兵。比如下面的程序:
Pool 类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
Pool 类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。我们可以用 Pool 类创建一个进程池,展开提交的任务给进程池。
一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持 超时 和 回调的异步结果,有一个类似 map 的实现。
参数
- processes :进程的数量,如果 processes 是 None 那么使用 os.cpu_count() 返回的数量。
- initializer:如果是 None,那么每一个工作进程在开始的时候会调 initializer(*initargs)。
- maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild 默认是 None,意味着只要 Pool 存在工作进程就会一直存活。
- context:用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者 一个context 对象的 Pool() 方法来创建一个池,两种方法都适当的设置了context
进程池的方法
- apply(func[, args[, kwds]]) :调用 func 函数并传递 args 和 kwds,不建议使用,并且 3.x 以后不在出现。结果返回前主进程会被阻塞直到函数执行结束,由于这个原因,apply_async() 更适合并发执行。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply() 方法的一个变体,会返回一个结果对象。 如果 callback 被指定,那么 callback 可以接收一个参数然后被调用,当结果准备好回调时会调用 callback,调用失败时,则用 error_callback 替换 callback。 Callbacks 应被立即完成,否则处理结果的线程会被阻塞。非阻塞 版本
- close() :阻止更多的任务提交到 pool,待任务完成后,工作进程会退出。
- terminate() :不管任务是否完成,立即停止工作进程。在对 pool 对象进程垃圾回收的时候,会立即调用 terminate()。
- join():wait 工作线程的退出,在调用 join() 前,必须调用 close() 或者 terminate()。这样是因为被终止的进程需要被父进程调用 wait(join等价与wait),否则进程会成为僵尸进程。(主进程阻塞,等待子进程的退出, join 方法要在 close 或 terminate 之后使用。 )
- map(func, iterable[, chunksize])
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- imap(func, iterable[, chunksize])
- imap_unordered(func, iterable[, chunksize])
- starmap(func, iterable[, chunksize])
- starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
非阻塞 进程池 --- apply_async
注意:是 "进程池",不是 "线程池"。它可以让你跑满多核CPU,而且使用方法非常简单。
注意要用 apply_async,如果省略了 async,就变成阻塞版本了。
使用 进程池( 非阻塞 版本)
# -*- coding: utf-8 -*-
import time
import multiprocessing
def worker_func(process_name=None, arg=None):
for i in range(3):
print(f"{process_name} ---> {arg}")
time.sleep(1)
if __name__ == "__main__":
cpu_count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cpu_count)
for index in range(10):
data = f"hello {index}"
pool.apply_async(worker_func, (f"process_name_{index}", data))
pool.close()
pool.join()
print("主线程退出")
示例:
# -*- coding: utf-8 -*-
import os
import time
import random
import multiprocessing
def func_1():
# os.getpid()获取当前的进程的ID
print(f"func_1 ---> {os.getpid()}")
start = time.time()
time.sleep(random.random() * 10)
end = time.time()
print(f'func_1 ---> 运行 {end - start}秒')
def func_2():
print(f"func_2 ---> {os.getpid()}")
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print(f'func_2 ---> 运行 {end - start}秒')
def func_3():
print(f"func_3 ---> {os.getpid()}")
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print(f'func_3 ---> 运行 {end - start}秒')
if __name__ == '__main__':
func_list = [func_1, func_2, func_3]
print(f"父进程 ---> {os.getpid()}")
pool = multiprocessing.Pool(4)
for func in func_list:
# Pool执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
pool.apply_async(func)
print('等待字进程...')
pool.close()
# 调用join之前,一定要先调用close() 函数,否则会出错
# close()执行后不会有新的进程加入到 pool, join 函数等待素有子进程结束
pool.join()
print('所有进程都完成')
pass
阻塞 进程池 --- apply
# coding: utf-8
import time
import multiprocessing
def worker_func(arg):
print(f"worker_func ---> {arg}")
time.sleep(3)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=3)
for i in range(4):
data = f"hello {i}"
# 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
pool.apply(worker_func, (data,))
pool.close()
pool.join()
map()
函数原型:map(func, iterable[, chunksize=None])
Pool 类中的 map 方法,与内置的 map 函数用法行为基本一致,它会使进程阻塞直到返回结果。
注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
import time
import multiprocessing
def worker_func(arg=None):
time.sleep(1)
print('arg * arg')
return arg * arg
if __name__ == "__main__":
temp_list = [1, 2, 3, 4, 5, 6,7,8,9]
start_time = time.time()
for item in temp_list:
worker_func(item)
end_time = time.time()
print("顺序执行时间:", int(end_time - start_time))
pool = multiprocessing.Pool(5) # 创建拥有5个进程数量的进程池
start_time = time.time()
result = pool.map(worker_func, temp_list) # 使进程阻塞直到返回结果
pool.close() # 关闭进程池,不再接受新的进程
pool.join() # 主进程阻塞等待子进程的退出
end_time = time.time()
print("并行执行时间:", int(end_time - start_time))
print(f'map 的所有子进程返回的结果列表: {result}')
上例是一个创建多个进程并发处理与顺序执行处理同一数据,所用时间的差别。从结果可以看出,并发执行的时间明显比顺序执行要快很多,但是进程是要耗资源的,所以平时工作中,进程数也不能开太大。程序中的 result 表示全部进程执行结束后全部的返回结果集,run 函数有返回值,所以一个进程对应一个返回结果,这个结果存在一个列表中。
对 Pool对象调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close(),让其不再接受新的 Process。
close()
关闭进程池(pool),使其不在接受新的任务。
terminate()
结束工作进程,不在处理未处理的任务。
join()
主进程阻塞等待子进程的退出,join 方法必须在 close 或 terminate 之后使用。
使用 Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:
# -*- coding: utf-8 -*-
import multiprocessing
import time
def func(msg):
for i in range(3):
print(msg)
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for index in range(10):
msg = f"hello {index}"
result.append(pool.apply_async(func, (msg,)))
pool.close()
pool.join()
for res in result:
print(res.get())
print("Sub-process(es) done.")
示例
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print('Starting', multiprocessing.current_process().name)
if __name__ == '__main__':
inputs = list(range(10))
print('Inputs :', inputs)
builtin_output = list(map(do_calculation, inputs))
print('Build-In :', builtin_output)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, )
# 默认情况下,Pool会创建固定数目的工作进程,并向这些工作进程传递作业,直到再没有更多作业为止。
# maxtasksperchild 参数为每个进程执行 task 的最大数目,
# 设置 maxtasksperchild参数可以告诉池在完成一定数量任务之后重新启动一个工作进程,
# 来避免运行时间很长的工作进程消耗太多的系统资源。
# pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, maxtasksperchild=2)
print('-' * 20)
pool_outputs = pool.map(do_calculation, inputs)
pool.close()
pool.join()
print('Pool :', pool_outputs)