首页 > 编程语言 >javaNIO多线程worker实现

javaNIO多线程worker实现

时间:2023-05-05 20:33:52浏览次数:38  
标签:javaNIO java worker next 线程 import 多线程 channel

boss线程负责接收连接,worker线程负责处理IO事件。

package net.yury.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 多线程优化NIO服务器
 * 使用一个线程专门负责接收连接,并将连接上的SocketChannel注册到selector上,称为boss线程
 * 使用另外一两个线程专门处理IO事件,称为worker线程
 */
public class MultiThreadNIOServer {
    public final static int BUFFER_SIZE = 128;
    private int port;
    private int workerSize;

    public MultiThreadNIOServer(int port, int workerSize) {
        this.port = port;
        this.workerSize = workerSize;
    }

    public static void main(String[] args) throws IOException {
        MultiThreadNIOServer server = new MultiThreadNIOServer(8080, 2);
        server.start();
    }

    public void start() throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress("0.0.0.0", port));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT, null);
        List<Worker> workerList = new ArrayList<>();
        for (int i = 0; i < workerSize; i++) {
            Worker worker = new Worker("nio-worker-" + i);
            workerList.add(worker);
        }
        int index = 0;
        System.out.println("server start!");

        // boss线程其实就是这个main线程,也可以单独使用一个线程
        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                if (next.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel)next.channel();
                    SocketChannel sc = channel.accept();
                    System.out.println("[boss] new connection, channel: " + sc);
                    sc.configureBlocking(false);
                    Worker worker = workerList.get(index);
                    worker.receive(sc, SelectionKey.OP_READ, null);
                    index++;
                    if (index >= workerList.size()) {
                        index = 0;
                    }
                }
            }
        }
    }

    /**
     * worker线程,一个实例就是一个线程
     */
    static class Worker implements Runnable {
        private Selector workerSelector;
        private Thread thread;
        private String name;

        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

        public Worker(String name) throws IOException {
            this.name = name;
            this.thread = new Thread(this, this.name);
            this.workerSelector = Selector.open();
            this.thread.start();
        }

        public void receive(SocketChannel channel, int ops, Object att) {
            queue.add(() -> {
                try {
                    channel.register(this.workerSelector, ops, att);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            workerSelector.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    workerSelector.select();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                Runnable task = queue.poll();
                if (task != null) {
                    task.run();
                }
                Iterator<SelectionKey> iterator = workerSelector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey next = iterator.next();
                    if (next.isReadable()) {
                        SocketChannel channel = (SocketChannel)next.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(MultiThreadNIOServer.BUFFER_SIZE);
                        int size;
                        try {
                            size = channel.read(buffer);
                        }catch (IOException ex) {
                            ex.printStackTrace();
                            next.cancel();
                            continue;
                        }
                        if (size == -1) {
                            next.cancel();
                            continue;
                        }
                        buffer.flip();
                        String s = StandardCharsets.UTF_8.decode(buffer).toString();
                        System.out.println("[" + this.name + "] client send: " + s);
                        try {
                            channel.register(workerSelector, ~SelectionKey.OP_READ & SelectionKey.OP_WRITE, s);
                        } catch (ClosedChannelException e) {
                            e.printStackTrace();
                            next.cancel();
                        }
                    }else if (next.isWritable()) {
                        SocketChannel channel = (SocketChannel)next.channel();
                        String s = (String)next.attachment();
                        String response = "你好, you content is: " + s;
                        ByteBuffer buffer = ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8));
                        try {
                            while (buffer.hasRemaining()) {
                                channel.write(buffer);
                            }
                            channel.register(workerSelector, ~SelectionKey.OP_WRITE & SelectionKey.OP_READ, null);
                        } catch (IOException e) {
                            e.printStackTrace();
                            next.cancel();
                        }
                    }else {
                        next.cancel();
                    }
                }
            }
        }
    }
}

标签:javaNIO,java,worker,next,线程,import,多线程,channel
From: https://www.cnblogs.com/yury757/p/17375297.html

相关文章

  • javaNIO创建tcp服务器时的重要点
    在使用NIO创建非阻塞tcp服务器时,几个容易出现问题的点,如下代码注释所示:packagenet.yury.nio;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.nio.ByteBuffer;importjava.nio.CharBuffer;importjava.nio.channels.*;importjava.nio.charse......
  • C语言多线程
    线程按照其调度者可以分为用户级线程和核心级线程两种 用户级线程主要解决的是上下文切换的问题,它的调度算法和调度过程全部由用户自行选择决定,在运行时不需要特定的内核支持; 我们常用基本就是用户级线程,所以就只总结一下POSIX提供的用户级线程接口; 基本线程操作相关的函数: 1......
  • Java 网络编程 —— 创建多线程服务器
    一个典型的单线程服务器示例如下:while(true){Socketsocket=null;try{//接收客户连接socket=serverSocket.accept();//从socket中获得输入流与输出流,与客户通信...}catch(IOExceptione){e.printStackTr......
  • 只有含有i/o时多线程才会比单线程快
    importthreadingimporttimeind=3defsingle():  forjinrange(ind):    foriinrange(5000):      w=2*i      #print(w)defs():  foriinrange(5000):    w=2*i    #print(w)defmulti(): ......
  • 多线程解决数据安全问题
      只需要再引发安全问题的部分加lock就行。加锁的话其他进程不能访问的。 ......
  • 多线程在毕设中的应用
    已经对多线程并发有了一个基本的理解。现在要将多线程用于毕设中,多线程的作用是将循环用作并发处理,有一定的速度提速作用。目前需要了解毕设程序在哪方面需要进行多线程修改。python的multiprocessing库去利用多核的机器,threading运行时会释放GIL锁,可以让I/O并行。 ......
  • 多线程对全局变量修改和单线程对全局变量修改
    线程过多了,不会比单线程快多少。因为程序可能花时间在线程的转换上了。importthreadingimporttimeind=4dan_sum=0multi_sum=0defsingle():  forjinrange(ind):    foriinrange(5000):      print(i*2)      globaldan_s......
  • python编写多线程程序并测速
    importthreadingimporttimeind=2defsingle():  forjinrange(ind):    foriinrange(5000):      print(i*2)defs():  foriinrange(5000):    print(i*2)defmulti():  threa=[]  forjinrange(ind): ......
  • 对多线程的一点理解
     电脑是8核的。 ......
  • 从CPU的视角看 多线程代码为什么那么难写!
      当我们提到多线程、并发的时候,我们就会回想起各种诡异的bug,比如各种线程安全问题甚至是应用崩溃,而且这些诡异的bug还很难复现。我们不禁发出了灵魂拷问“为什么代码测试环境运行好好的,一上线就不行了?”。为了解决线程安全的问题,我们的先辈们在编程语言中引入了各种各样新名......