首页 > 编程语言 >《Java源码分析》:Java NIO 之 Selector

《Java源码分析》:Java NIO 之 Selector

时间:2024-12-23 09:57:25浏览次数:7  
标签:Java NIO 服务器端 Selector 源码 selector buf socketChannel 客户端

 作者简介:大家好,我是码炫码哥,前中兴通讯、美团架构师,现任某互联网公司CTO,兼职码炫课堂主讲源码系列专题


代表作:《jdk源码&多线程&高并发》,《深入tomcat源码解析》,《深入netty源码解析》,《深入dubbo源码解析》,《深入springboot源码解析》,《深入spring源码解析》,《深入redis源码解析》等


联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬。码炫课堂的个人空间-码炫码哥个人主页-面试,源码等

Java NIO 之 Selector

这篇博文将来记录下自己Java NIO中另一个重要的东西—-Selector。

关于Selector确实不太好理解,自己也看了很多博客,到目前为止,自己对Selector还是没有弄清楚,感觉模模糊糊的。最主要的可能是没有找到对应的应用场景,不知道在上面地方能够用到Selector。

本着学习的态度,自己也来尝试通过写Demo来理解下Selector,于是就有了这篇博文。

Selector例子

本应用设想的场景是这样的:

在服务器端:

1、首先将ServerSocketChannel注册在Selector中,并为此通道注册SelectionKey.OP_ACCEPT事件。

2、当有客户端连接进来之后,将SocketChannel注册到Selector中,并为此通道注册SelectionKey.OP_READ事件,便于在服务器端接收到客户端发送过来的数据。

在客户端:

1、首先将SocketChannel注册在Selector中,并为此通道注册SelectionKey.OP_CONNECT事件。

2、当客户端和服务器端连接成功之后,将其注册到Selector中,并为此通道注册SelectionKey.OP_READ事件,便于在客户端接收到服务器点发送过来的数据。

最后,实现客户端和服务器端之间的交互。

这个Demo具体的交互工程如下
客户端和服务器端建立连接后,客户端第一次发送 1,第二个发送2,第N次发送N;服务器端每接收一个消息就给客户端发送一个“收到消息”。

写了下代码,发现并没有实现如上的功能,有一定的问题,问题还没有找出来,明天继续来找。

