目录
Java 三种IO实现一个简单聊天室
在 Java 平台, IO有三种模型.
以 TCP 套接字为例三种不同模型实现简单聊天室服务端和客户端.
启动服务端后, 启动多个客户端, 相互之间可以发消息.
同步阻塞IO
因为同步阻塞IO不能同时做到 accept
客户端连接和从客户端连接 read
数据, 必须使用额外的线程.
主线程负责 accept
客户端连接, 建立连接后再新建线程运行读取.
public static void syncIoChatServer() throws IOException {
var serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8080));
// 维护所有客户端连接
var clientSockets = new ArrayList<Socket>();
var welcome = "Welcome to chat room!\n".getBytes();
while (!Thread.interrupted()) {
// 等待客户端连接
var client = serverSocket.accept();
// 连接后加入列表
clientSockets.add(client);
// 直接启动新线程进行与客户端的读写
new Thread(
() -> {
try {
client.getOutputStream().write(welcome);
// 读缓存, 如果输入太多会截断
// 可以设计成按行读取也可以设计一套协议
var buffer = new byte[1024];
var read = client.getInputStream().read(buffer);
while (read > 0) {
// 读入的消息格式化处理后发到其它客户端
var message =
DateTimeFormatter.ISO_LOCAL_TIME.format(
LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS))
+ " "
+ client.getRemoteSocketAddress()
+ "\n\t"
+ new String(buffer, 0, read);
for (var iter = clientSockets.iterator(); iter.hasNext(); ) {
var clientSocket = iter.next();
if (clientSocket.isClosed()) {
iter.remove();
continue;
}
if (clientSocket != client) {
clientSocket.getOutputStream().write(message.getBytes());
}
}
read = client.getInputStream().read(buffer);
}
} catch (IOException ignore) {
}
}).start();
}
}
客户端需要读取服务端发送的消息, 同时从命令行读取要发送的消息, 必须使用额外的线程.
private static void syncIoClient() throws IOException {
var socket = new Socket();
socket.connect(new InetSocketAddress("localhost", 8080));
new Thread(
() -> {
try {
var scanner = new Scanner(System.in);
while (!Thread.interrupted()) {
var input = scanner.nextLine();
if (input.equals("exit")) {
socket.close();
return;
}
socket.getOutputStream().write(input.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
})
.start();
while (!Thread.interrupted()) {
var inputStream = socket.getInputStream();
var buffer = new byte[1024];
var read = inputStream.read(buffer);
while (read > 0) {
System.out.println(new String(buffer, 0, read));
read = inputStream.read(buffer);
}
}
}
同步非阻塞IO
自1.5开始, Java 提供了 Non-blocking IO, 可以以非阻塞的方式操作IO.
在 NIO 中, accept
和 read
是非阻塞的, 调用时 IO 设备未就绪时不会阻塞, 而是直接返回.
同时, NIO还提供了 Selector
, Selector
是阻塞的, 但可以批量注册 Channel
, 当 Channel
注册的操作就绪时, 就返回就绪的数量.
非阻塞IO可以使用单线程完成所有任务.
public static void syncNioChatServer() throws IOException {
var serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
var selector = Selector.open();
// 注册accept
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 保存所有客户端连接
var clientChannels = new ArrayList<SocketChannel>();
// 共享buffer, 因为是单线程, 可以直接使用一个buffer不用担心并发问题
var buffer = ByteBuffer.allocate(1024);
var welcome = "Welcome to chat room!\n".getBytes();
while (!Thread.interrupted()) {
try {
// 当有注册在Selector上的Channel有事件发生时
if (selector.select() > 0) {
var iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
var selectedKey = iterator.next();
if (selectedKey.isAcceptable()) {
// channel() 返回发生对应事件的Channel
var serverChannel = (ServerSocketChannel) selectedKey.channel();
var clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannels.add(clientChannel);
clientChannel.register(selector, SelectionKey.OP_READ);
buffer.clear();
buffer.put(welcome);
buffer.flip();
clientChannel.write(buffer);
} else if (selectedKey.isReadable()) {
var socketChannel = (SocketChannel) selectedKey.channel();
buffer.clear();
var read = socketChannel.read(buffer);
if (read > 0) {
buffer.flip();
var message =
DateTimeFormatter.ISO_LOCAL_TIME.format(
LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS))
+ " "
+ socketChannel.getRemoteAddress()
+ "\n\t"
+ read(buffer);
for (var iter = clientChannels.iterator(); iter.hasNext(); ) {
var clientChannel = iter.next();
if (!clientChannel.isConnected()) {
iter.remove();
continue;
}
if (clientChannel != socketChannel) {
buffer.clear();
buffer.put(message.getBytes());
buffer.flip();
clientChannel.write(buffer);
}
}
}
}
iterator.remove();
}
}
} catch (Exception e) {
}
}
}
private static String read(ByteBuffer buffer) {
var limit = buffer.limit();
var read = new byte[limit];
buffer.get(read);
return new String(read);
}
private static void syncNioClient() throws IOException {
var socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
// 先connect之后再配置非阻塞, 否则还要轮询是否已经建立连接
socketChannel.configureBlocking(false);
var buffer = ByteBuffer.allocate(1024);
while (!Thread.interrupted()) {
buffer.clear();
if (socketChannel.read(buffer) > 0) {
buffer.flip();
var message = new String(buffer.array(), 0, buffer.limit());
System.out.println(message);
}
// 这样读取不会阻塞
if (System.in.available() > 0) {
var input = new byte[System.in.available()];
System.in.read(input);
if (new String(input).trim().equals("exit")) {
socketChannel.close();
return;
}
buffer.clear();
buffer.put(input);
buffer.flip();
socketChannel.write(buffer);
}
}
}
异步IO
异步IO与其他模式有很大不同, 异步IO必须分配一个线程池, 在 IO 就绪时, JVM内部的线程会调用线程池执行回调.
public static void asyncNioChatServer() throws IOException, InterruptedException {
var asynchronousChannelGroup =
AsynchronousChannelGroup.withThreadPool(Executors.newCachedThreadPool());
var asynchronousServerSocketChannel =
AsynchronousChannelProvider.provider()
.openAsynchronousServerSocketChannel(asynchronousChannelGroup);
asynchronousServerSocketChannel.bind(new InetSocketAddress(8080));
var asynchronousSocketChannels = new ArrayList<AsynchronousSocketChannel>();
var welcome = "Welcome to chat room!\n".getBytes();
asynchronousServerSocketChannel.accept(
ByteBuffer.allocate(1024),
new CompletionHandler<>() {
@Override
public void completed(AsynchronousSocketChannel currentChannel, ByteBuffer buffer) {
// accept下一次连接, 并为每个连接注册分配buffer
asynchronousServerSocketChannel.accept(ByteBuffer.allocate(1024), this);
asynchronousSocketChannels.add(currentChannel);
buffer.clear();
buffer.put(welcome);
buffer.flip();
currentChannel.write(
buffer,
buffer,
new CompletionHandler<>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.clear();
currentChannel.read(
buffer,
buffer,
new CompletionHandler<>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
try {
if (result > 0) {
buffer.flip();
var message =
DateTimeFormatter.ISO_LOCAL_TIME.format(
LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS))
+ " "
+ currentChannel.getRemoteAddress()
+ "\n\t"
+ read(buffer);
// 读取的数据发送给其他客户端
for (var iter = asynchronousSocketChannels.iterator();
iter.hasNext(); ) {
var asynchronousSocketChannel = iter.next();
if (!asynchronousSocketChannel.isOpen()) {
iter.remove();
continue;
}
if (asynchronousSocketChannel != currentChannel) {
asynchronousSocketChannel.write(
ByteBuffer.wrap(message.getBytes()));
}
}
buffer.clear();
// 接着从channel读取数据
currentChannel.read(buffer, buffer, this);
}
} catch (Exception ignore) {
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {}
});
asynchronousChannelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
private static void asyncIoClient() throws Exception {
var asynchronousSocketChannel =
AsynchronousChannelProvider.provider()
.openAsynchronousSocketChannel(
AsynchronousChannelGroup.withFixedThreadPool(1, Executors.defaultThreadFactory()));
// 通过get阻塞, 否则还要嵌套一层回调
asynchronousSocketChannel.connect(new InetSocketAddress("localhost", 8080)).get();
var readBuffer = ByteBuffer.allocate(1024);
asynchronousSocketChannel.read(
readBuffer,
readBuffer,
new CompletionHandler<>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
var message = read(readBuffer);
System.out.println(message);
attachment.clear();
asynchronousSocketChannel.read(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {}
});
var scanner = new Scanner(System.in);
var writeBuffer = ByteBuffer.allocate(1024);
while (!Thread.interrupted()) {
if (scanner.hasNext()) {
var input = scanner.nextLine();
if (input.equals("exit")) {
asynchronousSocketChannel.close();
return;
}
writeBuffer.clear();
writeBuffer.put(input.getBytes());
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer);
}
}
}
标签:聊天室,Java,ByteBuffer,buffer,read,IO,var,new
From: https://www.cnblogs.com/wymc/p/17679403.html