文章目录
- 一、写在最前面
- 1、误区
- 2、IO模型分类
- 3、概念再梳理(重点)
- 二、BIO(Blocking IO)
- 1、客户端
- 2、服务端
- 3、效果展示
- 4、总结
- 三、NIO(NonBlocking IO)
- 1、客户端
- 2、服务端
- 3、效果展示
- 4、总结
- 四、多路复用(Multiplexing)
- 1、客户端
- 2、服务端
- 3、效果展示
- 4、总结
- 五、Netty(基于多路复用IO)
- 1、客户端
- 2、服务端
- 3、效果展示
- 4、总结
- 六、AIO(Asynchronous IO)
- 1、客户端
- 2、服务端
- 3、效果展示
- 4、总结
一、写在最前面
1、误区
在我阅读了网上很多关于NIO和多路复用的文章,发现很多文章的语言存在歧义。如果不信,请你回答下列问题:
1、请问有几种IO模型
答:BIO、NIO、多路复用、信号驱动IO、异步IO
2、请问你如何理解NIO
答:你可能会说对应的select、poll甚至是epoll概念,然后引申到Netty
3、请问你如何理解多路复用
答:你就会把多路复用的基本概念说一遍,甚至还可以引申到redis的多路复用机制
4、请问NIO和多路复用IO的区别是什么
答:。。。
这是发生在我周围的例子,有人甚至给我的回答是——多路复用就是NIO,我也不能说他错,也不能说他对。
2、IO模型分类
主要是网上的博客表述有歧义,导致很多人理不清这几种IO模型的关系。回到第一个问题,有几种IO模型。
在《UNIX网络编程卷1》的第六章对IO模型给BIO、NIO、多路复用、信号驱动IO、异步IO五种IO模型给出了如下的定义:
3、概念再梳理(重点)
不知道你发现没有,NIO中根本没有提及select、poll甚至是epoll,反而是在多路复用中提到了他们。NIO的定义只包含了一个空轮询,会不断的去获取数据。
那为什么网上很多博客还在说NIO含有select呢?
这里一定要区分,我们所说的IO模型,是从网络层面来说的,而很多博客中的NIO是Java规范里面的。这就好比线程有多少种状态?在操作系统层面,你可以说有新建、可运行、运行、阻塞、死亡这五种。但在Java层面,你就会回答New(新创建)、Runnable(可运行)、Blocked(被阻塞)、Waiting(等待)、Tilme waiting(计时等待)、Terminated(已终止)这六种。
同理如果你问的是Java层面的NIO,那么我们就可以要理解为是java.nio这个包下面的代码,这里就可以说出我们NIO的三大组件——Buffer、Channel、Selector。即Java里面的NIO(NIO包下面的代码)就是封装了多路复用的机制。同理Netty是基于网络层面的多路复用机制,是基于Java层面的NIO。
二、BIO(Blocking IO)
1、客户端
@Slf4j
public class BioClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 9099);
// 向服务端发送数据
socket.getOutputStream().write("Hello BioServer".getBytes());
socket.getOutputStream().flush();
log.info("向服务端发送数据结束");
byte[] bytes = new byte[1024];
//接收服务端回传的数据
socket.getInputStream().read(bytes);
log.info("接收到服务端返回的数据:{}", new String(bytes));
socket.close();
}
}
2、服务端
@Slf4j
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9099);
while (true) {
log.info("等待客户端连接");
//阻塞方法
Socket socket = serverSocket.accept();
log.info("客户端连接成功");
new Thread(() -> {
try {
handler(socket);
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}).start();
}
}
private static void handler(Socket socket) throws IOException {
byte[] bytes = new byte[1024];
log.info("开始读取数据");
// 接收客户端的数据,阻塞方法,没有数据可读时就阻塞
int read = socket.getInputStream().read(bytes);
log.info("读取数据完毕");
if (read != -1) {
log.info("接收到客户端的数据:{}", new String(bytes, 0, read));
}
// 向outputStream中回写数据
socket.getOutputStream().write("Hello BioClient".getBytes());
socket.getOutputStream().flush();
}
}
3、效果展示
根据测试结果,我们不难发现,BIO的服务端是阻塞获取新请求。结合代码,没接收到一个新请求,就开启一个线程读取对应的数据。
服务端:
客户端:
4、总结
在BIO模式下,会存在一下问题:
问题一:只能以阻塞的形式读取每一次的请求。
问题二:如果客户端发送的数据为0,那么服务端在读取数据的时候也会阻塞读取,直到获取到数据才会向下执行,即下面这行代码也是阻塞的。
int read = socket.getInputStream().read(bytes);
BIO升级为每接收到一个连接,就开启一个线程
问题三:并发请求10W个连接,此时就会开启10W个线程,资源耗费极大
问题四:和问题二相同,如果其中一个线程迟迟没有获取到数据,这就会导致部分线程卡死
BIO再次升级,升级为一个线程池的使用
问题五:虽然线程池能够解决线程开启和释放的性能损耗,但是依然会出现问题四
BIO 方式适用于连接数目比较小且固定的架构, 这种方式对服务器资源要求比较高, 但程序简单易理解
三、NIO(NonBlocking IO)
1、客户端
@Slf4j
public class NioClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8099));
sc.write(Charset.defaultCharset().encode("Hello NioServer"));
log.info("发送数据成功");
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = sc.read(buffer);
if (read > 0) {
log.info("读取到服务端返回的 {} 长度的数据", read);
}
sc.write(Charset.defaultCharset().encode("Hello NioServer2"));
log.info("再次发送数据成功");
System.in.read(); // 阻塞线程
}
}
2、服务端
@Slf4j
public class NioServer {
public static void main(String[] args) throws IOException, InterruptedException {
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);// accept不会阻塞
ssc.bind(new InetSocketAddress(8099));
List<SocketChannel> channelList = new ArrayList<>();
while (true) {
log.info("开始接收请求");
SocketChannel accept = ssc.accept();
if (accept != null) {
log.info("获取到对应的连接" + accept);
accept.configureBlocking(false); // read不会阻塞
channelList.add(accept);
}
for (SocketChannel channel : channelList) {
log.info("开始读取客户端的数据", channel);
int read = channel.read(buffer);
if (read > 0) {
buffer.flip();
log.info("读取到 {} 长度的数据", read);
ByteBuffer retBuf = ByteBuffer.wrap("Hi NioClient".getBytes());
TimeUnit.SECONDS.sleep(5); // 阻塞线程,便于观察控制台
channel.write(retBuf);
buffer.clear();
log.info("数据读取完成", channel);
}
}
}
}
}
3、效果展示
NIO区别于BIO最大的特点就是是否是阻塞获取请求(一定要区分开Java的NIO包和NIO这个概念)。NIO会不断的轮询获取请求,即控制台中不断的轮询打印开始接受请求,如果接收到数据,就会像控制台那样,直接读取对应的数据。结尾的报错是因为我关闭了客户端的连接
服务端:
客户端:
4、总结
对然NIO不再像BIO那样会阻塞线程,造成网络堵塞,但是它不停的去轮询线程,会造成CPU空转,这也是我们不想看到的。如果大量连接不涉及写数据,但此时这个线程轮询获取写入数据的操作就是无意义的。
四、多路复用(Multiplexing)
1、客户端
多路复用IO的代码稍微要复杂一点,是因为它引入了一个Selector组件,我们常听到的Reactor模式、select、poll、epoll等,都是多路复用IO机制里面的名词,本文侧重案例测试,对这些概念不再进行讲解。
多路复用客户端代码结构为:
- 初始化对应的与服务端的连接,这里的初始化包含初始化channel和Selector,并且完成对应事件和channel的绑定关系
- 开启连接,内部逻辑为依靠selector进行监听对应的事件(这就是我们所说的Reactor事件处理机制),如果监听到对应的连接信息,就根据连接中的事件进行判定,然后再执行不同的业务逻辑
@Slf4j
public class MultioClient {
private Selector selector;
public static void main(String[] args) throws IOException {
MultioClient client = new MultioClient();
client.initClient("127.0.0.1", 9099);
client.connect();
}
/**
* 获得一个Socket通道,并对该通道做一些初始化的工作
*
* @param ip 连接的服务器的ip
* @param port 连接的服务器的端口号
*/
public void initClient(String ip, int port) throws IOException {
// 获得一个Socket通道
SocketChannel channel = SocketChannel.open();
// 设置通道为非阻塞
channel.configureBlocking(false);
// 获得一个通道管理器
this.selector = Selector.open();
// 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
// 用channel.finishConnect() 才能完成连接
log.info("第1步:客户端初始化,发起连接,触发连接事件请求");
channel.connect(new InetSocketAddress(ip, port));
// 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件(连接事件)。
channel.register(selector, SelectionKey.OP_CONNECT);
}
/**
* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
*/
public void connect() throws IOException {
// 轮询访问selector
while (true) {
selector.select();
// 获得selector中选中的项的迭代器
Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 删除已选的key,以防重复处理
it.remove();
// 连接事件发生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
// 如果正在连接,则完成连接
if (channel.isConnectionPending()) {
channel.finishConnect();
}
// 设置成非阻塞
channel.configureBlocking(false);
// 在这里可以给服务端发送信息哦
ByteBuffer buffer = ByteBuffer.wrap("Hello MultServer".getBytes());
log.info("第3步:向服务端发送数据");
channel.write(buffer);
// 在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
channel.register(this.selector, SelectionKey.OP_READ); // 获得了可读的事件
} else if (key.isReadable()) {
log.info("第6步:读取返回的数据");
read(key);
}
}
}
}
/**
* 处理读取服务端发来的信息事件
*
* @param key
*/
public void read(SelectionKey key) throws IOException {
//和服务端的read方法一样
// 服务器可读取消息:得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);
if (len != -1) {
log.info("客户端收到信息:" + new String(buffer.array(), 0, len));
}
}
}
2、服务端
多路复用服务端代码逻辑:
- 同样的先初始化出channel和selector,然后完成事件的注册。需要区分的是客户端的channel是SocketChannel,服务端的channel是ServerSocketChannel。
- 同样的依靠Reactor事件处理机制去监听对应的请求中的事件,然后根据事件的类型做出不同的判定
@Slf4j
public class MultioServer {
public static void main(String[] args) throws IOException {
// 创建一个在本地端口进行监听的服务Socket通道.并设置为非阻塞方式
ServerSocketChannel ssc = ServerSocketChannel.open();
// 必须配置为非阻塞才能往selector上注册,否则会报错,selector模式本身就是非阻塞模式
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(9099));
// 创建一个选择器selector
Selector selector = Selector.open();
// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
log.info("等待事件发生——");
// 轮询监听channel里的key,select是阻塞的,accept()也是阻塞的
int select = selector.select();
log.info("有事件发生——");
// 有客户端请求,被轮询监听到
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
//删除本次已处理的key,防止下次select重复处理
it.remove();
handle(key);
}
}
}
/**
* @param key 对应的事件key
*/
private static void handle(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
log.info("第2步:有客户端[连接事件]发生");
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// NIO非阻塞体现:此处accept方法是阻塞的,但是这里因为是发生了连接事件,所以这个方法会马上执行完,不会阻塞
// 处理完连接请求不会继续等待客户端的数据发送
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 通过Selector监听Channel时对读事件感兴趣
sc.register(key.selector(), SelectionKey.OP_READ);
} else if (key.isReadable()) {
log.info("第4步:有客户端数据[可读事件]发生");
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// NIO非阻塞体现:首先read方法不会阻塞,其次这种事件响应模型,当调用到read方法时肯定是发生了客户端发送数据的事件
int len = sc.read(buffer);
if (len != -1) {
log.info("读取到客户端发送的数据,可进行相应的业务处理:",new String(buffer.array(), 0, len));
}
// 向客户端写入数据
ByteBuffer bufferToWrite = ByteBuffer.wrap("Hello MultClient".getBytes());
sc.write(bufferToWrite);
// 会触发key.isWritable() 或 key.isReadable()的条件
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
log.info("第5步:有客户端数据[写入事件]发生了");
SocketChannel sc = (SocketChannel) key.channel();
// NIO事件触发是水平触发
// 使用Java的NIO编程的时候,在没有数据可以往外写的时候要取消写事件,
// 在有数据往外写的时候再注册写事件
key.interestOps(SelectionKey.OP_READ);
}
}
}
3、效果展示
代码执行逻辑为:
- 首先客户端会初始化,然后去连接服务端
- 服务端接收到对应的连接事件后
- 然后客户端向服务端写入数据
- 此时服务端接收到客户端发送过来的连接,触发可读事件(key.isReadable())
- 读取完数据之后,服务端准备向客户端写入数据,此时触发服务端的可写事件,没有数据可以写入的时候取消写事件
- 最终客户端收到服务端写会的数据
客户端:
服务端:
4、总结
多路复用其实是一个相对完美的解决方案,即采用了Reactor模型,当对应的读、写、连接等事件触发的时候,再触发对应的接收请求的逻辑,然后完成后续的业务代码。既不会造成空轮询无效请求,也不会阻塞网络。
正式基于种种特点,Netty就在该IO模型的基础上进行封装。
五、Netty(基于多路复用IO)
1、客户端
Netty客户端代码如下,代码基本骨架都差不多
- 船舰对应的group,你可以理解为用来接收请求和分配工作线程的线程池组
- 创建对应的channel
- 创建需要对数据进行处理的handler。handler分为两大类:ChannelInboundHandler和ChannelOutboundHandler,你可以理解为是对管道中数据入口方向和出口方向数据的处理。
- 连接对应的服务端地址
- sync阻塞住线程,连接好后再继续向下执行,否则就是异步进行连接
@Slf4j
public class netty客户端 {
public static void main(String[] args) throws InterruptedException, IOException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder()); // inputStream
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 接收响应消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("第4步:netty客户端接收到服务端返回的数据为:{}", msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("第1步:客户端启动触发事件");
ctx.writeAndFlush("Hello NettyServer");
}
});
}
})
.connect(new InetSocketAddress("localhost", 8099))
.sync();
}
}
2、服务端
服务端代码格式和客户端差不多,最开始创建的Bootstrap对象变成了ServerBootstrap。同样的,核心逻辑在initChannel方法里面。
- StringEncoder和StringDecoder为一个编解码器,我调用的writeAndFlush方法是直接写入的字节,需要在服务端和客户端分别对该字节进行相同格式的编解码工作,才能够收到对应的数据信息
@Slf4j
public class netty服务端 {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("第2步:服务端接收消息:{}", msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("第3步:服务端接收到消息后进行业务处理");
ctx.channel().writeAndFlush("Hello NettyClient");
}
});
}
})
.bind(8099);
}
}
3、效果展示
服务端:
客户端:
4、总结
当你熟悉了Netty代码的编写风格之后,你会发现,我们想要对数据进行处理,只需要编写对应的ChannelHandler,其他的额外的关于数据的解析、读取等繁琐操作都被Netty封装了,即开发人员能够更加专注于业务开发,而非投身于繁琐的字节解析工作中。
想了解Netty一些细节概念的,可以参考:浅谈Netty机制
六、AIO(Asynchronous IO)
1、客户端
对应的Channel变成了AsynchronousSocketChannel
@Slf4j
public class AIOClient {
public static void main(String... args) throws Exception {
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000)).get();
socketChannel.write(ByteBuffer.wrap("Hello AioServer".getBytes()));
ByteBuffer buffer = ByteBuffer.allocate(512);
Integer len = socketChannel.read(buffer).get();
if (len != -1) {
log.info("客户端收到信息:" + new String(buffer.array(), 0, len));
}
}
}
2、服务端
@Slf4j
public class AIOServer {
public static void main(String[] args) throws Exception {
final AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
try {
// 再此接收客户端连接,如果不写这行代码后面的客户端连接连不上服务端
serverChannel.accept(attachment, this);
log.info(socketChannel.getRemoteAddress().toString());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
log.info(new String(buffer.array(), 0, result));
socketChannel.write(ByteBuffer.wrap("Hello AioClient".getBytes()));
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
3、效果展示
演示的效果也很好理解,就是我们常规的理解的异步获取到数据。
客户端:
服务端:
4、总结
异步非阻塞, 由操作系统完成后回调通知服务端程序启动线程去处理, 一般适用于连接数较多且连接时间较长的应用,JDK7 开始支持。
在Linux系统上,AIO的底层实现仍使用Epoll,没有很好实现AIO,因此在性能上没有明显的优势,而且被JDK封装了一层不容易深度优化,Linux上AIO还不够成熟。Netty是异步非阻塞框架,Netty在JDK的NIO上做了很多异步的封装。
BIO | NIO | AIO | |
IO模型 | 同步阻塞 | 同步非阻塞(多路复用) | 异步非阻塞 |
编程难度 | 简单 | 复杂 | 复杂 |
可靠性 | 差 | 好 | 好 |
吞吐量 | 低 | 高 | 高 |