首页 > 其他分享 >【RPC高性能框架总结】5.高性能nio框架netty(中)

【RPC高性能框架总结】5.高性能nio框架netty(中)

时间:2023-03-19 11:05:05浏览次数:42  
标签:netty 框架 ChannelHandlerContext ctx 高性能 io Handler import


接上一篇《​​4.高性能nio框架netty(上)​​》
上一篇我们编写了使用Netty框架开发的客户端的启动类“NettyTestClient”以及业务处理类“NettyTestClientHandler”,本篇我们继续编写服务端的启动类“NettyTestServer”以及业务处理类“NettyTestServerHandler”。

1、服务端启动类

首先编写服务端的“NettyTestServer”启动类:

package cn.com.netty.test.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyTestServer {

private final int port;

public NettyTestServer(int port) {
this.port = port;
}

public void start() throws Exception{
EventLoopGroup eventLoopGroup = null;
try {
//server端引导类,来引导绑定和启动服务器;
ServerBootstrap serverBootstrap = new ServerBootstrap();
//连接池处理数据
eventLoopGroup = new NioEventLoopGroup();
//装配ServerBootstrap
serverBootstrap.group(eventLoopGroup)//多线程处理
//制定通道类型为NioServerSocketChannel,一种异步模式的可以监听新进来的TCP连接的通道
.channel(NioServerSocketChannel.class)
.localAddress("localhost",port)//设置InetSocketAddress让服务器监听某个端口以等待客户端连接。
.childHandler(new ChannelInitializer<Channel>(){//设置childHandler执行所有的连接请求
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new NettyTestServerHandler());//注册handler
}
});
//最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭
//因为使用sync方法,所以关闭操作也会被阻塞。
ChannelFuture channelFuture = serverBootstrap.bind().sync();
System.out.println("开始监听,端口为:"+channelFuture.channel().localAddress());
channelFuture.channel().closeFuture().sync();
}finally{
eventLoopGroup.shutdownGracefully().sync();
}

}

public static void main(String[] args) throws Exception {
new NettyTestServer(8888).start();
}
}

这里创建了一个服务端的引导类“ServerBootstrap”,该引导类是用来引导绑定和启动服务器的类。接着创建了一个EventLoopGroup类的对象的实现类“NioEventLoopGroup”,来处理事件,如接受新连接、接受数据、写数据等等;
然后引导类“ServerBootstrap”开始进行引导配置,这里分四步:
(1)首先将NioEventLoopGroup线程组注册到引导类的组配置中
(2)然后再去注册一个Channel管道,指定注册的管道类型为“NioServerSocketChannel”,该类为一种异步模式的可以监听新进来的TCP连接的通道。
(3)然后设置InetSocketAddress让服务器监听某个端口以等待客户端连接。
(4)最后设置childHandler执行所有的连接请求。

当服务端引导类ServerBootstrap完成上面四步的引导配置后,最后使用bind()方法来绑定服务器,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭。

2、客户端业务处理类

我们处理业务的相关逻辑,和客户端一样,也是在Handler中完成的。NettyTestServerHandler就是我们需要编写的业务逻辑处理类。

package cn.com.netty.test.server;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyTestServerHandler extends ChannelInboundHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server 读取数据......");
//读取数据
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("接收客户端数据:"+body);
//向客户端写数据
System.out.println("server向client发送数据");
String currentTime = new Date(System.currentTimeMillis()).toString();
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("server 读取数据完毕...");
ctx.flush();//刷新后才将数据发出到SocketChannel
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}

这里服务端的Handler业务处理类,同样继承了一个Handler父类,即ChannelInboundHandlerAdapter,该类是ChannelInboundHandler的一个简单实现,默认情况下不会做任何处理,只是简单的将操作通过fire*方法传递到ChannelPipeline中的下一个ChannelHandler中让链中的下一个ChannelHandler去处理。
继承ChannelInboundHandlerAdapter类后,这里重写了父类的3个方法:
(1)channelRead(ChannelHandlerContext ctx, Object msg)
该方法用于当Netty从socket读取到信息后,触发的方法。
(2)channelReadComplete(ChannelHandlerContext ctx)
该方法用于当Netty从socket读取不到信息,即信息接受并读取完毕后,和反馈前,触发的方法。
(3)exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
该方法在接收、读取过程中出现异常时,触发的方法。

