首页 > 编程语言 >javaNIO创建tcp服务器时的重要点

javaNIO创建tcp服务器时的重要点

时间:2023-05-05 18:44:40浏览次数:61  
标签:javaNIO java buffer tcp selector int ByteBuffer 服务器时 channel

在使用NIO创建非阻塞tcp服务器时,几个容易出现问题的点,如下代码注释所示:

package net.yury.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author yury757
 * 非阻塞网络IO的优点在于:
 * 1、write和read操作不会阻塞线程,若网络双方同时有IO需求,则可以进行IO读写,否则读写会返回的长度是0
 * 2、在非阻塞IO中,若遇到一个很大的IO操作占用很长时间,会导致影响其他客户端的IO,非阻塞IO可以根据需要将一次大的IO根据bytebuffer大小拆分为多次小的IO,这样不影响其他客户端。所以bytebuffer在NIO中极为重要
 *      需要分多次IO是因为操作系统中也有缓冲区,不可能说想分配多大的bytebuffer,就可以一次性读取或写入这么大的bytebuffer
 */
public class NIOServer {
    public final static int BUFFER_SIZE = 128;
    private int port;
    private AtomicInteger count = new AtomicInteger(0);

    public NIOServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws IOException {
        NIOServer nioServer = new NIOServer(8080);
        nioServer.start();
    }

