目录
一. 理论知识
1.1 多道技术相关
##操作系统发展史: https://www.cnblogs.com/Dominic-Ji/articles/10929381.html>
#01 多道技术相关
1)并发:看起来像同时运行的就可以称之为并发
2)并行:真正意义上的同时执行
单核多道技术 实现并发原理:
"""
切换(CPU)分为两种情况
1.当一个程序遇到IO操作的时候,操作系统会剥夺该程序的CPU执行权限
作用:提高了CPU的利用率 并且也不影响程序的执行效率
2.当一个程序长时间占用CPU的时候,操作吸引也会剥夺该程序的CPU执行权限
弊端:降低了程序的执行效率(原本时间+切换时间)
"""
2.1 同步和异步 阻塞和非阻塞
- 同步和异步
"""描述的是任务的提交方式"""
同步:任务提交之后,原地等待任务的返回结果,等待的过程中不做任何事(干等)
程序层面上表现出来的感觉就是卡住了
异步:任务提交之后,不原地等待任务的返回结果,直接去做其他事情
我提交的任务结果如何获取?
任务的返回结果会有一个异步回调机制自动处理
- 阻塞非阻塞
"""描述的程序的运行状态"""
阻塞:阻塞态
非阻塞:就绪态、运行态
理想状态:我们应该让我们的写的代码永远处于就绪态和运行态之间切换
上述概念的组合:最高效的一种组合就是异步非阻塞
二. 进程对象编程
2.1 开启进程的方式
- 函数方式 multiprocessing
- 类方式 ()
'''
方式一: 使用Process类 创建一个进程对象 q = Process(target=函数,args=(参数)) 主要用于 函数是使用
方式二: 创建一个类 去继承 Process类 Mypeoess(Process类) , p = MyProcess('zhang') 主要用于类的继承
注意继承类,需要使用 super().__init__() 方法 才可以传参数
'''
##第一种方式 主要用这个
from multiprocessing import Process
import time
def task(name):
print(f' is running {name}')
time.sleep(3)
print(f'is over {name}')
if __name__ == '__main__':
#01 创建一个实例化对象 target=task 函数名称 ,args=('msg',) args 函数参数
P = Process(target=task,args=('qq',))
# 容器类型哪怕只有一个元素 建议用逗号隔开
#02 开启进程,告诉操作系统帮你创建进程
P.start()
print('主进程')
执行结果
主进程
is running qq
is over qq
这里发现主进程 优先被打印出来
## 第二种方式 针对类
# 第二种方式 类的继承
from multiprocessing import Process
import time
class MyProcess(Process): #这里是固定写法 类名称--->> 继承(Process)
def __init__(self,name,age):
'''
这里继承了 Process 类的方法,但是又不知道 Process 类里有什么
这里需要使用 super().__init__() 方式调用下 重新赋值即可
下面函数 run()才可以传参数
'''
super().__init__()
self.name = name
self.age = age
def run(self):
print(f'hello {self.name} {self.age}')
time.sleep(1)
print('get out!')
if __name__ == '__main__':
p = MyProcess('zhang',18)
p.start()
print('主')
- 总结
"""
创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去
一个进程对应在内存中就是一块独立的内存空间
多个进程对应在内存中就是多块独立的内存空间
进程与进程之间数据默认情况下是无法直接交互,如果想交互可以借助于第三方工具、模块
"""
2.2 join方法
join是让主进程等待子进程代码运行结束之后,再继续运行。不影响其他子进程的执行
from multiprocessing import Process
import time
def task(name,n):
print(f'is running {name}')
time.sleep(n)
print(f'is over {name}')
if __name__ == '__main__':
# #01 创建一个实例化对象 target=task 函数名称 ,args=('msg',) args 函数参数
# P1 = Process(target=task,args=('qq1', 1))
# P2 = Process(target=task, args=('weixin2', 2))
# P3 = Process(target=task, args=('qq3', 3))
#
# start_time = time.time()
# P1.start()
# P2.start()
# P3.start() # 仅仅是告诉操作系统要创建进程
#
# P1.join() # 主进程等待子进程p运行结束之后再继续往后执行
# P2.join()
# P3.join()
## for 循环方式
start_time = time.time()
p_list = []
for i in range(4):
Pi = Process(target=task, args=(f'qq{i}', i))
Pi.start()
p_list.append(Pi)
for i in p_list:
Pi.join()
print('主进程',time.time() - start_time)
解释下:这里等所有的子程序都执行完毕后 才会打印 print('主进程',time.time() - start_time)
执行结果
is running qq0
is over qq0
is running qq1
is running qq2
is running qq3
is over qq1
is over qq2
is over qq3
主进程 3.112183094024658
2.3 进程间数据隔离
- 进程之间会有数据隔离
from multiprocessing import Process
money = 100
def task():
global money # 局部修改全局
money = 666
print('子',money)
if __name__ == '__main__':
p = Process(target=task)
p.start()
p.join()
print(money)
解释下:global money 子进程修改了全局变量 但是只局限于 该子进程自己的,主进程的并未修改
2.4 进程对象其它方法pid
from multiprocessing import Process, current_process
import time
import os
def task(name):
print(f'子进程正在运行 pid {current_process().pid}') #查看进程的 pid
print(f'子进程正在运行 pid {os.getpid()}') #查看进程的 pid
print(f'子进程正在运行 ppid(-->>父进程) {os.getppid()}') #查看进程的 ppid
time.sleep(1)
print('子进程挂了')
if __name__ == '__main__':
P = Process(target=task,args=('qq',))
P.start()
print(P.is_alive()) #判断进程是否存活
P.terminate() #杀死进程 P
P.join()
print('主进程pid',current_process().pid) #查看进程的 pid
print('主进程pid', os.getpid()) #查看进程的 pid
print('主进程 ppid(-->>父进程)', os.getppid()) #查看进程的 ppid
解释下:
os.getpid() # 查看当前进程进程号
os.getppid() # 查看当前进程的父进程进程号
p.terminate() # 杀死当前进程 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快
print(p.is_alive()) # 判断当前进程是否存活
2.4 守护进程
- p.daemon = True
from multiprocessing import Process
import time
def task(name):
print('%s总管正在活着'% name)
time.sleep(1)
print('%s总管正在死亡' % name)
if __name__ == '__main__':
p = Process(target=task,args=('egon',))
# p = Process(target=task,kwargs={'name':'egon'})
p.daemon = True # 将进程p设置成守护进程 这一句一定要放在start方法上面才有效否则会直接报错
p.start()
print('皇帝jason寿终正寝')
#解释下
守护进程,如果主进程死了 守护进程会一起挂掉,这里主进程执行完毕后 退出,子进程P(主进程直接死的 所以子进程内代码来不及执行 就死了) 也会退出
2.5 互斥锁
多个进程操作同一份数据的时候,会出现数据错乱的问题
针对上述问题,解决方式就是加锁处理:将并发变成串行,牺牲效率但是保证了数据的安全
import json
import random
import time
from multiprocessing import Process, Lock
import multiprocessing
# 01 查票
def selete(user):
with open('data.txt', 'r', encoding='utf8') as f:
dic = json.load(f)
msg = f"用户 {user} 查询余票成功,剩余 {dic.get('ticket_num')} "
print(f"{msg}")
# 02 查询 + 购买
def update(user):
# 获取查询结果
with open('data.txt', 'r', encoding='utf8') as f:
msg_dic = json.load(f)
time.sleep(random.randint(1, 2))
# 判断是否存在余票
if msg_dic.get('ticket_num') > 0:
# 修改票的结果
num = msg_dic.get('ticket_num')
msg_dic['ticket_num'] -= 1
# 写入文件
with open('data.txt', 'w', encoding='utf8') as f:
json.dump(msg_dic, f)
print(f'当前余票为:{num} 用户{user} 购票成功')
else:
print(f'当前余票为:{msg_dic.get("ticket_num")} 用户{user} 购票失败')
def run(user, mutex):
selete(user)
# 给抢票环节加锁
mutex.acquire()
update(user)
# 释放锁
mutex.release()
if __name__ == '__main__':
# 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票 谁就可以操作数据
multiprocessing.set_start_method('fork')
mutex = Lock()
for i in range(1, 11):
P = Process(target=run, args=(f'{i}', mutex))
P.start()
2.6 进程之间的通信
1, 队列Queue模块
"""
管道:subprocess
stdin stdout stderr
队列:管道+锁
队列:先进先出
堆栈:先进后出
基本语法
q = queue.Queue(5) #创建队列 ()标示生成的队列最大可以同时存放的数据量
q.full()) #判断当前队列是否满了 满了返回True
q.empty() #判断当前队列是否空了 空的回返回True
q.get_nowait() #没有数据直接报错queue.Empty
q.get(timeout=3) #没有数据之后原地等待三秒之后再报错 queue.Empty
"""
# 创建一个队列
q = queue.Queue(5)
# 往队列存数据
q.put(111)
q.put(222)
q.put(333)
q.put(444)
print(q.empty())
q.put(555)
print(q.full()) #判断当前队列是否满了 满了返回True
print(q.empty()) #判断当前队列是否空了 空的回返回True
# 取数据
v1 = q.get()
v2 = q.get()
v3 = q.get()
v4 = q.get()
v5 = q.get()
# v6 = q.get_nowait() #没有数据直接报错
try:
v6 = q.get(timeout=3)
print(v6)
except Exception as e:
print(f'一滴都没了 {e}')
2, ipc机制
"""
研究思路:
1 主进程跟子进程借助队列进行通信
2 子进程和子进程借助对列进行通信
"""
from multiprocessing import Process,Queue
import multiprocessing
def producer(q):
q.put('68号技师为您服务')
def consumer(q):
print(q.get())
if __name__ == '__main__':
multiprocessing.set_start_method('fork')
q = Queue() #生成一个队列
P = Process(target=producer,args=(q,)) #启动一个进程 存数据到队列内
P1 = Process(target=consumer, args=(q,)) #再启动一个进程 取队列内的值
P.start()
P1.start()
3, 生产者消费者模式
- JoinableQueue
"""
生产者:生产/制造东西的
消费者:消费/处理东西的
该模型除了上述两个之外还需要一个媒介
生活中的例子做包子的将包子做好后放在蒸笼(媒介)里面,买包子的取蒸笼里面拿
厨师做菜做完之后用盘子装着给你消费者端过去
生产者和消费者之间不是直接做交互的,而是借助于媒介做交互
生产者(做包子的) + 消息队列(蒸笼) + 消费者(吃包子的)
JoinableQueue
q.task_done() # 告诉队列你已经从里面取出了一个数据并且处理完毕了
q.join() 当计数器为0的时候 才往后运行
"""
import multiprocessing
import random
import time
from multiprocessing import Process, Queue,JoinableQueue
# 生产者
def producer(user, name, q):
for i in range(1, 5):
data = f'厨师 {user} 完成菜品:{name} {i} '
print(data)
q.put(data)
# 消费者
def consumer(user, q):
while True:
msg = q.get()
time.sleep(random.randint(1, 3))
data = f'客人 {user} 购买了 {msg}'
print(data)
q.task_done() # 告诉队列你已经从里面取出了一个数据并且处理完毕了
if __name__ == '__main__':
multiprocessing.set_start_method('fork')
q = JoinableQueue()
#生产者
P = Process(target=producer, args=('百夫长', '佛跳墙', q,))
P1 = Process(target=producer, args=('李莲花', '黄花鱼', q,))
#消费者
C = Process(target=consumer, args=('客人:花千骨', q))
C1 = Process(target=consumer, args=('客人:白子画', q))
P.start()
P1.start()
C.daemon = True
C1.daemon = True
C.start()
C1.start()
P.join()
P.join()
q.join() # 等待队列中所有的数据被取完再执行往下执行代码
"""
JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1
每当你调用task_done的时候 计数器-1
q.join() 当计数器为0的时候 才往后运行
但是即使往后走 由于q.get()还在获取数据, 消费者进程并没有死
所以需要将消费者设置为守护进程 只要 q.join执行完毕 队列内没有数据了 那么消费者也没有存在必要了
"""
# 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了
print('下班了')
解释下:
q.join() 这是执行完毕后 虽然子进程没有死 但是代码会继续往下执行,
执行到 print('下班了') 主进程退出,相对应的守护进程 (消费者)也会退出
- 方式二(不推荐)
import multiprocessing
import random
import time
from multiprocessing import Process, Queue
def producer(user, name, q):
for i in range(1, 5):
data = f'厨师 {user} 完成菜品:{name} {i} '
print(data)
q.put(data)
def consumer(user, q):
while True:
msg = q.get()
# 判断队列内无内容 则结束代码
if not msg: break
time.sleep(random.randint(1, 3))
data = f'客人 {user} 购买了 {msg}'
print(data)
if __name__ == '__main__':
multiprocessing.set_start_method('fork')
q = Queue()
#生产者
P = Process(target=producer, args=('百夫长', '佛跳墙', q,))
P1 = Process(target=producer, args=('李莲花', '黄花鱼', q,))
#消费者
C = Process(target=consumer, args=('客人:花千骨', q))
C1 = Process(target=consumer, args=('客人:白子画', q))
P.start()
P1.start()
P.join()
P.join()
# 等待生产者生产完毕之后 往队列中添加特定的结束符号
# q.put(None) # 肯定在所有生产者生产的数据的末尾
# q.put(None) # 肯定在所有生产者生产的数据的末尾
# 在消费者内 判断如果 取出队列的值是空 则结果该代码
# 但是需要每个消费者都应该要取出来一个 空值,我们并不知道有多少消费者 所以不推荐
q.put(None)
q.put(None)
C.start()
C1.start()
三. 线程对象编程
3.1 开启线程两种方式
from threading import Thread
- 函数方式
'''
开启线程不需要在main下面执行代码 直接书写就可以
但是我们还是习惯性的将启动命令写在main下面
线程是不需要申请内存空间和拷贝代码的 所以速度很快
方式一: 使用Process类 创建一个线程对象 q = Thread(target=函数,args=(参数)) 主要用于 函数是使用
方式二: 创建一个类 去继承 Process类 Mypeoess(Thread线程类) , p = MyProcess('zhang') 主要用于类的继承
注意继承类,需要使用 super().__init__() 方法 才可以传参数
'''
from multiprocessing import Process
import multiprocessing
from threading import Thread
import time
def task(x):
print(f'is run {x}')
if __name__ == '__main__':
t = Thread(target=task, args=('线程',))
t.start()
print('主')
'''
执行结果
is run 线程
主
is run 进程
线程速度-->> 进程
线程是不需要申请内存空间和拷贝代码的 所以速度很快
'''
- 类的继承方式
from multiprocessing import Process
import multiprocessing
from threading import Thread
import time
class Mythread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
print(f'is name {self.name}')
time.sleep(1)
print(f'{self.name} is not avit')
if __name__ == '__main__':
t = Mythread('线程')
t.start()
print('主')
'''
执行结果:
is name 线程
主
线程 is not avit
发现线程速度很快 打印在主进程前面
'''
3.2 TCP实现服务端并发效果
1)进程方式
##服务端
import socket
from threading import Thread
import multiprocessing
from multiprocessing import Process
multiprocessing.set_start_method('fork')
server = socket.socket()
server.bind(('127.0.0.1', 8093))
server.listen(5)
print('开始监听')
# 具体执行流程代码 封装为一个函数接口
def talk(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0:
break
print(data.decode('utf8'))
conn.send(data.upper())
except Exception as e:
print(e)
break
conn.close()
while True:
# 循环创建连接池
conn, adder_client = server.accept()
# 如果连接池有新连接 则启动一个线程去执行 talk(新的连接 conn) 函数
##线程方式
t = Thread(target=talk, args=(conn,))
t.start()
#进程方式
#P = Process(target=talk, args=(conn,))
#P.start()
#客户端
import socket
import time
client = socket.socket()
client.connect(('127.0.0.1',8093))
while True:
time.sleep(1)
client.send(b'hello word')
date = client.recv(1024)
print(date.decode('utf8'))
2)线程方式
import socket
from threading import Thread,current_thread
def conn_server(conn):
while True:
print(f'线程3 {current_thread().name}')
try:
data = conn.recv(1024)
print(data.decode('utf8'))
if len(data) == 0:
break
print(data.decode('utf8'))
conn.send(data.upper())
except Exception as e:
print(e)
break
def server_def(ip, port):
print(f'线程2 {current_thread().name}')
server = socket.socket()
server.bind((ip, port))
server.listen(5)
print(f'开始监听地址是:{ip}:{port}')
while True:
conn, addr = server.accept()
print(addr)
# 连接池有内容 开设一个线程去执行
t = Thread(target=conn_server,args=(conn,))
t.start()
if __name__ == '__main__':
# 开设线程去执行函数 创建套接字 ;连接池
s = Thread(target=server_def, args=('127.0.0.1',8093))
s.start()
print(f'线程1 {current_thread().name}')
#客户端
import socket
import time
client = socket.socket()
client.connect(('127.0.0.1',8093))
while True:
client.send(b'hello world')
data = client.recv(1024)
print(data.decode('utf8'))
time.sleep(1)
3.3 线程对象的join方法
- join是让主进程等待线程代码运行结束之后,再继续运行。不影响其他线程的执行
from threading import Thread
import time
def task(name):
print(f'线程 is start {name}')
time.sleep(1)
print(f'线程 is stop {name}')
if __name__ == '__main__':
t = Thread(target=task, args=('花千骨', ))
t.start()
# t.join() 等线程执行完毕后 继续往下执行
t.join()
print('主')
'''
--->> t.join() 结果
线程 is start 花千骨
线程 is stop 花千骨
主
进程已结束,退出代码0
--->> ##没有加t.join() 结果
线程 is start 花千骨
主
线程 is stop 花千骨
'''
3.4 同进程下多个线程数据共享
from threading import Thread
import time
money = 100
def task():
global money
money = 666
print(f'线程修改后 --->> {money}')
if __name__ == '__main__':
print(f'原始数据 {money}')
t = Thread(target=task)
t.start()
t.join()
print(f'由线程修改后 主进程打印结果 {money}')
'''
执行结果如下:
原始数据 100
线程修改后 --->> 666
由线程修改后 主进程打印结果 666
结论:
同一进程下 多个线程下 数据共享
'''
3.5 线程对象属性及其他方法
'''
current_thread().name 获取线程的名称
主线程名称:MainThread
子线程名称:Thread-1 Thread-2 Thread-3...
active_count()
获取当前线程活跃的链接数:包含主线程在内
'''
import time
from threading import Thread,current_thread,active_count
import os
import time
def run(n):
time.sleep(n)
print(f'线程 线程的名称:',current_thread().name)
# print(f'线程pid ',os.getpid())
# print(f'线程ppid 父进程 ', os.getppid())
if __name__ == '__main__':
t = Thread(target=run, args=(2, ))
t1 = Thread(target=run, args=(1, ))
t.start()
t1.start()
t1.join()
print('主:当前正在活跃的线程数 ',active_count())
# print('主',os.getpid())
# print('主 ppid', os.getppid())
# print('主 线程名称',current_thread().name)
3.6 守护线程
'''
主线程结束后 不会立刻回收资源,会等待其他非守护线程结束才算结束
如果有守护线程和非守护线程 结束顺序如下
主进程代码结束(此时等待非守护线程结束)-->> 非守护线程结束 --->> 主进程结束 --->> 守护进程跟着结束
ps: 守护线程会在主线程结束后 立刻结束
因为主线程结束意味着所有线程结束
'''
import time
from threading import Thread
def run(name):
print(f'线程{name} 正在运行')
time.sleep(1)
print(f'{name} 线程已经停止')
print()
if __name__ == '__main__':
t = Thread(target=run, args=('egen',))
t.daemon = True
t.start()
print('主')
#案例
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print('end123')
def func():
print(456)
time.sleep(3)
print('end456')
if __name__ == '__main__':
t1 = Thread(target=foo)
t2 = Thread(target=func)
t1.daemon = True
t1.start()
t2.start()
print('主.......')
'''
打印结果
123
456
主.......
end123
end456
其中t1是守护进程
t1.start() 启动打印123 sleep 1s
t2.start() 启动打印456 sleep 3s
主进程 打印主(代码执行完毕)等待非守护进程结束 t2.start()
t1.start() sleep 1s后 继续打印 print('end123') 执行完毕 该进程结束
t2.start() sleep 3s 结束
主进程等待 t2.start() 结束后 也结束
'''
3.7 线程互斥锁
'''
#创建全局锁
mutex = Lock()
#对于数据加个锁 保证每次都只能是一个线程处理数据 处理完毕后
mutex.acquire()
# 处理完后 释放锁
mutex.release()
'''
from threading import Thread,current_thread,Lock
import time
moeny = 100
#创建全局锁
mutex = Lock()
def stak():
global moeny
#对于数据加个锁 保证每次都只能是一个线程处理数据 处理完毕后
mutex.acquire()
tmp = moeny
time.sleep(0.1)
print(current_thread().name)
moeny = tmp -1
# 处理完后 释放锁
mutex.release()
if __name__ == '__main__':
t_list = []
for i in range(1,100):
t = Thread(target=stak)
t.start()
t_list.append(t)
# 保证每个线程对象都执行完毕后 再执行主线程 print(moeny)
for i in t_list:
i.join()
print(moeny)
3.8 GIL全局解释锁
内存管理-->>垃圾回收机制(回收机制是通过线程完成的)
同一个进程下的多个线程的无法同时执行即无法利用多核优势
"""
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)
"""
"""
python解释器其实有多个版本
Cpython
Jpython
Pypypython
但是普遍使用的都是CPython解释器
在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行
同一个进程下的多个线程无法利用多核优势!!!
疑问:python的多线程是不是一点用都没有???无法利用多核优势
因为cpython中的内存管理不是线程安全的
内存管理(垃圾回收机制)
1.应用计数
2.标记清楚
3.分代回收
"""
"""
重点:
1.GIL不是python的特点而是CPython解释器的特点
2.GIL是保证解释器级别的数据的安全
3.GIL会导致同一个进程下的多个线程无法同时执行即无法利用多核优势(******)
4.针对不同的数据还是需要加不同的锁处理
5.解释型语言的通病:同一个进程下多个线程无法利用多核优势
"""
3.9 GIL锁和互斥锁区别
'''
1 首先100线程起来后 会先去抢 GIL锁
2 抢到GIL锁后 执行代码 然后抢到互斥锁 修改数据,遇见io sleep0.1 代码后 释放GIL锁
3 但是我手上还有一把互斥锁 别的线程即使抢到GIL锁 遇见 代码内加锁 mutex.acquire() 进入IO 等待,会释放GIL锁
4 最终GIL还是回到我的手中
'''
import time
from threading import Thread, current_thread, Lock
moeny = 100
# 创建全局锁
mutex = Lock()
def stak():
global moeny
# 对于数据加个锁 保证每次都只能是一个线程处理数据 处理完毕后
# 加锁的另一种写法
# with mutex:
# tmp = moeny
# #time.sleep(0.1)
# print(current_thread().name)
# moeny = tmp -1
mutex.acquire()
tmp = moeny
time.sleep(0.1)
print(current_thread().name)
moeny = tmp - 1
mutex.release()
if __name__ == '__main__':
t_list = []
for i in range(1, 100):
t = Thread(target=stak)
t.start()
t_list.append(t)
# 保证每个线程对象都执行完毕后 再执行主线程 print(moeny)
for i in t_list:
i.join()
print(moeny)
问题:同一个进程下的多线程无法利用多核优势,是否有用?
计算场景 多进程比较快
io场景 多线程较快
'''
计算密集型 (进程比较快)
线程计算时间是 4.6426990032196045 s
进程计算时间是 1.8311121463775635 s
io密集型 (线程比较快)
线程计算时间是 2.007658004760742 s
进程计算时间是 2.2873289585113525
'''
## 计算密集型
from multiprocessing import Process
import time
import os
from threading import Thread
def work():
res = 1
for i in range(10000000):
res *= i
if __name__ == '__main__':
l = []
print(os.cpu_count())
start_time = time.time()
for i in range(1,12):
# t = Thread(target=work) # 线程计算时间是 4.6426990032196045 s
# t.start()
# l.append(t)
p = Process(target=work) # 进程计算时间是 1.8311121463775635 s
p.start()
l.append(p)
for i in l:
i.join()
run_time = time.time() - start_time
print(run_time)
# io 密集型
from multiprocessing import Process
import time
import os
from threading import Thread
def work():
time.sleep(2)
if __name__ == '__main__':
l = []
print(os.cpu_count())
start_time = time.time()
for i in range(1,12):
t = Thread(target=work) # 线程计算时间是 2.007658004760742 s
t.start()
l.append(t)
# p = Process(target=work) # 进程计算时间是 2.2873289585113525
# p.start()
# l.append(p)
for i in l:
i.join()
run_time = time.time() - start_time
print(run_time)
四. 锁机制 信号量 event事件
4.1 死锁和递归锁
1)死锁
当你知道锁的使用抢锁必须要释放锁,其实你在操作锁的时候也极其容易产生死锁现象(整个程序卡死 阻塞)
##死锁现象
import time
from threading import Thread, Lock
mutexA = Lock()
mutexB = Lock()
class Myis_thread(Thread):
def run(self):
self.fun1()
self.fun2()
def fun1(self):
mutexA.acquire()
print(f'我抢到A锁了 {self.name}') #线程1抢到A锁 其他线程等待
mutexB.acquire()
print(f'我抢到B锁了 {self.name}') #线程1抢到B锁 其他线程等待
mutexB.release()
mutexA.release() #线程1释放 A B锁 其他线程抢A锁
def fun2(self):
mutexB.acquire() #线程1抢到B锁 进入sleep2s 其他线程抢到A锁 准备抢B锁
print(f'我抢到B锁了 {self.name}')
time.sleep(2)
mutexA.acquire()
print(f'我抢到A锁了 {self.name}') #线程1睡眠结束后 开始抢A锁,但是A锁被其他线程抢了 并未释放
mutexA.release()
mutexB.release() # 线程1有B锁 需要A锁,其他线程有A锁 需要B锁,故而产生死锁现象
if __name__ == '__main__':
for i in range(10):
t = Myis_thread()
t.start()
'''
运行结果
Thread-1 抢到A锁
Thread-1 抢到B锁
Thread-1 抢到B锁
Thread-2 抢到A锁
'''
2)递归锁
mutexB = RLock()
"""
递归锁的特点
可以被连续的acquire和release
但是只能被第一个抢到这把锁执行上述操作
它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一
只要计数不为0 那么其他人都无法抢到该锁
# 将上述的
mutexA = Lock()
mutexB = Lock()
# 换成
mutexA = mutexB = RLock()
"""
import time
from threading import Thread, Lock,RLock
# mutexA = RLock()
# mutexB = RLock()
mutexA = mutexB = RLock()
class Myis_thread(Thread):
def run(self):
self.fun1()
self.fun2()
def fun1(self):
mutexA.acquire()
print(f'{self.name} 我抢到A锁了 ') #线程1抢到A锁 计时器+1 =1 其他进程不可强 等待
mutexB.acquire()
print(f'{self.name} 我抢到B锁了 ') #线程1抢到B锁 计时器+1 1+1=2 其他进程不可强 等待
mutexB.release() #线程1释放B锁 计时器-1 2-1=1 其他进程不可强 等待
mutexA.release() #线程1释放A锁 计时器-1 1-1=0 其他进进程可以抢(但是执行fun1 很多线程需要抢A锁)
def fun2(self):
mutexB.acquire()
print(f'{self.name} 我抢到B锁了 ') #线程1抢到B锁 其他线程不可抢
time.sleep(2)
mutexA.acquire()
print(f'{self.name} 我抢到A锁了 ') #线程1抢到A锁 其他线程不可抢
mutexA.release()
mutexB.release() #线程释放AB锁 退出 其他线程开始抢锁
if __name__ == '__main__':
for i in range(5):
t = Myis_thread()
t.start()
'''
运行结果
Thread-1 我抢到A锁了
Thread-1 我抢到B锁了
Thread-1 我抢到B锁了
Thread-1 我抢到A锁了
Thread-2 我抢到A锁了
Thread-2 我抢到B锁了
Thread-2 我抢到B锁了
Thread-2 我抢到A锁了
Thread-4 我抢到A锁了
Thread-4 我抢到B锁了
Thread-4 我抢到B锁了
Thread-4 我抢到A锁了
Thread-3 我抢到A锁了
Thread-3 我抢到B锁了
Thread-3 我抢到B锁了
Thread-3 我抢到A锁了
Thread-5 我抢到A锁了
Thread-5 我抢到B锁了
Thread-5 我抢到B锁了
Thread-5 我抢到A锁了
'''
4.2 信号量
sm = Semaphore(5)
import random
import time
from threading import Thread, Semaphore
sm = Semaphore(5) # 括号内写数字 写几就表示并发是几把锁
def task(name):
sm.release()
print(f'{name} is run')
time.sleep(random.randint(2,5))
sm.acquire()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task, args=(f'张宇宙 {i}',))
t.start()
4.3 Event事件
-
Event
-
event.set() 发送信号
-
event.wait() 等待信号
from threading import Thread, Event
import time
event = Event() # 造了一个红绿灯
def light():
print('红灯亮着的') #执行顺序1 线程1 执行到这里 sleep 3
time.sleep(3)
print('绿灯亮了')
# 告诉等待红灯的人可以走了
event.set() #执行顺序4 sleep 结束后 线程1发出信号 event.set()
print('等待信号才能走')
def car(name):
print('%s 车正在灯红灯'%name) #执行顺序2 等待过程中 线程1-5 执行到这里
event.wait() # 等待别人给你发信号 #执行顺序3 这里卡住了
print('%s 车加油门飙车走了'%name) #执行顺序5 线程1-5 收到信号后 继续往后执行
if __name__ == '__main__':
t = Thread(target=light)
t.start()
for i in range(5):
t = Thread(target=car, args=('%s'%i, ))
t.start()
'''
红灯亮着的
0 车正在灯红灯
1 车正在灯红灯
2 车正在灯红灯
3 车正在灯红灯
4 车正在灯红灯
绿灯亮了
等待信号才能走
1 车加油门飙车走了
4 车加油门飙车走了
2 车加油门飙车走了
0 车加油门飙车走了
3 车加油门飙车走了
'''
4.4 线程和消息队列
- PriorityQueue
import queue
# 优先级q 你可以给放入队列中的数据设置进出的优先级
q = queue.PriorityQueue(4)
q.put((10, '111'))
q.put((100, '222'))
q.put((0, '333'))
q.put((-5, '444'))
print(q.get()) # (-5, '444')
# put括号内放一个元祖 第一个放数字表示优先级
# 需要注意的是 数字越小优先级越高!!!
五. 进程池与线程池(重要)
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
5.1 需要掌握的
需要掌握的:以下三条
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor #引模块
pool = ProcessPoolExecutor(5) #创建线程池/进程池
pool.submit(task, i).add_done_callback(call_back) #使用进程池 运行 task函数传参数i #add_done_callback(call_back) 并打印结果
name.result() #打印返回值
def call_back(name):
print('>>>',name.result())
"""
无论是开设进程也好还是开设线程也好 是不是都需要消耗资源
只不过开设线程的消耗比开设进程的稍微小一点而已
我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上!!!
硬件的开发速度远远赶不上软件呐
我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它
"""
# 池的概念
"""
什么是池?
池是用来保证计算机硬件安全的情况下最大限度的利用计算机
它降低了程序的运行效率但是保证了计算机硬件的安全 从而让你写的程序能够正常运行
"""
"""
任务的提交方式
同步:提交任务之后原地等待任务的返回结果 期间不做任何事
异步:提交任务之后不等待任务的返回结果 执行继续往下执行
返回结果如何获取???
异步提交任务的返回结果 应该通过回调机制来获取
回调机制
就相当于给每个异步任务绑定了一个定时炸弹
一旦该任务有结果立刻触发爆炸
"""
5.2 线程池
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
# pool = ThreadPoolExecutor(5) # 池子里面固定只有五个线程
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
pool = ProcessPoolExecutor(5)
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
"""
池子造出来之后 里面会固定存在五个线程
这个五个线程不会出现重复创建和销毁的过程
池子造出来之后 里面会固定的几个进程
这个几个进程不会出现重复创建和销毁的过程
池子的使用非常的简单
你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
pool.shutdown() 等待线程池中所有的任务执行完毕之后再继续往下执行
t.result() 线程的返回结果
"""
def task(n):
print(n,os.getpid())
time.sleep(2)
return n**n
def call_back(n):
print('call_back>>>:',n.result())
if __name__ == '__main__':
# pool.submit(task, 1) # 朝池子中提交任务 异步提交
# print('主')
t_list = []
for i in range(20): # 朝池子中提交20个任务
# res = pool.submit(task, i) # <Future at 0x100f97b38 state=running>
res = pool.submit(task, i).add_done_callback(call_back)
# print(res.result()) # result方法 同步提交
# t_list.append(res)
# 等待线程池中所有的任务执行完毕之后再继续往下执行
# pool.shutdown() # 关闭线程池 等待线程池中所有的任务运行完毕
# for t in t_list:
# print('>>>:',t.result()) # 肯定是有序的
"""
程序有并发变成了串行
任务的为什么打印的是None
res.result() 拿到的就是异步提交的任务的返回结果
"""
5.3 进程池
- 和线程用法一样
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
pool = ProcessPoolExecutor(5)
# pool = ThreadPoolExecutor(5) #线程池
def task(n):
print(n,os.getpid())
time.sleep(2)
return n**n
def call_back(n):
print('call_back>>>:',n.result())
if __name__ == '__main__':
for i in range(20): # 朝池子中提交20个任务
pool.submit(task, i).add_done_callback(call_back)
六 协程编程
6.1 协程概念
- 上下文切换和 保存状态
"""
进程:资源单位
线程:执行单位
协程:这个概念完全是程序员自己意淫出来的 根本不存在
单线程下实现并发
我们程序员自己再代码层面上检测我们所有的IO操作
一旦遇到IO了 我们在代码级别完成切换
这样给CPU的感觉是你这个程序一直在运行 没有IO
从而提升程序的运行效率
多道技术
切换+保存状态
CPU两种切换
1.程序遇到IO
2.程序长时间占用
TCP服务端
accept
recv
代码如何做到
切换+保存状态
切换
切换不一定是提升效率 也有可能是降低效率
IO切 提升
没有IO切 降低
保存状态
保存上一次我执行的状态 下一次来接着上一次的操作继续往后执行
yield
"""
import time
# 串行执行计算密集型的任务 1.2372429370880127
def func1():
for i in range(10000000):
i + 1
def func2():
for i in range(10000000):
i + 1
start_time = time.time()
func1()
func2()
print(time.time() - start_time)
----
切换 + yield 2.1247239112854004
import time
def func1():
while True:
10000000 + 1
yield
def func2():
g = func1() # 先初始化出生成器
for i in range(10000000):
i + 1
next(g)
start_time = time.time()
func2()
print(time.time() - start_time)
6.2 gevent模拟协程IO
from gevent import monkey;monkey.patch_all()
import time
from gevent import spawn
"""
gevent模块本身无法检测常见的一些io操作
在使用的时候需要你额外的导入一句话
from gevent import monkey
monkey.patch_all()
又由于上面的两句话在使用gevent模块的时候是肯定要导入的
所以还支持简写
from gevent import monkey;monkey.patch_all()
"""
def heng():
print('哼')
time.sleep(2)
print('哼')
def ha():
print('哈')
time.sleep(3)
print('哈')
def heiheihei():
print('heiheihei')
time.sleep(5)
print('heiheihei')
start_time = time.time()
# ha()
# heng()
# heiheihei()
g1 = spawn(heng)
g2 = spawn(ha)
g3 = spawn(heiheihei)
g1.join()
g2.join()
g3.join()
print(time.time() - start_time) #5.005933046340942 如果不然就是10s
6.3 协程实现TCP服务端并发
- 服务端
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn
def communication(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
def server(ip, port):
server = socket.socket()
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
spawn(communication, conn) #监控io操作
if __name__ == '__main__':
g1 = spawn(server, '127.0.0.1', 8091) ##监控io操作
g1.join()
- 客户端
from threading import Thread, current_thread
import socket
import time
def x_client():
client = socket.socket()
client.connect(('127.0.0.1',8091))
n = 0
while True:
msg = '%s say hello %s'%(current_thread().name,n)
n += 1
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
if __name__ == '__main__':
for i in range(100):
t = Thread(target=x_client)
t.start()
'''
开启100个线程 去向单线程服务端发请求
'''
七. IO模型相关
7.1 IO模型概念
"""
我们这里研究的IO模型都是针对网络IO的
Stevens在文章中一共比较了五种IO Model:
* blocking IO 阻塞IO
* nonblocking IO 非阻塞IO
* IO multiplexing IO多路复用
* signal driven IO 信号驱动IO
* asynchronous IO 异步IO
由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。
"""
同步
异步(回调机制--回去结果)
7.2 阻塞IO模型
#服务端
import socket
server = socket.socket()
server.bind(('127.0.0.1', 8099))
server.listen(5)
while True:
conn, addr = server.accept()
while True:
try:
data = conn.recv(1024)
if len(data) == 0:
break
print(data.decode('utf8'))
conn.send(data.upper())
except Exception as e:
print(e)
break
conn.close()
#客户端
import socket
import time
client = socket.socket()
client.connect(('127.0.0.1',8099))
n = 0
while True:
client.send(b'hello word')
data = client.recv(1024)
time.sleep(1)
n += 1
print(f'{n}' ,data.decode('utf8'))
# 在服务端开设多进程或者多线程 进程池线程池 其实还是没有解决IO问题
该等的地方还是得等 没有规避
只不过多个人等待的彼此互不干扰
7.3 非阻塞IO模型
- server.setblocking(False)
#服务端
import socket
import time
server = socket.socket()
server.bind(('127.0.0.1', 8081))
server.listen(5)
server.setblocking(False) #默认是 True 阻塞IO
# 将所有的网络阻塞变为非阻塞
r_list = []
del_list = []
while True:
try:
conn, addr = server.accept()
r_list.append(conn)
except BlockingIOError:
# time.sleep(0.1)
# print('列表的长度:',len(r_list))
# print('做其他事')
for conn in r_list:
try:
data = conn.recv(1024) # 没有消息 报错
if len(data) == 0: # 客户端断开链接
conn.close() # 关闭conn
# 将无用的conn从r_list删除
del_list.append(conn)
continue
conn.send(data.upper())
except BlockingIOError:
continue
except ConnectionResetError:
conn.close()
del_list.append(conn)
# 挥手无用的链接
for conn in del_list:
r_list.remove(conn)
del_list.clear()
7.4 IO多路复用
"""
当监管的对象只有一个的时候 其实IO多路复用连阻塞IO都比比不上!!!
但是IO多路复用可以一次性监管很多个对象
server = socket.socket()
conn,addr = server.accept()
监管机制是操作系统本身就有的 如果你想要用该监管机制(select)
需要你导入对应的select模块
select.select(read_list, [], []) 读 写 操作
"""
import socket
import select
server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False)
read_list = [server]
while True:
r_list, w_list, x_list = select.select(read_list, [], [])
"""
帮你监管
一旦有人来了 立刻给你返回对应的监管对象
"""
# print(res) # ([<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>], [], [])
# print(server)
# print(r_list)
for i in r_list: #
"""针对不同的对象做不同的处理"""
if i is server:
conn, addr = i.accept()
# 也应该添加到监管的队列中
read_list.append(conn)
else:
res = i.recv(1024)
if len(res) == 0:
i.close()
# 将无效的监管对象 移除
read_list.remove(i)
continue
print(res)
i.send(b'heiheiheiheihei')
# 客户端
import socket
client = socket.socket()
client.connect(('127.0.0.1',8080))
while True:
client.send(b'hello world')
data = client.recv(1024)
print(data)
总结
"""
监管机制其实有很多
select机制 windows linux都有
poll机制 只在linux有 poll和select都可以监管多个对象 但是poll监管的数量更多
上述select和poll机制其实都不是很完美 当监管的对象特别多的时候
可能会出现 极其大的延时响应
epoll机制 只在linux有
它给每一个监管对象都绑定一个回调机制
一旦有响应 回调机制立刻发起提醒
针对不同的操作系统还需要考虑不同检测机制 书写代码太多繁琐
有一个人能够根据你跑的平台的不同自动帮你选择对应的监管机制
selectors模块
"""
7.5 异步IO模型 asyncio
"""
异步IO模型是所有模型中效率最高的 也是使用最广泛的
相关的模块和框架
模块:asyncio模块
异步框架:sanic tronado twisted
速度快!!!
"""
import threading
import asyncio
@asyncio.coroutine
def hello():
print('hello world %s'%threading.current_thread())
yield from asyncio.sleep(1) # 换成真正的IO操作
print('hello world %s' % threading.current_thread())
loop = asyncio.get_event_loop() ##拿到事件池
tasks = [hello(),hello()] #创建任务
loop.run_until_complete(asyncio.wait(tasks)) #运行任务 并全部结束后 继续往下执行
loop.close() #关闭
---
import threading
import asyncio
import time
async def hello():
print('hello world %s'%threading.current_thread())
# 换成真正的IO操作
print('hello world %s' % threading.current_thread())
loop = asyncio.get_event_loop() ##拿到事件池
tasks = [hello(),hello()] #创建任务
loop.run_until_complete(asyncio.wait(tasks)) #运行任务 并全部结束后 继续往下执行
loop.close() #关闭
写法
标签:__,name,Python,编程,并发,线程,time,print,import
From: https://www.cnblogs.com/saas-open/p/17797514.html