首页 > 其他分享 >互斥锁、死锁、信号量、线程、协程

互斥锁、死锁、信号量、线程、协程

时间:2022-11-21 21:34:37浏览次数:56  
标签:__ start 信号量 互斥 死锁 线程 time print import

互斥锁、死锁、信号量、线程、协程

目录

互斥锁

互斥锁对共享数据进行锁定,保证同一时刻只能有一个线程去操作。
ps:互斥锁是多个线程一起去抢,抢到锁的线程先执行,没有抢到锁的线程需要等待,等互斥锁使用完释放后,其他等待的线程再去抢这个锁。

互斥锁代码实操

threading模块中定义了Lock变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁。
import threading

g_num = 0
lock = threading.Lock()  # 循环100万次执行的任务


def task():
    lock.acquire()  # 上锁
    for i in range(2000000):
        global g_num  # 声明修改全局变量的内存地址
        g_num = g_num + 1
    print("task1", g_num)
    lock.release()  # 释放锁


def task2():
    lock.acquire()  # 上锁
    for i in range(1000000):
        global g_num  # 声明修改全局变量的内存地址
        g_num = g_num + 1
    print("task2", g_num)
    lock.release()  # 释放锁


if __name__ == '__main__':
    first_threat = threading.Thread(target=task)
    second_threat = threading.Thread(target=task2)
    first_threat.start()
    second_threat.start()
#输出结果
task1 2000000
task2 3000000

#互斥锁可以保证同意时刻只有一个线程会执行代码,能够保证全局变量的数据没有问题
#线程等待把互斥锁都是把多任务改成单任务去执行,保证了数据的准确性,但是执行性能会下降

ps:
    acquire和release方法之间的代码同一时刻只能有一个线程去操作。
    如果在调用acquire方法的时候,其他线程已经使用了这个互斥锁,那么此时acquire方法会堵塞,直到这个互斥锁释放后才能再次上锁。

总结:
	互斥锁的作用就是保证同一时刻只能有一个线程去操作共享数据,保证共享数据不会出现错误问题
    使用互斥锁的好处确保某段关键代码只能由一个线程从头到尾完整的去执行
    使用互斥锁会影响代码的执行效率,多任务改成了单任务执行
    互斥锁如果没有使用好容易出现死锁的情况

线程理论

线程是CPU的基本执行单位,里面包括真正要执行的代码
进程是一个资源单位,其中包括了这个程序需要的所有资源
类比的话,进程就像是一个工厂,里面包括了生产所需所有资源;线程像一条流水线,包含具体的执行步骤。
一个工厂可以包括多个线程,每个线程可以生产不同的产品。每个线程被启动后会自动创建一个线程,该线程会从程序入口开始执行代码(py程序入口就是被执行文件的第一行)被称之为主线程。
如果主线程在完成任务时耗时过长,可以开启子线程来辅助主线程。
from threading import Thread

a = 100


def task():
    global a
    a = 1
    print("这是给子线程执行的任务")


# 创建一个子线程
t = Thread(target=task)
# 启动这个子线程
t.start()
print("主")
print(a)

# 在多进程中,开启子进程需要消耗大量的资源,所以主进程会先比子进程执行
# 子线程的开启速度比进程快的多
# 在多线程中,子线程可以直接访问主线程的内容
# 多个线程之间是平等的,所以不存在父子关系
# 在今后的开发中,每当出现i/o阻塞 ,比较耗时的操作

import time, os


def task():
    time.sleep(2)
    print("子线程 run......")
    print(os.getpid())


t = Thread(target=task)
t.start()
# 主线程等到子线程结束
t.join()
print("over")
print(os.getpid())

创建线程的两种方式

from threading import Thread
from multiprocessing import Process
import time


# def task(name):
#     print(f'{name} is running')
#     time.sleep(0.1)
#     print(f'{name} is over')
#
# if __name__ == '__main__':
#     start_time = time.time()
    # p_list = []
    # for i in range(100):
    #     p = Process(target=task, args=('用户%s'%i,))
    #     p.start()
    #     p_list.append(p)
    # for p in p_list:
    #     p.join()
    # print(time.time() - start_time)
    # t_list = []
    # for i in range(100):
    #     t = Thread(target=task, args=('用户%s'%i,))
    #     t.start()
    #     t_list.append(t)
    # for t in t_list:
    #     t.join()
    # print(time.time() - start_time)

# t = Thread(target=task, args=('jason',))
# t.start()
# print('主线程')
"""
创建线程无需考虑反复执行的问题
"""


class MyThread(Thread):
    def run(self):
        print('run is running')
        time.sleep(1)
        print('run is over')

obj = MyThread()
obj.start()
print('主线程')

线程的诸多特性

  • join方法
  • 同进程内多个线程数据共享
  • current_thread()
  • active_count()

GIL全局解释器锁

# 官方文档对GIL的解释
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.
翻译:
在CPython中,全局解释器锁(GIL)是一个互斥锁,它可以防止多个本地线程同时执行Python字节码。这个锁是必要的,主要是因为CPython的内存管理不是线程安全的。(然而,自从GIL存在以来,其他特性已经依赖于它强制执行的保证。)