下面简述一下上面的channelRead方法的具体逻辑:
首先从socket读取到的信息到channelRead方法时是一个Object类型,我们将其转换为ByteBuf类型,便于数据的读取。然后新建一个byte数组,用于接受buf缓冲区的字节流信息,使用Bytebuf的readBytes方法,将缓冲区的字节流读取到新建的byte数组中。后面就是转换为String将信息打印到控制台中。之后服务端需要向客户端反馈信息,此时构建一个当前时间的字符串,然后使用Unpooled工具,将该时间字符串转换为字节流之后,写入到Buf缓冲区中,然后使用ChannelHandlerContext上下文对象,将该Bytebuf写出(如果有下一个Inbound就写到写一个,没有就输出)。

flush刷新操作放在了channelReadComplete方法中,因为channelReadComplete是在channelRead方法执行后才执行的,所以这里相当于直接在channelRead方法最后进行了flush。

3.测试

上一篇我们完成了客户端的启动类和业务处理类的编写,本篇完成了服务端的启动类和业务处理类的编写,一个简单的netty交互模块就完成了,下面我们进行测试。

首先我们启动服务端监听类NettyTestServer:

【RPC高性能框架总结】5.高性能nio框架netty(中)_Netty


可以看到服务端已经启动,并且正在监听8888端口。然后我们启动客户端的服务类,进行服务的连接:

【RPC高性能框架总结】5.高性能nio框架netty(中)_Netty_02


可以看到,客户端成功连接了服务器,发送了相关请求,并接收到了服务端发来的当前时间的信息。

而服务端接收到客户端的请求后,进行数据的读取(反序列化),并且也向客户端发送了响应信息:

【RPC高性能框架总结】5.高性能nio框架netty(中)_fireChannelRead_03


当然我们再起一个客户端进行数据的发送也是可以的(因为服务端依然在监听端口):

【RPC高性能框架总结】5.高性能nio框架netty(中)_fireChannelRead_04


服务端会有另一个响应信息:

【RPC高性能框架总结】5.高性能nio框架netty(中)_fireChannelRead_05

至此我们编写了一个很简单的,单一请求的Netty客户端以及服务端代码。可以发现我们使用Netty的API模板代码去写交互逻辑,非常简洁和快速。
上面的服务端只实现了一个Handler,我们下面来实现多个Handler进行数据的处理。

4.多Handler处理

我们将服务端启动类NettyTestServer进行改造,来实现多个Handler进行数据的处理。我们只需要修改serverBootstrap进行childHandler处理器类装配的iniChannel方法没在Channel的pipeline管道中绑定多个Handler:

.childHandler(new ChannelInitializer<Channel>(){//设置childHandler执行所有的连接请求
@Override
protected void initChannel(Channel ch) throws Exception {
//注册两个InboundHandler,执行顺序为注册顺序,所以应该是NettyInboundHandler1、NettyInboundHandler2
//注册两个OutboundHandler,执行顺序为注册顺序的逆序,所以应该是NettyOutboundHandler1、NettyOutboundHandler2
ch.pipeline().addLast(new NettyInboundHandler1())
.addLast(new NettyOutboundHandler1())
.addLast(new NettyOutboundHandler2())
.addLast(new NettyInboundHandler2());
}
});

这里我们分别注册了四个Handler,分别是NettyInboundHandler1、NettyOutboundHandler1、NettyOutboundHandler2和NettyInboundHandler2。Inbound主要负责处理socketChannel的read操作(解码、业务处理),而Outbound主要处理socketChannel的write操作(编码、报文发送)。

Inbound和Outbound是有不同的执行顺序的:

(1)首先不论注册顺序,接受信息后,总是先执行所有的Inbound,再执行所有的Outbound。这是因为socketChannel接受到信息后,先触发read()的IO,再触发write的IO。
(2)对于InboundHandler,它的执行顺序为注册顺序。
(3)对于OutboundHandler,它的执行顺序为注册顺序的逆序。
创建这四个Handler:

【RPC高性能框架总结】5.高性能nio框架netty(中)_fireChannelRead_06


其中InboundHandler继承了ChannelInboundHandlerAdapter父类,而OutboundHandler继承了ChannelOutboundHandlerAdapter父类。

在NettyInboundHandler1中:

package cn.com.netty.test.server;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyInboundHandler1 extends ChannelInboundHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("----InboundHandler1----");
//通知执行下一个InboundHandler
ctx.fireChannelRead(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("InboundHandler1读取数据完毕...");
ctx.flush();//刷新后才将数据发出到SocketChannel
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}

