首页 > 系统相关 >python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器

python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器

时间:2024-09-26 14:23:21浏览次数:3  
标签:__ name get python 信号量 队列 线程 进程

一、python线程死锁与递归锁

死锁现象

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。

此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

代码示例:

from threading import Thread,Lock
import time
 
mutexA = Lock()
mutexB = Lock()
 
class MyThread(Thread):
    def run(self):
        self.task1()
        self.task2()
 
 
    def task1(self):
        mutexA.acquire()
        print('%s task1 get A' %self.name)
 
        mutexB.acquire()
        print('%s task1 get B' % self.name)
        mutexB.release()
 
        mutexA.release()
 
    def task2(self):
        mutexB.acquire()
        print('%s task2 get B' % self.name)
        time.sleep(1)  # Thread-2 拿到执行权,执行get A出现死锁,此时thread2需要B锁,而thread1占用,与此同时,thread1需要A锁,thread2占用
        mutexA.acquire()
        print('%s task2 get A' % self.name)
 
        mutexA.release()
        mutexB.release()
 
 
if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()

输出结果:

Thread-1 task1 get A
Thread-1 task1 get B
Thread-1 task2 get B
Thread-2 task1 get A # 出现死锁,整个程序被阻塞

2.递归锁

解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。

直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次

代码示例:

from threading import Thread,RLock
import time
 
mutexA = mutexB = RLock()
 
class MyThread(Thread):
    def run(self):
        self.task1()
        self.task2()
 
 
    def task1(self):
        mutexA.acquire()
        print('%s task1 get A' %self.name)
 
        mutexB.acquire()
        print('%s task1 get B' % self.name)
        mutexB.release()
 
        mutexA.release()
        time.sleep(1) # Thread-2 拿到执行权,,此时counter=0,thread2执行task1
 
    def task2(self):
        mutexB.acquire()
        print('%s task2 get B' % self.name)
 
        mutexA.acquire()
        print('%s task2 get A' % self.name)
 
        mutexA.release()
        mutexB.release()
 
 
if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()

输出结果:

Thread-1 task1 get A
Thread-1 task1 get B
Thread-2 task1 get A
Thread-2 task1 get B
Thread-3 task1 get A
Thread-3 task1 get B
Thread-4 task1 get A
Thread-4 task1 get B
Thread-5 task1 get A
Thread-5 task1 get B
Thread-6 task1 get A
......

二、信号量

1.什么是信号量

信号量也是一种锁。

信号量的主要用途是用来控制线程的并发量的,Semaphore管理一个内置的计数器,每调用一次acquire()方法时,计数器-1,每调用一次release()方法时,内部计数器+1。

不过需要注意的是,Semaphore内部的计数器不能小于0!当它内部的计数器等于0的时候,这个线程会被锁定,进入阻塞状态,直到其他线程去调用release方法。

2.用处

信号量`semaphore` 是用于控制进入数量的锁。有哪些应用场景呢,比如说在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,
如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件)。
又比如在做爬虫的时候,有时候爬取速度太快了,会导致被网站禁止,所以这个时候就需要控制爬虫爬取网站的频率。

3.信号量使用的示例

semaphore内部维护了一个条件变量condition,构造函数是:

Semaphore(value=1) # value设置是内部维护的计数器的大小,默认为1.

主要有两个方法:

每当调用acquire()时,内置计数器-1,直到为0的时候阻塞
每当调用release()时,内置计数器+1,并让某个线程的acquire()从阻塞变为不阻塞

用爬虫来举例,假如说有一个UrlProducer线程,爬取url,多个htmlSpider线程,爬取url对应的网页。如果直接开20个htmlSpider线程,20个线程是同时执行的,现在要限制同时执行能执行三个,就可以使用信号量来控制:

import threading
import time
class htmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release() # 内部维护的计数器加1,并通知内部维护的conditon通知acquire

class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20):
            self.sem.acquire() # 内部维护的计数器减1,到0就会阻塞
            html_thread = htmlSpider("http://baidu.com/{}".format(i), self.sem)
            html_thread.start()

if __name__ == "__main__":
    sem = threading.Semaphore(3) #设置同时最多3个
    url_producer = UrlProducer(sem)
    url_producer.start()

