多线程
1. 什么是线程
就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程
车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线
流水线的工作需要电源,电源就相当于cpu
所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。
2.什么事多线程
多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源
3. 为什么要用多线程
多线程指的是,在一个进程中开启多个线程,简单的讲:如果多个任务共用一块地址空间,那么必须在一个进程内开启多个线程。详细的讲分为4点:
-
多线程共享一个进程的地址空间
-
线程比进程更轻量级,线程比进程更容易创建可撤销,在许多操作系统中,创建一个线程比创建一个进程要快10-100倍,在有大量线程需要动态和快速修改时,这一特性很有用
-
若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量的I/O处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。
-
在多cpu系统中,为了最大限度的利用多核,可以开启多个线程,比开进程开销要小的多。(这一条并不适用于python)
4. 开启线程的两种方式
4.1方式一
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print(f'{name} say hello')
if __name__ == '__main__':
t=Thread(target=sayhi,args=('serein',))
t.start()
print('主线程')
4.2 方式二
from threading import Thread
import time
class Sayhi(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
time.sleep(2)
print(f'{self.name} say hello')
if __name__ == '__main__':
t = Sayhi('serein')
t.start()
print('主线程')
5. 线程相关的其他方法
Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。
threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumeate())有相同的结果。
from threading import Thread
import threading
from multiprocessing import Process
import os
def work():
import time
time.sleep(3)
print(threading.current_thread().getName())
if __name__ == '__main__':
#在主进程下开启线程
t=Thread(target=work)
t.start()
print(threading.current_thread().getName())
print(threading.current_thread()) #主线程
print(threading.enumerate()) #连同主线程在内有两个运行的线程
print(threading.active_count())
print('主线程/主进程')
'''
打印结果:
MainThread
<_MainThread(MainThread, started 140735268892672)>
[<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
主线程/主进程
Thread-1
'''
6. 守护线程
无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
需要强调的是:运行完毕并非终止运行
#1.对主进程来说,运行完毕指的是主进程代码运行完毕
#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
#3 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束
#4 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print(f'{name} say hello')
if __name__ == '__main__':
t=Thread(target=sayhi,args=('serin',))
t.setDaemon(True) #必须在t.start()之前设置
t.start()
print('主线程')
print(t.is_alive())
'''
主线程
True
7. 互斥锁
注意:
- 线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来
- join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高
示例:
# 锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁
import threading
import time
# 创建一个锁对象
my_lock = threading.Lock()
# 共享变量
shared_variable = 0
def increment():
global shared_variable
for _ in range(1000000):
# 申请锁
my_lock.acquire()
try:
shared_variable += 1
finally:
# 释放锁
my_lock.release()
# 创建两个线程
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)
# 启动线程
thread1.start()
thread2.start()
# 等待两个线程完成
thread1.join()
thread2.join()
print("Final value:", shared_variable) # 2000000
7.1 GIL和互斥锁的区别
共识:
- 锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据
- 保护不同的数据就应该加不同的锁
区别:
- GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据)
- 后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock
7.2 互斥锁与join的区别
# 不加锁:并发执行,速度快,数据不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
global n
print(f'{current_thread().getName()} is running')
temp=n
time.sleep(0.5)
n=temp-1
if __name__ == '__main__':
n=100
lock=Lock()
threads=[]
start_time=time.time()
for i in range(100):
t=Thread(target=task)
threads.append(t)
t.start()
for t in threads:
t.join()
stop_time=time.time()
print(f'主:{stop_time-start_time} n:{n}')
'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''
#不加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
from threading import current_thread,Thread,Lock
import os,time
def task():
#未加锁的代码并发运行
time.sleep(3)
print(f'{current_thread().getName()} start to run' )
global n
#加锁的代码串行运行
lock.acquire()
temp=n
time.sleep(0.5)
n=temp-1
lock.release()
if __name__ == '__main__':
n=100
lock=Lock()
threads=[]
start_time=time.time()
for i in range(100):
t=Thread(target=task)
threads.append(t)
t.start()
for t in threads:
t.join()
stop_time=time.time()
print(f'主:{stop_time-start_time} n:{n}')
'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''
#有的同学可能有疑问:既然加锁会让运行变成串行,那么我在start之后立即使用join,就不用加锁了啊,也是串行的效果啊
#没错:在start之后立刻使用jion,肯定会将100个任务的执行变成串行,毫无疑问,最终n的结果也肯定是0,是安全的,但问题是
#start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的
#单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
time.sleep(3)
print(f'{current_thread().getName()} start to run' )
global n
temp=n
time.sleep(0.5)
n=temp-1
if __name__ == '__main__':
n=100
lock=Lock()
start_time=time.time()
for i in range(100):
t=Thread(target=task)
t.start()
t.join()
stop_time=time.time()
print(f'主:{stop_time-start_time} n:{n}')
'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0
'''
8.死锁和递归锁
死锁:
- 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
示例:
from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print(f'{self.name} 拿到A锁')
mutexB.acquire()
print(f'{self.name} 拿到B锁')
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print(f'{self.name} 拿到B锁')
time.sleep(2)
mutexA.acquire()
print(f'{self.name} 拿到A锁')
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''
解决方案:
-
递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
# 一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
import threading
import time
mutexA = mutexB = threading.RLock()
class MyThread(threading.Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
with mutexA:
print(f'{self.name} 拿到A锁')
with mutexB:
print(f'{self.name} 拿到B锁')
def func2(self):
with mutexB:
print(f'{self.name} 拿到B锁')
time.sleep(2)
with mutexA:
print(f'{self.name} 拿到A锁')
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
9.进程池和线程池
import time
# 池就是一个容量
# 线程池 : ThreadPoolExecutor
# 进程池 : ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# 开设多进程的时候---> 开一个进程需要开一快资源
# 五个进程 --> 五块资源
# 开出来五块空间 ---> 五个进程需要运行就将这个五个放进去
# 第六个人等着
# 池子造出来后 里面固定存在五个线程
# 这五个线程不会存在出现重复创建和销毁的过程
def run_task(i):
print(f'当前参数 {i} 开始')
time.sleep(1)
print(f'当前参数 {i} 结束')
return i ** 2
def call_back(n):
print(f"则是call_back 的参数 {n} ")
print(f"则是call_back 的参数 {n.result()} ")
return n.result() * 2
def read_data(path):
...
return path
# 线程池
def main_thread():
result_list = []
# 【1】先声明池子容量
pool = ThreadPoolExecutor(5)
# 【2】向池子中添加任务
for i in range(1, 10):
res = pool.submit(run_task, i).add_done_callback(call_back)
print(res)
# 返回值对象
# <Future at 0x2ce493e2e50 state=running>
# print(f'这是 pool 的返回值 res {res}')
# 获取到异步函数中的返回值 res.result()
# print(f'这是 pool 的返回值 {res.result()}')
# result_list.append(res)
# # 等待所有子进程完成 ,拿到结果
# pool.shutdown()
# for res in result_list:
# print(f'这是 pool 的返回值 {res.result()}')
# 进程池
def main_process():
# 【1】先声明池子容量
pool = ProcessPoolExecutor(5)
# 【2】向池子中添加任务
for i in range(1, 10):
pool.submit(run_task, i)
if __name__ == '__main__':
main_thread()
# main_process()
标签:__,Thread,threading,编程,并发,线程,time,print,多线程
From: https://www.cnblogs.com/Formerly/p/17982902