首页 > 编程语言 >基于Java NIO 写的一个简单版 Netty 服务端

基于Java NIO 写的一个简单版 Netty 服务端

时间:2024-04-02 18:22:23浏览次数:40  
标签:Netty Java NIO void key msg HandlerContext public channel

A Simple Netty Based On JAVA NIO

基于Java NIO 写的一个简单版 Netty 服务端

前置知识

NIO

  • NIO 一般指 同步非阻塞 IO,同样用于**描述程序访问数据方式 **的还有BIO(同步阻塞)、AIO(异步非阻塞)
  • 同步异步指获取结果的方式,同步为主动去获取结果,不管结果是否准备好,异步为等待结果准备好的通知
  • 阻塞非阻塞是线程在结果没有到来之前,是否进行等待,阻塞为进行等待,非阻塞则不进行等待
  • NIO 主动地去获取结果,但是在结果没有准备好之前,不会进行等待。而是通过一个 多路复用器 管理多个通道,由一个线程轮训地去检查是否准备好即可。在网络编程中,多路复用器通常由操作系统提供,Linux中主要有 select、poll、epoll。同步非阻塞指线程不等待数据的传输,而是完成后由多路复用器通知,线程再将数据从内核缓冲区拷贝到用户空间内存进行处理。

Java NIO

  • 基于 NIO 实现的网络框架,可以用少量的线程,处理大量的连接,更适用于高并发场景。于是,Java提供了NIO包提供相关组件,用于实现同步非阻塞IO
    • 核心三个类Channel、Buffer、Selector。Channel代表一个数据传输通道,但不进行数据存取,有Buffer类进行数据管理,Selector为一个复用器,管理多个通道

Bytebuffer

  • 该类为NIO 包中用于操作内存的抽象类,具体实现由HeapByteBuffer、DirectByteBuffer两种
  • HeapByteBuffer为堆内内存,底层通过 byte[ ] 存取数据
  • DirectByteBuffer 为堆外内存,通过JDK提供的 Unsafe类去存取;同时创建对象会关联的一个Cleaner对象,当对象被GC时,通过cleaner对象去释放堆外内存

各核心组件介绍

NioServer

为启动程序类,监听端口,初始化Channel

  • 下面为NIO模式下简单服务端处理代码
// 1、创建服务端Channel,绑定端口并配置非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
serverSocketChannel.configureBlocking(false);

// 2、创建多路复用器selector,并将channel注册到多路复用器上
// 不能直接调用channel的accept方法,因为属于非阻塞,直接调用没有新连接会直接返回
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

// 3、循环处理多路复用器的IO事件
while(true){

    // 3.1、select属于阻塞的方法,这里阻塞等待1秒
    // 如果返回0,说明没有事件处理
    if (selector.select(1000) == 0){
        System.out.println("服务器等待了1秒,无IO事件");
        continue;
    }
    // 3.2、遍历事件进行处理
    Set<SelectionKey> selectionKeys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while(iterator.hasNext()){
        SelectionKey key = iterator.next();
        // accept事件,说明有新的客户端连接
        if (key.isAcceptable()){
            // 新建一个socketChannel,注册到selector,并关联buffer
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
            System.out.println("客户端连接:"+socketChannel.getRemoteAddress());
        }
        // read事件 (内核缓冲区的数据准备好了)
        if(key.isReadable()){
            SocketChannel channel = (SocketChannel)key.channel();
            ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
            try {
              // 将数据写进buffer
                int readNum = channel.read(byteBuffer);
                if (readNum == -1){
                    System.out.println("读取-1时,表示IO流已结束");
                    channel.close();
                    break;
                }
                // 打印buffer
                byteBuffer.flip();
                byte[] bytes = new byte[readNum];
                byteBuffer.get(bytes, 0, readNum);
                System.out.println("读取到数据:" + new String(bytes));
            } catch (IOException e) {
                System.out.println("读取发生异常,广播socket");
                channel.close();
            }

        }
        // write事件 (操作系统有内存写出了)
        if (key.isWritable()){
            SocketChannel channel = (SocketChannel)key.channel();
            // 读取read时暂存数据
            byte[] bytes = (byte[])key.attachment();
            if (bytes != null){
                System.out.println("可写事件发生,写入数据: " + new String(bytes));
                channel.write(ByteBuffer.wrap(bytes));
            }
            // 清空暂存数据,并切换成关注读事件
            key.attach(null);
            key.interestOps(SelectionKey.OP_READ);
        }
        iterator.remove();
    }
}