"""
1.在CPython解释器中存在全局解释器锁简称GIL
	python解释器有很多类型
	CPython JPython PyPython (常用的是CPython解释器)
2.GIL本质也是一把互斥锁 用来阻止同一个进程内多个线程同时执行(重要)
3.GIL的存在是因为CPython解释器中内存管理不是线程安全的(垃圾回收机制)
	垃圾回收机制
		引用计数、标记清除、分代回收
"""

验证GIL的存在

from threading import Thread

num = 100


def task():
    global num
    num -= 1


t_list = []
for i in range(100):
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(num)
# 0

GIL与普通互斥锁

GIL只能够确保同进程内多线程数据不会被垃圾回收机制弄乱,并不能确保程序里面的数据是否安全。
import time
from threading import Thread, Lock

num = 100


def task(mutex):
    global num
    mutex.acquire()
    count = num
    time.sleep(0.1)
    num = count - 1
    mutex.release()


mutex = Lock()
t_list = []
for i in range(100):
    t = Thread(target=task, args=(mutex,))
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(num)
# 0

python多线程是否有用

首先分情况:情况1,单个CPU/多个CPU;情况2,IO密集型(代码有IO操作)/计算密集型(代码没有IO)
1.单个CPU
IO密集型:
多进程:申请额外空间,消耗更多的资源;
多线程:消耗资源相对较少,通过多道技术
#多线程有优势

计算密集型:
多进程:申请额外的空间 消耗更多的资源(总耗时+申请空间+拷贝代码+切换)
多线程:消耗资源相对较少 通过多道技术(总耗时+切换)
#多线程有优势

2.多个CPU
IO密集型:
多进程:总耗时(单个进程的耗时+IO+申请空间+拷贝代码)
多线程:总耗时(单个进程的耗时+IO)
#多线程有优势

计算密集型:
多进程:总耗时(单个进程的耗时)
多线程:总耗时(多个进程的综合)
#多进程完胜

代码实现:
from threading import Thread
from multiprocessing import Process
import os
import time


def work():
    # 计算密集型
    res = 1
    for i in range(1, 100000):
        res *= i


if __name__ == '__main__':
    # print(os.cpu_count())  # 12  查看当前计算机CPU个数
    start_time = time.time()
    # p_list = []
    # for i in range(12):  # 一次性创建12个进程
    #     p = Process(target=work)
    #     p.start()
    #     p_list.append(p)
    # for p in p_list:  # 确保所有的进程全部运行完毕
    #     p.join()
    t_list = []
    for i in range(12):
        t = Thread(target=work)
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print('总耗时:%s' % (time.time() - start_time))  # 获取总的耗时

"""
计算密集型
    多进程:5.665567398071289
    多线程:30.233906745910645
"""

def work():
    time.sleep(2)   # 模拟纯IO操作


if __name__ == '__main__':
    start_time = time.time()
    # t_list = []
    # for i in range(100):
    #     t = Thread(target=work)
    #     t.start()
    # for t in t_list:
    #     t.join()
    p_list = []
    for i in range(100):
        p = Process(target=work)
        p.start()
    for p in p_list:
        p.join()
    print('总耗时:%s' % (time.time() - start_time))

"""
IO密集型
    多线程:0.0149583816528320
    多进程:0.6402878761291504
"""

死锁现象

	两个或两个以上的进程或线程在执行过程中,因抢夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁现象。
"""
虽然我们已经掌握了互斥锁的使用
	先抢锁 后释放锁
但是在实际项目尽量少用(你也不会用!!!)
"""
from threading import Thread, Lock
import time

mutexA = Lock()  # 类名加括号每执行一次就会产生一个新的对象
mutexB = Lock()  # 类名加括号每执行一次就会产生一个新的对象
'''
Lock()是一个类,类名加括号,每执行一次就会产生一个对象
类名加括号每执行一次就会产生一个新的对象,可以用单例模式实现
'''

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        mutexB.release()
        print(f'{self.name}释放了B锁')
        mutexA.release()
        print(f'{self.name}释放了A锁')

    def func2(self):
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        time.sleep(1)
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexA.release()
        print(f'{self.name}释放了A锁')
        mutexB.release()
        print(f'{self.name}释放了B锁')


for i in range(10):
    t = MyThread()
    t.start()

信号量

互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据。
    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
锁信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念。
信号量本质也是互斥锁 只不过它是多把锁
强调:
    信号量在不同的知识体系中,意思可能有区别
        在并发编程中,信号量就是多把互斥锁
        在django中,信号量指的是达到某个条件自动触发(中间件)
        ...
我们之前使用Lock产生的是单把锁
	类似于单间厕所
信号量相当于一次性创建多间厕所
	类似于公共厕所
    

from threading import Thread, Lock, Semaphore
import time
import random

sp = Semaphore(5)  # 一次性产生五把锁

class MyThread(Thread):
    def run(self):
        sp.acquire()
        print(self.name)
        time.sleep(random.randint(1, 3))
        sp.release()

