首页 > 其他分享 >Netty(四)NIO多线程优化

Netty(四)NIO多线程优化

时间:2023-11-13 15:11:18浏览次数:35  
标签:Netty NIO register worker boss selector 线程 var 多线程

Netty(四)NIO多线程优化


​ 前面的代码都只有一个选择器,没有充分利用多核CPU,因此可以分两组选择器

  • boss:单线程配一个选择器,专门处理accept事件,不负责数据的读写
  • worker:创建CPU核心数的线程,每个线程配一个选择器,轮流处理read事件
image-20231031160258298

1 多线程问题分析

  • 关键是这一部分的代码,需要保证register在worker线程启动之前执行,因为如果worker线程先执行的话就会导致在selector.select()处堵塞,从而导致使用了这个worker selector的、运行在boss线程的register也会被堵塞无法完成socketChannel的注册,进而worker线程也一直无法关注到读事件从而被一直堵塞

    	worker.register(sc); // 创建selector 启动worker-0线程	监听read事件
    	sc.register(worker.selector, SelectionKey.OP_READ, null); // boss线程,注册连接得到的socketChannel搭到创建的selector上,这行代码先执行的话会堵塞在select() 
    
  • 虽然这么写boss线程先执行不会出现这个问题,但是在第一个客户端连接之后,boss线程处于select()堵塞的状态,此时再有新客户端连接的话,仍然会因为select()方法而导致影响register出现上面的问题

​ 服务器MultiThreadServer

    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("boss");

        var ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        var boss = Selector.open();
        var sscKey = ssc.register(boss, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        var worker = new Worker("worker-0");

        while (true) {
            boss.select();
            var iter = boss.selectedKeys().iterator();
            while (iter.hasNext()) {
                var key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    var channel = (ServerSocketChannel) key.channel();
                    var sc = channel.accept();
                    sc.configureBlocking(false);

                    log.debug("connected {}", sc.getRemoteAddress());
                    log.debug("before register {}", sc.getRemoteAddress());

                    worker.register(); // 创建selector 启动worker-0线程
                    log.debug("{} uses selector", Thread.currentThread().getName());
                    sc.register(worker.selector, SelectionKey.OP_READ, null); // boss线程
                    log.debug("after register {}", sc.getRemoteAddress());

                }
            }
        }

    }