这里channelRead方法中,InboundHandler接受到数据后,使用上下文对象ChannelHandlerContext的fireChannelRead方法,将接收到的msg信息传递到下一个InboundHandler。在NettyInboundHandler2中执行和之前NettyTestServerHandler一样的处理:

package cn.com.netty.test.server;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyInboundHandler2 extends ChannelInboundHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("----InboundHandler2----");
//读取数据
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("接收客户端数据:"+body);
//向客户端写数据
System.out.println("server向client发送数据");
String currentTime = new Date(System.currentTimeMillis()).toString();
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("server 读取数据完毕...");
ctx.flush();//刷新后才将数据发出到SocketChannel
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}

这里值得注意的是,虽然代码一样,但是因为这里有Handler链需要执行,所以在ctx.write(resp)方法写出后,不是直接发送给客户端,而是向OutHandler处理类中走。在NettyOutboundHandler1中:

package cn.com.netty.test.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class NettyOutboundHandler1 extends ChannelOutboundHandlerAdapter{

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("----OutboundHandler1----");
ByteBuf buf = (ByteBuf)msg;//上层传递过来的数据
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("上层Handler传递过来的信息为:"+body);
String body2 = "now dateTime is:"+body;//拼接了新的信息
ByteBuf resp = Unpooled.copiedBuffer(body2.getBytes());
ctx.write(resp);
ctx.flush();
}
}

在该类中,重写了write方法,向外写出数据。这里首先拿到上一层Handler传递过来的数据,转换为字符串,并拼接了新的信息,向下一个Handler进行write并刷新。在NettyOutboundHandler2中:

package cn.com.netty.test.server;

import java.net.InetAddress;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class NettyOutboundHandler2 extends ChannelOutboundHandlerAdapter{

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("----OutboundHandler2----");
ByteBuf buf = (ByteBuf)msg;//上层传递过来的数据
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("上层Handler传递过来的信息为:"+body);
InetAddress address = InetAddress.getLocalHost();//获取本地IP地址对象
String body2 = body+"\n the server's ip is:"+address.getHostAddress();//拼接了新的信息
ByteBuf resp = Unpooled.copiedBuffer(body2.getBytes());
ctx.write(resp);
ctx.flush();
}

}

在该类中将拼装好的信息后面再添加当前服务器的ip地址后,最终写出该数据。

客户端依然使用我们之前写好的客户端。下面进行测试:

首先启动服务端进行监听操作:

【RPC高性能框架总结】5.高性能nio框架netty(中)_Netty_07


然后启动客户端:

【RPC高性能框架总结】5.高性能nio框架netty(中)_Netty_08


此时可以看到客户端发送请求后,收到了服务端反馈的当前时间信息,不仅有Inbound构造的当前时间信息,还有Outbound拼接的“now dateTime is:”以及IP地址信息。

在服务端,我们可以通过控制台看到这4个Handler的执行顺序:

【RPC高性能框架总结】5.高性能nio框架netty(中)_Netty_09


可以看到,执行顺序分别为InboundHandler1、InboundHandler2、OutboundHandler2和OutboundHandler1,符合之前我们所提到的,先执行InboundHandler后执行OutboundHandler,以及InboundHandler顺序执行,OutboundHandler逆序执行的规则。

其实整个Handler的执行方式,和我们之前画的Netty框架图的业务逻辑处理部分是一样的:

【RPC高性能框架总结】5.高性能nio框架netty(中)_fireChannelRead_10


这里首先执行In Handler的处理器链,然后逆序执行Out Handler的处理器链,最后将处理好的反馈信息写出(当然也可以选择不反馈)。

这里需要注意一点的是,OutboundHandler在注册的时候,一定要放在最后一个InboundHandler的前面,不然就无法传递到OutboundHandler中。这是因为InboundHandler执行了ctx.write(),而处理器链后面又没有注册OutboundHandler的情况下,InboundHandler会直接将数据写出去,而不会写到前面的OutboundHandler中。

5.总结

在使用Handler的过程中,需要注意:
(1)ChannelInboundHandler之间的传递,通过调用ctx.fireChannelRead(msg)实现;调用ctx.write()将传递到ChannelOutboundHandler。
(2)ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。
(3)流水线pipeline中outhandler不能放在最后,否则不生效。
(4)Handler的消费处理放在最后一个处理。

参考:
传智播客《2017零基础大数据》教学视频

标签:netty,框架,ChannelHandlerContext,ctx,高性能,io,Handler,import
From: https://blog.51cto.com/u_16012040/6130995

相关文章