    public void start() throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress("0.0.0.0", port));

        ssc.register(selector, SelectionKey.OP_ACCEPT, null);
        System.out.println("server start!");

        while (true) {
            // 阻塞selector,当有事件发生时才会唤醒
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                // 注意:selector不会自动对已经处理完的selectionKey做删除操作,因此这里一定要移除这个selectionKey,不然下次又会对这个selectionKey做处理
                iterator.remove();
                if (next.isAcceptable()) {
                    System.out.println("accept event happen, selectionKey: " + next);
                    ServerSocketChannel channel = (ServerSocketChannel) next.channel();
                    SocketChannel sc = channel.accept();
                    System.out.println("new connection: " + sc);
                    sc.configureBlocking(false);
                    sc.register(selector, SelectionKey.OP_READ, null);
                } else if (next.isReadable()) {
                    System.out.println("read event happen, selectionKey: " + next);
                    SocketChannel channel = (SocketChannel) next.channel();
                    // 注意:无论如何一定要对selectionKey做处理,要么read,要么cancel
                    // 客户端没有调用close,程序直接退出时,会报异常java.io.IOException: 远程主机强迫关闭了一个现有的连接。
                    // 客户端正常调用close时,会触发一个读事件,并且read返回-1
                    try {
                        channelReadLengthProtocol(channel, selector, next);
                    } catch (IOException ex) {
                        ex.printStackTrace();
                        // 注意:当报错时,认为客户端异常断开了,因此要将这个selectionKey取消掉
                        next.cancel();
                    }
                } else if (next.isWritable()) {
                    System.out.println("write event happen, selectionKey: " + next);
                    SocketChannel channel = (SocketChannel)next.channel();
                    channelWrite(channel, selector, next);
                }
            }
        }
    }

    public void channelRead(SocketChannel channel, Selector selector, SelectionKey key) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(2);
        StringBuilder sb = new StringBuilder();
        // 注意:这里需要处理半包和粘包问题
        // 半包:一个数据包被拆分到多个buffer中,读取时没有将这些buffer组合在一起或组合的方式错误导致数据异常。
        // 解决方法1:使用attachment边扩容边多次读取,或在attachment中使用bytebuffer数组,可以避免bytebuffer拷贝;解决方法2:使用自定义协议,如channelReadLengthProtocol
        // 粘包:一个buffer中包含多个数据包,多个数据包混淆在一起,无法辨别哪个是哪个
        // 下面这个代码没有解决半包问题,在遇到中文时肯定会有乱码
        while (true) {
            int read = channel.read(buffer);
            if (read == -1) {
                // 注意:-1表示客户端正常断开,因此这里要将这个selectionKey取消掉
                key.cancel();
                return;
            } else if (read == 0) {
                /**
                 * 注意:返回值为0有三种情况:
                 * 1、由于操作系统缓冲区满了等其他原因,socketChannel中读到数据
                 * 2、byteBuffer的 position = limit
                 * 3、客户端发送数据完毕
                 */
                break;
            } else {
                // 注意:需要对buffer进行读操作时,一定要flip
                buffer.flip();
                String s = StandardCharsets.UTF_8.decode(buffer).toString();
                sb.append(s);
                // 注意:要重新对buffer进行写操作时,一定要clear,或者compact
                buffer.clear();
            }
        }
        String s = sb.toString();
        System.out.println("client send: " + s);
        channel.register(selector, ~SelectionKey.OP_READ & SelectionKey.OP_WRITE, s);
    }

    public void channelWrite(SocketChannel channel, Selector selector, SelectionKey key) throws IOException {
        String s = (String)key.attachment();
        ByteBuffer buffer = StandardCharsets.UTF_8.encode("你好, count: " + count.getAndIncrement() + ", your content length is " + s.length());
        // 注意:StandardCharsets.UTF_8.encode和ByteBuffer.wrap方法返回的buffer都是已经切到了读模式,不需要在flip,再flip一次反而把所有数据都弄丢了
        while (buffer.hasRemaining()) {
            channel.write(buffer);
        }
        channel.register(selector, ~SelectionKey.OP_WRITE & SelectionKey.OP_READ);
    }

    /**
     * 这是一个基于长度协议,并使用attachment分批读取的方式处理半包问题的解决方案
     * 主要解决办法就是在传输内容前加一个4字节数字表示剩余内容体的byte长度,然后服务端分批读取直到读取到对应长度的byte即可。
     * 客户端也需要按照这种协议去发送内容,否则可能会产生问题
     * @param channel
     * @param selector
     * @param key
     * @throws IOException
     */
    public void channelReadLengthProtocol(SocketChannel channel, Selector selector, SelectionKey key) throws IOException {
        Object attachment = key.attachment();
        AttachByteBuffer attachByteBuffer;
        if (attachment == null) {
            // 获取长度
            ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
            int size = 0;
            do {
                int tmp = channel.read(sizeBuffer);
                if (tmp == -1) {
                    key.cancel();
                    return;
                } else {
                    size += tmp;
                }
            } while (size != 4);
            // 获取内容的长度
            sizeBuffer.flip();
            int length = buffer2Int(sizeBuffer, 0);
            if (length <= 0) {
                System.out.println("unsupported content length: " + length);
                key.cancel();
                return;
            }

            // 初始化
            attachByteBuffer = new AttachByteBuffer(length, ByteBuffer.allocate(BUFFER_SIZE));
            key.attach(attachByteBuffer);
        }else {
            attachByteBuffer = (AttachByteBuffer)attachment;
        }

        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        int read = channel.read(buffer);
        System.out.println("read size: " + read);
        if (read == -1) {
            key.cancel();
            return;
        }
        attachByteBuffer.put(buffer);

        // 如果已经读完了,则取消读事件,注册写事件
        if (attachByteBuffer.getCurrentSize() == attachByteBuffer.getRequireSize()) {
            ByteBuffer completeBuffer = attachByteBuffer.getBuffer();
            completeBuffer.flip();
            String s = StandardCharsets.UTF_8.decode(completeBuffer).toString();
            System.out.println("client send length: " + s.length());
            channel.register(selector, ~SelectionKey.OP_READ & SelectionKey.OP_WRITE, s);
        }
    }

    /**
     * 读取bytebuffer中的4个字节,转int
     * 小端字节序
     * @param buffer
     * @param start
     * @return
     */
    public int buffer2Int(ByteBuffer buffer, int start) {
        int res = 0;
        for (int i = 0; i < 4; i++) {
            res += ((buffer.get(start + i) & 0xff) << (i * 8));
        }
        return res;
    }
}

class AttachByteBuffer {
    private int requireSize;
    private int currentSize;
    private ByteBuffer buffer;