​ Worker selector类

	static class Worker implements Runnable {
        private String name;
        private Thread thread;
        private Selector selector;

        private boolean start = false;

        public Worker(String name) {
            this.name = name;
        }

        public void register() throws IOException {
            if (!this.start) {
                this.thread = new Thread(this, this.name);
                this.selector = Selector.open();
                this.thread.start();
                this.start = true;
            }
        }

        @Override
        public void run() {
            while(true) {
                try {
                    log.debug("{} uses selector", Thread.currentThread().getName());
                    selector.select();
                    var iter = this.selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        var key = iter.next();
                        iter.remove();
                        if(key.isReadable()) {
                            var buffer = ByteBuffer.allocate(16);
                            var sc = (SocketChannel)key.channel();
                            sc.read(buffer);
                            debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

2 netty源码针对该问题的解决思路(使用堵塞队列+selector wakeup)

  • 上面的代码中,在触发accept事件后,得到连接的socketChannel并将其注册到worker selector上,其实还是在boss线程上执行的,因此需要先将这个注册的步骤放到worker启动的线程里面

  • 总结上面的问题,就是在多线程环境下,可能会出现线程执行selector.select()方法被堵塞,导致其他线程无法进行register也就是channel无法注册同一个selector的情况

  • 因此需要将两个操作都放到worker线程,一般会使用堵塞队列完成线程间的通信:

            private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    
            public Worker(String name) {
                this.name = name;
            }
    
            public void register(SocketChannel sc) throws IOException {
                if (!this.start) {
                    this.thread = new Thread(this, this.name);
                    this.selector = Selector.open();
                    this.thread.start();
                    this.start = true;
                }
    
                this.queue.add(() -> {
                    try {
                        sc.register(this.selector, SelectionKey.OP_READ, null); // boss线程
                    } catch (ClosedChannelException e) {
                        throw new RuntimeException(e);
                    }
                });
                
                selector.wakeup();
            }
    
  • 这样,每当出现需要进行新客户端连接(也就是需要需要在selector上注册socketChannel的时候),就会在堵塞队列中添加需要注册socketChannel的任务,然后通过wakeup方法唤醒selector,最后worker线程就会从阻塞队列中取到任务执行注册,避免了worker线程注册的时候select方法堵塞导致boss线程无法注册的情况

3 多worker进行负载均衡优化

  • 类似使用一个worker数组实现一个负载均衡,每次出现一个连接就去轮流使用worker数组的一个worker

  • 代码如下:

        public static void main(String[] args) throws IOException {
            Thread.currentThread().setName("boss");
    
            var ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.bind(new InetSocketAddress(8080));
    
            var boss = Selector.open();
            var sscKey = ssc.register(boss, 0, null);
            sscKey.interestOps(SelectionKey.OP_ACCEPT);
    
    
            var workers = new Worker[3];
            for(var i = 0; i < workers.length; i++) {
                workers[i] = new Worker("worker-" + i);
            }
            AtomicInteger count = new AtomicInteger();
    
            while (true) {
                boss.select();
                var iter = boss.selectedKeys().iterator();
                while (iter.hasNext()) {
                    var key = iter.next();
                    iter.remove();
                    if (key.isAcceptable()) {
                        var channel = (ServerSocketChannel) key.channel();
                        var sc = channel.accept();
                        sc.configureBlocking(false);
    
                        log.debug("connected {}", sc.getRemoteAddress());
                        log.debug("before register {}", sc.getRemoteAddress());
    
                        var index = count.get();
                        workers[index].register(sc); // 创建selector 启动worker-0线程
                        index = (index + 1) % workers.length;
                        count.set(index);
    
                        log.debug("after register {}", sc.getRemoteAddress());
    
                    }
                }
            }
    
        }
    
  • 一般情况下会设置线程数为cpu的核心数:

            log.debug("cpu core nums:{}", Runtime.getRuntime().availableProcessors());
            var workers = new Worker[Runtime.getRuntime().availableProcessors()];
    

    注意:Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,因此会去拿物理cpu的个数,而不是docker被分配的cpu个数

    这个问题直到jdk10才解决,使用jvm参数UseContainerSupport配置可以默认开启获取容器配置

标签:Netty,NIO,register,worker,boss,selector,线程,var,多线程
From: https://www.cnblogs.com/tod4/p/17829208.html

相关文章

  • Netty(三)网络编程
    Netty(三)网络编程1阻塞和非阻塞堵塞:在没有数据可读的时候,包括数据复制的过程,线程必须堵塞等待,不会占用CPU但是线程相当于闲置在单线程下,两个堵塞的方法会相互影响,必须使用多线程,32位JVM一个线程320K,64位JVM一个线程1024K,为了减少线程数,需要采用线程池技术但是即便使用了线......
  • 每个.NET开发都应掌握的C#多线程知识点
    上篇文章讲述了C#特性(Attribute)知识点,本文将介绍多线程的知识点。多线程编程是现代软件开发中的重要组成部分,它能够充分利用多核处理器,提高应用程序的性能和响应性。C#作为.NET开发的主要语言,提供了强大的多线程支持。本文将介绍C#多线程知识点,帮助.NET开发者更好地应对多线程编程......
  • Ubuntu+Minio对象存储+pm2进程管理
    Minio是一个go编写的高性能对象存储服务,它兼容AmazonS3API。无论是静态网站的托管,还是数据存储分析,亦或是数据的备份与恢复等多种场景下,都可以为我们提供解决方案。ubuntu安装Minio对象存储服务:1.打开终端,并执行以下命令下载Minio文件。可以从Minio的官方网站获取最新的下载......
  • 多线程锁
    常见锁介绍synchronized锁的八中情况packagecom.shaonian.juc.more_thread_lock;importjava.util.concurrent.TimeUnit;classPhone{publicstaticsynchronizedvoidsendSMS()throwsException{//停留4秒TimeUnit.SECONDS.sleep(4);......
  • 【.NET】多线程:自动重置事件与手动重置事件的区别
    在多线程编程中,如果每个线程的运行不是完全独立的。那么,一个线程执行到某个时刻需要知道其他线程发生了什么。嗯,这就是所谓线程同步。同步事件对象(XXXEvent)有两种行为:1、等待。线程在此时会暂停运行,等待其他线程发出信号才继续(等你约);2、发出信号。当前线程发出信号,其他正在等待......
  • SQL union all 不去重
    创建表(牛客sql108改)DROPTABLEIFEXISTS`Products`;CREATETABLEIFNOTEXISTS`Products`(prod_nameVARCHAR(255)NOTNULLCOMMENT'产品名称');INSERTINTO`Products`VALUES('a'),('b'),('c');DROPTABLEIFEXISTS`......
  • Redis6.0使用多线程是怎么回事?
    Redis不是说用单线程的吗?怎么6.0成了多线程的?Redis6.0的多线程是用多线程来处理数据的读写和协议解析,但是Redis执行命令还是单线程的。这样做的⽬的是因为Redis的性能瓶颈在于⽹络IO⽽⾮CPU,使⽤多线程能提升IO读写的效率,从⽽整体提⾼Redis的性能。为什么命令执行为什么不采用多线......
  • 如何解决多线程下的共享对象问题?分布式系统又该如何应对?
    嗨,各位小米粉丝们!欢迎来到小米带你飞的微信公众号!今天我们要聊的话题可是程序员们都头疼的大问题哦——多线程情况下的对象共用问题,以及在分布式系统中的应对策略!小米要给大家详细解读一下,让你的技术面试不再被问倒!多线程中,如何解决对象共用问题?首先,我们得先了解多线程带来的挑战。......
  • java怎么实现对指定进行多线程访问的效果
    要使用Java实现对特定网站(例如"http://xkrj5.com")的多线程访问,你可以采用以下步骤:创建一个线程类:这个类将负责执行HTTP请求。使用线程池:这可以更有效地管理多个线程。执行HTTP请求:使用Java的网络库(如 HttpURLConnection 或第三方库如ApacheHttpClient)。下面是......
  • C++11 并发编程基础(一):并发、并行与C++多线程
    C++11标准在标准库中为多线程提供了组件,这意味着使用C++编写与平台无关的多线程程序成为可能,而C++程序的可移植性也得到了有力的保证。另外,并发编程可提高应用的性能,这对对性能锱铢必较的C++程序员来说是值得关注的。1.何为并发并发指的是两个或多个独立的活动在同一时段内发生......