首页 > 编程语言 >Netty学习——源码篇7 Pipeline的事件传播机制1

Netty学习——源码篇7 Pipeline的事件传播机制1

时间:2024-03-27 15:59:26浏览次数:30  
标签:Netty Pipeline ChannelHandlerContext void ctx Exception 源码 link throws

        上篇:Netty学习——源码篇6 Pipeline设计原理 已经知道AbstractChannelHandlerContext中有Inbound和Outbound两个boolean变量,分别用于识别Context所对应的Handler的类型。

        1、Inbound为true时,表示其对应的ChannelHandler是ChannelInboundHandler的子类。

        2、Outbound为true时,表示其对应的ChannelHandler是ChannelOutboundHandler的子类。

        这两个属性到底有什么作用呢?还要从ChannelPipeline的事件传播类型说起。Netty中的传播事件可以分为两种:Inbound事件和Outbound事件。以下是Netty官网针对这两个事件的说明。

         由上可以看出,Inbound和Outbound事件的流向是不一样的,Inbound事件的流向是从下至上的,而Outbound恰好相反,是从下到上。并且Inbound方法是通过调用相应的ChannelHandlerContext.fireIN_EVT()方法来传递的,而Outbound方法是通过ChannelHandlerContext的fireChannelRegister()调用会发送一个ChannelRegistered的Inbound给下一个ChannelHandlerContext,而ChannelHandlerContext的bind()方法调用时会发送一个bind的Outbound事件给下一个ChannelHandlerContext。

        Inbound事件传播方法代码如下:

public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * Invoked when the current {@link Channel} has read a message from the peer.
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

        Outbound事件传播方法的代码如下:

public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * Called once a bind operation is made.
     *
     * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
     * @param localAddress  the {@link SocketAddress} to which it should bound
     * @param promise       the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception    thrown if an error accour
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a connect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
     * @param remoteAddress     the {@link SocketAddress} to which it should connect
     * @param localAddress      the {@link SocketAddress} which is used as source on connect
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a disconnect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a close operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a deregister operation is made from the current registered {@link EventLoop}.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Intercepts {@link ChannelHandlerContext#read()}.
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * Called once a write operation is made. The write operation will write the messages through the
     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
     * {@link Channel#flush()} is called
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
     * that are pending.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
     * @throws Exception        thrown if an error accour
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

        可以发现,Inbound类似于事件回调(响应请求的事件),而Outbound类似于主动触发(发起请求的事件)。注意,如果捕获了一个事件,并且想让这个事件继续传递下去,需要调用Context对应的fireXXX()方法。

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx)  throws Exception{
        System.out.println("连接成功");
        ctx.fireChannelActive();
    }
    
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception{
        System.out.println("客户端关闭");
        ctx.close(promise);
    }
}

        如上面的代码所示,MyInboundHandler收到了一个channelActive事件,它在处理后,如果希望将事件继续传播下去,那么需要接着调用ctx.fireChannelActive()方法。

        下面用一个代码案例了解一下Pipeline的传播机制。分别编写InboundHandlerA、InboundHandlerB、InboundHandlerC和OutboundandlerA、OutboundandlerB、OutboundandlerC类。

public class InboundHandlerA extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        System.out.println("InboundHandlerA");
        ctx.fireChannelRead(msg);
    }
}
public class InboundHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        System.out.println("InboundHandlerB");
        ctx.fireChannelRead(msg);
    }
}
public class InboundHandlerC extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
        System.out.println("InboundHandlerC");
        ctx.fireChannelRead(msg);
    }
}

        以上三个类都调用了ctx.fireChannelRead()方法向下传播。

public class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
        System.out.println("OutboundHandlerA write");
        ctx.write(msg,promise);
    }
}

public class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
        System.out.println("OutboundHandlerB write");
        ctx.write(msg,promise);
    }

    @Override
    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception{
        ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                ctx.channel().write("hello");
            }
        },3, TimeUnit.SECONDS);
    }
}

public class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
        System.out.println("OutboundHandlerC write");
        ctx.write(msg,promise);
    }
}

        上面的三个类都调用了ctx.write()方法。下面编写测试代码,来了解其传播顺序。先编写服务端代码。PipelineServer类主要完成Pipeline的注册工作,代码如下:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class PipelineServer {
    public void start(int port) throws Exception{
        NioEventLoopGroup bossGroup= new NioEventLoopGroup();
        NioEventLoopGroup workerGroup= new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception{
                            //InboundHandler的执行顺序,应该是 A B C
                            ch.pipeline().addLast(new InboundHandlerA());
                            ch.pipeline().addLast(new InboundHandlerB());
                            ch.pipeline().addLast(new InboundHandlerC());
                            
                            //Outbound的执行顺序应该是C B A
                            ch.pipeline().addLast(new OutboundHandlerA());
                            ch.pipeline().addLast(new OutboundHandlerB());
                            ch.pipeline().addLast(new OutboundHandlerC());
                        }
                    }).option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
        
    }

    public static void main(String[] args) throws Exception{
        PipelineServer server = new PipelineServer();
        server.start(8080);
    }
}

        PipelineClient类,与服务端建立连接并向服务端发送数据,代码如下:

