首页 > 编程语言 >Netty学习——源码篇9 Handler其他处理与异步处理

Netty学习——源码篇9 Handler其他处理与异步处理

时间:2024-04-01 16:00:50浏览次数:23  
标签:Netty GenericFutureListener listener listeners Handler Promise Override 源码 Futur

1 ChannelHandlerContext

        每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext,并与ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系如下图:

2 Channel的声明周期

        Netty有一个简单但强大的状态模型,能完美映射到ChannelInboundHandler的各个方法。如下表所示是Channel生命周期四个不同的状态。

         一个Channel正常的生命周期如下图所示。随着状态发生变化产生相应的事件。这些事件被转发到ChannelPipeline中的ChannelHandler来触发相应的操作。

3 ChannelHandler常用的API

        先看一个Netty中整个Handler体系的类关系图。

        Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有类型的父类是ChannelHandler, ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法,如下表

        Netty还提供了一个实现了ChannelHandler的抽象类ChannelHandlerAdapter。 ChannelHandlerAdapter实现了父类的所有方法,主要功能就是将请求从一个ChannelHandler往下传递到下一个ChannelHandler,直到全部ChannelHandler传递完毕。也可以直接继承于ChannelHandlerAdapter,然后重写里面的方法。

4 ChannelInboundHandler

        ChannelInboundHandler还提供了一些在接收数据或Channel状态改变时被调用的方法。下面是ChannelInboundHandler的一些方法。

5 异步处理Future

        java.util.concurrent.Future是Java原生API中提供的接口,用来记录异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成立即返回执行结果,否则阻塞线程,知道任务完成再返回。

        Netty扩展了Java的Future,在Future的基础上扩展了监听器(Listener)接口,通过监听器可以让异步执行更加有效率,不需要通过调用get方法来等待异步执行结束,而是通过监听器回调来精确控制异步执行结束时间。

public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();


    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);


    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> sync() throws InterruptedException;

    
    Future<V> syncUninterruptibly();

    
    Future<V> await() throws InterruptedException;

 
    Future<V> awaitUninterruptibly();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

  
    boolean await(long timeoutMillis) throws InterruptedException;

   
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

   
    boolean awaitUninterruptibly(long timeoutMillis);


    V getNow();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

        ChannelFuture接口有扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时和一个Channel进行绑定。

public interface ChannelFuture extends Future<Void> {

    /**
     * Returns a channel where the I/O operation associated with this
     * future takes place.
     */
    Channel channel();

    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;

    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;

    @Override
    ChannelFuture awaitUninterruptibly();

  
    boolean isVoid();
}

6 异步执行Promise

        Promise接口也是Future的扩展接口,它表示一种可写的Future,可以自定义设置异步执行的结果。


/**
 * Special {@link Future} which is writable.
 */
public interface Promise<V> extends Future<V> {

    
    Promise<V> setSuccess(V result);

 
    boolean trySuccess(V result);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a failure. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     */
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
     *         without being cancelled.  {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable();

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

        ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,既可以写异步执行结果,又具备了监听者的功能,是Netty实际编程中使用的表示异步执行的接口。

public interface ChannelPromise extends ChannelFuture, Promise<Void> {

    @Override
    Channel channel();

    @Override
    ChannelPromise setSuccess(Void result);

    ChannelPromise setSuccess();

    boolean trySuccess();

    @Override
    ChannelPromise setFailure(Throwable cause);

    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise sync() throws InterruptedException;

    @Override
    ChannelPromise syncUninterruptibly();

    @Override
    ChannelPromise await() throws InterruptedException;

    @Override
    ChannelPromise awaitUninterruptibly();

    /**
     * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
     */
    ChannelPromise unvoid();
}

        DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promise实例。Netty使用addListener方法来回调异步执行的结果。DefaultPromise的addListener()方法的代码如下

    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            addListener0(listener);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }
    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
        }
    }
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }

        从上述代码中可以看到,DefaultChannelPromise会判断异步任务执行的状态,如果执行完毕就立即通知监听者,否则加入监听者队列。通知监听者就是找一个线程来执行调用监听者的回调函数。

        再来看监听者的接口,其实就是一个方法,即等待异步任务执行完毕后,获得Future结果,执行回调的逻辑,代码如下。

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

