首页 > 其他分享 >6.NIO-selector详解

6.NIO-selector详解

时间:2022-10-12 13:36:01浏览次数:81  
标签:NIO buffer next 详解 事件 ByteBuffer selector channel

1.4.2、selector

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 只能用于网络IO,文件IO不可用,因为FileChannel没有阻塞模式
  • 如果不用selector多路复用,线程一直在做无用功。selector能够
    • 有可连接事件才去连接
    • 有可读事件才去读取
      • 有可写事件才去写入
        • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

常用方法

//1.获取selector
Selector selector = Selector.open();

//2.注册事件,前提channel必须配置为非阻塞毛事
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);

channel可绑定的事件类型

  • connect - 客户端连接成功时触发
  • accept - 服务器端成功接受连接时触发
  • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
  • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
//1.监听channel事件,
int count = selector.select();//阻塞直到事件发生
int count = selector.select(long timeout);//阻塞直到事件发生获取超时
int count = selector.selectNow();//非阻塞,不管有无事件发生,立即返回,自己根据返回值查看知否有事件
@Slf4j
//selector
public class SocketSelectorServerTest {
    public static void main(String[] args) throws IOException {
        //1.获取selector
        Selector selector = Selector.open();

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(5888));

        //2.将channel注册到selector中
        SelectionKey key = ssc.register(selector, 0, null);

        //key只关注accept事件
        key.interestOps(SelectionKey.OP_ACCEPT);

        log.debug("register key...{}", key);
        while (true) {
            //在事件未处理时,不阻塞
            //3.select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行
            //select在事件未处理时,他不会阻塞,事件发生要么处理,要么取消,不能置之不理
            selector.select();

            //4.处理事件,SelectionKeys里包含了所有发生的事件
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                iterator.remove();//问题1解决
                log.debug("key...{}", next);
                //5.区分事件类型
                if (next.isAcceptable()) {//如果是accept
                    //1.处理事件
                    ServerSocketChannel channel = (ServerSocketChannel) next.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    SelectionKey scKey = sc.register(selector, 0, null);
                    log.debug("{}", sc);
                    scKey.interestOps(SelectionKey.OP_READ);
                } else if (next.isReadable()) {//如果是read
                    try {
                        SocketChannel channel = (SocketChannel) next.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        int read = channel.read(buffer);
                        if (-1 == read) {//问题3解决
                            next.cancel();
                        }else {
                            buffer.flip();
                            ByteBufferUtil.debugRead(buffer);
                        }
                    } catch (IOException e) {//问题2解决
                        e.printStackTrace();
                        next.cancel();//客户端关闭了,需要取消事件
                    }
                }
            }
        }
    }
}

事件发生后,要么处理,要么取消(cancel),不能置之不理

问题1:启动服务后,启动客户端,debug模式发送消息后,空指针异常

image-20221010131532032

原因:selector会在事件发生后,向集合中存入key,但不会删除,等到发送消息的时间后,循环获取到的第一个key已经处理过的,这个key不带时间的,所以47行accep方法返回的是null

解决:迭代器中调用remove移除

问题2:客户异常关闭后,服务器报客户单关闭异常,服务器就停止了。

解决:try catch捕获异常,有异常是,调用cancel()

问题3:客户端正常关闭后(channel.close()),服务端还有问题,因为正常断开,会产生一个读事件,不在异常里面,捕获不了

解决:通过channel.read(buffer);的返回值判断,如果-1,表示断开,调用cancel

消息边界问题

如果bytebuffer长度超过了客户端数据的长度,就有消息边界问题。解决方法主要有三种:

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

服务端

@Slf4j
//处理消息边界问题
public class SelectorMsgServerTest {
    public static void main(String[] args) throws IOException {
        //1.获取selector
        Selector selector = Selector.open();

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(5888));

        //2.将channel注册到selector中
        SelectionKey key = ssc.register(selector, 0, null);

        //key只关注accept事件
        key.interestOps(SelectionKey.OP_ACCEPT);

