并发编程
1. 任务调度算法介绍
1.1 进程的调度问题
# CPU 调度进程
# 什么是调度算法
# 要想多个进程交替运行
# 操作系统必须对这些进程进行调度
# 这个调度也不是随即进行的,而是需要遵循一定的法则
# 由此就有了进程的调度算法。
# 先来先服务算法
# (1)理论
# 先来先服务(FCFS)调度算法是一种最简单的调度算法
# (2)适用场景
# 该算法既可用于作业调度,也可用于进程调度。
# FCFS算法比较有利于长作业(进程),而不利于短作业(进程)。
# 由此可知,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型的作业(进程)。
# 短作业优先调度算法
# (1)理论
# 短作业(进程)优先调度算法(SJ/PF)是指对短作业或短进程优先调度的算法
# (2)适用场景
# 该算法既可用于作业调度,也可用于进程调度。
# 但其对长作业不利;
# 不能保证紧迫性作业(进程)被及时处理;
# 作业的长短只是被估算出来的。
# 时间片轮转法
# (1)理论
# 时间片轮转(Round Robin,RR)法的基本思路是让每个进程在就绪队列中的等待时间与享受服务的时间成比例。
# 在时间片轮转法中,需要将CPU的处理时间分成固定大小的时间片
# 例如,几十毫秒至几百毫秒。
# 如果一个进程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放自己所占有的CPU而排到就绪队列的末尾,等待下一次调度。
# 同时,进程调度程序又去调度当前就绪队列中的第一个进程。
# 显然,轮转法只能用来调度分配一些可以抢占的资源。
# 这些可以抢占的资源可以随时被剥夺,而且可以将它们再分配给别的进程。CPU是可抢占资源的一种。
# (2)适用场景
# 由于作业调度是对除了CPU之外的所有系统硬件资源的分配,其中包含有不可抢占资源,所以作业调度不使用轮转法。
# 在轮转法中,时间片长度的选取非常重要。
# 首先,时间片长度的选择会直接影响到系统的开销和响应时间。
# 如果时间片长度过短,则调度程序抢占处理机的次数增多。
# 这将使进程上下文切换次数也大大增加,从而加重系统开销。
# 反过来,如果时间片长度选择过长
# 例如,一个时间片能保证就绪队列中所需执行时间最长的进程能执行完毕
# 则轮转法变成了先来先服务法。
# 时间片长度的选择是根据系统对响应时间的要求和就绪队列中所允许最大的进程数来确定的。
# 在轮转法中,加入到就绪队列的进程有3种情况:
# 一种是分给它的时间片用完,但进程还未完成,回到就绪队列的末尾等待下次调度去继续执行。
# 另一种情况是分给该进程的时间片并未用完,只是因为请求I/O或由于进程的互斥与同步关系而被阻塞。当阻塞解除之后再回到就绪队列。
# 第三种情况就是新创建进程进入就绪队列。
# 如果对这些进程区别对待,给予不同的优先级和时间片从直观上看,可以进一步改善系统服务质量和效率。
# 例如,我们可把就绪队列按照进程到达就绪队列的类型和进程被阻塞时的阻塞原因分成不同的就绪队列,每个队列按FCFS原则排列,各队列之间的进程享有不同的优先级,但同一队列内优先级相同。
# 这样,当一个进程在执行完它的时间片之后,或从睡眠中被唤醒以及被创建之后,将进入不同的就绪队列。
# 多级反馈队列
# (1)理论
# 前面介绍的各种用作进程调度的算法都有一定的局限性。
# 如短进程优先的调度算法,仅照顾了短进程而忽略了长进程,而且如果并未指明进程的长度,则短进程优先和基于进程长度的抢占式调度算法都将无法使用。
# 而多级反馈队列调度算法则不必事先知道各种进程所需的执行时间,而且还可以满足各种类型进程的需要,因而它是目前被公认的一种较好的进程调度算法。
# (2)调度算法的实施过程
# 在采用多级反馈队列调度算法的系统中,调度算法的实施过程如下所述。
# [1]为多个就绪队列设置优先级
# 应设置多个就绪队列,并为各个队列赋予不同的优先级。
# 第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个降低。
# 该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每个进程所规定的执行时间片就愈小。
# 例如,第二个队列的时间片要比第一个队列的时间片长一倍,……,第i+1个队列的时间片要比第i个队列的时间片长一倍。
# [2]新进程等待调用
# 当一个新进程进入内存后
# 首先将它放入第一队列的末尾,按FCFS原则排队等待调度。
# 当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;
# 如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地按FCFS原则等待调度执行;
# 如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,如此下去,当一个长作业(进程)从第一队列依次降到第n队列后,在第n 队列便采取按时间片轮转的方式运行。
# [3]按顺序调度队列
# 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;
# 仅当第1~(i-1)队列均空时,才会调度第i队列中的进程运行。
# 如果处理机正在第i队列中为某进程服务时,又有新进程进入优先权较高的队列(第1~(i-1)中的任何一个队列)
# 则此时新进程将抢占正在运行进程的处理机
# 即由调度程序把正在运行的进程放回到第i队列的末尾
# 把处理机分配给新到的高优先权进程。
2.并发串行并行
- 串行,按照顺序一个个执行
- 并发,感觉上是并行,其实是穿行
- 并行,同时运行
- 并行一定是并发,但是并发不一定是并行
3.同步/异步/阻塞/非阻塞
- 同步
# 就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。
# 按照这个定义,其实绝大多数函数都是同步调用。
# 但是一般而言,我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。
- 异步
# 异步的概念和同步相对。
# 当一个异步功能调用发出后,调用者不能立刻得到结果。
# 当该异步功能完成后,通过状态、通知或回调来通知调用者。
# 如果异步功能用状态来通知
# 那么调用者就需要每隔一定时间检查一次。
# 如果是使用通知的方式效率则很高
# 因为异步功能几乎不需要做额外的操作,至于回调函数,其实和通知没太多区别。
- 阻塞
# 阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。
# 函数只有在得到结果之后才会将阻塞的线程激活。
# 有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。
# 对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
# 同步调用
# apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态)
# 阻塞调用
# 当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。
- 非阻塞
# 非阻塞和阻塞的概念相对应
# 指在不能立刻得到结果之前也会立刻返回
# 同时该函数不会阻塞当前线程。
# 同步和异步针对的是函数/任务的调用方式
# 同步是指当一个进程发起一个函数调用的时候一直等待结果的返回
# 异步是指当一个进程发起一个函数调用的时候,不需要一直等待结果的返回而且是可以继续其他的任务
# 可以通过其他手段得到这个异步进程的结果
# 阻塞和非阻塞针对的是进程或线程(已经跑起来的函数/任务)
# 阻塞遇到了一定的IO,需要事件处理,这时候就会将进程或线程挂起
# 非阻塞,跟阻塞相反,没有遇到IO
4.进程的创建和状态
- 进程是计算机中资源分配的最小单元;一个进程中可以有多个线程,同一个进程中的线程共享资源
- 进程与进程之间则是相互隔离
- Python中通过多进程可以利用CPU的多核优势,计算密集型操作适用于多进程
# 主要分为4种形式创建新的进程
# 通用系统创建新进程的4种形式
# (1)系统初始化
# (2)进程中开启子进程
# (3)交互式请求
# (4)批处理作业的初始化
# 什么是进程的状态
# 进程状态反映进程执行过程的变化。
# 这些状态随着进程的执行和外界条件的变化而转换。
# 三态模型
# 在三态模型中,进程状态分为三个基本状态,
# 即运行态,
# 就绪态,
# 阻塞态。
# 五态模型
# 在五态模型中,进程分为
# 新建态,终止态,运行态,就绪态,阻塞态
5.创建多进程的方式
# 【1】Process类的参数介绍
# Process([group [, target [, name [, args [, kwargs]]]]])
# group参数未使用,值始终为None
# target 是指我们要开启多线程的哪个函数名
# args 多线程对应的函数中需要传入的参数
# 【2】Process类的方法介绍
# p.start():启动进程,并调用该子进程中的p.run()
# p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
# p.is_alive():如果p仍然运行,返回True
# p.join([timeout]):
# 主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。
# timeout是可选的超时时间
# 需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
# 多进程的创建方式
# 导入模块
import multiprocessing
def run_task(i):
print(f'当前的参数为 :>>>> {i}')
# 创建方式一
def main_first():
for i in range(1, 10):
# 创建进程对象
task = multiprocessing.Process(target=run_task, args=(i,))
# 启动进程
task.start()
class MyProcess(multiprocessing.Process):
def __init__(self, i):
super().__init__()
self.i = i
def run(self) -> None:
run_task(i=self.i)
# print(f'这是谁 :>>>> {self} | 参数为 :>>>> {self.i}')
def main_second():
for i in range(1, 10):
task = MyProcess(i=i)
task.start()
if __name__ == '__main__':
# main_first()
main_second()
6.进程之间的数据是相互隔离的
- 进程是资源分配的最小单元,每个进程中都维护自己独立的数据,不共享
import multiprocessing
number = 99
def run_task(i):
global number
number += 1
print(f'这是子进程 {i} :>>>> {number}')
def normal_main():
run_task(i=1)
run_task(i=2)
run_task(i=3)
# 这是子进程 1 :>>>> 100
# 这是子进程 2 :>>>> 101
# 这是子进程 3 :>>>> 102
def main_first():
for i in range(1, 5):
task = multiprocessing.Process(target=run_task, args=(i,))
task.start()
if __name__ == '__main__':
main_first()
# 这是子进程 1 :>>>> 100
# 这是子进程 2 :>>>> 100
# 这是子进程 3 :>>>> 100
# 这是子进程 4 :>>>> 100
7.多进程实现TCP服务端并发
【1】服务端
import socket
from socket import SOL_SOCKET, SO_REUSEADDR
import multiprocessing
server = socket.socket()
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 9999))
server.listen()
def talk(conn, addr):
while True:
try:
msg_from_client = conn.recv(1024)
msg_from_client = msg_from_client.decode('utf8')
print(f'这是来自 :>>> {addr} | 信息是 :>>>> {msg_from_client}')
msg_from_client = msg_from_client.upper()
conn.send(msg_from_client.encode('utf8'))
except Exception as e:
print(f'这是来自 :>>> {addr} | 错误是 :>>>> {e}')
break
def main_multyprocess():
while True:
conn, addr = server.accept()
task = multiprocessing.Process(target=talk, args=(conn, addr))
task.start()
if __name__ == '__main__':
main_multyprocess()
【2】客户端
import socket
client = socket.socket()
client.connect(('127.0.0.1', 9999))
while True:
msg = input("请输入小写字母:>>> ").strip()
if not msg: continue
client.send(msg.encode('utf8'))
msg_from_server = client.recv(1024)
print(f'这是来自 服务端的数据 :>>>>{msg_from_server.decode("utf8")}')
8.Process类的 join 方法
import multiprocessing
import time
def run_task(i):
print(f"这是参数 {i} 开始!")
# 模拟IO阻塞
time.sleep(2)
print(f"这是参数 {i} 结束!")
def timer(func):
def inner(*args, **kwargs):
start_time = time.time()
res = func(*args, **kwargs)
print(f"当前程序 {func.__name__} 总耗时 :>>>> {time.time() - start_time} s")
return res
return inner
# @timer
def main_first():
for i in range(1, 5):
# 创建进程对象
p_task = multiprocessing.Process(target=run_task, args=(i,))
# 启动进程对象
p_task.start()
# 没有 join 方法的时候,处于并行状态
# 先将主进程启动起来,但是没有按照顺序拿到子进程的结果
# @timer
def main_second():
for i in range(1, 5):
# 创建进程对象
p_task = multiprocessing.Process(target=run_task, args=(i,))
# 启动进程对象
p_task.start()
# 在启动后立马加上 join ---> 串行
p_task.join()
def main_third():
# 创建一个进程的空列表
task_list = []
for i in range(1, 5):
# 创建进程对象
p_task = multiprocessing.Process(target=run_task, args=(i,))
# 启动进程对象
p_task.start()
# 将启动后的子进程扔到列表里
task_list.append(p_task)
# 逐个遍历每一个子进程
for p_task in task_list:
# 启动阻塞
p_task.join()
# 我先启动主进程
# 接着启动每一个子进程
# 主进程等待所有子进程结束后再结束子进程
# 在 main 函数里面启动多进程
if __name__ == '__main__':
start_time = time.time()
# 主进程 开始时间--运行程序--结束时间
# main_first() # 总耗时 :>>>> 0.08754086494445801 s
# main_second() # 总耗时 :>>>> 8.487617254257202 s
main_third() # 总耗时 :>>>> 2.18369460105896 s
print(f"总耗时 :>>>> {time.time() - start_time} s")
9.多进程io模版
# 【一】导入模块 multiprocessing
import multiprocessing
import time
# 【二】有一个可能会产生IO阻塞的进程
def run_task(i):
print(f"子进程开始 {i}")
# 模拟IO 阻塞,实际上是没有的
time.sleep(2)
print(f"子进程结束 {i}")
# 【三】开始多进程
def main():
# 【1】创建一个空列表:存储每一个启动的子进程
task_list = []
# 【2】模拟有很多个进程
for i in range(1, 5):
# (1)创建进程对象
process_obj = multiprocessing.Process(target=run_task, args=(i,))
# (2)启动进程
process_obj.start()
# (3)加入到进程列表中
task_list.append(process_obj)
# 【3】遍历每一个子进程,等待IO结束
for process_obj in task_list:
process_obj.join()
# 【四】启动多进程,将多进程放在 主函数 main 下面
if __name__ == '__main__':
main()
10. 方法补充
import multiprocessing
from multiprocessing import current_process
import time
import os
# 进程号
# 【1】什么是进程号?
# 计算机会给每一个运行的进程分配一个PID号
# 【2】如何查看
# Windows系统 CMD 命令行 tasklist 即可查看
# Mac系统 终端运行 ps aux 即可查看
# 【3】如何根据指定进程号查看进程
# Mac系统
# 终端运行 ps aux|grep PID 即可查看
# Windows系统
# CMD 命令行 tasklist |findstr PID 即可查看
def run_task(i):
print(f"这是参数 {i} 开始!")
# 【1】查看当前进程的ID
print(f"这是参数 {i} 的进程ID current_process() :>>> {current_process().pid}!")
print(f"这是参数 {i} 的进程ID os.getpid() :>>> {os.getpid()}!")
# 【2】查看父进程的ID
print(f"这是参数 {i} 的父进程ID os.getpid() :>>> {os.getppid()}!")
# 模拟IO阻塞
time.sleep(2)
print(f"这是参数 {i} 结束!")
def main_third():
# 创建一个进程的空列表
task_list = []
for i in range(1, 5):
# 创建进程对象
p_task = multiprocessing.Process(target=run_task, args=(i,))
# 启动进程对象
p_task.start()
# 先启动进程才能杀死当前进程
if i == 2:
p_task.terminate()
# 将启动后的子进程扔到列表里
task_list.append(p_task)
# 逐个遍历每一个子进程
for p_task in task_list:
print(f'当前进程 join 之前 是否存活 :>>>> {p_task.is_alive()}')
# 启动阻塞
p_task.join()
print(f'当前进程 join 之后 是否存活 :>>>> {p_task.is_alive()}')
# 在 main 函数里面启动多进程
if __name__ == '__main__':
start_time = time.time()
print(f'这是主进程的ID :>>>> {current_process().pid}')
main_third()
print(f"总耗时 :>>>> {time.time() - start_time} s")
11.僵尸进程和孤儿进程
# 【一】僵尸进程
# 对于子进程来说:子进程死亡的时候,但是他的这部分资源却没有被回收掉
# 这种现象对于这个死掉的子进程来说就是僵尸进程
# 【二】孤儿进程
# 子进程对于父进程来说:父进程死亡,子进程也应该跟这个死亡,父进程死了,但是子进程没死,
# init 进程 接收掉所有父进程死亡而子进程未死亡的继承,负责回收掉子进程的资源
# 【三】僵尸进程的危害大于孤儿进程
# 僵尸进程会占用大部分的资源,并且这部分资源没人来处理
# 孤儿进程虽然父进程没了,但是 init 进程会将这部分资源释放掉
12.守护进程
# 【一】守护进程
# 守护进程 (daemon) 是在计算机系统启动时就已经运行,并且一直在后台运行的一类特殊进程。
# 它们通常不与用户直接交互,也不接受标准输入和输出,而是在后台执行某种任务或提供某种服务。
# 守护进程往往是由系统管理员手动启动的,它们可以在系统启动时自动启动,一直运行在后台,直到系统关闭或被停止。
# 常见的守护进程包括网络服务 (如 web 服务器、邮件服务器、 ftp 服务器等)、日志记录系统 (如系统日志服务、应用程序日志服务等) 等。
# 守护进程通常在后台运行,不需要用户交互,并且有较高的权限,因此编写守护进程需要特别注意安全性和稳定性。
# 【二】父进程死亡了,但是子进程未死亡
'''
import multiprocessing
import time
def task(i):
print(f'总管 :>>>> {i} :>>>> 存活')
time.sleep(2)
print(f'总管 :>>>> {i} :>>>> 死亡')
if __name__ == '__main__':
name = 'serein'
print(f'当前皇帝 {name} :>>>> 执掌江山!')
process = multiprocessing.Process(target=task, args=('formerly',))
process.start()
print(f'当前皇帝 {name} :>>>> 寿终正寝!')
# 当前皇帝 serein :>>>> 执掌江山!
# 当前皇帝 serein :>>>> 寿终正寝!
# 总管 :>>>> formerly :>>>> 存活
# 总管 :>>>> formerly :>>>> 死亡
'''
import multiprocessing
import time
def task(i):
print(f'总管 :>>>> {i} :>>>> 存活')
time.sleep(2)
print(f'总管 :>>>> {i} :>>>> 死亡')
if __name__ == '__main__':
name = 'serein'
print(f'当前皇帝 {name} :>>>> 执掌江山!')
process_formerly = multiprocessing.Process(target=task, args=('formerly',))
# 守护进程 , 要放在 start 之前加
# 给哪个子进程加上守护进程,哪个子进程就会随着主进程死亡而死亡
process_formerly.daemon = True
process_formerly.start()
print(f'当前皇帝 {name} :>>>> 寿终正寝!')
# 当前皇帝 serein :>>>> 执掌江山!
# 当前皇帝 serein :>>>> 寿终正寝!
13.进程锁(互斥锁)
import json
# 【一】什么是互斥锁
# 互斥锁(Mutex)是一种用于多进程编程中控制对共享资源访问的机制。
# 其作用是保证在同一时刻只有一个线程在访问共享资源,从而避免多个线程同时读写数据造成的问题。
# 互斥锁的基本原理是在对共享资源进行访问前加锁,使得其他线程无法访问该资源,当访问完成后再解锁,使得其他线程可以进行访问。
# 通过这种方式,可以保证同一时间只有一个线程在执行关键代码段,从而保证了数据的安全性。
# 需要注意的是,互斥锁会带来一些额外的开销,
from multiprocessing import Process, Lock
import time
import os
# 【二】多个进程共享同一打印终端
# 【1】未加锁
'''
def work(lock):
# 进去之前 把门锁起来
lock.acquire()
print(f'这是进程 :>>>> {os.getpid()} 进程开始')
# 模拟 IO 操作
time.sleep(2)
print(f'这是进程 :>>>> {os.getpid()} 进程结束')
# 走之后把锁放开
lock.release()
if __name__ == '__main__':
# 声明一把锁
lock = Lock()
for i in range(1, 5):
process = Process(target=work, args=(lock,))
process.start()
# 总结 : 虽然加锁让我们的程序变成了串行,但是对于整个程序来说,安全
# 以时间换空间
'''
# 【三】多个进程共享同一个文件
'''
BASE_DIR = os.path.dirname(__file__)
file_name = 'data.json'
file_path = os.path.join(BASE_DIR, file_name)
# 【一】初始化票数
def save_data(data=None):
if not data:
data = {'ticket_number': 2}
with open(file_path, mode='w') as fp:
json.dump(data, fp)
# 【二】获取票数信息
def get_ticket_number():
with open(file_path, 'r') as fp:
data = json.load(fp)
return data
# 【三】查看票数
def search_ticket_number(name):
ticket_data = get_ticket_number()
print(f'当前用户 {name} 正在查询余票 {ticket_data.get("ticket_number")} ')
# 【四】买票
def buy_ticket(name):
# 获取票数信息
ticket_data = get_ticket_number()
ticket_number = ticket_data.get("ticket_number")
# 模拟网络延迟
time.sleep(2)
# 【2】买票
if ticket_number > 0:
# 把票买走,减少库存
ticket_data['ticket_number'] -= 1
# 存到文件里
save_data(data=ticket_data)
print(f'当前用户 :>>>> {name} 购票成功!')
else:
print(f"当前用户 :>>>> {name} 已无余票!")
def main(name):
# 【1】先让他查票
search_ticket_number(name=name)
# 【2】买票
buy_ticket(name=name)
if __name__ == '__main__':
save_data()
task_list = []
for i in range(1, 5):
process_obj = Process(target=main, args=(i,))
process_obj.start()
task_list.append(process_obj)
# join 等待
for process_obj in task_list:
process_obj.join()
'''
BASE_DIR = os.path.dirname(__file__)
file_name = 'data.json'
file_path = os.path.join(BASE_DIR, file_name)
# 【一】初始化票数
def save_data(data=None):
if not data:
data = {'ticket_number': 2}
with open(file_path, mode='w') as fp:
json.dump(data, fp)
# 【二】获取票数信息
def get_ticket_number():
with open(file_path, 'r') as fp:
data = json.load(fp)
return data
# 【三】查看票数
def search_ticket_number(name):
ticket_data = get_ticket_number()
print(f'当前用户 {name} 正在查询余票 {ticket_data.get("ticket_number")} ')
# 【四】买票
def buy_ticket(name):
# 获取票数信息
ticket_data = get_ticket_number()
ticket_number = ticket_data.get("ticket_number")
# 模拟网络延迟
time.sleep(2)
# 【2】买票
if ticket_number > 0:
# 把票买走,减少库存
ticket_data['ticket_number'] -= 1
# 存到文件里
save_data(data=ticket_data)
print(f'当前用户 :>>>> {name} 购票成功!')
else:
print(f"当前用户 :>>>> {name} 已无余票!")
def main(name, lock):
# 【1】先让他查票
search_ticket_number(name=name)
# 买票前加锁
lock.acquire()
# 【2】买票
buy_ticket(name=name)
# 买票后释放
lock.release()
if __name__ == '__main__':
save_data()
lock = Lock()
task_list = []
for i in range(1, 5):
process_obj = Process(target=main, args=(i, lock))
process_obj.start()
task_list.append(process_obj)
# join 等待
for process_obj in task_list:
process_obj.join()
标签:__,task,name,队列,编程,并发,进程,之多,ticket
From: https://www.cnblogs.com/Formerly/p/17971135