3.4、Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
测试Pipeline执行流程
服务端:
@Slf4j
public class TestPipelineServer {
public static void main(String[] args) throws InterruptedException {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("dcode", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("1,消息:{}", msg);
ctx.fireChannelRead(msg);
//和上面的是一个意思,调用下一个handler
//super.channelRead(ctx, msg);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("2,消息:{}", msg);
Student student = new Student();
student.setName(msg.toString());
ctx.fireChannelRead(student);
}
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("3,消息:{}", msg);
channel.writeAndFlush("server msg...");
}
});
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("6");
super.write(ctx, msg, promise);
}
});
}
}).bind(8080).sync();
}
@Data
static class Student {
private String name;
}
}
客户端:
public class TestPipelineClient {
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
}
})
.connect(new InetSocketAddress(8080))
.sync()
.channel();
channel.writeAndFlush("msg.....");
}
}
执行结果:
14:34:16.317 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 1,消息:msg.....
14:34:16.317 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 2,消息:msg.....
14:34:16.318 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 3,消息:TestPipelineServer.Student(name=msg.....)
14:34:16.318 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 6
14:34:16.318 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 5
14:34:16.318 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 4
总结:
1.Inbound处理器从上到下执行,Outbound处理器相反,
2.中断一个,就不会传递到下一个
3.Inbound中ctx也有wirteAndFlush方法,这个方法是从当前向前找出站Outbound。而channel的write方法是从最后找Outbound
此前测试handler顺序比较麻烦,netty提供了EmbeddedChannel方便测试
@Slf4j
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3=new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4=new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1, h2, h3, h4);
//测试入站Inbound
//结果:1 2
embeddedChannel.writeInbound("test inbound");
//测试出站Outbound
//结果:4 3
embeddedChannel.writeOutbound("test outbound");
}
}
标签:Netty,Pipeline,void,ctx,pipeline,Handler,msg,new,public
From: https://www.cnblogs.com/jpymll/p/16821447.html