1. 基本概念
1.1 并发和并行
并发和并行的概念并不是对立的,并发(concurrent)对应的是顺序(sequential),并行(parallel)对应的是串行(serial)。
- 顺序:上一个开始执行的任务完成后,当前任务才能开始执行
- 并发:无论上一个开始执行的任务是否完成,当前任务都可以开始执行
- 串行:有一个任务执行单元,从物理上就只能一个任务、一个任务地执行
- 并行:有多个任务执行单元,从物理上就可以多个任务一起执行
顺序/并发关注的是任务的调度,而串行/并行更关注执行单元(单核/多核CPU)。
单核多线程就属于串行(只有一个实际执行任务的 CPU 核)并发(不必等上一个任务完成才开始下一个任务)。
也可以将并行理解为并发的子集:如果某个系统支持两个或者多个动作(Action)同时存在,那么这个系统就是一个并发系统。如果某个系统支持两个或者多个动作同时执行,那么这个系统就是一个并行系统。并发系统与并行系统这两个定义之间的关键差异在于“存在”这个词。
Erlang 之父 Joe Armstrong 的图很好地展示了并发和并行的区别:
1.2 进程和线程
进程:进程是资源分配的最小单位,拥有独立的地址空间。
线程:线程是CPU调度的最小单位,共享同一进程的地址空间。每个进程中可以执行多个任务,每个任务就是线程,线程可以说是程序用CPU的一个基本单元,如果程序中是单一的执行路径,那么就是单线程的,如果有多个执行路径,那么就是多线程的。对于python来说,存在GIL 全局解释器锁,它的存在使得一个CPU同一个时刻只能执行一个线程,python的多线程就更像是伪多线程,一个线程运行时,其他,多线程代码不是同时执行,而是交替执行。而不同线程之间的切换需要耗费资源的,因为需要存储线程的上下文,不断的切换就会持续耗费资源。同时,多线程也不能够有效的利用CPU的多核性能,提升程序的运行速度。
协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。一个线程上可以跑多个协程,协程是轻量级的线程。
2. 多线程
python常用多线程模块:_thread, threading, Queue
threading支持_thread中所有功能,Queue用于提供线程安全的队列类
2.1 threading.Thread类
两种使用方法:
(1)使用Thread实例化一个线程对象,传入想要执行的函数
thread1 = Thread(target=print, args=(1,))
(2)继承Thread类,重写run方法
初始化:
Thread(
group=None,
target=None, #run方法使用的可调用对象(函数)
name=None, #线程的名称
args=(), #传入的函数的参数组成的列表或元组
kwargs=None, #传入的函数的关键字参数
*,
daemon=None) #守护线程
常用方法:
- start():启动线程
- run():线程执行的代码,一般重写此方法
- join(timeout=None):阻塞调用线程(一般是主进程),等到被调用线程终止(正常或者未处理的异常)或直到超时。
- getName()/setName():获取/设置线程名字,也可直接使用name属性
- isDaemon()/setDaemon():判断和设置线程为守护线程,也可直接使用daemon属性
2.2 Daemon Thread 守护进程
也叫后台进程,它的目的是为其他线程提供服务,其存在依赖于其他线程,如果其他线程消亡,它会随之消亡。
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
super().__init__()
self.n = n
def run(self) -> None:
while True:
_count = threading.active_count()
print(self.n, f"当前活跃的线程个数:{_count}")
time.sleep(self.n)
for i in range(1, 4):
t = MyThread(i)
t.setDaemon(False)
t.start()
本例中创建了3个线程,如果setDaemon为False,当主线程终止后他们依然会在while循环中运行,如果为True,当主线程终止后它们也会终止。
2.3 阻塞进程
join方法会使调用进程被阻塞,等待被调用join方法的子进程运行结束
2.4 线程安全
为保证数据安全,需要使用线程锁来保护数据。
当两个线程同时对一全局变量进行累加,不加锁就会导致资源抢夺,从而输出不确定的值。每次上锁后都要记得及时释放(release),也可以使用with模块。
(1)Lock():原始锁
在Python中,它是能用的最低级的同步基元组件,由 _thread 扩展模块直接实现。一个线程获得一个锁后,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。
threading.Lock() 返回的原始锁有 acquire() 和 release() 两个方法,对象本身有“锁定”和“非锁定”两种状态。当状态为非锁定时调用 acquire() 方法会将状态改为锁定,并立即返回,当状态为锁定时调用 acquire() 方法会阻塞线程,直到其他线程调用了 release() 方法将锁状态改为非锁定,然后 acquire() 重置其为锁定状态并返回。release() 只在锁定状态下调用;它将状态改为非锁定并立即返回。如果尝试释放一个非锁定的锁,则会引发 RuntimeError 异常。
示例:
import time
import threading
number = 0
lock = threading.Lock() #实例化一个锁
class MyThread(threading.Thread):
def __init__(self, n, k):
super().__init__()
self.n = n
self.k = k
def run(self) -> None:
global number
for i in range(self.k):
#lock.acquire()
number += 1
#lock.release()
t1 = MyThread(1, 1000000)
t2 = MyThread(2, 1000000)
t1.start()
t2.start()
t1.join()
t2.join()
print("main", number)
acquire() 方法有两个缺省参数,一般来讲我们不传入任何参数就相当于 acquire(blocking=True, timeout=-1),当无法获得锁时,这会无限期地阻塞线程,直到获得锁,然后返回 True。设置 blocking 为 False 表示以不阻塞线程的方式尝试获得锁,并立即返回 True 或者 False。当 blocking 为 True时还可设置 timeout 表示最多阻塞线程的时间。
(2)RLock():重入锁/递归锁。
用法与原始锁基本相同,相比原始锁加入了“所属线程”和“递归等级”的概念。特点:①重入锁必须由获取它的线程释放;②一旦线程获得了重入锁,同一个线程再次获取它将不阻塞(原始锁会阻塞自己);③线程必须在每次获取它时释放一次。
2.5 threading.Condition 条件对象
当线程在系统中运行时,线程的调度具有一定的透明性,通常程序无法准确控制线程的轮换执行,如果有需要,Python 可通过线程通信来保证线程协调运行。使用 Condition 可以让那些己经得到 Lock 对象却无法继续执行的线程释放 Lock 对象,Condition 对象也可以唤醒其他处于等待状态的线程。
例子:两个线程模拟的智能机器人互相对话
天猫精灵:小爱同学
小爱:在
天猫精灵:我们来对古诗吧
小爱:好啊
天猫精灵:我住长江头
小爱:不聊了,再见
这是一个同步问题(线程同步:多个线程协按照一定的顺序协同完成某一任务),用原始锁和信号量都能实现,逻辑大概如下:
class TianMao(threading.Thread):
--snip--
def run(self):
self.lock_A.acquire()
self.lock_B.acquire()
print("tianmao:小爱同学")
self.lock_A.release()
self.lock_B.acquire()
print("tianmao:我们来对古诗吧")
self.lock_A.release()
self.lock_B.acquire()
print("tianmao:我住长江头")
self.lock_A.release()
self.lock_B.release()
class XiaoAi(threading.Thread):
--snip--
def run(self):
self.lock_A.acquire()
print("xiaoai:在")
self.lock_B.release()
self.lock_A.acquire()
print("xiaoai:好啊")
self.lock_B.release()
self.lock_A.acquire()
print("xiaoai:不聊了,再见")
self.lock_A.release()
Condition 对象也用到了两个锁对象:([python] 线程间同步之条件变量Condition - 简书 (jianshu.com))
Codition有两层锁,一把底层锁会在进入wait方法的时候释放,离开wait方法的时候再次获取,上层锁会在每次调用wait时分配一个新的锁,并放入condition的等待队列中,而notify负责释放这个锁。
acquire() 和 release() 都是 RLock 的方法,用来对底层锁的获得和释放,wait() 方法会先释放底层锁,然后阻塞调用进程,等待notify唤醒,再尝试获得底层锁;notify(n=1)由获得锁的线程调用,会唤醒等待队列中的n个线程,但不会释放底层锁,这之后必须由线程主动释放锁。
使用起来也更加容易理解:
class TianMao(threading.Thread):
--snip--
def run(self):
self.con_lock.acquire()
print("tianmao:小爱同学")
self.con_lock.wait()
print("tianmao:我们来对古诗吧")
self.con_lock.notify()
self.con_lock.wait()
print("tianmao:我住长江头")
self.con_lock.notify()
self.con_lock.release()
class XiaoAi(threading.Thread):
--snip--
def run(self):
self.con_lock.acquire()
print("xiaoai:在")
self.con_lock.notify()
self.con_lock.wait()
print("xiaoai:好啊")
self.con_lock.notify()
self.con_lock.wait()
print("xiaoai:不聊了,再见")
self.con_lock.release()
条件对象时常配合循环的条件检查使用,参考 Python 文档 提供的生产消费者案例:
# Consume one item
with cv:
while not an_item_is_available():
cv.wait()
get_an_available_item()
# Produce one item
with cv:
make_an_item_available()
cv.notify()
条件检查也可使用 wait_for(predicate, timeout=None) 方法自动化实现,predicate 是一个返回布尔值的可调用对象。
2.6 信号量
Python 通过 threading.Semaphore(value=1) 来创建信号量,value赋予内部计数器初始值,小于零会引发 ValueError 异常。acquire()方法相当于P操作,如果内部计数器大于零,则减一并立即返回,如果内部计数器等于零,阻塞线程,直到其他线程调用release()方法使内部计数器大于零,然后再减一返回(也可以像锁变量那样设置blocking和timeout);release(n=1)方法相当于V操作,将内部计数器加n。为了防止信号量被过多释放,可使用 threading.BoundedSemaphore(value=1) 创建有界信号量,使得其内部计数器值不超过初始值。
信号量通常用于保护数量有限的资源,例如数据库服务器。(连接池)
2.7 threading.Event 事件对象
这是线程之间通信的最简单机制之一:一个线程发出事件信号,而其他线程等待该信号。
(个人理解)尽管 Condition 对象能在一定程度上实现线程通信,但主要目的是为了同步调度。这中间总是涉及到对锁的争抢,如果单纯想让一个线程控制其他线程的执行,可以使用 Event 类,它的概念十分简单。①Event 创建一个事件管理flag,其取值为False(默认)或True;②set() 和 clear() 方法用于将 flag 变为 True 或 False;③当 flag 为 False 时,wait(timeout=None) 方法会阻塞线程直到 flag 变为 True;④is_set() 方法可获取当前flag标志值(从而让线程执行不同的操作)。
2.8 threading.Timer 定时器
Timer是Thread的子类,实例化一个Timer相当于创建了一个线程。
class threading.Timer(interval, function, args=[], kwargs={})
创建后需要start启动,然后会在interval秒过去后,以参数 args 和关键字参数 kwargs 运行function。
如果把 function 设置为另一个线程的 start 方法,则会延迟启动该线程;在执行function前可以调用cancel方法取消线程执行
2.9 threading.Barrier 栅栏对象
栅栏类提供一个简单的同步原语,用于应对固定数量的线程需要彼此相互等待的情况。线程调用 wait() 方法后将阻塞,直到所有线程都调用了 wait() 方法。此时所有线程将被同时释放。
(略)