标签:Netty,GenericFutureListener,listener,listeners,Handler,Promise,Override,源码,Futur
From: https://blog.csdn.net/geminigoth/article/details/136986097

相关文章

  • 【包远程安装运行】:SpringBoot实现水果蔬菜产品展示小程序源码+指导运行视频教程
    今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的蔬菜水果的小程序系统,该系统分为小程序和后台系统。小程序功能有:产品展示、关于我们、联系我们、产品详情等。后台功能有:蔬菜水果管理、联系我们、关于我们、网站设置等。该小程序功能不多,适合......
  • 【包远程安装运行】:SpringBoot+Mysql企业人事考勤考核管理系统源码+运行视频+包运行+
    今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的在线考核考勤管理系统,该系统除脚手架功能外具体的功能如下:管理员实现的功能有数据维护,考核日期管理,考核管理,测评表管理,统计;干部实现的功能有干部管理,考核日期管理,部门考核,干部个人考核。如果感兴......
  • 【课程设计/实训作业】python学生成绩管理系统源码
    项目介绍一直想做一款学生成绩管理系统,看了很多优秀的开源项目但是发现没有合适的。于是利用空闲休息时间开始自己写了一套管理系统。学习过程中遇到问题可以咨询评论。在线体验http://score.gitapp.cn/(账号:admin123密码:admin123)源码地址https://github.com/geeeeeee......
  • 基于springboot实现大学生入学审核系统项目【项目源码+论文说明】计算机毕业设计
    基于springboot实现大学生入学审核系统演示摘要随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了大学生入学审核系统的开发全过程。通过分析大学生入学审核系统管理的不足,创建了一个计算机管理大学生入学审核系统的方案。文......
  • 基于springboot实现学生读书笔记共享平台系统项目【项目源码+论文说明】计算机毕业设
    基于springboot实现学生读书笔记共享平台系统演示摘要本论文主要论述了如何使用JAVA语言开发一个读书笔记共享平台,本系统将严格按照软件开发流程进行各个阶段的工作,采用B/S架构,面向对象编程思想进行项目开发。在引言中,作者将论述读书笔记共享平台的当前背景以及系统开发......
  • 【附源码】JAVA计算机毕业设计智慧外贸平台(springboot+mysql+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在全球经济一体化的时代背景下,智慧外贸平台的建设已成为提升我国外贸竞争力、优化国际贸易环境的关键一环。随着信息技术的迅猛发展,传统的外贸模式已......
  • 【附源码】JAVA计算机毕业设计智慧物流管理系统(springboot+mysql+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着信息化时代的快速发展,物流行业作为现代经济体系的重要支柱,正面临着前所未有的机遇与挑战。传统的物流管理方式已难以适应现代社会的需求,智能化、......
  • 【附源码】JAVA计算机毕业设计智慧小饭桌(springboot+mysql+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着社会的快速发展和科技的进步,人们对教育的关注和要求也在不断提高。在当今快节奏的生活中,许多家庭面临着孩子午餐难以解决的问题,尤其是对于那些工......
  • 【附源码】JAVA计算机毕业设计智慧型居民小区物业管理(springboot+mysql+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着城市化进程的加速和居民生活水平的提高,智慧型居民小区逐渐成为现代城市发展的一个重要方向。在这样的背景下,传统物业管理模式已难以满足居民日益......
  • 全新客服系统源码 - 开源客服系统 - 工单系统接入
    客服系统可谓是一款经久不衰的产品,产生时期要追溯至曾经电视购物盛行的前夕,在电商大时代被各种身怀绝技的产品经理再一次的革新与发展。从最初简单的用户信息记录、工单跟踪,到后期的服务数据量化、沟通质量指标化、智慧应答,一步步在走向全机器智能客服的时代。将成型的客服系统......