首页 > 其他分享 >《Netty权威指南》之I/O基础入门

《Netty权威指南》之I/O基础入门

时间:2022-12-02 22:01:49浏览次数:39  
标签:指南 Netty java 入门 线程 客户端 import null socket


I/O基础入门:

  • ​​1. Java的 I/O演进​​
  • ​​2. NIO入门​​
  • ​​2.1 BIO通信模型(同步阻塞式I/O )​​
  • ​​2.2 伪异步I/O编程​​

1. Java的 I/O演进

从JDK1.0到JDK1.3,Java的I/O类库都很原始,很多Unix网络编程中的概念或者接口在I/O类库中都没有体现,例如:​​Pipe​​​,​​Channel​​​,​​Buffer​​​和​​Selector​​​等。2002年发布JDK1.4时,NIO以​​JSR-51​​​的身份正式随JDK发布。它新增了​​java.nio​​包 提供了很多进行异步I/O 开发的API和类库,主要的类和接口如下:

  • 进行异步I/O操作的缓冲区​​ByteBuffer​​等;
  • 进行异步I/O操作的管道​​Pipe​​;
  • 进行各种I/O操作(同步或者异步)的​​Channel​​​,包括​​ServerSocketChannel​​​和​​SocketChannel​​;
  • 多种字符集的编码能力和解码能力;
  • 实现非阻塞I/O操作的多路复用器​​Selector​​;
  • 基于流行的​​Perl​​实现的正则表达式类库;
  • 文件通道FileChannel;

2017年7月28日,JDK1.7正式发布,亮点包括将原来的I/O类库进行了升级,被称为​​NIO2.0​​​,​​NIO2.0​​​由​​JSR-203​​演进而来,主要提供了如下三个方面的改进:

  • 提供能够批量获取文件属性的API,这些API具有平台无关性,不与特性的文件系统相耦合,另外它还提供了标准文件系统的SPI,供各个服务商扩展实现。
  • 提供AIO功能,支持基于文件的异步I/O操作和针对网络套接字的异步操作。
  • 完成​​JSR-51​​定义的通道功能,包括对配置和多播数据报的支持等。

2. NIO入门

2.1 BIO通信模型(同步阻塞式I/O )

该模型通常由一个独立的Acceptor线程负责监听客户端的连接,它收到客户端的连接请求之后,为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的​​一请求一应答​​通信模式。

《Netty权威指南》之I/O基础入门_.net


该模型最大的问题是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程数和客户端并发访问数呈1:1的正比关系。当线程数膨胀之后,系统性能急剧下降,并随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题。

下面看demo:​​时间服务器程序​

首先是创建​​TimeServerHandler.java​

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 同步阻塞式I/O
* @author linmengmeng
* @date 2020年3月20日下午2:21:27
*/
public class TimeServerHandler implements Runnable {

private static Logger log = LoggerFactory.getLogger(TimeServer.class);

private Socket socket;

public TimeServerHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String currentTime = null;
String body = null;
while (true) {
//读取输入流,如果读取到输入流的尾部,则返回null,退出循环
body = in.readLine();
System.out.println("body:" + body);
if (body == null) {
//客户端退出时,服务端的应答线程会受到一个body为null的响应,此应答线程也会自动结束
break;
}
log.info("The Time Server receive order : {}", body);
//判断请求消息是否是‘QUERY TIME ORDER’ 如果是则获取当前时间,并通过PrintWriter的println函数发送给客户端
currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
out.println(currentTime);
}
} catch (Exception e) {
if (in != null) {
try {
log.info("in will close......");
in.close();
} catch (IOException e2) {
e2.printStackTrace();
}
}
if (out != null) {
log.info("out will close......");
out.close();
out = null;
}
if (socket != null) {
try {
log.info("socket will close......");
this.socket.close();
} catch (Exception e2) {
e2.printStackTrace();
}
this.socket = null;
}
}
}
}

服务端代码:TimeServer.java

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/**
* 同步阻塞式I/O服务端
* @author linmengmeng
* @date 2020年3月20日下午2:18:34
*/
public class TimeServer {

private static Logger log = LoggerFactory.getLogger(TimeServer.class);

@Autowired
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
log.info("The Time Server is start in port : {}" , port);
Socket socket = null;
//通过无线循环来监听客户端的连接,如果没有客户端接入,则主线程会一直阻塞在ServerSocket的accept操作上
while (true) {
socket = serverSocket.accept();
//开启新的线程处理客户端的请求
new Thread(new TimeServerHandler(socket)).start();
}
} finally {
if (serverSocket != null) {
log.info("The TimeServer close");
serverSocket.close();
serverSocket = null;
}
}
}
}

