八、AIO编程
8.1 AIO编程简介
8.1.1 AIO编程概述
AIO也叫异步非阻塞,JDK1.7之后的新特性,AIO引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
与NIO模型不同,读写操作为例,只需直接调用read和write的API即可,这方法都是异步的对于读操作,当有流可读时,系统会将可读的流传入到read方法的缓冲区,并通知应用程序读写都是异步的,完成之后会主动调用回调函数;
AIO需要操作系统的支持,在Linux内核2.6版本之后增加了对真正异步IO的实现。Java从JDK1.7之后支持AIO,JDK1.7新增一些与文件/网络IO相关的一些API,称之为NIO2.0或者称之为AIO(Asynchronous IO)。AIO最大的特征提供了异步功能,对于socket网络通信和文件IO都是起作用的。
8.1.2 AIO与其他IO的区别
AIO是异步非阻塞IO,AIO实现了真正意义上的异步处理。
- 异步:当在处理一个请求的同时,还可以同时处理其他请求,并且当请求处理完毕时(开始响应数据),基于事件回调的方式将数据响应给调用者,调用者不需要再次来到提供者这边读取数据,而是直接读取调用者中用户空间中的数据,完完全全的做到了异步处理;
- 非阻塞:在AIO中,调用任何的请求连接、读取提供者数据等方法都不会造成当前线程阻塞,而是将请求告诉给了服务提供者,自己可以去处理其他任务;服务提供者接收到请求后,实现自身的业务逻辑处理,然后将数据数据发送给调用者;
BIO(同步阻塞):
NIO(同步非阻塞):
AIO(同步非阻塞):
- BIO同步阻塞:到理发店理发,理发店人多,就一直等理发师,知道轮到自己理发
- NIO同步非阻塞:到理发店理发,发现理发店人很多,就先告诉理发师说一会再来,自己先去干其它事情,一会再回来看看是否轮到了自己
- AIO异步非阻塞:给理发师打电话,让理发师上门服务,自己干其它事情,理发师自己来家里理发
8.2 实现AIO编程
在Java中,实现AIO编程的主要有AsynchronousFileChannel、AsynchronousServerSocketChannel、AsynchronousSocketChannel等三个异步通道类;使用read、accept等阻塞方法时,异步通道则是基于事件回调的方式来处理阻塞问题;
即:当调用阻塞方法时不再阻塞当前线程,并且当数据提供者响应数据时将,数据请求方将会自动将操作系统内核所获取到的数据复制到用户空间,供用户提供者使用;
8.2.1 AIO基于文件编程
1)基于Future-读
- 示例代码:
package com.aio.demo01_asynchronousFileChannel;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future;
/**
* @author lscl
* @version 1.0
* @intro:
*/
public class Demo01_通过Future读取数据 {
public static void main(String[] args) throws Exception {
Path path = Paths.get("001.txt");
// 创建一个异步的FileChannel对象,设置为读模式
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
// 创建Buffer来接收AsynchronousFileChannel的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 进行异步读取
Future<Integer> future = fileChannel.read(buffer, 0);
/*
如果isDone()返回true,代表内核空间已经准备好了数据,并且已经将内核的数据拷贝到用户空间了
返回false,则代表内核空间暂时没有数据,或者数据还没有从内核拷贝到用户空间
*/
while (!future.isDone()) ;
// limit=position,position=0
buffer.flip();
System.out.println(new String(buffer.array()));
// position=0,limit=capacity
buffer.clear();
// 可以获取到读取的字节数
Integer count = future.get();
System.out.println("读取完毕,共读取到【" + count + "】个字节");
}
}
2)基于CompletionHandler-读
- 示例代码:
package com.aio.demo01_asynchronousFileChannel;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
/**
* @author lscl
* @version 1.0
* @intro:
*/
public class Demo02_通过CompletionHandler读取数据 {
public static void main(String[] args) throws Exception {
Path path = Paths.get("001.txt");
// 创建一个异步的FileChannel对象,设置为读模式
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 进行异步读取
fileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
/**
* 读取完毕时调用的方法
* @param result: 读取的字节数量
* @param attachment: 附着的信息,该参数等于fileChannel调用read时传递的buffer(第三个参数的buffer)
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
// true
System.out.println(buffer == attachment);
// limit=position,position=0
buffer.flip();
System.out.println(new String(buffer.array()));
// position=0,limit=capacity
buffer.clear();
}
/**
* 读取失败时调用的方法
* @param exc: 出现的异常
* @param attachment: 外面传递的那个buffer对象
*/
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
System.out.println("程序执行完毕...");
}
}
3)基于Future-写
- 示例代码:
package com.aio.demo01_asynchronousFileChannel;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future;
/**
* @author lscl
* @version 1.0
* @intro:
*/
public class Demo03_通过Future写取数据 {
public static void main(String[] args) throws Exception {
Path path = Paths.get("001.txt");
// 创建一个AsynchronousFileChannel,对与该类来说,每次运行不会清空文件,而是每次运行都会把本次的内容覆盖文件之前的内容
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(
path,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING // 如果文件里面有内容那么就清空数据
);
ByteBuffer buffer = ByteBuffer.wrap("abc".getBytes());
Future<Integer> future = fileChannel.write(buffer, 0);
System.out.println("写出【" + future.get() + "】个字节");
fileChannel.close();
}
}
4)基于CompletionHandler-写
- 示例代码:
package com.aio.demo01_asynchronousFileChannel;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
/**
* @author lscl
* @version 1.0
* @intro:
*/
public class Demo04_通过CompletionHandler写取数据 {
public static void main(String[] args) throws Exception {
Path path = Paths.get("001.txt");
// 创建一个AsynchronousFileChannel,对与该类来说,每次运行不会清空文件,而是每次运行都会把本次的内容覆盖文件之前的内容
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(
path,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING // 如果文件里面有内容那么就清空数据
);
ByteBuffer buffer = ByteBuffer.wrap("abc".getBytes());
fileChannel.write(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("写出【" + result + "】个字节");
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
System.out.println("写入完毕....");
// 让程序不要这么快结束(等待completed回调函数执行)
Thread.sleep(100);
}
}
8.2.2 AIO基于网络编程
1)服务端
- 示例代码:
package com.aio.demo01_asynchronousFileChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* @author lscl
* @version 1.0
* @intro:
*/
public class Demo05_基于服务器的AIO通信模式_服务端 {
public static void main(String[] args) throws Exception {
// 创建一个异步的ServerSocketChannel
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
// 绑定IP和端口
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8888));
/*
接收一个客户端
该方法不会阻塞当前线程,而是基于事件回调的形式,当接收到客户端时执行回调函数把接收到的客户端传递进去
*/
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
/**
* 接收成功执行的回调
* @param socketChannel: 客户的Channel
* @param attachment: 添加的附着信息
*/
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
/*
使用AsynchronousSocketChannel进行读取
该方法并不会阻塞当前线程,而是基于事件回调的形式,当客户端有数据可读时,触发回调函数
*/
socketChannel.read(buffer, null, new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
// limit=position,position=0
buffer.flip();
try {
System.out.println("接收到了来自客户端【" + socketChannel.getRemoteAddress() + "】发送的信息: " + new String(buffer.array()));
} catch (IOException exception) {
exception.printStackTrace();
}
// position=0,limit=capacity
buffer.clear();
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
}
/**
* 接收失败的回调
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
// 让程序不会结束
System.in.read();
}
}
2)客户端
- 示例代码:
package com.aio.demo01_asynchronousFileChannel;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* @author lscl
* @version 1.0
* @intro:
*/
public class Demo06_基于服务器的AIO通信模式_客户端 {
public static void main(String[] args) throws Exception {
// 获取一个异步的SocketChannel
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
/*
连接到服务器
该方法不会阻塞,
*/
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888), null, new CompletionHandler<Void, Object>() {
/**
* 连接服务器成功执行的方法
* @param result
* @param attachment
*/
@Override
public void completed(Void result, Object attachment) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("hello".getBytes());
// limit=position,position=0
buffer.flip();
// 将数据发送到服务器
socketChannel.write(buffer);
// position=0,limit=capacity
buffer.clear();
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
// 让程序不会结束
System.in.read();
}
}