从结果可以看出,每次都几乎是三个同时的完成任务。

三、Event事件

1. 什么是事件

同进程的一样,线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,
这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,
它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 
而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,
它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

2. Event几种方法

event.isSet()

返回event的状态值;

event.wait()

如果 event.isSet()==False将阻塞线程;

event.set()

设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统度;

event.clear()

恢复event的状态值为False。

3.代码示例

from threading import Thread,Event
import time

event=Event()

def light():
    print('红灯正亮着')
    time.sleep(3)
    event.set() #绿灯亮

def car(name):
    print('车%s正在等绿灯' %name)
    event.wait() #等灯绿 此时event为False,直到event.set()将其值设置为True,才会继续运行.
    print('车%s通行' %name)

if __name__ == '__main__':
    # 红绿灯
    t1=Thread(target=light)
    t1.start()
    # 车
    for i in range(10):
        t=Thread(target=car,args=(i,))
        t.start()

四、线程队列Queue

1. 队列分类

Queue

哪个数据先存入,取数据的时候先取哪个数据,同生活中的排队买东西

LifoQueue

哪个数据最后存入的,取数据的时候先取,同生活中手枪的弹夹,子弹最后放入的先打出

PriorityQueue

存入数据时候加入一个优先级,取数据的时候优先级最高的取出

2. Queue简介

线程队列Queue,也称FIFO,存在队列中的数据先进先出,就好比拉肚子,吃什么拉什么~~呃呃,有点重口味

Queue常用函数:

Queue.qsize()    返回队列大小

Queue.empty()  判断队列是否为空

Queue.full()        判断队列是否满了

Queue.get([block[,timeout]])  从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。

Queue.task_done()  从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成(与Queue.get配对使用)。

Queue.put(…[,block[,timeout]])  向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。

Queue.join()  监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。

Queue使用:

# !usr/bin/env python
# -*- coding:utf-8 _*-

import threading
import queue

q = queue.Queue(5)  # 长度,队列中最多存放5个数据


def put():
    for i in range(20):
        q.put(i)
        print("数字%d存入队列成功" % i)
    q.join()  # 阻塞进程,直到所有任务完成,取多少次数据task_done多少次才行,否则最后的ok无法打印
    print('ok')


def get():
    for i in range(20):
        value = q.get()
        print("数字%d重队列中取出" % value)
        q.task_done()  # 必须每取走一个数据,发一个信号给join
    # q.task_done()   #放在这没用,因为join实际上是一个计数器,put了多少个数据,
    # 计数器就是多少,每task_done一次,计数器减1,直到为0才继续执行


t1 = threading.Thread(target=put, args=())
t1.start()
t2 = threading.Thread(target=get, args=())
t2.start()

输出结果:

数字0存入队列成功
数字1存入队列成功
数字2存入队列成功
数字3存入队列成功
数字4存入队列成功
数字0重队列中取出
数字1重队列中取出
数字2重队列中取出
数字3重队列中取出
数字4重队列中取出
......

3. LifoQueue简介

与Queue相反,最后存入的数据最先取出,最先存入的数据最后取出,如果说FIFO是吃什么拉什么,那么LIFO就是吃什么吐什么,先吃的后吐,后吃的先吐~~真是重口味呀!

LifoQueue函数介绍:

函数不做过多介绍了,已经在 python线程队列Queue-FIFO 有了详细讲解,两者都属于Queue,函数都一样!

LifoQueue使用: 

# !usr/bin/env python
# -*- coding:utf-8 _*-

import queue
import threading
import time
 
# 可以设置队列的长度 q=queue.LifoQueue(5),意味着队列中最多存放5个元素,当队列满的时候自动进入阻塞状态
q=queue.LifoQueue()
def put():
    for i in range(10):
        q.put(i)
        print("数据%d被存入到队列中" % i)
    q.join()
    print('ok')
 
def get():
    for i in range(10):
        value = q.get()
        print("数据%d从队列中取出" % value)
        q.task_done()
 
t1=threading.Thread(target=put,args=())
t1.start()
t2=threading.Thread(target=get,args=())
t2.start()

输出结果:

