首页 > 编程语言 >Java 三种IO实现一个简单聊天室

Java 三种IO实现一个简单聊天室

时间:2023-09-05 14:12:40浏览次数:26  
标签:聊天室 Java ByteBuffer buffer read IO var new

目录

Java 三种IO实现一个简单聊天室

在 Java 平台, IO有三种模型.

以 TCP 套接字为例三种不同模型实现简单聊天室服务端和客户端.

启动服务端后, 启动多个客户端, 相互之间可以发消息.

同步阻塞IO

因为同步阻塞IO不能同时做到 accept 客户端连接和从客户端连接 read 数据, 必须使用额外的线程.

主线程负责 accept 客户端连接, 建立连接后再新建线程运行读取.

public static void syncIoChatServer() throws IOException {
  var serverSocket = new ServerSocket();
  serverSocket.bind(new InetSocketAddress(8080));
  // 维护所有客户端连接
  var clientSockets = new ArrayList<Socket>();
  var welcome = "Welcome to chat room!\n".getBytes();
  while (!Thread.interrupted()) {
    // 等待客户端连接
    var client = serverSocket.accept();
    // 连接后加入列表
    clientSockets.add(client);
    // 直接启动新线程进行与客户端的读写
    new Thread(
        () -> {
          try {
            client.getOutputStream().write(welcome);
            // 读缓存, 如果输入太多会截断
            // 可以设计成按行读取也可以设计一套协议
            var buffer = new byte[1024];
            var read = client.getInputStream().read(buffer);
            while (read > 0) {
              // 读入的消息格式化处理后发到其它客户端
              var message =
                  DateTimeFormatter.ISO_LOCAL_TIME.format(
                      LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS))
                  + " "
                  + client.getRemoteSocketAddress()
                  + "\n\t"
                  + new String(buffer, 0, read);
              for (var iter = clientSockets.iterator(); iter.hasNext(); ) {
                var clientSocket = iter.next();
                if (clientSocket.isClosed()) {
                  iter.remove();
                  continue;
                }
                if (clientSocket != client) {
                  clientSocket.getOutputStream().write(message.getBytes());
                }
              }
              read = client.getInputStream().read(buffer);
            }
          } catch (IOException ignore) {
          }
        }).start();
  }
}

客户端需要读取服务端发送的消息, 同时从命令行读取要发送的消息, 必须使用额外的线程.

private static void syncIoClient() throws IOException {
  var socket = new Socket();
  socket.connect(new InetSocketAddress("localhost", 8080));
  new Thread(
      () -> {
        try {
          var scanner = new Scanner(System.in);
          while (!Thread.interrupted()) {
            var input = scanner.nextLine();
            if (input.equals("exit")) {
              socket.close();
              return;
            }
            socket.getOutputStream().write(input.getBytes());
          }
        } catch (IOException e) {
          e.printStackTrace();
        }
      })
      .start();
  while (!Thread.interrupted()) {
    var inputStream = socket.getInputStream();
    var buffer = new byte[1024];
    var read = inputStream.read(buffer);
    while (read > 0) {
      System.out.println(new String(buffer, 0, read));
      read = inputStream.read(buffer);
    }
  }
}

同步非阻塞IO

自1.5开始, Java 提供了 Non-blocking IO, 可以以非阻塞的方式操作IO.

在 NIO 中, acceptread 是非阻塞的, 调用时 IO 设备未就绪时不会阻塞, 而是直接返回.

同时, NIO还提供了 Selector, Selector是阻塞的, 但可以批量注册 Channel , 当 Channel 注册的操作就绪时, 就返回就绪的数量.

非阻塞IO可以使用单线程完成所有任务.

public static void syncNioChatServer() throws IOException {
  var serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8080));
  serverSocketChannel.configureBlocking(false);
  var selector = Selector.open();
  // 注册accept
  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  // 保存所有客户端连接
  var clientChannels = new ArrayList<SocketChannel>();
  // 共享buffer, 因为是单线程, 可以直接使用一个buffer不用担心并发问题
  var buffer = ByteBuffer.allocate(1024);
  var welcome = "Welcome to chat room!\n".getBytes();
  while (!Thread.interrupted()) {
    try {
      // 当有注册在Selector上的Channel有事件发生时
      if (selector.select() > 0) {
        var iterator = selector.selectedKeys().iterator();
        while (iterator.hasNext()) {
          var selectedKey = iterator.next();
          if (selectedKey.isAcceptable()) {
            // channel() 返回发生对应事件的Channel
            var serverChannel = (ServerSocketChannel) selectedKey.channel();
            var clientChannel = serverChannel.accept();
            clientChannel.configureBlocking(false);
            clientChannels.add(clientChannel);
            clientChannel.register(selector, SelectionKey.OP_READ);
            buffer.clear();
            buffer.put(welcome);
            buffer.flip();
            clientChannel.write(buffer);
          } else if (selectedKey.isReadable()) {
            var socketChannel = (SocketChannel) selectedKey.channel();
            buffer.clear();
            var read = socketChannel.read(buffer);
            if (read > 0) {
              buffer.flip();
              var message =
                  DateTimeFormatter.ISO_LOCAL_TIME.format(
                          LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS))
                      + " "
                      + socketChannel.getRemoteAddress()
                      + "\n\t"
                      + read(buffer);
              for (var iter = clientChannels.iterator(); iter.hasNext(); ) {
                var clientChannel = iter.next();
                if (!clientChannel.isConnected()) {
                  iter.remove();
                  continue;
                }
                if (clientChannel != socketChannel) {
                  buffer.clear();
                  buffer.put(message.getBytes());
                  buffer.flip();
                  clientChannel.write(buffer);
                }
              }
            }
          }
          iterator.remove();
        }
      }
    } catch (Exception e) {
    }
  }
}