客户端代码:TimeClient.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 同步阻塞式I/O客户端
* @author linmengmeng
* @date 2020年3月20日下午2:19:05
*/
public class TimeClient {

private static Logger log = LoggerFactory.getLogger(TimeClient.class);

public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1", port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
//客户端通过PrintWriter向服务端发送‘QUERY TIME ORDER’指令,然后通过BufferedReader的readLine读取响应
out.println("QUERY TIME ORDER");
log.info("Send Order 2 Server success");
String resp = in.readLine();
log.info("Now is : {}", resp);
//客户端到此就结束了,程序会自动退出
} catch (Exception e) {
// TODO: handle exception
}finally {
if (out != null) {
log.info("out will close......");
out.close();
out = null;
}
if (in != null) {
try {
log.info("in will close......");
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if (socket != null) {
try {
log.info("socket will close......");
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}

分别执行服务端和客户端,客户端执行结束会自动退出,服务端仍然在监听客户端的连接,执行结果如下:

服务端执行结果:

[2020-03-20 15:29:00,831] [INFO] [TimeServer.java:33] [com.lin.test.netty.book.two.TimeServer.main] [main]: The Time Server is start in port : 8080
body:QUERY TIME ORDER
[2020-03-20 15:29:07,969] [INFO] [TimeServerHandler.java:45] [com.lin.test.netty.book.two.TimeServer.run] [Thread-1]: The Time Server receive order : QUERY TIME ORDER
body:null

客户端执行结果:

[2020-03-20 15:29:07,980] [INFO] [TimeClient.java:39] [com.lin.test.netty.book.two.TimeClient.main] [main]: Send Order 2 Server success
[2020-03-20 15:29:08,012] [INFO] [TimeClient.java:41] [com.lin.test.netty.book.two.TimeClient.main] [main]: Now is : Fri Mar 20 15:29:07 CST 2020
[2020-03-20 15:29:08,013] [INFO] [TimeClient.java:47] [com.lin.test.netty.book.two.TimeClient.main] [main]: out will close......
[2020-03-20 15:29:08,017] [INFO] [TimeClient.java:53] [com.lin.test.netty.book.two.TimeClient.main] [main]: in will close......
[2020-03-20 15:29:08,048] [INFO] [TimeClient.java:62] [com.lin.test.netty.book.two.TimeClient.main] [main]: socket will close......

为了改进​​一线程一连接​​​模型,后来又演进出了一种通过线程池或者消息队列实现一个或者多线程处理N个客户端的模型,由于它的底层仍然采用同步阻塞I/O,所以被称为​​伪异步​​。

2.2 伪异步I/O编程

通过一个线程池来处理多个客户端的请求接入,形成​​客户端个数M:线程池最大线程数N​​的比例关系,其中M可以远远大于N。通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发导致线程耗尽。

当有新的客户端接入时,将客户端的​​Socket​​​封装成一个​​Task​​(该任务实现了java.lang.Runnable接口)投递到后端的线程池中处理。由于线程池可以设置消息队列的大小和线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

《Netty权威指南》之I/O基础入门_客户端_02


首先创建一个时间处理器处理类的线程池,​​TimeServerHandlerExecutePool.java​

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 时间处理器处理类的线程池
* @author linmengmeng
* @date 2020年3月20日下午4:19:25
*/
public class TimeServerHandlerExecutePool {

private ExecutorService executor;

public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
}

public void execute(Runnable task) {
executor.execute(task);
}
}

然后对服务端代码进行改造

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/**
* 使用线程池处理服务端请求的伪异步I/O时间器服务端
* @author linmengmeng
* @date 2020年3月20日下午4:20:11
*/
public class TimeServer2 {


private static Logger log = LoggerFactory.getLogger(TimeServer.class);

@Autowired
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
log.info("The Time Server is start in port : {}" , port);
Socket socket = null;

//创建I/O任务线程池
TimeServerHandlerExecutePool singleExcecutor = new TimeServerHandlerExecutePool(50, 10000);

//通过无线循环来监听客户端的连接,如果没有客户端接入,则主线程会一直阻塞在ServerSocket的accept操作上
while (true) {
socket = serverSocket.accept();

//将请求Socket封装成Task,然后调用线程池的execute方法执行,避免每个请求接入都创建新线程
singleExcecutor.execute(new TimeServerHandler(socket));
}
} finally {
if (serverSocket != null) {
log.info("The TimeServer close");
serverSocket.close();
serverSocket = null;
}
}
}
}

同步I/O读取输入流时使用的是​​socket.getInputStream()​​​方法,返回一个​​InputStream​​​对象,看下​​InputStream​​​的​​read​​方法:

《Netty权威指南》之I/O基础入门_客户端_03


可以看到Socket的输入流进行读取操作的时候,它会一直阻塞下去,直到有数据可读或者数据读取完毕或者发生异常。这意味着当对方发送请求或者应答消息比较缓慢,或者网络传输比较慢时,读取输入流的一方的通信线程将被长时间阻塞。


标签:指南,Netty,java,入门,线程,客户端,import,null,socket
From: https://blog.51cto.com/linmengmeng/5907619

相关文章