数据0被存入到队列中
数据1被存入到队列中
数据2被存入到队列中
数据3被存入到队列中
数据4被存入到队列中
数据5被存入到队列中
数据6被存入到队列中
数据7被存入到队列中
数据8被存入到队列中
数据9被存入到队列中
数据9从队列中取出
数据8从队列中取出
数据7从队列中取出
数据6从队列中取出
数据5从队列中取出
数据4从队列中取出
数据3从队列中取出
数据2从队列中取出
数据1从队列中取出
数据0从队列中取出
ok

4. PriorityQueue简介

在数据存入的时候设置优先级,设置的值越小,优先级越高,取数据的时候默认按照优先级最高的取出

PriorityQueue函数介绍:

函数不做过多介绍了,已经在 python线程队列Queue-FIFO 有了详细讲解,两者都属于Queue,函数都一样!

PriorityQueue使用:

按优先级:不管是数字、字母、列表、元组等(字典、集合没测),使用优先级存数据取数据,队列中的数据必须是同一类型,都是按照实际数据的ascii码表的顺序进行优先级匹配,汉字是按照unicode表。

 

# !usr/bin/env python
# -*- coding:utf-8 _*-
 
import queue
import threading
import time
 
q = queue.PriorityQueue()
q.put([1, 'ace'])
q.put([40, 333])
q.put([3, 'afd'])
q.put([5, '4asdg'])
# 1是级别最高的,
while not q.empty():  # 不为空时候执行
    print(q.get())
 
 
 
q = queue.PriorityQueue()
q.put('我')
q.put('你')
q.put('他')
q.put('她')
q.put('ta')
while not q.empty():
    print(q.get())

运行结果: 

[1, 'ace']
[3, 'afd']
[5, '4asdg']
[40, 333]
ta
他
你
她
我

五、Python进程池、线程池、回调函数

1. 池的概念

不管是线程还是进程,都不能无限制的开下去,总会消耗和占用资源。

也就是说,硬件的承载能力是有限度的,在保证高效率工作的同时应该还需要保证硬件的资源占用情况,所以需要给硬件设置一个上限来减轻硬件的压力,所以就有了池的概念。

2. 进程池与线程池的使用方法(除了使用模块不一样外其他都相同)

from concurrent.futures import ProcessPoolExecutor  # 导入进程池模块
from concurrent.futures import ThreadPoolExecutor # 导入线程池模块
import os
import time
import random

# 下面以进程池为例,线程池只是使用导入模块不一样,仅此而已。
def task(name):
    print('name:[%s]|进程:[%s]正在运行' % (name, os.getpid()))
    time.sleep(random.randint(1, 3))   # 模拟进程运行耗费时间。

# 这一步的必要性:在创建进程时,会将代码以模块的方式从头到尾导入加载执行一遍
# (所以创建线程如果不写在main里面的话,这个py文件里面的所有代码都会从头到尾加载执行一遍
# 就会导致在创建进程的时候产生死循环。)
if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)  # 设置线程池的大小,默认等于cpu的核心数。
    for i in range(10):
        pool.submit(task, '进程%s' % i)  # 异步提交(提交后不等待)
    
    pool.shutdown(wait=True)  # 关闭进程池入口不再提交,同时等待进程池全部运行完毕。(类似join方法,但是shutdown等待全部线程结束后在执行主进程,默认wait等于true)
    print('主') # 标识一下主进程的完毕之前的语句

运行结果:

name:[进程0]|进程:[17656]正在运行
name:[进程1]|进程:[14380]正在运行
name:[进程2]|进程:[18956]正在运行
name:[进程3]|进程:[3564]正在运行
name:[进程4]|进程:[14380]正在运行
name:[进程5]|进程:[18956]正在运行
name:[进程6]|进程:[3564]正在运行
name:[进程7]|进程:[18956]正在运行
name:[进程8]|进程:[3564]正在运行
name:[进程9]|进程:[17656]正在运行
主

3.同步调用:提交任务,原地等待该任务执行完毕,拿到结果后再执行下一个任务,导致程序串行执行!

from concurrent.futures import ProcessPoolExecutor  # 导入进程池模块
from concurrent.futures import ThreadPoolExecutor # 导入线程池模块
import os
import time
import random