    public AttachByteBuffer(int requireSize, ByteBuffer buffer) {
        this.requireSize = requireSize;
        this.currentSize = 0;
        this.buffer = buffer;
    }

    public int getRequireSize() {
        return requireSize;
    }

    public void setRequireSize(int requireSize) {
        this.requireSize = requireSize;
    }

    public ByteBuffer getBuffer() {
        return buffer;
    }

    public void setBuffer(ByteBuffer buffer) {
        this.buffer = buffer;
    }

    public int getCurrentSize() {
        return currentSize;
    }

    public void setCurrentSize(int currentSize) {
        this.currentSize = currentSize;
    }

    public synchronized void put(ByteBuffer newBuffer) {
        newBuffer.flip();
        int size = newBuffer.limit() - newBuffer.position();
        // 如果buffer不够存储,则扩容
        if (buffer.limit() - buffer.position() < size) {
            ByteBuffer buffer2 = ByteBuffer.allocate(buffer.limit() + NIOServer.BUFFER_SIZE * 2);
            buffer.flip();
            buffer2.put(buffer);
            // 把旧的buffer替换了,旧的buffer已经没用了
            buffer = buffer2;
        }
        // 将新读取的数据写入到attachment中的buffer中
        this.currentSize += size;
        buffer.put(newBuffer);
    }
}

简易客户端如下:

package net.yury.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

/**
 * @author yury757
 */
public class NIOClient {
    public static void main(String[] args) throws IOException {
        client2();
    }

    public static void client1() throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        StringBuilder str = new StringBuilder();
        for (int i = 0; i < 500; i++) {
            str.append("你");
        }
        byte[] content = str.toString().getBytes(StandardCharsets.UTF_8);
        ByteBuffer buffer = ByteBuffer.wrap(content);
        while (buffer.hasRemaining()) {
            sc.write(buffer);
        }

        StringBuilder sb = new StringBuilder();
        buffer = ByteBuffer.allocate(2);
        while (sc.read(buffer) > 0) {
            buffer.flip();
            sb.append(StandardCharsets.UTF_8.decode(buffer));
            buffer.clear();
        }
        System.out.println("server response is: " + sb.toString());
        sc.close();
    }

    public static void client2() throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        StringBuilder str = new StringBuilder();
        for (int i = 0; i < 50000; i++) {
            str.append("你");
        }
        byte[] content = str.toString().getBytes(StandardCharsets.UTF_8);
        int size = content.length;
        ByteBuffer buffer = ByteBuffer.allocate(size + 4);
        byte[] sizeByteArray = int2Byte(size);
        // 先写长度,再写内容
        buffer.put(sizeByteArray);
        buffer.put(content);

        buffer.flip();
        while (buffer.hasRemaining()) {
            int temp = sc.write(buffer);
            System.out.println("send size: " + temp);
        }

        // 接收端一样处理,省略
        StringBuilder sb = new StringBuilder();
        buffer = ByteBuffer.allocate(2);
        while (sc.read(buffer) > 0) {
            buffer.flip();
            sb.append(StandardCharsets.UTF_8.decode(buffer));
            buffer.clear();
        }
        System.out.println("server response is: " + sb.toString());
        sc.close();
    }

    /**
     * int转byte数组
     * 小段字节序
     * @param number
     * @return
     */
    public static byte[] int2Byte(int number) {
        byte[] res = new byte[4];
        for (int i = 0; i < 4; i++) {
            res[i] = (byte)(number & 0xff);
            number = number >> 8;
        }
        return res;
    }
}

Selector.select() 方法在以下情况不阻塞:

  • 事件发生时:accept、read、write、客户端正常/异常关闭

  • linux下,NIO的bug发生时,不阻塞;java在linux下的Selector实现有一个bug。

  • 调用Selector.wakeup()时

  • 调用Selector.close()时

  • Selector所在的线程调用了interrupt时

标签:javaNIO,java,buffer,tcp,selector,int,ByteBuffer,服务器时,channel
From: https://www.cnblogs.com/yury757/p/17375076.html