EventLoop

处理 Channel 中数据的读写

  • 在上面的Server中,大量并发时单线程地处理读写事件会导致延迟,因此将读写处理抽取出来,可利用多线程实现高并发
  • 一个EventLoop会关联一个selector,只会处理这个selector上的Channel
public class EventLoop2 implements Runnable{


    private final Thread thread;
    /**
     * 复用器,当前线程只处理这个复用器上的channel
     */
    public Selector selector;
    /**
     * 待处理的注册任务
     */
    private final Queue<Runnable> queue = new LinkedBlockingQueue<>();

    /**
     * 初始化复用器,线程启动
     * @throws IOException
     */
    public EventLoop2() throws IOException {
        this.selector = SelectorProvider.provider().openSelector();
        this.thread = new Thread(this);
        thread.start();
    }

    /**
     * 将通道注册给当前的线程处理
     * @param socketChannel
     * @param keyOps
     */
    public void register(SocketChannel socketChannel,int keyOps){
        // 将注册新的socketChannel到当前selector封装成一个任务
        queue.add(()->{
            try {
                MyChannel myChannel = new MyChannel(socketChannel, this);
                SelectionKey key = socketChannel.register(selector, keyOps);
                key.attach(myChannel);
            } catch (Exception e){
                e.printStackTrace();
            }
        });
        // 唤醒阻塞等待的selector线程
        selector.wakeup();
    }

