首页 > 其他分享 >Netty入门-Handler & Pipeline

Netty入门-Handler & Pipeline

时间:2022-10-24 14:55:31浏览次数:64  
标签:Netty Pipeline void ctx pipeline Handler msg new public

3.4、Handler & Pipeline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品

测试Pipeline执行流程

服务端:

@Slf4j
public class TestPipelineServer {
    public static void main(String[] args) throws InterruptedException {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast("dcode", new StringDecoder(CharsetUtil.UTF_8));

                        pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.info("1,消息:{}", msg);
                                ctx.fireChannelRead(msg);
                                //和上面的是一个意思,调用下一个handler
                                //super.channelRead(ctx, msg);
                            }
                        });

                        pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.info("2,消息:{}", msg);
                                Student student = new Student();
                                student.setName(msg.toString());
                                ctx.fireChannelRead(student);
                            }
                        });

                        pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.info("3,消息:{}", msg);
                                channel.writeAndFlush("server msg...");
                            }
                        });

                        pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.info("4");
                                super.write(ctx, msg, promise);
                            }
                        });

                        pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.info("5");
                                super.write(ctx, msg, promise);
                            }
                        });

                        pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.info("6");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                }).bind(8080).sync();
    }

    @Data
    static class Student {
        private String name;
    }
}

客户端:

public class TestPipelineClient {
    public static void main(String[] args) throws InterruptedException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                    }
                })
                .connect(new InetSocketAddress(8080))
                .sync()
                .channel();

        channel.writeAndFlush("msg.....");
    }
}

执行结果:

14:34:16.317 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 1,消息:msg.....
14:34:16.317 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 2,消息:msg.....
14:34:16.318 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 3,消息:TestPipelineServer.Student(name=msg.....)
14:34:16.318 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 6
14:34:16.318 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 5
14:34:16.318 [nioEventLoopGroup-2-2] INFO com.jpy.netty.pipeline.TestPipelineServer - 4

总结:

1.Inbound处理器从上到下执行,Outbound处理器相反,

2.中断一个,就不会传递到下一个

3.Inbound中ctx也有wirteAndFlush方法,这个方法是从当前向前找出站Outbound。而channel的write方法是从最后找Outbound

此前测试handler顺序比较麻烦,netty提供了EmbeddedChannel方便测试

@Slf4j
public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.info("1");
                super.channelRead(ctx, msg);
            }
        };

        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.info("2");
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h3=new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.info("3");
                super.write(ctx, msg, promise);
            }
        };

        ChannelOutboundHandlerAdapter h4=new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.info("4");
                super.write(ctx, msg, promise);
            }
        };

        EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1, h2, h3, h4);
        //测试入站Inbound
        //结果:1 2
        embeddedChannel.writeInbound("test inbound");
        //测试出站Outbound
        //结果:4 3
        embeddedChannel.writeOutbound("test outbound");
    }
}

标签:Netty,Pipeline,void,ctx,pipeline,Handler,msg,new,public
From: https://www.cnblogs.com/jpymll/p/16821447.html

相关文章

  • Netty入门-Hello World
    Netty入门1、Netty优势NettyVSNIO,工作量大,bug多需要自己构建协议解决TCP传输问题,如黏包,半包epoll空轮询导致cpu100%对API进行增强,ThreadLocal-->FastThreadLocal......
  • Netty Reactor模型
      1、netty抽象出两个线程池:BossGroup负责监听和建立连接;WorkerGroup负责网络IO的读写2、BossGroup和WorkerGroup类型都是NioEventLoopGroup,相当于一个事件......
  • 说说 Redis pipeline
    更多技术文章,请关注我的个人博客​www.immaxfang.com​​和小公众号​​Max的学习札记​​。Redis客户端和服务端之间是采用TCP协议进行通信的,是基于Request/Respons......
  • 说说 Redis pipeline
    更多技术文章,请关注我的个人博客www.immaxfang.com和小公众号Max的学习札记。Redis客户端和服务端之间是采用TCP协议进行通信的,是基于Request/Response这种一问......
  • 【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
    ......
  • 安卓中实现异步任务(3)——使用HandlerThread实现
    #安卓中实现异步任务(3)——使用HandlerThread实现问题背景上篇文章大致介绍了几种安卓汇总实现异步任务的方法,讲得比较简要,有朋友问到具体的实现方式,现在开始分列几篇文章......
  • 序列化器---netty
    packagecn.itcast.protocol;importcom.google.gson.*;importjava.io.*;importjava.lang.reflect.Type;importjava.nio.charset.StandardCharsets;/***用......
  • Transformers Pipelines
    pipelines是使用模型进行推理的一种很好且简单的方法。这些pipelines是从库中抽象出大部分复杂代码的对象,提供了一个简单的API,专门用于多个任务,包括命名实体识别、屏蔽语......
  • Netty的第一个例子
    importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOptio......
  • 通过netty把百度地图API获取的地理位置从Android端发送到Java服务器端
    本篇记录我在实现时的思考过程,写给之后可能遇到困难的我自己也给到需要帮助的人。写的比较浅显,见谅。在写项目代码的时候,需要把Android端的位置信息传输到服务器端,通过Ne......