实现代码如下:

        package selector;
    
        import java.io.IOException;
        import java.net.InetSocketAddress;
        import java.nio.ByteBuffer;
        import java.nio.channels.SelectionKey;
        import java.nio.channels.Selector;
        import java.nio.channels.ServerSocketChannel;
        import java.nio.channels.SocketChannel;
        import java.util.Iterator;
        import java.util.Set;
    
        public class NIOServer {
    
            //通道选择器
            private Selector selector;
            public NIOServer(){
                try {
                    selector = Selector.open();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            /*
             * 函数功能:服务器端开始监听,看是否有客户端连接进来
             * */
            private void listen() throws IOException {
                System.out.println("server running....");
                while(true){
                    // 当注册事件到达时,方法返回,否则该方法会一直阻塞 
                    selector.select();
                    // 获得selector中选中的相的迭代器,选中的相为注册的事件 
                    Set<SelectionKey> set = selector.selectedKeys();
                    Iterator<SelectionKey> ite = set.iterator();
                    while(ite.hasNext()){
                        SelectionKey selectionKey = (SelectionKey) ite.next();
                        // 删除已选的key 以防重负处理  
                        ite.remove(); 
                        if(selectionKey.isAcceptable()){//如果有客户端连接进来
                            //先拿到这个SelectionKey里面的ServerSocketChannel。
                            ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
                            // 获得和客户端连接的通道
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            System.out.println("有客户端连接到服务器!!!");
                            socketChannel.configureBlocking(false);//将此通道设置为非阻塞
                            //服务器端向客户端发送数据
        //                  ByteBuffer buf = ByteBuffer.allocate(40);
        //                  buf.put("hello client!!!".getBytes());
        //                  socketChannel.write(buf);
                            socketChannel.write(ByteBuffer.wrap(new String("hello client!").getBytes()));
                            //为了接收客户端发送过来的数据,需要将此通道绑定到选择器上,并为该通道注册读事件  
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                        else if(selectionKey.isReadable()){//客户端发送数据过来了
                            //先拿到这个SelectionKey里面的SocketChannel。
                            SocketChannel socketChannel = (SocketChannel)selectionKey.channel();           
    
                            //接收来自于客户端发送过来的数据
                            ByteBuffer buf = ByteBuffer.allocate(20);
                            int len = 0;
                            System.out.println("服务器端接收到的数据为:");
                            while((len=socketChannel.read(buf))!=-1){
    
        //                      while(buf.hasRemaining()){
        //                          System.out.println(buf.get()+" ");
        //                      }
                                byte[] receData = buf.array();
                                String msg = new String(receData).trim();
                                System.out.println("接收来自客户端的数据为:"+msg);
                                buf.clear();
                            }
    
                            //服务器端向客户端发送"确定信息"
    
                            buf.put("收到信息".getBytes());
                            while(buf.hasRemaining()){
                                socketChannel.write(buf);
                            }   
                            //buf.clear();
                        }
                    }
    
                }
    
            }
            /*
             * 函数功能:初始化serverSocketChannel来监听指定的端口是否有新的TCP连接,
             * 并将serverSocketChannel注册到selector中
             * */
            private void init(int port) {
                try {
                    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                    //serverSocketChannel监听指定端口
                    serverSocketChannel.socket().bind(new InetSocketAddress(port));
                    serverSocketChannel.configureBlocking(false);//设置为非阻塞模式
    
                    /*
                     * 将serverSocketChannel注册到selector中,并为该通道注册selectionKey.OP_ACCEPT事件  
                     * 注册该事件后,当事件到达的时候,selector.select()会返回,  如果事件没有到达selector.select()会一直阻塞
                     * */
                    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            public static void main(String[] args) throws IOException {
                NIOServer server = new NIOServer();
                server.init(9999);
                server.listen();
            }
    
        }

客户端

        package selector;
    
        import java.io.IOException;
        import java.net.InetSocketAddress;
        import java.nio.ByteBuffer;
        import java.nio.channels.SelectionKey;
        import java.nio.channels.Selector;
        import java.nio.channels.SocketChannel;
        import java.util.Iterator;
        import java.util.Set;
    
        public class NIOClient {
    
            private Selector selector;
    
            public NIOClient() throws IOException {
                this.selector =Selector.open();
            }
    
            private void init(String address,int port) throws IOException{
                //客户端,首先有一个SocketChannel
                SocketChannel socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);//将此通道设置为非阻塞模式
                //连接
                socketChannel.connect(new InetSocketAddress(address,port));
    
                //将SocketChannel注册到selector中,并为该通道注册SelectionKey.OP_CONNECT
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
    
            }
    
            public static void main(String[] args) throws IOException {
                NIOClient client = new NIOClient();
                client.init("localhost",9999);
                client.connect();
    
            }
    
            private void connect() throws IOException {
                int data = 1;
                while(true){
                    selector.select();//
                    Set<SelectionKey> set = selector.selectedKeys();
                    Iterator<SelectionKey> ite = set.iterator();
    
                    while(ite.hasNext()){
                        SelectionKey selectionKey = (SelectionKey) ite.next();
                        ite.remove(); //删除已选的key,以防重复处理
                        if(selectionKey.isConnectable()){//看是否有连接发生
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            //如果正在连接,则完成连接
                            if(socketChannel.isConnectionPending()){
                                socketChannel.finishConnect();
                            }
                            socketChannel.configureBlocking(false);//设置为非阻塞模式
                            //给服务器端发送数据
                            System.out.println("客户端连接上了服务器端。。。。");
        //                  ByteBuffer buf = ByteBuffer.allocate(128);
        //                  buf.put("hello server.....".getBytes());
        //                  socketChannel.write(buf);
                            socketChannel.write(ByteBuffer.wrap(new String("hello server!").getBytes()));
                            //为了接收来自服务器端的数据,将此通道注册到选择器中
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                        else if(selectionKey.isReadable()){
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            //接收来自于服务器端发送过来的数据
                            ByteBuffer buf = ByteBuffer.allocate(10);
                            int len = 0;
                            System.out.println("客户端接收到的数据为:");
                            while((len=socketChannel.read(buf))!=-1){
        //                      while(buf.hasRemaining()){
        //                          System.out.println(buf.get()+" ");
        //                      }
                                byte[] receData = buf.array();
                                String msg = new String(receData).trim();
                                System.out.println("接收来自服务器端的数据为:"+msg);
                                buf.clear();
                            }
                            //发送数据
                            buf.put((data+"").getBytes());
                            while(buf.hasRemaining()){
                                socketChannel.write(buf);
                            }
    
                            data++;
                        }           
    
    
                    }
    
                }
    
            }
    
        }

Selector例子(修正)

在上面的例子中,就如昨天所说,还存在一点问题,客户端和服务器端还不能够正常的交互。

通过debug发现产生问题的代码如下:

                        int len = 0;
                        while((len=socketChannel.read(buf))!=-1){
    
                            byte[] receData = buf.array();
                            String msg = new String(receData).trim();
                            System.out.println("接收来自客户端的数据为:"+msg);
                            buf.clear();
                        }

发现在上面的while循环中是一个死循环,是不能够退出的。也就是说,socketChannel管道中的数据并不是读一次这个数据就没有了,而是还存于管道中,即使服务器端只发送一次数据给客户端,客户端通过如上的while循环的代码来读取数据是不会结束的。

将上面的代码更改为如下的代码即可:即采用更大的空间只读一次

                        //接收来自于客户端发送过来的数据
                        ByteBuffer buf = ByteBuffer.allocate(128);
                        socketChannel.read(buf);
                        byte[] receData = buf.array();
                        String msg = new String(receData).trim();
                        System.out.println("接收来自客户端的数据为:"+msg);
                        buf.clear();

修正这个问题之后基本就客户端和服务器端基本就能够交互了

但是由于我这个Demo想完成的任务为:客户端和服务器端建立连接后,客户端第一次发送 1,第二个发送2,第N次发送N;服务器端每接收一个消息就给客户端发送一个“收到消息”。

但是发现服务器端收到的消息并不是1,2,3.。。。

查找原因发现客户端发送数据不能使用如下的代码:

                        //发送数据
                        buf.put((data+"").getBytes());//data初始值为1
                        while(buf.hasRemaining()){
                            socketChannel.write(buf);
                        }
                        data++;

换成如下的代码就可以正常工作了

                socketChannel.write(ByteBuffer.wrap(new String(data+"").getBytes()));
                data++;

小结

对Selector不怎么熟悉,模仿别人的代码来实现客户端和服务器端之间的交互,还存在一些问题。

标签:Java,NIO,服务器端,Selector,源码,selector,buf,socketChannel,客户端
From: https://blog.csdn.net/smart_an/article/details/144656690

相关文章

  • Java 项目实战:基于 Spring Boot、MySQL、MyBatis、Redis、Nginx 与 Vue 的电力企业业
    1.项目概述1.1项目背景在电力企业中,员工需要不断提升专业知识和技能,以确保电力系统的安全、稳定运行。传统的培训和考核方式存在效率低、资源浪费等问题。为了满足电力企业对员工培训和考核的需求,提高培训效果和考核效率,降低成本,开发一个功能完善、易于使用的电力企业业务考试......
  • 深入探讨 Java 的现代特性:从虚拟线程到模块化系统
    随着技术不断进步,Java也在不断演进,融入了许多新的特性和改进。作为目前世界上最流行的编程语言之一,Java在性能、并发、模块化和开发效率方面都作出了显著的提升。在这篇博客中,我们将探讨一些当前Java领域中比较流行的技术和特性,包括虚拟线程、模块系统、记录类(RecordClas......
  • SAAS版 财务系统 云会计财务源码
     现代企业的财务管理面临着越来越多的挑战,包括复杂的会计规范、繁琐的报表填写和高昂的人力成本。为了解决这些问题,我们开发了云会计财务源码,为企业提供全面、高效的财务管理解决方案。云会计财务源码是一款基于云技术的财务管理系统,具备以下特点:智能化:通过人工智能技术,自......
  • SAAS版 财务系统 云会计财务源码
    现代企业的财务管理面临着越来越多的挑战,包括复杂的会计规范、繁琐的报表填写和高昂的人力成本。为了解决这些问题,我们开发了云会计财务源码,为企业提供全面、高效的财务管理解决方案。云会计财务源码是一款基于云技术的财务管理系统,具备以下特点:智能化:通过人工智能技术,自动......
  • 基于Java健身房管理系统设计与实现 毕业设计源码15390
    摘 要随着人们生活水平的日益提高,健身已经成为了很多人生活中不可或缺的一部分。为了满足人们对健身的需求,各种健身房也应运而生。然而,传统的健身房管理方式存在诸多问题,如信息管理混乱、客户体验差等。为了解决这些问题,提高健身房的管理效率和服务质量,我们设计并实现了一套......
  • 基于SpringBoot+Vue的美发门店管理系统设计与实现毕设(文档+源码)
    目录一、项目介绍二、开发环境三、功能介绍四、核心代码五、效果图六、源码获取:         大家好呀,我是一个混迹在java圈的码农。今天要和大家分享的是一款基于SpringBoot+Vue的美发门店管理系统,项目源码请点击文章末尾联系我哦~目前有各类成品毕设JavaWeb......
  • java核心基础 第五章 线程
    核心概念计算机程序计算机程序是存储在硬盘上的一个文件,例如你经常用的浏览器Chrome,它在电脑硬盘上是一个chrome.exe的文件,你双击它就可以运行。这个文件里存储这个这个程序运行时需要的所有指令和数据。进程进程是一个计算机程序运行的容器,进程的概念是由操作系统提供的,一......
  • 基于SpringBoot+Vue的码头船只货柜试管理系统设计与实现毕设(文档+源码)
    目录一、项目介绍二、开发环境三、功能介绍四、核心代码五、效果图六、源码获取:         大家好呀,我是一个混迹在java圈的码农。今天要和大家分享的是一款基于SpringBoot+Vue的码头船只货柜管理系统,项目源码请点击文章末尾联系我哦~目前有各类成品毕设JavaW......
  • 宠物食品销售微信小程序的设计与实现 毕业设计源码08727
    摘 要针对本课题来说,主要是以大数据及互联网作为基础,以微信小程序作为平台,充分利用目前的信息化基础设施,将传统的宠物食品销售模式与网络商城之间进行有效的集合,这样形成一种全新的电子商务模式,当然这个也是对传统的电子商务的一种全新的突破。本课题对具体的选题的背景......
  • Java学习,方法覆盖
    Java方法覆盖是面向对象编程中的一个重要概念,它允许子类提供一个特定实现,该实现将覆盖(或重写)父类中已有方法。通过方法覆盖,子类可以自定义或扩展从父类继承的行为。方法重载与方法覆盖区别:方法重载(Overloading):两个方法的方法名相同,但参数不一致,可以说一个方法是另一个方法......