def task(name):
    print('name:[%s]|进程[%s]正在运行...' % (name, os.getpid()))
    time.sleep(random.randint(1, 3))
    return '拿到[%s]|进程%s的结果...' % (name, os.getpid())

if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)
    result = []  # 创建一个空列表来搜集执行结果
    for i in range(10):
        res = pool.submit(task, '进程%s' % i).result()  # 使用.result()方法得到每次的结果,同步调用
        result.append(res)
    pool.shutdown(wait=True)
    for j in result:
        print(j)
    print('主进程')

运行结果:

name:[进程0]|进程[3376]正在运行...
name:[进程1]|进程[27124]正在运行...
name:[进程2]|进程[10176]正在运行...
name:[进程3]|进程[28636]正在运行...
name:[进程4]|进程[3376]正在运行...
name:[进程5]|进程[27124]正在运行...
name:[进程6]|进程[10176]正在运行...
name:[进程7]|进程[28636]正在运行...
name:[进程8]|进程[3376]正在运行...
name:[进程9]|进程[27124]正在运行...
拿到[进程0]|进程3376的结果...
拿到[进程1]|进程27124的结果...
拿到[进程2]|进程10176的结果...
拿到[进程3]|进程28636的结果...
拿到[进程4]|进程3376的结果...
拿到[进程5]|进程27124的结果...
拿到[进程6]|进程10176的结果...
拿到[进程7]|进程28636的结果...
拿到[进程8]|进程3376的结果...
拿到[进程9]|进程27124的结果...
主进程

异步调用:提交任务,不去等结果,继续执行

from concurrent.futures import ProcessPoolExecutor
import os
import random
import time

def task(name):
    time.sleep(random.randint(1, 3))
    print('name: %s 进程[%s]运行...' % (name, os.getpid()))


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)
    for i in range(10):
        pool.submit(task, '进程%s' % i)   # 异步调用,提交后不等待结果,继续执行代码

    pool.shutdown(wait=True)
    print('主进程')

运行结果:

name: 进程3 进程[10016]运行...
name: 进程0 进程[12736]运行...
name: 进程1 进程[4488]运行...
name: 进程2 进程[3920]运行...
name: 进程5 进程[12736]运行...
name: 进程6 进程[4488]运行...
name: 进程4 进程[10016]运行...
name: 进程9 进程[4488]运行...
name: 进程8 进程[12736]运行...
name: 进程7 进程[3920]运行...
主进程

六、回调函数

1.什么是回调函数

上面我们在演示异步调用时候,说过提交任务不等待执行结果,继续往下执行代码,那么,执行的结果我们怎么得到呢?

可以为进程池和线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发并接收任务的返回值当做参数,这个函数就是回调函数。

代码示例:

from concurrent.futures import ThreadPoolExecutor
import time
import random
import requests


def task(url):
    print('获取网站[%s]信息' % url)
    response = requests.get(url)  # 下载页面
    time.sleep(random.randint(1, 3))
    return {'url': url, 'content': response.text}  # 返回结果:页面地址和页面内容

futures = []
def back(res):
    res = res.result()  # 取到提交任务的结果(回调函数固定写法)
    res = '网站[%s]内容长度:%s' % (res.get('url'), len(res.get('content')))
    futures.append(res)
    return futures

if __name__ == '__main__':
    urls = [
        'http://www.baidu.com',
        'http://www.dgtle.com/',
        'https://www.bilibili.com/'
    ]
    pool = ThreadPoolExecutor(4)
    futures = []
    for i in urls:
        pool.submit(task, i).add_done_callback(back)  # 执行完线程后,使用回调函数

    pool.shutdown(wait=True)
    for j in futures:
        print(j)

运行结果:

