一、python线程死锁与递归锁
死锁现象
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。
此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
代码示例:
from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()
class MyThread(Thread):
def run(self):
self.task1()
self.task2()
def task1(self):
mutexA.acquire()
print('%s task1 get A' %self.name)
mutexB.acquire()
print('%s task1 get B' % self.name)
mutexB.release()
mutexA.release()
def task2(self):
mutexB.acquire()
print('%s task2 get B' % self.name)
time.sleep(1) # Thread-2 拿到执行权,执行get A出现死锁,此时thread2需要B锁,而thread1占用,与此同时,thread1需要A锁,thread2占用
mutexA.acquire()
print('%s task2 get A' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
输出结果:
Thread-1 task1 get A
Thread-1 task1 get B
Thread-1 task2 get B
Thread-2 task1 get A # 出现死锁,整个程序被阻塞
2.递归锁
解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。
直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次
代码示例:
from threading import Thread,RLock
import time
mutexA = mutexB = RLock()
class MyThread(Thread):
def run(self):
self.task1()
self.task2()
def task1(self):
mutexA.acquire()
print('%s task1 get A' %self.name)
mutexB.acquire()
print('%s task1 get B' % self.name)
mutexB.release()
mutexA.release()
time.sleep(1) # Thread-2 拿到执行权,,此时counter=0,thread2执行task1
def task2(self):
mutexB.acquire()
print('%s task2 get B' % self.name)
mutexA.acquire()
print('%s task2 get A' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
输出结果:
Thread-1 task1 get A
Thread-1 task1 get B
Thread-2 task1 get A
Thread-2 task1 get B
Thread-3 task1 get A
Thread-3 task1 get B
Thread-4 task1 get A
Thread-4 task1 get B
Thread-5 task1 get A
Thread-5 task1 get B
Thread-6 task1 get A
......
二、信号量
1.什么是信号量
信号量也是一种锁。
信号量的主要用途是用来控制线程的并发量的,Semaphore管理一个内置的计数器,每调用一次acquire()方法时,计数器-1,每调用一次release()方法时,内部计数器+1。
不过需要注意的是,Semaphore内部的计数器不能小于0!当它内部的计数器等于0的时候,这个线程会被锁定,进入阻塞状态,直到其他线程去调用release方法。
2.用处
信号量`semaphore` 是用于控制进入数量的锁。有哪些应用场景呢,比如说在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,
如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件)。
又比如在做爬虫的时候,有时候爬取速度太快了,会导致被网站禁止,所以这个时候就需要控制爬虫爬取网站的频率。
3.信号量使用的示例
semaphore
内部维护了一个条件变量condition
,构造函数是:
Semaphore(value=1) # value设置是内部维护的计数器的大小,默认为1.
主要有两个方法:
每当调用acquire()时,内置计数器-1,直到为0的时候阻塞
每当调用release()时,内置计数器+1,并让某个线程的acquire()从阻塞变为不阻塞
用爬虫来举例,假如说有一个UrlProducer
线程,爬取url
,多个htmlSpider
线程,爬取url
对应的网页。如果直接开20个htmlSpider
线程,20个线程是同时执行的,现在要限制同时执行能执行三个,就可以使用信号量来控制:
import threading
import time
class htmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
time.sleep(2)
print("got html text success")
self.sem.release() # 内部维护的计数器加1,并通知内部维护的conditon通知acquire
class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(20):
self.sem.acquire() # 内部维护的计数器减1,到0就会阻塞
html_thread = htmlSpider("http://baidu.com/{}".format(i), self.sem)
html_thread.start()
if __name__ == "__main__":
sem = threading.Semaphore(3) #设置同时最多3个
url_producer = UrlProducer(sem)
url_producer.start()
从结果可以看出,每次都几乎是三个同时的完成任务。
三、Event事件
1. 什么是事件
同进程的一样,线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,
这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,
它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,
而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,
它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
2. Event几种方法
event.isSet() | 返回event的状态值; |
event.wait() | 如果 event.isSet()==False将阻塞线程; |
event.set() | 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统度; |
event.clear() | 恢复event的状态值为False。 |
3.代码示例
from threading import Thread,Event
import time
event=Event()
def light():
print('红灯正亮着')
time.sleep(3)
event.set() #绿灯亮
def car(name):
print('车%s正在等绿灯' %name)
event.wait() #等灯绿 此时event为False,直到event.set()将其值设置为True,才会继续运行.
print('车%s通行' %name)
if __name__ == '__main__':
# 红绿灯
t1=Thread(target=light)
t1.start()
# 车
for i in range(10):
t=Thread(target=car,args=(i,))
t.start()
四、线程队列Queue
1. 队列分类
Queue | 哪个数据先存入,取数据的时候先取哪个数据,同生活中的排队买东西 |
LifoQueue | 哪个数据最后存入的,取数据的时候先取,同生活中手枪的弹夹,子弹最后放入的先打出 |
PriorityQueue | 存入数据时候加入一个优先级,取数据的时候优先级最高的取出 |
2. Queue简介
线程队列Queue,也称FIFO,存在队列中的数据先进先出,就好比拉肚子,吃什么拉什么~~呃呃,有点重口味
Queue常用函数:
Queue.qsize() 返回队列大小
Queue.empty() 判断队列是否为空
Queue.full() 判断队列是否满了
Queue.get([block[,timeout]]) 从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。
Queue.task_done() 从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成(与Queue.get配对使用)。
Queue.put(…[,block[,timeout]]) 向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。
Queue.join() 监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。
Queue使用:
# !usr/bin/env python
# -*- coding:utf-8 _*-
import threading
import queue
q = queue.Queue(5) # 长度,队列中最多存放5个数据
def put():
for i in range(20):
q.put(i)
print("数字%d存入队列成功" % i)
q.join() # 阻塞进程,直到所有任务完成,取多少次数据task_done多少次才行,否则最后的ok无法打印
print('ok')
def get():
for i in range(20):
value = q.get()
print("数字%d重队列中取出" % value)
q.task_done() # 必须每取走一个数据,发一个信号给join
# q.task_done() #放在这没用,因为join实际上是一个计数器,put了多少个数据,
# 计数器就是多少,每task_done一次,计数器减1,直到为0才继续执行
t1 = threading.Thread(target=put, args=())
t1.start()
t2 = threading.Thread(target=get, args=())
t2.start()
输出结果:
数字0存入队列成功
数字1存入队列成功
数字2存入队列成功
数字3存入队列成功
数字4存入队列成功
数字0重队列中取出
数字1重队列中取出
数字2重队列中取出
数字3重队列中取出
数字4重队列中取出
......
3. LifoQueue简介
与Queue相反,最后存入的数据最先取出,最先存入的数据最后取出,如果说FIFO是吃什么拉什么,那么LIFO就是吃什么吐什么,先吃的后吐,后吃的先吐~~真是重口味呀!
LifoQueue函数介绍:
函数不做过多介绍了,已经在 python线程队列Queue-FIFO 有了详细讲解,两者都属于Queue,函数都一样!
LifoQueue使用:
# !usr/bin/env python
# -*- coding:utf-8 _*-
import queue
import threading
import time
# 可以设置队列的长度 q=queue.LifoQueue(5),意味着队列中最多存放5个元素,当队列满的时候自动进入阻塞状态
q=queue.LifoQueue()
def put():
for i in range(10):
q.put(i)
print("数据%d被存入到队列中" % i)
q.join()
print('ok')
def get():
for i in range(10):
value = q.get()
print("数据%d从队列中取出" % value)
q.task_done()
t1=threading.Thread(target=put,args=())
t1.start()
t2=threading.Thread(target=get,args=())
t2.start()
输出结果:
数据0被存入到队列中
数据1被存入到队列中
数据2被存入到队列中
数据3被存入到队列中
数据4被存入到队列中
数据5被存入到队列中
数据6被存入到队列中
数据7被存入到队列中
数据8被存入到队列中
数据9被存入到队列中
数据9从队列中取出
数据8从队列中取出
数据7从队列中取出
数据6从队列中取出
数据5从队列中取出
数据4从队列中取出
数据3从队列中取出
数据2从队列中取出
数据1从队列中取出
数据0从队列中取出
ok
4. PriorityQueue简介
在数据存入的时候设置优先级,设置的值越小,优先级越高,取数据的时候默认按照优先级最高的取出
PriorityQueue函数介绍:
函数不做过多介绍了,已经在 python线程队列Queue-FIFO 有了详细讲解,两者都属于Queue,函数都一样!
PriorityQueue使用:
按优先级:不管是数字、字母、列表、元组等(字典、集合没测),使用优先级存数据取数据,队列中的数据必须是同一类型,都是按照实际数据的ascii码表的顺序进行优先级匹配,汉字是按照unicode表。
# !usr/bin/env python
# -*- coding:utf-8 _*-
import queue
import threading
import time
q = queue.PriorityQueue()
q.put([1, 'ace'])
q.put([40, 333])
q.put([3, 'afd'])
q.put([5, '4asdg'])
# 1是级别最高的,
while not q.empty(): # 不为空时候执行
print(q.get())
q = queue.PriorityQueue()
q.put('我')
q.put('你')
q.put('他')
q.put('她')
q.put('ta')
while not q.empty():
print(q.get())
运行结果:
[1, 'ace']
[3, 'afd']
[5, '4asdg']
[40, 333]
ta
他
你
她
我
五、Python进程池、线程池、回调函数
1. 池的概念
不管是线程还是进程,都不能无限制的开下去,总会消耗和占用资源。
也就是说,硬件的承载能力是有限度的,在保证高效率工作的同时应该还需要保证硬件的资源占用情况,所以需要给硬件设置一个上限来减轻硬件的压力,所以就有了池的概念。
2. 进程池与线程池的使用方法(除了使用模块不一样外其他都相同)
from concurrent.futures import ProcessPoolExecutor # 导入进程池模块
from concurrent.futures import ThreadPoolExecutor # 导入线程池模块
import os
import time
import random
# 下面以进程池为例,线程池只是使用导入模块不一样,仅此而已。
def task(name):
print('name:[%s]|进程:[%s]正在运行' % (name, os.getpid()))
time.sleep(random.randint(1, 3)) # 模拟进程运行耗费时间。
# 这一步的必要性:在创建进程时,会将代码以模块的方式从头到尾导入加载执行一遍
# (所以创建线程如果不写在main里面的话,这个py文件里面的所有代码都会从头到尾加载执行一遍
# 就会导致在创建进程的时候产生死循环。)
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 设置线程池的大小,默认等于cpu的核心数。
for i in range(10):
pool.submit(task, '进程%s' % i) # 异步提交(提交后不等待)
pool.shutdown(wait=True) # 关闭进程池入口不再提交,同时等待进程池全部运行完毕。(类似join方法,但是shutdown等待全部线程结束后在执行主进程,默认wait等于true)
print('主') # 标识一下主进程的完毕之前的语句
运行结果:
name:[进程0]|进程:[17656]正在运行
name:[进程1]|进程:[14380]正在运行
name:[进程2]|进程:[18956]正在运行
name:[进程3]|进程:[3564]正在运行
name:[进程4]|进程:[14380]正在运行
name:[进程5]|进程:[18956]正在运行
name:[进程6]|进程:[3564]正在运行
name:[进程7]|进程:[18956]正在运行
name:[进程8]|进程:[3564]正在运行
name:[进程9]|进程:[17656]正在运行
主
3.同步调用:提交任务,原地等待该任务执行完毕,拿到结果后再执行下一个任务,导致程序串行执行!
from concurrent.futures import ProcessPoolExecutor # 导入进程池模块
from concurrent.futures import ThreadPoolExecutor # 导入线程池模块
import os
import time
import random
def task(name):
print('name:[%s]|进程[%s]正在运行...' % (name, os.getpid()))
time.sleep(random.randint(1, 3))
return '拿到[%s]|进程%s的结果...' % (name, os.getpid())
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
result = [] # 创建一个空列表来搜集执行结果
for i in range(10):
res = pool.submit(task, '进程%s' % i).result() # 使用.result()方法得到每次的结果,同步调用
result.append(res)
pool.shutdown(wait=True)
for j in result:
print(j)
print('主进程')
运行结果:
name:[进程0]|进程[3376]正在运行...
name:[进程1]|进程[27124]正在运行...
name:[进程2]|进程[10176]正在运行...
name:[进程3]|进程[28636]正在运行...
name:[进程4]|进程[3376]正在运行...
name:[进程5]|进程[27124]正在运行...
name:[进程6]|进程[10176]正在运行...
name:[进程7]|进程[28636]正在运行...
name:[进程8]|进程[3376]正在运行...
name:[进程9]|进程[27124]正在运行...
拿到[进程0]|进程3376的结果...
拿到[进程1]|进程27124的结果...
拿到[进程2]|进程10176的结果...
拿到[进程3]|进程28636的结果...
拿到[进程4]|进程3376的结果...
拿到[进程5]|进程27124的结果...
拿到[进程6]|进程10176的结果...
拿到[进程7]|进程28636的结果...
拿到[进程8]|进程3376的结果...
拿到[进程9]|进程27124的结果...
主进程
异步调用:提交任务,不去等结果,继续执行
from concurrent.futures import ProcessPoolExecutor
import os
import random
import time
def task(name):
time.sleep(random.randint(1, 3))
print('name: %s 进程[%s]运行...' % (name, os.getpid()))
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task, '进程%s' % i) # 异步调用,提交后不等待结果,继续执行代码
pool.shutdown(wait=True)
print('主进程')
运行结果:
name: 进程3 进程[10016]运行...
name: 进程0 进程[12736]运行...
name: 进程1 进程[4488]运行...
name: 进程2 进程[3920]运行...
name: 进程5 进程[12736]运行...
name: 进程6 进程[4488]运行...
name: 进程4 进程[10016]运行...
name: 进程9 进程[4488]运行...
name: 进程8 进程[12736]运行...
name: 进程7 进程[3920]运行...
主进程
六、回调函数
1.什么是回调函数
上面我们在演示异步调用时候,说过提交任务不等待执行结果,继续往下执行代码,那么,执行的结果我们怎么得到呢?
可以为进程池和线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发并接收任务的返回值当做参数,这个函数就是回调函数。
代码示例:
from concurrent.futures import ThreadPoolExecutor
import time
import random
import requests
def task(url):
print('获取网站[%s]信息' % url)
response = requests.get(url) # 下载页面
time.sleep(random.randint(1, 3))
return {'url': url, 'content': response.text} # 返回结果:页面地址和页面内容
futures = []
def back(res):
res = res.result() # 取到提交任务的结果(回调函数固定写法)
res = '网站[%s]内容长度:%s' % (res.get('url'), len(res.get('content')))
futures.append(res)
return futures
if __name__ == '__main__':
urls = [
'http://www.baidu.com',
'http://www.dgtle.com/',
'https://www.bilibili.com/'
]
pool = ThreadPoolExecutor(4)
futures = []
for i in urls:
pool.submit(task, i).add_done_callback(back) # 执行完线程后,使用回调函数
pool.shutdown(wait=True)
for j in futures:
print(j)
运行结果:
获取网站[http://www.baidu.com]信息
获取网站[http://www.dgtle.com/]信息
获取网站[https://www.bilibili.com/]信息
网站[http://www.dgtle.com/]内容长度:39360
网站[https://www.bilibili.com/]内容长度:69377
网站[http://www.baidu.com]内容长度:2381
七、定时器
Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。
构造方法:Timer(interval, function, args=[], kwargs={})
interval: 指定的延迟时间
function: 要执行的方法
args/kwargs: 方法的参数
代码示例:
import time
from threading import Timer
def task(name):
print("%s 大帅比"%name)
if __name__ == '__main__':
t = Timer(2,task,kwargs={"name":"lqz"}) #2表示间隔的时间
t.start() #正常情况下会延迟两秒后执行task函数
time.sleep(1)
t.cancel() #如果Timer没有结束就会被停止执行
"一劳永逸" 的话,有是有的,而 "一劳永逸" 的事却极少