    /**
     * 循环地处理 注册事件、读写事件
     */
    @Override
    public void run() {
        while (!thread.isInterrupted()){
            try {
                int select = selector.select(1000);
                // 处理注册到当前selector的事件
                if (select == 0){
                    Runnable task;
                    while ((task = queue.poll()) != null){
                        task.run();
                    }
                    continue;
                }
                // 处理读写事件
                System.out.println("服务器收到读写事件,select:" + select);
                processReadWrite();

            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理读写事件
     * @throws Exception
     */
    private void processReadWrite() throws Exception{
        System.out.println(Thread.currentThread() + "开始监听读写事件");
        // 3.2、遍历事件进行处理
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while(iterator.hasNext()){
            SelectionKey key = iterator.next();
            MyChannel myChannel = (MyChannel)key.attachment();
            if(key.isReadable()){
                // 将数据读进buffer
                myChannel.doRead(key);
            }
            if (key.isWritable()){
                myChannel.doWrite(key);
            }
            iterator.remove();
        }
    }
}

EventloopGroup

一组EventLoop,轮训地为eventLoop分配Channel

public class EventLoopGroup {
    private EventLoop2[] children = new EventLoop2[1];

    private AtomicInteger idx = new AtomicInteger(0);

    public EventLoopGroup() throws IOException {
        for (int i = 0; i < children.length; i++){
            children[i] = new EventLoop2();
        }
    }

    public EventLoop2 next(){
        // 轮训每一个children
        return children[idx.getAndIncrement() & (children.length - 1)];
    }

    public void register(SocketChannel channel,int ops){
        next().register(channel,ops);
    }
}

Channel

封装了SocketChannel 和 Pipline,将从Channel读写的消息,沿着Pipline上的节点进行处理

  • 在上面EventLoop中,注册Channel到对应的Selector前,会进行封装,将自定义的Channel放在读写事件触发时会返回的SelectionKey里面
  • 同时提供了数据读写处理方法,读写事件触发时调用该方法,数据会沿着pipline上去处理
public class MyChannel {

    private SocketChannel channel;

    private EventLoop2 eventLoop;

    private Queue<ByteBuffer> writeQueue;

    private PipLine pipLine;

    /**
     * 一个channel关联一个eventLoop、一个pipLine、一个socketChannel、一个writeQueue
     * @param channel
     * @param eventLoop
     */
    public MyChannel(SocketChannel channel, EventLoop2 eventLoop) {
        this.channel = channel;
        this.eventLoop = eventLoop;
        this.writeQueue = new ArrayDeque<>();
        this.pipLine = new PipLine(this,eventLoop);
        this.pipLine.addLast(new MyHandler1());
        this.pipLine.addLast(new MyHandler2());
    }

    /**
     * 读事件处理
     * @param key
     * @throws Exception
     */
    public void doRead(SelectionKey key) throws Exception{
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int readNum = channel.read(buffer);
            if (readNum == -1){
                System.out.println("读取-1时,表示IO流已结束");
                channel.close();
                return;
            }
            // 转成可读状态
            buffer.flip();
            // 消息放入pipLine,交给头节点, 头节点开始传递
            pipLine.headContext.fireChannelRead(buffer);

        } catch (IOException e) {
            System.out.println("读取发生异常,广播socket");
            channel.close();
        }
    }

    /**
     * 真正地写出数据,关注写事件后,会触发
     * @param key
     * @throws IOException
     */
    public void doWrite(SelectionKey key) throws IOException{
        ByteBuffer buffer;
        while ((buffer =writeQueue.poll()) != null){
            channel.write(buffer);
        }
        // 回复读取状态
        key.interestOps(SelectionKey.OP_READ);

    }

    /**
     * 写出到队列
     * @param msg
     */
    public void doWriteQueue(ByteBuffer msg){
        writeQueue.add(msg);
    }

    /**
     * 从最后一个节点进行写出,写出到头节点是调用doWriteQueue
     * @param msg
     */
    public void write(Object msg){
        this.pipLine.tailContext.write(msg);
    }

    /**
     * 从最后一个节点进行flush,写出到头节点时调用doFlush
     */
    public void flush(){
        this.pipLine.tailContext.flush();
    }

    /**
     * 关注写事件,才能进行真正地写出
     */
    public void doFlush(){
        this.channel.keyFor(eventLoop.selector).interestOps(SelectionKey.OP_WRITE);
    }

}

Handler 和 HandlerContext

handler 接口定义了可以扩展处理的消息,由开发人员实现具体的处理

handlerContext 类封装了handler的实现类,将handler的上一个节点和下一个节点,让消息可以延者链表传递

public interface Handler {

    /**
     * 读取数据处理
     * @param ctx
     * @param msg
     */
    void channelRead(HandlerContext ctx,Object msg);

    /**
     * 写出数据
     * @param ctx
     * @param msg
     */
    void write(HandlerContext ctx,Object msg);

    /**
     * 刷下数据
     * @param ctx
     */
    void flush(HandlerContext ctx);
}
public class HandlerContext {

    private Handler handler;

    MyChannel channel;

    HandlerContext prev;

    HandlerContext next;

    public HandlerContext(Handler handler, MyChannel channel) {
        this.handler = handler;
        this.channel = channel;
    }

    /**
     * 读消息的传递,从头节点开始往后传
     * @param msg
     */
    public void fireChannelRead(Object msg){
        HandlerContext next = this.next;
        if (next != null){
            next.handler.channelRead(next,msg);
        }
    }

    /**
     * 从尾节点开始往前传
     * @param msg
     */
    public void write(Object msg){
        HandlerContext prev = this.prev;
        if (prev != null){
            prev.handler.write(prev,msg);
        }
    }

    /**
     * 从尾节点开始往前传
     */
    public void flush(){
        HandlerContext prev = this.prev;
        if (prev != null){
            prev.handler.flush(prev);
        }
    }
}

Pipline

本质是链表,包含了头尾节点的HandlerContext,提供方法给开发人员加节点

public class PipLine {

    private MyChannel channel;

    private EventLoop2 eventLoop;

    public HandlerContext headContext;

    public HandlerContext tailContext;

    public PipLine(MyChannel channel, EventLoop2 eventLoop) {
        this.channel = channel;
        this.eventLoop = eventLoop;
        PipHandler headHandler = new PipHandler();
        this.headContext = new HandlerContext(headHandler,channel);
        PipHandler tailHandler = new PipHandler();
        this.tailContext = new HandlerContext(tailHandler,channel);
        // 构建链表
        this.headContext.next = this.tailContext;
        this.tailContext.prev = this.headContext;
    }

    public void addLast(Handler handler){
        HandlerContext curr = new HandlerContext(handler, channel);

        // 连接在倒数第二个后面
        HandlerContext lastButOne = this.tailContext.prev;
        lastButOne.next = curr;
        curr.prev = lastButOne;

        // 连接在最后一个前面
        curr.next = tailContext;
        tailContext.prev = curr;

    }

    public static class PipHandler implements Handler{

        @Override
        public void channelRead(HandlerContext ctx, Object msg) {
            System.out.println("接收"+(String) msg +"进行资源释放");
        }

        @Override
        public void write(HandlerContext ctx, Object msg) {
            System.out.println("写出"+msg.toString());
        }

        @Override
        public void flush(HandlerContext ctx) {
            System.out.println("flush");
        }
    }
}

标签:Netty,Java,NIO,void,key,msg,HandlerContext,public,channel
From: https://www.cnblogs.com/gg12138/p/18111254

相关文章

  • java,postgresql,python中各种数据类型的占位长度,取值范围
    Java数据类型Java中的数据类型分为两类:基本数据类型和引用数据类型。基本数据类型数据类型占位长度取值范围byte1字节-128127short2字节-3276832767int4字节-21474836482147483647long8字节-92233720368547758089223372036854775807float4字节1.4E-453.4028235E38double8字节4.......
  • Mybatis 中 javaType 和 jdbcType 对应关系
      JDBCType             JavaType     CHAR                  String  VARCHAR              String  LONGVARCHAR                Str......
  • [附源码]JAVA计算机毕业设计电子市场计算机配件报价系统(源码+开题)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着信息技术的快速发展和普及,计算机作为现代人日常生活和工作中的重要工具,其配件市场的需求日益增长。电子市场作为连接供应商与消费者的桥梁,在推动......
  • [附源码]JAVA计算机毕业设计电子商城购物系统(源码+开题)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着互联网技术的迅猛发展和普及,网络购物已成为现代人生活中不可或缺的一部分。电子商城购物系统作为网络购物的重要载体,为企业提供了一个全新的销售......
  • 【附源码】计算机毕业设计音乐豆瓣(java+springboot+mysql+mybatis+论文)
    本系统(程序+源码)带文档lw万字以上  文末可领取本课题的JAVA源码参考系统程序文件列表系统的选题背景和意义音乐豆瓣是一个以音乐为主题的社交网站,用户可以在网站上分享自己喜欢的音乐、评论和推荐音乐作品,还可以与其他用户进行交流和互动。音乐豆瓣的目的是为了让更多的......
  • 【附源码】计算机毕业设计玉龙湾小区网站(java+springboot+mysql+mybatis+论文)
    本系统(程序+源码)带文档lw万字以上  文末可领取本课题的JAVA源码参考系统程序文件列表系统的选题背景和意义玉龙湾小区作为一个大型的综合性社区,拥有众多的住户和商铺。为了更好地满足社区居民的需求,提高社区管理的效率和质量,建立一个专门的网站是非常必要的。这个网站可......
  • Java版企业电子招标采购系统源码——鸿鹄电子招投标系统的技术特点
      在数字化时代,采购管理也正经历着前所未有的变革。全过程数字化采购管理成为了企业追求高效、透明和规范的关键。该系统通过SpringCloud、SpringBoot2、Mybatis等先进技术,打造了从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通过待办消息、招标公告......
  • 【Java代码审计】SSRF篇
    【Java代码审计】SSRF篇1.SSRF漏洞2.Java-SSRF漏洞常见接口3.SSRF漏洞演示URLConnectionURLConnection绕过4.SSRF修复白名单方式过滤方式通用预防SSRF方法1.SSRF漏洞SSRF是Server-SideRequestForge的英文首字母缩写,中文意思是服务器端请求伪造。Web应用程序......
  • 【javaWeb & 功能介绍第一篇】阿里云OSS文件上传
    文件上传文件上传存储文件本地存储云服务阿里云文件上传文件上传是将本地的图片,视频,音频等文件上传到服务器,供其他用户浏览或下载的过程文件上传在项目中应用十分广泛,我们经常发微博,发微信都用到了文件上传的功能在前端的开发之中,如果需要文件上传功能,则必须在......
  • 基于java的ssm电影后台管理系统
    MVC设计模式可以将这些对象、显示、控制分离以提高软件的的灵活性和复用性,MVC结构可以使程序具有对象化的特征,也更容易维护。在需要改进和个性化定制界面及用户交互的同时,不需要重新编写业务逻辑,达到减少编码的时间,提高代码复用性 系统思维导图系统环境和技术使用编译器:In......