        //log.debug("register key...{}", key);
        while (true) {
            //在事件未处理时,不阻塞
            //3.select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行
            //select在事件未处理时,他不会阻塞,事件发生要么处理,要么取消,不能置之不理
            selector.select();

            //4.处理事件,SelectionKeys里包含了所有发生的事件
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                iterator.remove();//问题1解决
                //log.debug("key...{}", next);
                //5.区分事件类型
                if (next.isAcceptable()) {//如果是accept
                    //1.处理事件
                    ServerSocketChannel channel = (ServerSocketChannel) next.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);

                    //att 就是附件的意思,可以为每个channel添加一个buffer作为附件,在扩容中间共享buffer
                    ByteBuffer buffer = ByteBuffer.allocate(16); // attachment附件
                    SelectionKey scKey = sc.register(selector, 0, buffer);
                    //log.debug("{}", sc);
                    scKey.interestOps(SelectionKey.OP_READ);
                } else if (next.isReadable()) {//如果是read
                    try {
                        SocketChannel channel = (SocketChannel) next.channel();
                        ByteBuffer buffer = (ByteBuffer) next.attachment();
                        int read = channel.read(buffer);
                        if (-1 == read) {//问题3解决
                            next.cancel();
                        } else {
                            split(buffer);
                            //需要扩容
                            if (buffer.position() == buffer.limit()) {
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                buffer.flip();
                                newBuffer.put(buffer); // 0123456789abcdef3333\n
                                next.attach(newBuffer);
                            }
                        }
                    } catch (IOException e) {//问题2解决
                        e.printStackTrace();
                        next.cancel();//客户端关闭了,需要取消事件
                    }
                }
            }
        }
    }

    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            // 找到一条完整消息
            if (source.get(i) == '\n') {
                int length = i + 1 - source.position();
                // 把这条完整消息存入新的 ByteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                // 从 source 读,向 target 写
                for (int j = 0; j < length; j++) {
                    target.put(source.get());
                }
                ByteBufferUtil.debugAll(target);
            }
        }
        source.compact(); // 0123456789abcdef  position 16 limit 16
    }
}

客户端

public class MsgClient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 5888));
        SocketAddress address = sc.getLocalAddress();
        sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
        sc.write(Charset.defaultCharset().encode("-xxx0123456789abcdef3333\n"));
        System.in.read();
    }
}

结果

+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [5]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 0a                                  |0123.           |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [37], limit: [37]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 34 35 36 37 38 39 61 62 63 64 65 66 2d 78 78 78 |456789abcdef-xxx|
|00000010| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef|
|00000020| 33 33 33 33 0a                                  |3333.           |
+--------+-------------------------------------------------+----------------+

ByteBuffer 大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

标签:NIO,buffer,next,详解,事件,ByteBuffer,selector,channel
From: https://www.cnblogs.com/jpymll/p/16784216.html

相关文章

  • 7.NIO-selector-写入内容过多
    1.4.3、写入内容过多服务端publicclassWriteServer{publicstaticvoidmain(String[]args)throwsIOException{ServerSocketChannelssc=ServerS......
  • 1.1.NIO-三大组件
    1、NIO基础non-blockingio非阻塞io1.1、三大组件1.1.1、Channel&Bufferchannel类似于stream,它就是读写数据的双向通道,可以从channel将数据读入buffer,也可以将buffer......
  • #打卡不停更#在开鸿智谷NiobeU4移植lvgl并实现ADC按键状态显示
    本文主要分享在开鸿智谷NiobeU4开发板移植lvgl经验,并实现按键按下sw4显示SW4Pressed松开显示SW4Release,整理踩坑经验分享如下。1.移植准备开鸿智谷NiobeU4开发板Openhar......
  • 详解商业智能“前世今生”,“嵌入式BI”到底是如何产生的?
    嵌入式分析是使任何应用程序或用户更容易获得数据分析和商业智能的技术。商业智能是通过分析业务数据辅助决策获取数据背后的0信息。商业智能软件和技术包含了报表查询,OL......
  • Java基础(三)| switch、循环及Random详解
    ⭐本专栏旨在对JAVA的基础语法及知识点进行全面且详细的讲解,完成从0到1的java学习,面向零基础及入门的学习者,通过专栏的学习可以熟练掌握JAVA编程,同时为后续的框架学习,进阶开......
  • DelayQueue详解
    DelayQueue介绍【1】DelayQueue是一个支持延时获取元素的阻塞队列,内部采用优先队列PriorityQueue存储元素,同时元素必须实现Delayed接口;在创建元素时可以指定多......
  • Entity Framework教程-代码优先开发方式详解(Code First Development)
    更新记录转载请注明出处:2022年10月12日发布。2022年10月9日从笔记迁移到博客。EFCore代码优先开发方式详解(CodeFirstDevelopment)说明记得先安装EF包,再使用记......
  • 通信协议——IIC详解
    I2C协议物理层原理总体特征电气限制协议层起始和停止条件数据有效性响应/应答寻址读数据写数据单片机通讯软件模拟硬件外设(一)物理层1.原理......
  • 通信协议——SPI详解
    SPI协议(一)简介SPI(SerialPeripheraInterface)是串行外设接口的缩写。特点有:一种高速的、全双工、同步的串行通信总线;采用主从方式工作;一般有一个主设备和一个或者......
  • LinkedBlockingDeque详解
    LinkedBlockingDeque介绍【1】LinkedBlockingDeque是一个基于链表实现的双向阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,可以看做无界队列,但也可以设置容......