获取网站[http://www.baidu.com]信息
获取网站[http://www.dgtle.com/]信息
获取网站[https://www.bilibili.com/]信息
网站[http://www.dgtle.com/]内容长度:39360
网站[https://www.bilibili.com/]内容长度:69377
网站[http://www.baidu.com]内容长度:2381

七、定时器

Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。

构造方法:Timer(interval, function, args=[], kwargs={}) 

interval: 指定的延迟时间 
function: 要执行的方法 
args/kwargs: 方法的参数

代码示例:

import time
from threading import Timer


def task(name):
    print("%s 大帅比"%name)

if __name__ == '__main__':
    t = Timer(2,task,kwargs={"name":"lqz"})  #2表示间隔的时间
    t.start()   #正常情况下会延迟两秒后执行task函数
    time.sleep(1)
    t.cancel()  #如果Timer没有结束就会被停止执行

"一劳永逸" 的话,有是有的,而 "一劳永逸" 的事却极少



标签:__,name,get,python,信号量,队列,线程,进程
From: https://blog.51cto.com/u_8901540/12118821

相关文章

  • vscode设置python解释器以及函数无法点击跳转问题
    1.下载插件1.1Python1.2Pylance1.3Remote-SSH2.设置本地/远程python解释器2.1本地设置2-1-1设置解释器路径设置自定义python解释器路径,mac快捷键command+p>python:selectinterpreter选择或者输入解释器2-1-2查看设置结果设置完python-venv路径后,打开......
  • 基于SqlAlchemy+Pydantic+FastApi的Python开发框架的路由处理
    在前面随笔《基于SqlAlchemy+Pydantic+FastApi的Python开发框架 》中介绍了框架总体的内容,其中主要的理念就是通过抽象接口的方式,实现代码的重用,提高开发效率。本篇随笔深入介绍一下FastApi的路由处理部分的内容,通过基类继承的方式,我们可以简化路由器(或者叫WebAPI控制器)的基础......
  • Python学习
    Python学习1.执行python程序大多数程序语言,入门编程代码是"HelloWorld!",以下代码为使用Python输出(打印)"HelloWorld!"print("HelloWorld!")可以理解为想打印什么东西只需要把print("")写好,把想打印的内容放在""里即可2、打印多个HelloWorld!需求:打印1000......
  • 深入解析:Unicode 与 UTF-8 在 Python 中的秘密武器
    引言字符编码是计算机科学中的一个重要领域,它定义了如何将人类可读的文字转换为机器能够理解的形式。随着互联网的发展,不同的语言和符号需要在全球范围内共享,这就对字符编码提出了更高的要求。Unicode标准就是为了满足这种需求而诞生的,它提供了一套统一的字符集,几乎涵盖了所有现代......
  • Python中的“with”语句:开启优雅文件操作的新篇章
    引言在日常开发工作中,我们经常需要与各种类型的文件打交道。无论是简单的日志记录还是复杂的文本分析,正确地打开、读取、写入以及关闭文件都是必不可少的步骤。然而,在传统的文件操作模式下(如使用open()函数),一旦在操作过程中发生异常,可能会导致文件未被正确关闭,进而引发资源泄露等......
  • Python设计模式速通
    目录先导对象的事情类的事情方法面对对象程序设计的几个基本要点封装多态继承抽象组合面对对象程序设计的准则开放/封闭原则控制反转原则接口隔离原则单一职责原则替换原则规定三大模式创建型模式结构型模式行为型模式先导我们开始设计模式之前,首先第......
  • 2024.9.25 Python,单词替换,优美的排列 II,sort的用法前K个高频单词,广度优先搜索腐烂的橘
    1.单词替换在英语中,我们有一个叫做词根(root)的概念,可以词根后面添加其他一些词组成另一个较长的单词——我们称这个词为衍生词(derivative)。例如,词根help,跟随着继承词“ful”,可以形成新的单词“helpful”。现在,给定一个由许多词根组成的词典dictionary和......
  • Java线程池详解
    目录前言线程池概述线程池的实现线程池的构造拒绝策略任务队列线程池的工作原理线程池的监控Executors线程池工厂自定义线程池使用线程池的好处应用场景总结本文详细探讨了线程池在并发编程领域的应用,介绍了ThreadPoolExecutor的核心组件、工作原理,线程池的构造......
  • Python日志管理之Loguru
    1.安装pipinstallloguru2.快速使用fromloguruimportloggerlogger.add("my_log.log",rotation="10MB")#自动分割日志文件logger.info("这是一个信息级别的日志")3.日志器配置方式1.导入即用fromloguruimportlogger,有且只有1个日志器对象,简化配置复杂性2.日志器配......
  • 计算机专业毕设选题推荐-基于python的企业工作考勤管理系统 企业员工考勤系统
    精彩专栏推荐订阅:在下方主页......