参考:
NIO 详解(Java):https://juejin.cn/post/6844903605669986317
ByteBuffer(内存缓冲区):https://blog.csdn.net/u010430495/article/details/86087154
1、NIO基础
Java IO 是阻塞的,当用户进行数据读写时,首先会由系统去等待数据准备(查看内核空间中有没有数据),等待内核数据准备好后再将数据从内核区拷贝到进程中(用户区),整个读写操作会一直阻塞到数据准备完或目标可写为止。Java NIO 是非阻塞的,每一次数据读写都会立刻返回,并且将可读/可写 的数据写入到缓冲区,即使没有数据调用也会立刻返回不会对缓冲区进行操作。
2、缓冲区
2.1 Buffer类及实现
Java中缓冲区主要是 Buffer 抽象类,如下:
Buffer类定义了缓冲区的一些基本变量: 当前位置(position)、容量(capacity)、最大读取限制(limit)、标记(mark)等,读取时position<=limit<=capacity。
针对每一种数据类型都有对应的Buffer抽象类对应,比如 IntBuffer、ShortBuffer、CharBuffer、FloatBuffer、DoubleBuffer、LongBuffer、ByteBuffer。
其中 IntBuffer 继承自 Buffer抽象类,同时实现了 Comparable接口,它的实现子类为 HeapIntBuffer。名字看起来为堆缓冲区类,实际上使用父类的数组来保存数据,并且将父类的所有操作都实现了。
缓冲区写操作:
IntBuffer buf = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5}); //利用数组初始化IntBuffer
IntBuffer buf2 = IntBuffer.allocate(3); //创建指定容量的IntBuffer
buf.put(1); //put操作,插入同时会修改position、limit
buf.put(199);
缓冲区读操作:
//1. flip操作,刚写完进行一次flip,重置下读取上下限
public final flip() {
limit = position; //将读取上限设置为最后写入的位置
position = 0; //重置初始读取位置
mark = -1;
return this;
}
//2. 底层数组可以通过array() 方法直接获取
public final int[] array {
if (hb == null)
throw new UnsupportedOperationException();
if (isReadOnly)
throw new ReadOnlyBufferException();
return hb; //直接返回底层数组
}
//实现举例
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4});
System.out.println(Arrays.toString(buffer.array())); // 1 2 3 4
}
//3. 通过mark()标记位置,然后通过reset()跳转到标记位置进行读取
其它操作:
public abstract IntBuffer compact() - 压缩缓冲区
public IntBuffer duplicate() - 复制缓冲区,会直接创建一个新的数据相同的缓冲区
public abstract IntBuffer slice() - 划分缓冲区,将原本的容量大小的缓冲区划分为更小的
public final Buffer rewind() - 重绕缓冲区,就是把position归零,将mark变为-1
public final Buffer clear() - 将缓冲区清空,所有变量回到最初状态
2.2 直接缓冲区的实现
前面一直使用的是堆缓冲区,也就是说实际上数据是保存在一个数组中的,实际上占用的是堆内存。还有另外一种方式是采用直接缓冲区,也就是申请堆外内存进行数据保存,采用操作系统本地的IO,相比堆缓冲区会快一些。
非直接缓冲区是需要经过一个 copy的阶段(从内核空间copy到用户空间);直接缓冲区不需要copy阶段,可以理解为内存映射文件。
下面通过文件复制操作对比直接缓冲区和非直接缓冲区:
// 2.FileChannel实现文件复制,采用堆缓冲区
FileChannel inChannel, outChannel;
FileInputStream fis = null;
FileOutputStream fos = null;
inChannel = outChannel = null;
try {
// 生成通道
fis = new FileInputStream("D:\\language_workplace\\java_workplace\\NIO\\src\\1.jpg");
fos = new FileOutputStream("D:\\language_workplace\\java_workplace\\NIO\\src\\2.jpg");
inChannel = fis.getChannel();
outChannel = fos.getChannel();
// 指定缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 数据复制
while (inChannel.read(buffer) != -1) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
System.out.println(e.getMessage());
}
// 3.FileChannel 实现文件复制,采用内存映射文件
FileChannel inChannel, outChannel;
inChannel = outChannel = null;
inChannel = FileChannel.open(Paths.get("D:\\language_workplace\\java_workplace\\NIO\\src\\1.jpg"), StandardOpenOption.READ);
outChannel = FileChannel.open(Paths.get("D:\\language_workplace\\java_workplace\\NIO\\src\\2.jpg"), StandardOpenOption.READ, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
//内存映射文件
MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
//直接对缓冲区进行数据读写操作
byte[] dst = new byte[inMappedBuf.limit()];
inMappedBuf.get(dst);
outMappedBuf.put(dst);
3、通道
channel 接口有许多实现类,其子接口包括 ReadableByteChannel、WritableByteChannel,不过通道一般不会区分出写通道和读通道,它会将读写功能整合到一起,实现了一个 ByteChannel接口。
通道实现文件复制还可以利用通道直接实现,实例如下:
3.1 文件传输 FileChannel
通过FileChannel 可以很方便实现文件传输、复制、截断。传输时可以采取直接内存或者非直接内存进行数据传输。
// 对文件进行截断
RandomAccessFile f = new RandomAccessFile("D:\\language_workplace\\java_workplace\\NIO\\src\\test.txt", "rw");
FileChannel channel = f.getChannel();
//截断文件
channel.truncate(10);
ByteBuffer buffer = ByteBuffer.allocate(20);
channel.read(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.remaining()));
//直接通过通道传输数据
FileOutputStream out = new FileOutputStream("test2.txt");
FileInputStream in = new FileInputStream("test1.txt");
FileChannel inChannel = in.getChannel(); //获取test.txt文件的通道
inChannel.transferTo(0, inChannel.size(), out.getChannel()); //直接将test文件通道数据传给test2文件
//当需要部分编辑文件时,可以通过MappedByteBuffer类实现,将文件映射到内存中进行编辑,采用直接缓冲区
try(RandomAccessFile f = new RandomAccessFile("test.txt", "rw");
FileChannel channel = f.getChannel()){
//通过map方法映射文件的某一段内容,创建MappedByteBuffer对象
//比如这里就是从第四个字节开始,映射10字节内容到内存中
//注意这里需要使用MapMode.READ_WRITE模式,其他模式无法保存数据到文件
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 4, 10);
//我们可以直接对在内存中的数据进行编辑,也就是编辑Buffer中的内容
//注意这里写入也是从pos位置开始的,默认是从0开始,相对于文件就是从第四个字节开始写
//注意我们只映射了10个字节,也就是写的内容不能超出10字节了
buffer.put("yyds".getBytes());
//编辑完成后,通过force方法将数据写回文件的映射区域
buffer.force();
}
3.2 文件锁 FileLock
创建一个跨进程的文件锁来防止多个进程同时修改文件时对资源的争抢操作,这样保证了同一时间只有一个进程能够修改它,解决了多进程间的文件同步,保证了安全性。但是注意它是进程级别的,而不是线程级别的。
//创建RandomAccessFile对象,并拿到Channel
try {
RandomAccessFile f = new RandomAccessFile("D:\\language_workplace\\java_workplace\\NIO\\src\\test.txt", "rw");
FileChannel channel = f.getChannel();
FileLock lock = channel.lock(0, 6, false); //表示锁住了从0开始的6个字节的数据,false表示独占锁
System.out.println(new Date() + " 已获取到文件锁!");
Thread.sleep(5000); //假设要处理5秒钟
System.out.println(new Date() + " 操作完毕,释放文件锁!");
lock.release();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
独占锁表示文件加锁后只有一个进程可以对文件进行读写操作,其它进程都会被阻塞,等待锁释放;
共享锁表示文件加锁后可以有多个进程对文件进行读操作,但是不能同时进行写操作。
除了 channel.lock()
方式对文件上锁外,还可以通过 channel.tryLock()
进行非阻塞上锁,获取锁失败立即返回null ,而不是等待。
4、多路复用网络通信
4.1 传统阻塞 I/O 网络通信
利用socket 建立起TCP进行网络通信
//客户端
public class Client {
public static void main(String[] args) {
try {
//客户端发送数据
Socket socket = new Socket("localhost", 8080);
Scanner sc = new Scanner(System.in);
OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream());
System.out.println("请输入要发送给服务端的内容:");
String text = sc.nextLine();
writer.write(text + '\n');
writer.flush();
System.out.println("数据已发送"+text);
//客户端接收数据
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println("收到服务器返回:"+reader.readLine());
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
//服务器端
public class Server {
public static void main(String[] args) {
try {
//服务端接收数据
ServerSocket server = new ServerSocket(8080);
System.out.println("正在等待客户端连接....");
Socket socket = server.accept();
System.out.println("客户端已连接,IP地址为:"+socket.getInetAddress().getHostAddress());
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println("接收到客户端数据:");
System.out.println(reader.readLine());
//服务端发送数据
OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream());
writer.write("已收到!");
writer.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
}
利用通道进行阻塞通信,通道结合缓冲区实现数据的读写操作。注意服务器端需要同时接收多个客户端连接,所以需要将 accept 后的操作都放在 while 循环中,针对每一个成功连接的用户,新起一个线程进行具体的数据操作处理。
//客户端
public class ClientChannel {
public static void main(String[] args) {
try {
SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
Scanner sc = new Scanner(System.in);
System.out.println("已连接到服务端!");
System.out.println("请输入要发送给服务端的内容:");
String text = sc.nextLine();
//向通道中写入数据
channel.write(ByteBuffer.wrap(text.getBytes()));
//从通道中读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.limit()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
//public class ServerChannel {
public static void main(String[] args) {
try {
ServerSocketChannel channel = ServerSocketChannel.open();
//绑定到8080端口
channel.bind(new InetSocketAddress(8080));
while (true) {
//accept方法阻塞等待用户连接
SocketChannel socketChannel = channel.accept();
//获取通道远端地址
System.out.println("客户端已连接,IP地址为:"+socketChannel.getRemoteAddress());
//具体的操作处理新起一个线程进行处理
new Thread(()->{
try {
//缓冲区接收数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);
buffer.flip();
System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.limit()));
//返回客户端消息
socketChannel.write(ByteBuffer.wrap("已收到!".getBytes()));
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.2 选择器与 I/O 多路复用
我们发现利用传统的 socket 进行网络通信,需要针对每一个用户新起一个线程进行单独处理,这样看起来很合理,但是如果要和用户保持持续通信,就不能摧毁这些线程,而是需要一直保留连接,就会导致白白占用资源不被使用。利用 NIO 提供的多路复用编程模型可以解决这一问题。
服务器不再是单纯的 accept 阻塞等待客户端连接,而是通过 Selector 选择器根据客户端状态不断轮询,只有当客户端在对应状态时,比如(read、write、accept)时,才真正的创建线程进行处理,这样就不会一直阻塞等待某个客户端的 IO 操作了。
通过这种方式,甚至单线程都能做到高效的复用,最典型的例子就是Redis了,因为内存的速度非常快,多线程上下文的开销就会显得有些拖后腿,还不如直接单线程简单高效,这也是为什么Redis单线程也能这么快的原因。
选择器是当某一个状态已经就绪时才进行处理,Selector 的实现方案有多种:
- select:当连接出现某个状态时就知道已经就绪了,但是不知道具体是哪个连接就绪,每次调用都线型遍历所有连接,时间复杂度为 O(n),并且存在最大连接数限制。
- poll:同 select,但是底层采用链表,所以没有最大连接数的限制。
- epoll:采取事件通知的方式,当某个连接就绪,能够直接进行精准通知(这是因为内核实现中 epoll 是根据每个 fd 上的回调函数实现的,只要就绪就会调用回调函数,实现精准通知,但是只有 Linux 支持这种方式),事件复杂度为 O(1)。
//采用选择器后的服务端
public class Server {
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
//使用选择器必须选择非阻塞方式,这样才不会像I/O那样卡在accept()那里,而是直接进行下一步操作
serverSocketChannel.configureBlocking(false);
/*将选择器注册到通道上,后面是需要监听的事件,多个事件通过 | 连接
SelectionKey.OP_ACCEPT --- 连接事件,表示服务器监听到了客户端连接,服务器可以接收这个连接了
SelectionKey.OP_CONNECT --- 连接就绪事件,表示客户端与服务器的连接建立成功
SelectionKey.OP_READ --- 读就绪事件,表示通道中已经有可读的数据,可以执行读操作了
SelectionKey.OP_WRITE --- 写就绪事件,表示可以向通道中写入数据了
*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//无限循环等待客户端网络操作,没有操作会阻塞
int count = selector.select();
System.out.println("总共监听到"+count+"个事件");
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//根据不同事件类型执行不同操作
if(key.isAcceptable()) {
//处理accept事件
SocketChannel channel = serverSocketChannel.accept(); //已经准备好处理accept
System.out.println("客户端已连接,IP地址为:"+channel.getRemoteAddress());
//连接建立完毕,注册读选择器
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} else if(key.isReadable()) {
//处理read事件
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (channel.read(buffer) > 0) {
buffer.flip();
System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
}
//返回消息
channel.write(ByteBuffer.wrap("已收到!".getBytes()));
}
//处理完毕后移除选择器,否则下次还有
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//采用选择器后的客户端
public class Client {
public static void main(String[] args) {
try {
SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
Scanner sc = new Scanner(System.in);
System.out.println("已连接到服务器");
while (true) {
System.out.println("请输入要发送给服务端的内容:");
String text = sc.nextLine();
//直接向通道中写入数据,真舒服
channel.write(ByteBuffer.wrap(text.getBytes()));
System.out.println("已发送!");
ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
buffer.flip();
System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.3 Reactor模式优化
Reactor 模式主要由 Reactor 和处理资源池这两个核心部分组成,主要负责如下事情:
- Reactor 负责监听和分发事件,事件类型包括连接事件、读写事件;
- 处理资源池负责处理事件,比如 read -> 业务逻辑处理 -> send;
Reactor 模式灵活多变,可以针对不同的业务场景灵活变换:
- 单 Reactor 单进程 / 线程;
- 单 Reactor 多进程 / 线程;
- 多 Reactor 单进程 / 线程;(复杂并且相对第一种没有性能优势,实际运用不多)
- 多 Reactor 多进程 / 线程;
Reactor 对象作用是监听和分发事件;Acceptor 对象作用是建立连接;Handler 对象作用是处理业务;
4.3.1 单 Reactor 单线程
单 Reactor 单线程方案:
- Reactor 对象通过 select 监听事件,接收到事件后通过 dispatch 进行分发,具体分发给 Acceptor 对象还是 Handler 对象要看具体的事件类型;
- 如果是建立连接的事件,交由 Acceptor 对象进行处理,Acceptor 对象通过 accept 方法获取连接,并创建一个 Handler 对象来处理后续的响应事件;
- 如果不是建立连接事件,则交由当前连接对应的 Handler 对象来进行响应;
- Handler 对象通过 read -> 业务处理 -> send 流程来完成完整的业务流程。
//Acceptor类
public class Acceptor implements Runnable{
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel channel) {
this.selector = selector;
this.serverSocketChannel = channel;
}
@Override
public void run() {
try {
SocketChannel channel = serverSocketChannel.accept();
System.out.println("客户端已连接,IP地址为:"+channel.getRemoteAddress());
channel.configureBlocking(false);
//注册时创建好对应的Handler,这样在Reactor中分发时就可以直接调用Handler
channel.register(selector, SelectionKey.OP_READ, new Handler(channel));
} catch (Exception e) {
e.printStackTrace();
}
}
}
//Handler类
public class Handler implements Runnable{
private final SocketChannel channel;
public Handler(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
channel.write(ByteBuffer.wrap("已收到!".getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
//Reactor类
public class Reactor implements Runnable, Closeable {
private final ServerSocketChannel serverSocketChannel;
private final Selector selector;
public Reactor() throws IOException {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
}
@Override
public void run() {
try {
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
//注册selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, serverSocketChannel));
//监听事件并选择处理
while (true) {
int count = selector.select();
System.out.println("监听到 "+count+" 个事件");
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
this.dispatch(iterator.next());
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//分发监听到的事件
private void dispatch(SelectionKey key) {
Object att = key.attachment();
if (att instanceof Runnable) { //Acceptor、Handler 均实现了Runnable接口,这里可以统一调用
((Runnable)att).run();
}
}
@Override
public void close() throws IOException {
serverSocketChannel.close();
selector.close();
}
}
//测试
public class User {
public static void main(String[] args) {
try {
Reactor reactor = new Reactor();
reactor.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.3.2 单 Reactor 多线程
注意到上面自始至终都只有一个线程来处理,如果面对大量请求,一个线程是不够用的。单线程的 reactor 模式并没有解决 IO 和 CPU 处理速度不匹配的问题,因此多线程 reactor 模式引入线程池的概念,把耗时的 IO 操作交给线程池去处理,处理完毕后再同步到 selectionKey中。
public class Handler implements Runnable{
private final SocketChannel channel;
//采用线程池方式处理工作线程
private static final ExecutorService POOL = Executors.newFixedThreadPool(10);
public Handler(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
try {
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
buffer.flip();
//在数据读取出来后,将数据处理交给线程池来执行
POOL.submit(()->{
try {
System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
channel.write(ByteBuffer.wrap("已收到!".getBytes()));
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.3.3 多 Reactor 多线程
高并发时,一个 Reactor 处理多个来自客户端的请求会显得有些乏力,不妨将 Reactor 做成一主多从的形式,主 Reactor 只负责 Accept 操作,而从 Reactor 进行其它操作。这样从 reactor 能够根据服务器负荷,新增多个从 reactor 进行处理。
处理流程:
- 主Reactor TCPReactor 监听到事件,分发给 Acceptor 或者 TCPHandler;
- Acceptor 内仅处理 Accept 事件,其余事件处理由从Reactor TCPSubReactor 监听并处理;Acceptor 同时需要处理唤醒阻塞的 Selector 并给通道注册 Read 事件;
- TCPSubReactor 需要注意当一个监听中阻塞住的 selector 由于 Acceptor 需要注册新的 IO 事件到该 selector 上时,Acceptor 会调用 selector 的 wakeup() 函数唤醒阻塞住的 selector 以注册新 IO 事件进行后续监听;但是TCPSubReactor 中循环调用 selector.select() 的线程可能因为循环太快导致 selector 被唤醒后在 IO 事件成功注册前被调用 selector.select() 而阻塞住。因此需要设置一个 flag 变量来控制 selector.select() 的执行。只有当新 IO 事件完全注册成功之后才能让 TCPSubReactor 线程执行。
- 对于读写事件分发给 TCPHandler 采取线程池进行处理;事件处理包括 READING -> WORKING -> WRITING -> READING 流程。处理各种事件时需要根据不同的状态采取相应的措施,这里采用设计模式中的状态模式进行实现。HandlerState 作为接口,有ReadState、WorkState、WriteState 三种处理模式,它们分别实现了 changeState、handle 两个方法。
//TCPReactor 主Reactor
public class TCPReactor implements Runnable{
private final ServerSocketChannel serverSocketChannel;
private final Selector selector;
public TCPReactor(int port) throws IOException {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
//绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
//注册ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocketChannel));
}
@Override
public void run() {
//不断监听事件
while (!Thread.interrupted()) {
System.out.println("MainReactor waiting for new event on port:"+serverSocketChannel.socket().getLocalPort()+"...");
try {
if (selector.select() == 0) {
continue;
}
} catch (IOException e) {
e.printStackTrace();
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
dispatch(iterator.next());
iterator.remove();
}
}
}
private void dispatch(SelectionKey key) {
Object att = key.attachment();
if (att instanceof Runnable) {
((Runnable)att).run();
}
}
}
//TCPSubReactor 从Reactor
public class TCPSubReactor implements Runnable{
private final ServerSocketChannel serverSocketChannel;
private final Selector selector;
private boolean restart = false;
int num;
public TCPSubReactor(Selector selector, ServerSocketChannel serverSocketChannel, int num) {
this.serverSocketChannel = serverSocketChannel;
this.selector = selector;
this.num = num;
}
@Override
public void run() {
while (!Thread.interrupted()) {
System.out.println("wating for restart");
while (!Thread.interrupted() && !restart) { //线程中断或者重启前保持运行
try {
if (selector.select() == 0) {
continue;
}
} catch (IOException e) {
e.printStackTrace();
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
dispatch(iterator.next());
iterator.remove();
}
}
}
}
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment());
if (r != null) {
r.run();
}
}
//设置重启标志,为了避免当Acceptor调用selector的wakeup()函数唤醒阻塞的selector时,立即循环调用selector.select()造成的阻塞,此时还没来得及注册新事件就已经阻塞了
public void setRestart(boolean restart) {
this.restart = restart;
}
}
//Acceptor 接收监听的请求进行处理
public class Acceptor implements Runnable{
private final ServerSocketChannel serverSocketChannel;
private final int cores = Runtime.getRuntime().availableProcessors(); //获取CPU核心数
private final Selector[] selectors = new Selector[cores];
private int selIdx = 0; //当前可用的TCPSubReactor索引
private TCPSubReactor[] r = new TCPSubReactor[cores]; //subReactor线程
private Thread[] t = new Thread[cores];
public Acceptor(ServerSocketChannel serverSocketChannel) throws IOException {
this.serverSocketChannel = serverSocketChannel;
//创建多个TCPSubReactor、Selector线程
for (int i = 0; i < cores; i++) {
selectors[i] = Selector.open();
r[i] = new TCPSubReactor(selectors[i], serverSocketChannel, i);
t[i] = new Thread(r[i]);
t[i].start();
}
}
@Override
public synchronized void run() {
try {
SocketChannel channel = serverSocketChannel.accept(); //accept阻塞接收客户端连接
System.out.println(channel.socket().getRemoteSocketAddress().toString()
+ " is connected.");
if (channel != null) {
channel.configureBlocking(false);
r[selIdx].setRestart(true); //注册新事件前需要暂停Seletor.select()操作
selectors[selIdx].wakeup(); //唤醒阻塞住的Selector
SelectionKey sk = channel.register(selectors[selIdx], SelectionKey.OP_READ);//注册READ事件并返回该通道的Key
selectors[selIdx].wakeup();
r[selIdx].setRestart(false); //重启线程
sk.attach(new TCPHandler(sk, channel)); //给key绑定TCPHandler对象
if (++selIdx == selectors.length) {
selIdx = 0;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// TCPHandler 多线程处理READ、WRITE等类型事件
public class TCPHandler implements Runnable{
private final SelectionKey sk;
private final SocketChannel sc;
private static final int THREAD_COUNTING = 10;
//自定义线程池
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
//状态模式实现Handler
HandlerState state;
public TCPHandler(SelectionKey sk, SocketChannel sc) {
this.sk = sk;
this.sc = sc;
state = new ReadState(); //初始状态设置为READING
pool.setMaximumPoolSize(32); //设置线程池最大线程数
}
@Override
public void run() {
try {
state.handle(this, sk, sc, pool);
} catch (Exception e) {
System.out.println("[Warning!] A client has been closed.");
closeChannel();
}
}
public void closeChannel(){
try {
sk.cancel();
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void setState(HandlerState state) {
this.state = state;
}
}
// HandlerState 处理接口
public interface HandlerState {
public void changeState(TCPHandler handler);
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc, ThreadPoolExecutor pool) throws IOException;
}
// ReadState 处理 READING 状态
public class ReadState implements HandlerState{
private SelectionKey sk;
@Override
public void changeState(TCPHandler handler) {
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc, ThreadPoolExecutor pool) throws IOException {
this.sk = sk;
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder msg = new StringBuilder();
while (sc.read(buffer) > 0) {
buffer.flip();
String str = new String(buffer.array(), 0, buffer.limit());
System.out.println(sc.socket().getRemoteSocketAddress().toString() + ">" + str);
msg.append(str);
}
h.setState(new WorkState()); //改变状态 READING -> WORKING,接着进行数据逻辑处理
pool.execute(new WorkThread(h, msg.toString()));
}
//逻辑处理函数
synchronized void process(TCPHandler h, String str) {
// do process(decode, logically process, encode)..
h.setState(new WriteState()); // 改变状态(WORKING -> WRITEING)
this.sk.interestOps(SelectionKey.OP_WRITE); //注册新事件
this.sk.selector().wakeup(); // 唤醒阻塞住的selector
}
//工作线程
class WorkThread implements Runnable {
TCPHandler h;
String str;
public WorkThread(TCPHandler h, String str) {
this.h = h;
this.str = str;
}
@Override
public void run() {
process(h, str);
}
}
}
// WorkState 处理 WORKING 状态
public class WorkState implements HandlerState{
@Override
public void changeState(TCPHandler handler) {
handler.setState(new WriteState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc, ThreadPoolExecutor pool) {
}
}
// WriteState 处理 WRITING 状态
public class WriteState implements HandlerState{
@Override
public void changeState(TCPHandler handler) {
handler.setState(new ReadState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc, ThreadPoolExecutor pool) throws IOException {
String response = "Your message has sent to " + sc.socket().getLocalSocketAddress().toString() + "\r\n";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) { //响应client字符串
sc.write(buffer);
}
h.setState(new ReadState()); //处理完后状态 WRITING -> READING
sk.interestOps(SelectionKey.OP_READ); //通过key改变通道注册的事件
sk.selector().wakeup(); //使一个阻塞住的selector操作立即返回
}
}
// 测试主类
public class Main {
public static void main(String[] args) {
try {
TCPReactor reactor = new TCPReactor(1333);
new Thread(reactor).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
线程池:
https://juejin.cn/post/6983213662383112206
https://juejin.cn/post/6844904002363080717
https://juejin.cn/post/6844904113952522253
Reactor 模式讲解:
https://blog.csdn.net/qq_46284579/article/details/125126028
负载均衡算法:
https://juejin.cn/post/6992210674780749837#comment
IO多路复用,主从Reactor:
https://blog.csdn.net/yehjordan/article/details/51026045
标签:netty,Java,NIO,buffer,System,selector,new,public,channel From: https://www.cnblogs.com/istitches/p/17616979.html