public class PipelineClient {
    public void connect(String host,int port) throws Exception{
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE,true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new ClientInhandler());
                }
            });
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        }finally {
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception{
        PipelineClient client = new PipelineClient();
        client.connect("127.0.0.1",8080);
    }
}

        ClientHandler类,完成向服务端发送数据的动作,代码如下:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientInhandler extends ChannelInboundHandlerAdapter {

    //读取服务端的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        System.out.println("clientInHandler.channelRead");
        ByteBuf result = (ByteBuf) msg;
        byte[] result1 = new byte[result.readableBytes()];
        result.readBytes(result1);
        result.release();
        ctx.close();
        System.out.println("server infomation:" + new String(result1));
    }

    //当连接建立的时候向服务端发送消息,channelActive 事件在连接建立的时候会被触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
        System.out.println("ClientHandler.channelActive");
        String msg = "are you ok";
        ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());
        encoded.writeBytes(msg.getBytes());
        ctx.write(encoded);
        ctx.flush();
    }
}

        接下来运行测试代码,分别启动PipelineServer和PipelineClient,得到的运行结果如下图:

        从运行结果上看,Handler的传播顺序:从Inbound开始顺序执行,然后从Outbound逆序执行。 

标签:Netty,Pipeline,ChannelHandlerContext,void,ctx,Exception,源码,link,throws
From: https://blog.csdn.net/geminigoth/article/details/136857460

相关文章

  • 【全开源】JAVA海外短剧国际版源码支持H5+Android+IOS_博纳软云
    &nbsp;&nbsp;&nbsp;在数字化快速发展的今天,海外短剧市场日益繁荣,成为了全球娱乐界的新宠。为了满足广大用户的需求,我们推出了一款基于JAVA开发的海外短剧国际版源码,支持H5、Android和IOS三大平台,让您轻松进军海外短剧市场。这款源码采用了JAVA语言进行开发,具有高度的可定制......
  • 【全开源】JAVA海外短剧国际版源码支持H5+Android+IOS_博纳软云
    &nbsp;&nbsp;&nbsp;在数字化快速发展的今天,海外短剧市场日益繁荣,成为了全球娱乐界的新宠。为了满足广大用户的需求,我们推出了一款基于JAVA开发的海外短剧国际版源码,支持H5、Android和IOS三大平台,让您轻松进军海外短剧市场。这款源码采用了JAVA语言进行开发,具有高度的可定制......
  • 【全开源】JAVA海外短剧国际版源码支持H5+Android+IOS_博纳软云
    &nbsp;&nbsp;&nbsp;在数字化快速发展的今天,海外短剧市场日益繁荣,成为了全球娱乐界的新宠。为了满足广大用户的需求,我们推出了一款基于JAVA开发的海外短剧国际版源码,支持H5、Android和IOS三大平台,让您轻松进军海外短剧市场。这款源码采用了JAVA语言进行开发,具有高度的可定制......
  • Java版企业电子招标采购系统源码——鸿鹄电子招投标系统的技术特点
    在数字化时代,采购管理也正经历着前所未有的变革。全过程数字化采购管理成为了企业追求高效、透明和规范的关键。该系统通过SpringCloud、SpringBoot2、Mybatis等先进技术,打造了从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通过待办消息、招标公告、中......
  • Java版直播商城免 费 搭 建:平台规划与常见营销模式,电商源码、小程序、三级分销及详解
    【saas云平台】打造全行业全渠道全场景的saas产品,为经营场景提供一体化解决方案;门店经营区域化、网店经营一体化,本地化、全方位、一站式服务,为多门店提供统一运营解决方案;提供丰富多样的营销玩法覆盖所有经营场景,助力商家成功;系统稳定压倒一切,让商家经营有保障,消费者购物更放心......
  • 在线租房网站毕业设计源码
    目录一、程序介绍:三、文档目录:四、运行截图:五、数据库表:六、代码展示:七、更多学习目录:八、互动留言一、程序介绍:文档:开发技术文档、参考LW、答辩PPT,部分项目另有其他文档开发环境:eclipse/idea丨navicat12丨mysql5.7丨tomcat9.0/7.0丨JDK1.8丨maven3.25配套工具:涉......
  • 【附源码】Node.js毕业设计公办小学网络报名系统(Express)
    本系统(程序+源码)带文档lw万字以上  文末可获取本课题的源码和程序系统程序文件列表系统的选题背景和意义选题背景:在当今信息化社会,网络技术已经深入到各个领域,教育领域也不例外。传统的公办小学报名方式主要是线下排队、人工登记,这种方式不仅效率低下,而且容易出现错误。......
  • 精品单页个人导航HTML源码
    源码介绍简约大气精品单页导航-可自行修改其他页面,源码由HTML+CSS+JS组成,记事本打开源码文件可以进行内容文字之类的修改,双击html文件可以本地运行效果,也可以上传到服务器里面下载地址精品单页个人导航HTML源码......
  • [附源码]计算机毕业设计基于大数据的身体健康管理平台(JSP+java+springmvc+mysql+MyBa
    本项目包含程序+源码+数据库+LW+调试部署环境,文末可获取一份本项目的java源码和数据库参考。项目文件图项目介绍随着健康意识的提升和大数据技术的发展,基于大数据的身体健康管理平台成为现代社会追求高质量生活的一个新趋势。这种平台通过收集和分析用户的健康数据,提供个性......
  • [附源码]计算机毕业设计文档管理系统(JSP+java+springmvc+mysql+MyBatis)
    本项目包含程序+源码+数据库+LW+调试部署环境,文末可获取一份本项目的java源码和数据库参考。项目文件图项目介绍随着信息化时代的到来,企业和机构在日常运营中产生大量文档资料。有效的文档管理系统(DMS)对于保障信息的组织、存储、检索和安全至关重要。一个先进的DMS能够帮助......