for i in range(20):
    t = MyThread()
    t.start()

event事件

一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似于发射信号一样
from threading import Thread, Event
import time


event = Event()  # 造了一个红绿灯


def light():
    print('红灯亮着的')
    time.sleep(3)
    print('绿灯亮了')
    # 告诉等待红灯的人可以走了
    event.set()


def car(name):
    print('%s 车正在灯红灯'%name)
    event.wait()  # 等待别人给你发信号
    print('%s 车加油门飙车走了'%name)


if __name__ == '__main__':
    t = Thread(target=light)
    t.start()

    for i in range(20):
        t = Thread(target=car, args=('%s'%i, ))
        t.start()

进程池与线程池

先回顾之前TCP服务端实现并发的效果是怎么玩的
每来一个人就开设一个进程或者线程去处理
"""
无论是开设进程也好还是开设线程也好 是不是都需要消耗资源
只不过开设线程的消耗比开设进程的稍微小一点而已

我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上!!!
硬件的开发速度远远赶不上软件呐

我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它
"""
# 池的概念
"""
什么是池?
	池是用来保证计算机硬件安全的情况下最大限度的利用计算机
	它降低了程序的运行效率但是保证了计算机硬件的安全 从而让你写的程序能够正常运行
"""

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os


# pool = ThreadPoolExecutor(5)  # 池子里面固定只有五个线程
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
pool = ProcessPoolExecutor(5)
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
"""
池子造出来之后 里面会固定存在五个线程
这个五个线程不会出现重复创建和销毁的过程
池子造出来之后 里面会固定的几个进程
这个几个进程不会出现重复创建和销毁的过程

池子的使用非常的简单
你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
"""


def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**n

def call_back(n):
    print('call_back>>>:',n.result())
"""
任务的提交方式
    同步:提交任务之后原地等待任务的返回结果 期间不做任何事
    异步:提交任务之后不等待任务的返回结果 执行继续往下执行
        返回结果如何获取???
        异步提交任务的返回结果 应该通过回调机制来获取
        回调机制
            就相当于给每个异步任务绑定了一个定时炸弹
            一旦该任务有结果立刻触发爆炸
"""
if __name__ == '__main__':
    # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
    # print('主')
    t_list = []
    for i in range(20):  # 朝池子中提交20个任务
        # res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
        res = pool.submit(task, i).add_done_callback(call_back)
        # print(res.result())  # result方法   同步提交
        # t_list.append(res)
    # 等待线程池中所有的任务执行完毕之后再继续往下执行
    # pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
    # for t in t_list:
    #     print('>>>:',t.result())  # 肯定是有序的
"""
程序有并发变成了串行
任务的为什么打印的是None
res.result() 拿到的就是异步提交的任务的返回结果
"""

协程

"""
进程:资源单位
线程:执行单位
协程:这个概念完全是程序员自己意淫出来的 根本不存在
		单线程下实现并发
		我们程序员自己再代码层面上检测我们所有的IO操作
		一旦遇到IO了 我们在代码级别完成切换
		这样给CPU的感觉是你这个程序一直在运行 没有IO
		从而提升程序的运行效率
	
多道技术
	切换+保存状态
	CPU两种切换
		1.程序遇到IO
		2.程序长时间占用

TCP服务端 
	accept
	recv
	
代码如何做到
	切换+保存状态

切换
	切换不一定是提升效率 也有可能是降低效率
	IO切			提升
	没有IO切 降低
		
保存状态
	保存上一次我执行的状态 下一次来接着上一次的操作继续往后执行
	yield
"""

验证切换是否就一定提升效率
# import time
#
# # 串行执行计算密集型的任务   1.2372429370880127
# def func1():
#     for i in range(10000000):
#         i + 1
#
# def func2():
#     for i in range(10000000):
#         i + 1
#
# start_time = time.time()
# func1()
# func2()
# print(time.time() - start_time)

# 切换 + yield  2.1247239112854004
# import time
#
#
# def func1():
#     while True:
#         10000000 + 1
#         yield
#
#
# def func2():
#     g = func1()  # 先初始化出生成器
#     for i in range(10000000):
#         i + 1
#         next(g)
#
# start_time = time.time()
# func2()
# print(time.time() - start_time)

协程实现并发

# 服务端
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn


def communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def server(ip, port):
    server = socket.socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        spawn(communication, conn)


if __name__ == '__main__':
    g1 = spawn(server, '127.0.0.1', 8080)
    g1.join()

    
# 客户端
from threading import Thread, current_thread
import socket


def x_client():
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 0
    while True:
        msg = '%s say hello %s'%(current_thread().name,n)
        n += 1
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=x_client)
        t.start()

"""
理想状态:
	我们可以通过
	多进程下面开设多线程
	多线程下面再开设协程序
	从而使我们的程序执行效率提升
"""

标签:__,start,信号量,互斥,死锁,线程,time,print,import
From: https://www.cnblogs.com/zhiliaowang/p/16913427.html

相关文章