相关文章

  • 【C# TCP】
    https://www.cnblogs.com/yilezhu/p/12045018.html  https://blog.51cto.com/u_11990719/3112576 https://blog.csdn.net/lgj123xj/article/details/129868074https://blog.51cto.com/u_11990719/3112576 传输层TCP这里的TCP指的是传输层TCP,双方约定好协议内容,通过Soc......
  • 练习——简单的TcpCS了解基本概念
    packagecom.net;importjava.io.IOException;importjava.io.OutputStream;importjava.net.*;//客户端@SuppressWarnings({"all"})publicclassTCPClient_{publicstaticvoidmain(String[]args){Socketsocket=null;OutputS......
  • 河北稳控科技多通道振弦传感器无线采集仪发送数据到 TCP 服务器及远程修改参数
    河北稳控科技多通道振弦传感器无线采集仪发送数据到TCP服务器及远程修改参数 1、发送数据到TCP服务器参数配置(下列参数位于【参数配置】区域内的【自动模式参数】和【GPRS】面板内)数据发送方式:GPRSTCP数据包协议:字符串1.0TCP相关的其它参数可不进行配置,使用我们......
  • 多通道振弦传感器无线采集仪发送数据到 TCP 服务器及远程修改参数
     多通道振弦传感器无线采集仪发送数据到TCP服务器及远程修改参数1、发送数据到TCP服务器参数配置(下列参数位于【参数配置】区域内的【自动模式参数】和【GPRS】面板内)数据发送方式:GPRSTCP数据包协议:字符串1.0TCP相关的其它参数可不进行配置,使用我们已经为设备......
  • TCP三次握手/四次挥手详解
    2009-11-2611:23TCP三次握手/四次挥手详解(转)TCP(TransmissionControlProtocol)传输控制协议TCP是主机对主机层的传输控制协议,提供可靠的连接服务,采用三次握手确认建立一个连接:位码即tcp标志位,有6种标示:SYN(synchronous建立联机)ACK(acknowledgement确认)PSH......
  • TCP三次握手和四次挥手详解
    在三次握手发送的数据包中有两个ACK值(Acknowledgement),人们习惯一个大写,一个小写来加以区分。其实ACK也好,ack也好,只不过是个代号而已,叫他张三也行,叫他李四也没事,没有任何影响,因为咱们不会改动那个东西。就算是把名字记反了,对咱们也没有任何影响,大家知道三次握手的数据包里有这么......
  • CS144 计算机网络 Lab3:TCP Sender
    前言在Lab2中我们实现了TCPReceiver,负责在收到报文段之后将数据写入重组器中,并回复给发送方确认应答号。在Lab3中,我们将实现TCP连接的另一个端点——发送方,负责读取ByteStream(由发送方上层应用程序创建并写入数据),并将字节流转换为报文段发送给接收方。代码实现TCPSe......
  • 30张图说清楚 TCP 协议
    大家好,我是风筝前两天分享了20张图说清楚IP协议今天,继续来网管的自我修养之TCP协议,这可是除IP协议外另一个核心协议了。TCP协议是网络传输中至关重要的一个协议,它位于传输层。向上支持FTP、TELNET、SMTP、DNS、HTTP等常见的应用层协议,向下要与网络层的IP协议相互配合,实现......
  • Redis WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/s
    RedisWARNING:TheTCPbacklogsettingof511cannotbeenforcedbecause/proc/sys/net/core/somaxconnissettothelowervalueof128. 内核参数默认128,对于负载很大的服务是不够的。改为2048或者更大echo2048> /proc/sys/net/core/somaxconn  系统重启后失效v......
  • Install Tcpping on Linux
    Tcpping 介绍 测试网络延迟最常用方法是使用ping工具,它使用ICMP协定。在某些情况下ICMP是被防火墙阻挡,这使得Ping在这情况下是无法使用的。此时为了能够继续监控的话,就必需使用TCP/UDP的方式,TCPPING为更容易绕过普通的防火墙规则的第3层测试工具。这样的一个第3层的测试工具TC......