今日内容详细
多进程实现TCP服务端并发
import socket
from multiprocessing import Process
def get_server():
server = socket.socket()
server.bind(('192.168.1.160', 8080))
server.listen(5)
return server
def get_talk(sock):
while True:
data = sock.recv(1024)
print(data.decode('utf8'))
sock.send(data.upper())
if __name__ == '__main__':
server = get_server()
while True:
sock, addr = server.accept()
# 开设多进程去聊天
p = Process(target=get_talk, args=(sock,))
p.start()
互斥锁代码实操
锁:建议只加载操作数据的部分 否则整个程序的效率会极低
from multiprocessing import Process, Lock
import time
import json
import random
def search(name):
with open(r'data.json', 'r', encoding='utf8') as f:
data = json.load(f)
print(f'{name}查看票 目前剩余:{data.get("ticket_num")}')
def buy(name):
# 先查询票数
with open(r'data.json', 'r', encoding='utf8') as f:
data = json.load(f)
# 模拟网络延迟
time.sleep(random.randint(1, 3))
# 买票
if data.get('ticket_num') > 0:
with open(r'data.json', 'w', encoding='utf8') as f:
data['ticket_num'] -= 1
json.dump(data, f)
print(f'{name}买票成功')
else:
print(f'{name}买票失败 很可怜 没车回去了!!!!')
def run(name, mutex):
search(name)
mutex.acquire() # 抢锁
buy(name)
mutex.release() # 释放锁
if __name__ == '__main__':
mutex = Lock()
for i in range(10):
p = Process(target=run, args=(f'用户{i}号', mutex))
p.start()
"""
锁有很多种 但是作用都一样
行锁 表锁......
"""
线程
线程理论
进程
进程其实是资源单位 表示一块内存空间
线程
线程才是执行单位 表示真正的代码指令
我们可以将进程比喻是车间 线程是车间里面的流水线
一个进程内部至少含有一个线程
1.一个进程内可以开设多个线程
2.同一个进程下的多个线程数据是共享的
3.创建进程与线程的区别
创建进程的消耗要远远大于线程
创建线程的两种方式
from threading import Thread
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(0.1)
print(f'{name} is over')
if __name__ == '__main__':
start_time = time.time()
# p_list = []
# for i in range(100):
# p = Process(target=task, args=(f'用户{i}',))
# p.start()
# p_list.append(p)
# for p in p_list:
# p.join()
# print(time.time() - start_time) # 0.7657256126403809
# t_list = []
# for i in range(100):
# t = Thread(target=task, args=(f'用户{i}',))
# t.start()
# t_list.append(t)
# for t in t_list:
# t.join()
# print(time.time() - start_time) # 0.1122446060180664
# t = Thread(target=task, args=('jason',))
# t.start()
# print('主线程')
"""
创建线程无需考虑反复执行的问题
"""
class MyThread(Thread):
def run(self):
print('run is running')
time.sleep(1)
print('run is over')
obj = MyThread()
obj.start()
print('主线程')
线程的诸多特性
1.join方法
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(1)
print(f'{name} is over')
t = Thread(target=task, args=('程序', ))
t.start()
t.join()
print('主线程')
2.同进程内多个线程数据共享
3.current_thread()
4.active_count()
import time
from threading import Thread, current_thread, active_count
import os
money = 1000
def task():
time.sleep(3)
global money
money = 666
print(current_thread().name)
print('子线程进程号>>>:', os.getpid())
for i in range(10):
t = Thread(target=task)
t.start()
print(money)
print('存活的线程数', active_count())
print(current_thread().name)
print('主线程进程号>>>:', os.getpid())
GIL全局解释器锁
官方文档对GIL的解释
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.
"""
1.在CPython解释器中存在全局解释器锁简称GIL
python解释器有很多类型
CPython JPython PyPython(常用的是CPython解释器)
2.GIL本质也是一把互斥锁 用来阻止同一个进程内多个线程同时执行(重要)
3.GIL的存在是因为CPython解释器中内存管理不是线程安全的(垃圾回收机制)
垃圾回收机制
引用计数、标记清楚、分带回收
"""
验证GIL的存在
from threading import Thread
num = 100
def task():
global num
num -= 1
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(num) # 0
GIL与普通互斥锁
既然CPython解释器中有GIL 那么我们以后写代码是不是就不需要操作锁了!!!
"""
GIL只能够确保同进程内多线程数据不会被垃圾回收机制弄乱
并不能确保程序里面的数据是否安全
"""
import time
from threading import Thread, Lock
num = 100
def task(mutex):
global num
mutex.acquire()
count = num
time.sleep(0.1)
num = count - 1
mutex.release()
mutex = Lock()
t_list = []
for i in range(100):
t = Thread(target=task, args=(mutex,))
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(num) # 0
GIL锁是解释器级别的锁,保证同一时刻进程中只有一个线程拿到GIL锁,拥有执行权限。而线程互斥锁是保证同一时刻只有一个线程能对数据进行操作,是数据级别的锁
python多线程是否有用
需要分情况
情况1
单个CPU
多个CPU
情况2
IO密集型(代码有IO操作)
计算密集型(代码没有IO)
1.单个CPU
IO密集型
多进程
申请额外的空间 消耗更多的资源
多线程
消耗资源相对较少 通过多道技术
ps:多线程有优势!!!
计算密集型
多进程
申请额外的空间 消耗更多的资源(总耗时+申请空间+拷贝代码+切换)
多线程
消耗资源相对较少 通过多道技术(总耗时+切换)
ps:多线程有优势!!!
2.多个CPU
IO密集型
多进程
总耗时(单个进程的耗时+IO+申请空间+拷贝代码)
多线程
总耗时(单个进程的耗时+IO)
ps:多线程有优势!!!
计算密集型
多进程
总耗时(单个进程的耗时)
多线程
总耗时(多个进程的综合)
ps:多进程完胜!!!
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
# 计算密集型
res = 1
for i in range(1, 100000):
res *= i
if __name__ == '__main__':
print(os.cpu_count()) # 20 查看当前计算机CPU个数
start_time = time.time()
# p_list = []
# for i in range(12): # 一次性创建12个进程
# p = Process(target=work)
# p.start()
# p_list.append(p)
# for p in p_list: # 确保所有的进程全部运行完毕
# p.join()
t_list = []
for i in range(12):
t = Thread(target=work)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print('总耗时:%s' % (time.time() - start_time)) # 获取总的耗时
"""
计算密集型
多进程:5.665567398071289
多线程:30.233906745910645
"""
def work():
time.sleep(2) # 模拟纯IO操作
if __name__ == '__main__':
start_time = time.time()
# t_list = []
# for i in range(100):
# t = Thread(target=work)
# t.start()
# for t in t_list:
# t.join()
p_list = []
for i in range(100):
p = Process(target=work)
p.start()
for p in p_list:
p.join()
print('总耗时:%s' % (time.time() - start_time))
"""
IO密集型
多线程:0.0149583816528320
多进程:0.6402878761291504
"""
死锁现象
死锁的概念
两个或两个以上的进程(线程)在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,程序无法推进
acquire()
release()
from threading import Thread,Lock
import time
mutexA = Lock() # 产生一把锁
mutexB = Lock() # 产生一把锁
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print(f'{self.name}抢到了A锁')
mutexB.acquire()
print(f'{self.name}抢到了B锁')
mutexB.release()
print(f'{self.name}释放了B锁')
mutexA.release()
print(f'{self.name}释放了A锁')
def func2(self):
mutexB.acquire()
print(f'{self.name}抢到了B锁')
time.sleep(1) # 死锁在这里,后面的代码不会继续执行
mutexA.acquire()
print(f'{self.name}抢到了A锁')
mutexA.release()
print(f'{self.name}释放了A锁')
mutexB.release()
print(f'{self.name}释放了B锁')
for i in range(10):
obj = MyThread()
obj.start()
信号量
信号量和互斥锁
互斥锁同时只允许一个进程修改数据,而信号量则允许多个人同时修改数据
在python并发编程中信号量相当于多把互斥锁
"""
如果将自定义互斥锁比喻成是单个厕所(一个坑位)
那么信号量相当于是公共厕所(多个坑位)
互斥锁就相当于你家的卫生间,只有一个坑位,同时只能一个人上厕所;而信号量则相当于公共厕所,有多个坑位,可以多个人同时上厕所。比如现在有10个人需要上厕所,现在公共厕所只有3个坑位,这时候就先有3个人上厕所,等其中一个或者多个人出来之后,另外的人才能进去。
"""
from threading import Thread, Lock, Semaphore
import time
import random
sp = Semaphore(5) # 一次性产生五把锁
class MyThread(Thread):
def run(self):
sp.acquire()
print(self.name)
time.sleep(random.randint(1, 3))
sp.release()
for i in range(20):
t = MyThread()
t.start()
event事件
"""
子线程的运行可以由其他子线程决定!!!
"""
from threading import Thread, Event
import time
event = Event() # 类似于造了一个红绿灯
def light():
print('红灯亮着的 所有人都不能动')
time.sleep(3)
print('绿灯亮了 油门踩到底 给我冲!!!')
event.set() # 绿灯亮了
def car(name):
print('%s正在等红灯' % name)
event.wait() # 等灯绿 此时 event为False 直到event.set()将其值设置为True,才继续
print('%s加油门 飙车了' % name)
t = Thread(target=light)
t.start()
for i in range(20):
t = Thread(target=car, args=('熊猫PRO%s' % i,))
t.start()
event.isSet() | 返回event的状态值 |
---|---|
event.wait() | 如果event.isSet() == False将阻塞线程 |
event.set() | 设置event的状态值为True,等待系统调度 |
event.clear() | 恢复event的状态值为False |
进程池与线程池
进程和线程能否无限制的创建 不可以
因为硬件的发展赶不上软件 有物理极限 如果我们在编写代码的过程中无限制的创建进程或者线程可能会导致计算机崩溃
池
降低程序的执行效率 但是保证了计算机硬件的安全
进程池
提前创建好固定数量的进程供后续程序的调用超出则等待
线程池
提前创建好固定数量的线程供后续程序的调用超出则等待
#Python标准模块——concurrent.futures
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor:进程池,提供异步调用
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random
from threading import current_thread
# 1.产生含有固定数量线程的线程池
# pool = ThreadPoolExecutor(10)
pool = ProcessPoolExecutor(5)
def task(n):
print('task is running')
time.sleep(random.randint(1, 3))
print('task is over', n, current_thread().name)
print('task is over', os.getpid())
return '我是task函数的返回值'
def func(*args, **kwargs):
print('from func')
if __name__ == '__main__':
# 2.将任务提交给线程池即可
for i in range(20):
res = pool.submit(task, 123) # 朝线程池提交任务
print(res.result()) # 不能直接获取返回值
pool.submit(task, 123).add_done_callback(func)
"""add_done_callback只要任务有结果了 就会自动调用括号内的函数处理"""
协程
"""
进程:资源单位
线程:执行单位
协程:单线程下实现并发(效率极高)
在代码层面欺骗CPU 让CPU觉得我们的代码里面没有IO操作
实际上IO操作被我们自己写的代码检测 一旦有 立刻让代码执行别的
(该技术完全是程序员自己弄出来的 名字也是程序员自己起的)
核心:自己写的代码完成切换+保存状态
"""
import time
from gevent import monkey;monkey.patch_all() # 固定编写 用于检测所有的IO操作(猴子补丁)
from gevent import spawn
def func1():
print('func1 running')
time.sleep(3)
print('func1 over')
def func2():
print('func2 running')
time.sleep(5)
print('func2 over')
if __name__ == '__main__':
start_time = time.time()
# func1()
# func2()
s1 = spawn(func1) # 检测代码 一旦有IO自动切换(执行没有io的操作 变向的等待io结束)
s2 = spawn(func2)
s1.join()
s2.join()
print(time.time() - start_time) # 8.01237154006958 协程 5.015487432479858
协程实现并发
import socket
from gevent import monkey;monkey.patch_all() # 固定编写 用于检测所有的IO操作(猴子补丁)
from gevent import spawn
def communication(sock):
while True:
data = sock.recv(1024)
print(data.decode('utf8'))
sock.send(data.upper())
def get_server():
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
while True:
sock, addr = server.accept() # IO操作
spawn(communication, sock)
s1 = spawn(get_server)
s1.join()
"""
终极结论
python可以通过开设多进程 在多进程下开设多线程 在多线程使用协程
从而让程序执行的效率达到极致!!!
但是实际业务中很少需要如此之高的效率(一直占着CPU不放)
因为大部分程序都是IO密集型的
所以协程我们知道它的存在即可 几乎不会真正去自己编写
"""
标签:__,name,网络,并发,线程,time,print,import
From: https://www.cnblogs.com/qian-yf/p/16913238.html