一个简单的服务器分别使用BIO NIO AOI 实现
BIO 阻塞IO 每个链接单独起来你一个线程
点击查看代码
package org.example.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class BIOServer extends Thread {
private ServerSocket serverSocket;
public int getPort() {
return serverSocket.getLocalPort();
}
public void run() {
try {
serverSocket = new ServerSocket(0);
Executor executor = Executors.newFixedThreadPool(8);
while (true) {
Socket socket = serverSocket.accept();
RequestHandler requestHandler = new RequestHandler(socket);
executor.execute(requestHandler);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException {
BIOServer server = new BIOServer();
server.start();
try (Socket client = new Socket(InetAddress.getLocalHost(), server.getPort())) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
bufferedReader.lines().forEach(s -> System.out.println(s));
}
}
}
// 简化实现,不做读取,直接发送字符串
class RequestHandler extends Thread {
private Socket socket;
RequestHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (PrintWriter out = new PrintWriter(socket.getOutputStream());) {
out.println("Hello world!");
out.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
}
BIO 非阻塞IO 多路复用 一个线程可以处理多个连接
点击查看代码
package org.example.nio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class NIOServer extends Thread {
public void run() {
// 创建 Selector 和 Channel
try (Selector selector = Selector.open(); ServerSocketChannel serverSocket = ServerSocketChannel.open();) {
serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
serverSocket.configureBlocking(false);
// 注册到 Selector,并说明关注点
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();// 阻塞等待就绪的 Channel,这是关键点之一
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 生产系统中一般会额外进行就绪状态检查
sayHelloWorld((ServerSocketChannel) key.channel());
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void sayHelloWorld(ServerSocketChannel server) throws IOException {
try (SocketChannel client = server.accept();) {
client.write(Charset.defaultCharset().encode("Hello world!"));
}
}
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.start();
try (Socket client = new Socket(InetAddress.getLocalHost(),8888)) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
bufferedReader.lines().forEach(s -> System.out.println(s));
}
}
}
AIO 异步IO 基于事件和回调触发 一个线程可以服务多个连接
点击查看代码
package org.example.aio;
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
public class AIOServer extends Thread {
private AsynchronousServerSocketChannel server;
public void run() {
String host = "localhost";
int port = 8888;
//ChannelGroup用来管理共享资源
AsynchronousChannelGroup group = null;
try {
group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
server = AsynchronousServerSocketChannel.open(group);
} catch (IOException e) {
throw new RuntimeException(e);
}
//通过setOption配置Socket
try {
server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
server.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
} catch (IOException e) {
throw new RuntimeException(e);
}
//绑定到指定的主机,端口
try {
server.bind(new InetSocketAddress(host, port));
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("Listening on " + host + ":" + port);
//输出provider
System.out.println("Channel Provider : " + server.provider());
//等待连接,并注册CompletionHandler处理内核完成后的操作。
server.accept(null, new AcceptCompletionHandler(server));
//因为AIO不会阻塞调用进程,因此必须在主进程阻塞,才能保持进程存活。
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
AIOServer server = new AIOServer();
server.start();
try (Socket client = new Socket("localhost", 8888)) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
bufferedReader.lines().forEach(s -> System.out.println(s));
}
}
}
class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
//返回参数,传入参数
private AsynchronousServerSocketChannel channel;
public AcceptCompletionHandler(AsynchronousServerSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println("有新客户的连接....");
try {
result.write(Charset.defaultCharset().encode("Hello world!"));
} finally {
try {
//关闭处理完的socket,并重新调用accept等待新的连接
result.close();
channel.accept(null, this);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.print("Server failed...." + exc.getCause());
}
}