多进程实现TCP服务端并发
import socket from multiprocessing import Process def get_server(): server = socket.socket() server.bind(('127.0.0.1', 8081)) server.listen(5) return server def get_talk(sock): while True: data = sock.recv(1024) print(data.decode('utf8')) sock.send(data.upper()) if __name__ == '__main__': server = get_server() while True: sock, addr = server.accept() # 开设多进程进行任务 p = Process(target=get_talk, args=(sock,)) p.start()
互斥锁
我们使用互斥锁的目的是为了,避免多个程序操作一份数据时发生了错乱,比如秒杀,抢票 互斥锁会将并发变为串行: 程序的执行效率降低了,但是也保证了数据安全 互斥锁只出现在多个程序操作数据的地方 其他位置要尽量避免添加 # 互斥锁分类 行锁 锁表格一行 表锁 锁表格 乐观锁 执行乐观,不上锁 悲观锁 执行悲观,执行就上锁
线程理论
线程与进程的区别 进程是资源单元: 进程负责给内部线程提供相应的资源(类似于车间) 线程是执行单位: 线程负责真正的功能(流水线)
创建线程的两种方式
第一种: from threading import Thread import time def task(name): print(f'{name}正在运行') time.sleep(3) print(f'{name}运行结束') if __name__ == '__main__': t = Thread(target=task,args=('tom',)) t.start() print('主进程')
第二种:
from threading import Thread
import time
class MyThread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
print(f'{self.name}正在运行')
time.sleep(3)
print(f'{self.name}运行结束')
obj = MyThread('tom')
obj.start()
print('主线程')
结果:
线程的诸多特性
# 1.join方法 """ 主线程等到子线程运行结束之后再运行""" from threading import Thread import time def task(): print('正在执行') time.sleep(3) print('运行结束') t = Thread(target=task) t.start() t.join() print('主线程') # 注意join添加的位置 # 2.同进程内多个线程数据共享 from threading import Thread money = 1000 def func(): global money money = 666 t = Thread(target=func) t.start() t.join() # 确保线程运行完毕,再查找money,更具有说服性 print(money) # 666 # 3.current_thread() 线程名: from threading import Thread,current_thread current_thread().name # 查看线程名 主进程的标识:MainThread 子进程的标识:Thread-N (随机的编号) # 4.active_count() 查看进程下的线程数 # 5.进程号 同一个进程下开设的多个线程拥有相同的进程号
GIL全局解释器锁
对于GIL的官方文献: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) 翻译: # 1.GIL的研究是Cpython解释器的特点 不是python语言的特点 """python解释器也是由编程语言写出来的""" Cpython 用C写出来的 Jpython 用Java写出来的 Pypython 用python写出来的 # 2.GIL本质也是一把互斥锁 # 3.GIL的存在使得同一个进程下的多个线程无法同时执行(关键) 单进程下的多线程无法利用多核优势 效率低! # 4.GIL的存在主要是因为cpython解释器中垃圾回收机制不是线程安全的
(垃圾回收机制)
验证GIL存在
GIL:python解释器层面的一把锁 # 并发 from threading import Thread num = 100 def task(): global num num -= 1 # 通过global让num每次都-1 t_list = [] for i in range(100): # 循环执行100个进程 t = Thread(target=task) t.start() # 开始执行 t_list.append(t) # 每次通过global获取的新数字添加到列表中保存 for t in t_list: # 循环t_list内拿到的数 t.join() # 主线程等待子线程代码运行结束之后在往下运行 print(num) # 0 """ 大致流程为: 一百个进程抢一把锁,抢到之后执行进程操作,全局中的num每抢一次就自减一, 当前进程运行完毕。其他进程又进行此操作,随后重复该操作 """ # GIL的特点: # 并发 from threading import Thread import time num = 100 def task(): global num tmp = num time.sleep(0.1) # 进入IO操作放出GIL锁 num = tmp - 1 t_list = [] for i in range(100): # 循环执行100个进程 t = Thread(target=task) t.start() # 开始执行 t_list.append(t) # 每次通过global获取的新数字添加到列表中保存 for t in t_list: # 循环t_list内拿到的数 t.join() # 主线程等待子线程代码运行结束之后在往下运行 print(num) # 99 """ 大致流程: 一百个进程抢一把锁,抢到之后执行进程操作,全局中的num每抢一次先进行 IO操作,放出抢到的锁,其他进程又可以进行抢锁。每个程序睡眠结束后自减1, 没有锁,所以每次不会继承上一次的进程的num,得到的数仍为num-1之后的数。 """ """ GIL不会影响程序层面的数据也不会保证它的修改是安全的要想保证得自己加锁 """ # 将上面的并发变为串行 from threading import Thread,Lock import time money = 100 mutex = Lock() def task(): mutex.acquire() # 再给进程加一把互斥锁 global money tmp = money time.sleep(0.1) # 进行IO操作 money = tmp - 1 mutex.release() t_list = [] for i in range(100): t = Thread(target=task) t.start() t_list.append(t) for t in t_list: t.join() print(money)
GIL与普通互斥锁
""" GIL只能够确保同进程内多线程数据不会被垃圾回收机制弄乱 ,并不能确保程序里面的数据是否安全 """ import time from threading import Thread,Lock num = 100 def task(mutex): global num mutex.acquire() count = num time.sleep(0.1) num = count - 1 mutex.release() mutex = Lock() t_list = [] for i in range(100): t = Thread(target=task,args=(mutex,)) t.start() t_list.append(t) for t in t_list: t.join() print(num)
验证python的多线程是否有用
# 1.单个cpu的IO密集型 多进程: 申请额外的空间,消耗的资源更多 多线程: 资源消耗相比较少,通过多道技术 """此类型中多线程具有一定优势""" # 2.单个cpu的计算密集型 多进程: 申请额外的空间 消耗更多的资源 多线程: 资源消耗相比较少,通过多道技术 """此类型中多线程具有一定优势""" # 3.多个cpu的IO密集型 多进程: 总耗时(单个进程的耗时+IO+申请空间+拷贝代码) 多线程: 总耗时(单个进程的耗时+IO) """此类型中多线程具有一定优势""" from threading import Thread from multiprocessing import Process import os import time def work(): time.sleep(2) # 模拟纯IO操作 if __name__ == '__main__': start_time = time.time() # 多线程 t_list = [] for i in range(100): t = Thread(target=work) t.start() for t in t_list: t.join() print('总耗时:%s' % (time.time() - start_time)) # 多进程 p_list = [] for i in range(100): p = Process(target=work) p.start() for p in p_list: p.join() print('总耗时:%s' % (time.time() - start_time)) """ IO密集型 多线程:0.0149583816528320 多进程:0.6402878761291504 """ # 4.多个cpu的计算密集型 多进程: 总耗时(单个进程的耗时) 多线程: 总耗时(多个进程的综合) """此类型中的多进程优势很大""" from threading import Thread from multiprocessing import Process import os import time def work(): # 计算密集型 res = 1 for i in range(1, 100000): res *= i if __name__ == '__main__': start_time = time.time() # 多进程 p_list = [] for i in range(12): # 一次性创建12个进程 p = Process(target=work) p.start() p_list.append(p) for p in p_list: # 确保所有的进程全部运行完毕 p.join() # 多线程 t_list = [] for i in range(12): t = Thread(target=work) t.start() t_list.append(t) for t in t_list: t.join() print('总耗时:%s' % (time.time() - start_time)) # 获取总的耗时 """ 计算密集型 多进程:5.665567398071289 多线程:30.233906745910645 """
死锁现象
# 1.为什么会出现死锁现象? 在抢锁的时候 通过一些IO操作形成了闭环,互拿对方的锁 Lock() # 类名加括号每执行一次就会产生一个新的对象 acquire() release() from threading import Thread, Lock import time mutexA = Lock() mutexB = Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print(f'{self.name}抢到了A锁') mutexB.acquire() print(f'{self.name}抢到了B锁') mutexB.release() print(f'{self.name}释放了B锁') mutexA.release() print(f'{self.name}释放了A锁') def func2(self): mutexB.acquire() print(f'{self.name}抢到了B锁') time.sleep(1) mutexA.acquire() print(f'{self.name}抢到了A锁') mutexA.release() print(f'{self.name}释放了A锁') mutexB.release() print(f'{self.name}释放了B锁') for i in range(10): t = MyThread() t.start()
信号量
# 本质: 互斥锁,是多把互斥锁 在并发编程中 : 信号量就是多把互斥锁 在django中 : 信号量指的是达到某个条件自动触发(中间件) # Lock产生的是一把锁,信号量产生的是多把锁 # 如何使用 from threading import Thread, Lock, Semaphore import time import random sp = Semaphore(5) # 一次性产生五把锁 class MyThread(Thread): def run(self): sp.acquire() print(self.name) time.sleep(random.randint(1, 3)) sp.release() for i in range(20): t = MyThread() t.start()
event事件
# 子进程或者子线程之间可以彼此等待彼此 eg:子A运行到某一个代码位置后发信号告诉子B开始运行 from threading import Thread, Event import time event = Event() # 类似于造了一个红绿灯 def light(): print('红灯亮 不能动') time.sleep(3) print('绿灯亮 可以动') event.set() def car(name): print('%s正在等红灯' % name) event.wait() print('%s车辆行驶了' % name) t = Thread(target=light) t.start() for i in range(20): t = Thread(target=car, args=('熊猫PRO%s' % i,)) t.start()
进程池与线程池
# 1.为什么要有进程/线程池呢? 为了保证计算机硬件的安全,但是代价是降低了程序的执行效率。我们需要考虑硬件的承受能力 # 2.进程池提前创建好固定个数的进程供程序使用,后续不会再创建 # 3.线程池提前创建好固定个数的线程供程序使用,后续不会再创建 实操: from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import current_thread import os import time pool = ProcessPoolExecutor(5) # 产生五个线程 def task(n): print(os.getpid()) time.sleep(1) return '返回结果' def func(*args, **kwargs): print('func', args, kwargs) print(args[0].result()) if __name__ == '__main__': for i in range(20): pool.submit(task, 123).add_done_callback(func) #异步回调:异步任务执行完成后有结果就会自动触发该机制
协程与协程实操
# 1.协程:程序员自己定义出出来的一种方法,可以在单线程下实现并发,效率极高 本质:其实就是戏法,让cpu误以为没有IO操作。实际上IO操作被我们自己写的代码检测,有的话立刻让代码执行别的。 核心: 自己写代码完成切换+保存状态 from gevent import monkey;monkey.patch_all() # 固定编写 用于检测所有的IO操作(猴子补丁) from gevent import spawn import time from gevent import monkey; monkey.patch_all() # 固定编写 用于检测所有的IO操作(猴子补丁) from gevent import spawn def func1(): print('func1 running') time.sleep(3) print('func1 over') def func2(): print('func2 running') time.sleep(5) print('func2 over') if __name__ == '__main__': start_time = time.time() s1 = spawn(func1) # 检测代码 一旦有IO自动切换(执行没有io的操作 变向的等待io结束) s2 = spawn(func2) s1.join() s2.join() print(time.time() - start_time) # 8.01237154006958 协程 5.015487432479858 # 1.协程实操
import socket from gevent import monkey;monkey.patch_all() # 固定编写 用于检测所有的IO操作(猴子补丁) from gevent import spawn def communication(sock): while True: data = sock.recv(1024) print(data.decode('utf8')) sock.send(data.upper()) def get_server(): server = socket.socket() server.bind(('127.0.0.1', 8080)) server.listen(5) while True: sock, addr = server.accept() # IO操作 spawn(communication, sock) s1 = spawn(get_server) s1.join()
"""
如何不断的提升程序的运行效率
多进程下开多线程 多线程下开协程
"""
标签:__,name,Thread,编程,并发,线程,time,print,import From: https://www.cnblogs.com/juzijunjun/p/16913040.html