(一)IO模型简介
- 目前我们眼睛的IO都是基于网络IO的
- Stevens在文章中一共比较了五种IO Model:
- blocking IO 阻塞IO模型
- nonblocking IO 非阻塞IO模型
- IO multiplexing IO多路复用模型
- signal driven IO ---(忽略)
- asynchronous IO 异步IO模型
- 由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。
- 当一个read操作发生时,该操作会经历两个阶段:
- 1、等待数据准备 (Waiting for the data to be ready)
- 2、将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
- 同步
- 在同步模式下,一个组件必须等待另一个组件完成某个操作,然后再进行下一个操作。
- 异步
- 在异步模式下,组件之间互不干扰,它们可以独立地执行任务。当一个组件完成任务后,它可以直接通知另一个组件继续执行操作。
- 阻塞
- 在阻塞模式下,一个进程或线程在执行某个操作时,如果该操作没有完成,它将立即停止一切操作,并等待操作完成后再继续进行后续操作。
- 非阻塞
- 在非阻塞模式下,一个进程或线程在执行某个操作时,即使该操作没有完成,也会立即返回执行下一个操作,不会出现停滞等待的情况。 如果下一个操作需要依赖于上一个还未完成的操作,它将定期检查上一个操作是否完成,保证这些未完成的操作不会影响后续任务的执行。
- 常见的网络IO
- accept
- recv
- recvfrom
(二)阻塞IO模型
(1)TCP/UDP协议模型
(2)TCP 经典IO
- 客户端
from socket import *
client = socket()
IP = '127.0.0.1'
PORT = 8083
client.connect((IP, PORT))
while True:
msg_to_server = 'hello world'
msg_to_server = msg_to_server.encode('utf-8')
client.send(msg_to_server)
msg_from_server = client.recv(1024)
msg_from_server = msg_from_server.decode('utf-8')
print(msg_from_server)
- 服务端
from socket import *
server = socket()
IP = '127.0.0.1'
PORT = 8082
server.bind((IP, PORT))
server.listen(5)
while True:
conn, addr = server.accept()
while True:
try:
msg_from_client = conn.recv(1024)
if len(msg_from_client) == 0:
break
print(msg_from_client.decode('utf8'))
conn.send(msg_from_client.upper())
except Exception as e:
print(e)
break
conn.close()
- 问题:服务端只能接待一个客户端
(3)解决方案
- 在服务端开始多进程或多线程,进程池线程池
- 但是还没在根本上解决IO问题
- 因为在服务端 虽然解决了 请求数据这部分的IO 但是没有从根本上解决 向内核要数据和拷贝数据给客户端这个IO
(二)非阻塞IO模型
- 将所有的阻塞操作变为非阻塞
(1)TCP 非阻塞经典IO
- 服务端
from socket import *
import time
server = socket()
IP = '127.0.0.1'
PORT = 8083
server.bind((IP, PORT))
server.listen(5)
# 将所有的网络阻塞 变为 非阻塞
# BlockingIOError: [WinError 10035] 无法立即完成一个非阻止性套接字操作。 ---- 表示服务端并没有立即建立链接,导致了资源无法接收
server.setblocking(False)
conn_list = []
del_conn_list = []
while True:
try:
conn, addr = server.accept()
conn_list.append(conn)
# 检测进程资源利用错误
except BlockingIOError as e:
print(f'当前无阻塞,可进行其他工作')
for conn in conn_list:
try:
data_from_client = conn.recv(1024) # 没有消息则报错
if len(data_from_client) == 0:
# 如果接收到的消息为 0 则表示当前链接无效:关闭当前链接
conn.close() # (客户端断开连接)
# 将无用的conn链接从链接列表中删除掉
del_conn_list.append(conn) # (将无用的链接放到无用的链接列表里)
continue
# 链接对象正常:反馈消息给客户端
conn.send(data_from_client.upper())
# 检测进程资源利用错误
except BlockingIOError as e:
# 查看其他链接对象是否有消息发过来
continue
# 检测到客户端断开链接错误
except ConnectionError as e:
# 断开链接
conn.close()
# 将无效的链接添加到废弃列表里
del_conn_list.append(conn)
# 回收无用的链接对象
for conn in del_conn_list:
# 在有效列表内删除无效的链接对象
conn_list.remove(conn)
# 清空无效链接列表
del_conn_list.clear()
- 客户端
from socket import *
client = socket()
IP = '127.0.0.1'
PORT = 8083
client.connect((IP, PORT))
while True:
msg_to_server = 'hello world'
msg_to_server = msg_to_server.encode('utf-8')
client.send(msg_to_server)
msg_from_server = client.recv(1024)
msg_from_server = msg_from_server.decode('utf-8')
print(msg_from_server)
(2)总结
- 虽然非阻塞 IO 看起来十分优秀
- 但是该模型会长时间占用 CPU 而不处理数据(大部分资源都浪费在反复发起请求 请求数据上)
- 实际应用中不会考虑使用非阻塞 IO 模型
- 占用资源严重
(三)IO多路复用
(1)什么是IO多路复用
- 当监管的对象只有一个的时候,其实IO多路复用连阻塞IO的功能都比不上
- 但是IO多路复用一次性可以监管多个对象
- server = socket.socket()
- conn, addr = socket.accept()
- 这个监管机制是由操作系统本身就存在的(select)
- 如果使用需要导入相关的模块(select模块)
(2)基于 select模块解决 IO 阻塞
- 客户端
from socket import *
import select
server = socket()
IP = '127.0.0.1'
PORT = 8083
server.bind((IP, PORT))
server.listen(5)
# 将阻塞模型变为非阻塞模型
server.setblocking(False)
# 将监管目标(谁来发起链接)添加到监管列表
read_list = [server]
while True:
r_list, w_list, x_list = select.select(read_list, [], []) # 检测server端
# 返回值(server对象,[],[]) : ([<socket.socket fd=408, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8083)>], [], [])
# 一旦有链接发起,直接拿到监管链接
# 循环拿到每一个发起的链接对象
for i in r_list:
'''
针对不同的对象进行分类处理
'''
# 如果是服务对象就将其添加到服务列表内,再次确认
if i is server:
# 拿到每一个链接对象
conn, addr = i.accept()
# 将链接对象添加到监管机制中
read_list.append(conn)
else:
# 如果是链接请求就拿到客户端发起的数据
msg_from_client = i.recv(1024)
if len(res) == 0:
# 关闭无效链接
i.close()
# 将无效链接从列表中移除
read_list.remove(i)
continue
msg_from_client = msg_from_client.decode('utf-8')
print(msg_from_client)
# 反馈信息给客户端
i.send(msg_from_client.upper())
- 服务端
from socket import *
client = socket()
IP = '127.0.0.1'
PORT = 8083
client.connect((IP, PORT))
while True:
msg_to_server = 'hello world'
msg_to_server = msg_to_server.encode('utf-8')
client.send(msg_to_server)
msg_from_server = client.recv(1024)
msg_from_server = msg_from_server.decode('utf-8')
print(msg_from_server)
(3)总结
- 监管机制有很多
- select机制
- Windows/Mac 都有
- poll机制
- 只有Linux有
- poll 和 select 机制都可以监管多个对象,但是 poll监管的对象更多
上述两个机制都不是很完美
当监管的对象特别多的时候,可能会出现机器大的延迟响应
- epoll机制
- 只在 Linux 机制有
- 给每个监管对象都绑定了一个回调机制
- 一旦有响应,回调机制会立刻发起提醒
针对不同的操作系统,需要考虑不同的监管机制,太过麻烦
因此有了一个新的模块(selectors模块)
根据平台的不同而自动选择不同的机制
(四)异步IO
- 异步IO 是所有模型中效率最高的,也是使用最广泛的
- 由于涉及到调用操作系统底层,我们只能通过C语言代码才能模拟出效果
- 但是,有相关大佬已经帮我们将相关的方法进行了封装
- 模块
- asyncio 模块
- 异步框架
- sanic
- tronado
- twisted
(1)asyncio 模块安装
pip install asyncio
(2)方法示例
- 单线程实现并发效果
import threading
import asyncio
async def hello():
print(f'hello world statr:>>{threading.current_thread()}')
await asyncio.sleep(1) # 真正的 IO 操作 ---- 交互时操作 / 响应式操作
print(f'hello world end:>>>{threading.current_thread()}')
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
# 单线程实现并发效果
# hello world statr:>><_MainThread(MainThread, started 8005963776)>
# hello world statr:>><_MainThread(MainThread, started 8005963776)>
# hello world end:>>><_MainThread(MainThread, started 8005963776)>
# hello world end:>>><_MainThread(MainThread, started 8005963776)>
(五)IO总结
- 四个IO模型对比