private static String read(ByteBuffer buffer) {
  var limit = buffer.limit();
  var read = new byte[limit];
  buffer.get(read);
  return new String(read);
}
private static void syncNioClient() throws IOException {
  var socketChannel = SocketChannel.open();
  socketChannel.connect(new InetSocketAddress("localhost", 8080));
  // 先connect之后再配置非阻塞, 否则还要轮询是否已经建立连接
  socketChannel.configureBlocking(false);
  var buffer = ByteBuffer.allocate(1024);
  while (!Thread.interrupted()) {
    buffer.clear();
    if (socketChannel.read(buffer) > 0) {
      buffer.flip();
      var message = new String(buffer.array(), 0, buffer.limit());
      System.out.println(message);
    }
    // 这样读取不会阻塞
    if (System.in.available() > 0) {
      var input = new byte[System.in.available()];
      System.in.read(input);
      if (new String(input).trim().equals("exit")) {
        socketChannel.close();
        return;
      }
      buffer.clear();
      buffer.put(input);
      buffer.flip();
      socketChannel.write(buffer);
    }
  }
}

异步IO

异步IO与其他模式有很大不同, 异步IO必须分配一个线程池, 在 IO 就绪时, JVM内部的线程会调用线程池执行回调.

public static void asyncNioChatServer() throws IOException, InterruptedException {
  var asynchronousChannelGroup =
      AsynchronousChannelGroup.withThreadPool(Executors.newCachedThreadPool());
  var asynchronousServerSocketChannel =
      AsynchronousChannelProvider.provider()
          .openAsynchronousServerSocketChannel(asynchronousChannelGroup);
  asynchronousServerSocketChannel.bind(new InetSocketAddress(8080));
  var asynchronousSocketChannels = new ArrayList<AsynchronousSocketChannel>();
  var welcome = "Welcome to chat room!\n".getBytes();
  asynchronousServerSocketChannel.accept(
      ByteBuffer.allocate(1024),
      new CompletionHandler<>() {
        @Override
        public void completed(AsynchronousSocketChannel currentChannel, ByteBuffer buffer) {
          // accept下一次连接, 并为每个连接注册分配buffer
          asynchronousServerSocketChannel.accept(ByteBuffer.allocate(1024), this);
          asynchronousSocketChannels.add(currentChannel);
          buffer.clear();
          buffer.put(welcome);
          buffer.flip();
          currentChannel.write(
              buffer,
              buffer,
              new CompletionHandler<>() {
                @Override
                public void completed(Integer result, ByteBuffer buffer) {
                  buffer.clear();
                  currentChannel.read(
                      buffer,
                      buffer,
                      new CompletionHandler<>() {
                        @Override
                        public void completed(Integer result, ByteBuffer buffer) {
                          try {
                            if (result > 0) {
                              buffer.flip();
                              var message =
                                  DateTimeFormatter.ISO_LOCAL_TIME.format(
                                          LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS))
                                      + " "
                                      + currentChannel.getRemoteAddress()
                                      + "\n\t"
                                      + read(buffer);
                              // 读取的数据发送给其他客户端
                              for (var iter = asynchronousSocketChannels.iterator();
                                  iter.hasNext(); ) {
                                var asynchronousSocketChannel = iter.next();
                                if (!asynchronousSocketChannel.isOpen()) {
                                  iter.remove();
                                  continue;
                                }
                                if (asynchronousSocketChannel != currentChannel) {
                                  asynchronousSocketChannel.write(
                                      ByteBuffer.wrap(message.getBytes()));
                                }
                              }
                              buffer.clear();
                              // 接着从channel读取数据
                              currentChannel.read(buffer, buffer, this);
                            }
                          } catch (Exception ignore) {
                          }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {}
                      });
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {}
              });
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {}
      });
  asynchronousChannelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
private static void asyncIoClient() throws Exception {
  var asynchronousSocketChannel =
      AsynchronousChannelProvider.provider()
          .openAsynchronousSocketChannel(
              AsynchronousChannelGroup.withFixedThreadPool(1, Executors.defaultThreadFactory()));
  // 通过get阻塞, 否则还要嵌套一层回调
  asynchronousSocketChannel.connect(new InetSocketAddress("localhost", 8080)).get();
  var readBuffer = ByteBuffer.allocate(1024);
  asynchronousSocketChannel.read(
      readBuffer,
      readBuffer,
      new CompletionHandler<>() {
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
          attachment.flip();
          var message = read(readBuffer);
          System.out.println(message);
          attachment.clear();
          asynchronousSocketChannel.read(attachment, attachment, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {}
      });
  var scanner = new Scanner(System.in);
  var writeBuffer = ByteBuffer.allocate(1024);
  while (!Thread.interrupted()) {
    if (scanner.hasNext()) {
      var input = scanner.nextLine();
      if (input.equals("exit")) {
        asynchronousSocketChannel.close();
        return;
      }
      writeBuffer.clear();
      writeBuffer.put(input.getBytes());
      writeBuffer.flip();
      asynchronousSocketChannel.write(writeBuffer);
    }
  }
}

标签:聊天室,Java,ByteBuffer,buffer,read,IO,var,new
From: https://www.cnblogs.com/wymc/p/17679403.html

相关文章

  • The 2022 ICPC Asia Nanjing Regional Contest
    链接:https://codeforces.com/gym/104128A.Stop,YesterdayPleaseNoMore#include"bits/stdc++.h"usingnamespacestd;usingi64=longlong;voidsolve(){intn,m,k;cin>>n>>m>>k;strings;cin>>......
  • 【JAVA基础】基本数据类型
    Java数据类型简介Java语言中的数据类型分为两大类,分别是基本类型和引用类型。程序中需要处理许多数据,对于不同数据都有其对应的数据类型,其实就是在内存中开辟一个存储空间来存放数据,不同数据所开辟的内存大小也会不一样。Java基本数据类型Java基本类型共有八种,基本类型可以分为......
  • The 2022 ICPC Asia Hangzhou Regional Programming Contest
    The2022ICPCAsiaHangzhouRegionalProgrammingContestNoBugNoGame  #include<bits/stdc++.h>usingnamespacestd;#defineendl"\n"#defineintlonglongtypedeflonglongll;constintN=3e3+10;intf[N][N][2];signedm......
  • P5812 [IOI2019] 天桥
    优化建图,首先分几种情况讨论。假设当前的桥\(l,r,h\)。起点和终点是\(S,T\)。第一种情况:\(S\leql<r\leqT\)。容易发现如果要从这条天桥中间上这条天桥,一定经过\(l\)或\(r\),不如直接走上去。所以只用保留\((l,h),(r,h)\)和他们往下的一个其他天桥与该楼的交点,这个交......
  • Java JDK安装 - AdoptOpenJDK(CentOS 7 + AdoptOpenJDK 8)
    Linux系统-部署-运维系列导航 关于JVM、JRE与JDK  1.JVM(JavaVirtualMechinal)Java虚拟机,是整个java实现跨平台的最核心的部分,负责解释执行字节码文件,是可运行java字节码文件的虚拟计算机。当使用Java编译器编译Java程序时,生成的是与平台无关的字节码,这些字节码只......
  • Java JDK安装 - OracleJDK(CentOS 7 + OracleJDK 8u201)
    Linux系统-部署-运维系列导航 关于JVM、JRE与JDK  1.JVM(JavaVirtualMechinal)Java虚拟机,是整个java实现跨平台的最核心的部分,负责解释执行字节码文件,是可运行java字节码文件的虚拟计算机。当使用Java编译器编译Java程序时,生成的是与平台无关的字节码,这些字节码只......
  • nginx location配置规则与经验
    Linux系统-部署-运维系列导航 文档说明nginx使用过程中,配置最多的,最难以理解的,也是最容易出问题的,就是location块级指令,本文旨在将location相关配置规范以及使用经验,搜集汇总,便于需要时查看。特别说明:本文详细内容大部分为网络搜集整理,旨在提供一条学习路线,让我们有条理地阅......
  • 【ToolChains】CLion(VS2019) + CMake + Vcpkg 的使用
    参考博客:https://blog.51cto.com/u_15075510/4201238http://t.csdn.cn/pADDUhttps://zhuanlan.zhihu.com/p/454233496https://blog.csdn.net/weixin_43803955/article/details/123544106Vcpkg概述Vcpkg是微软社区开发的一个跨平台的C++包管理工具。它旨在解决C++......
  • 无涯教程-JavaScript - DVARP函数
    描述DVARP函数通过使用列表或数据库中符合您指定条件的记录的字段(列)中的数字,基于整个总体计算总体的方差。语法DVARP(database,field,criteria)争论Argument描述Required/Optionaldatabase组成列表或数据库的单元格范围。数据库是相关数据的列表,其中相关信息......
  • 无涯教程-JavaScript - DVAR函数
    描述DVAR函数使用与指定条件相匹配的列表或数据库的列中的数字,根据样本估算总体的方差。语法DVAR(database,field,criteria)争论Argument描述Required/Optionaldatabase组成列表或数据库的单元格范围。数据库是相关数据的列表,其中相关信息的行是记录,数据的列......