首页 > 编程语言 >Netty 学习(十):ChannelPipeline源码说明

Netty 学习(十):ChannelPipeline源码说明

时间:2022-10-11 18:47:49浏览次数:94  
标签:Netty name newCtx ChannelPipeline handler 源码 String

Netty 学习(十):ChannelPipeline源码说明

作者: Grey

原文地址:

博客园:Netty 学习(十):ChannelPipeline源码说明

CSDN:Netty 学习(十):ChannelPipeline源码说明

ChannelPipeline可以看作一条流水线,原料(字节流)进来,经过加工,形成一个个Java对象,然后基于这些对象进行处理,最后输出二进制字节流。

ChannelPipeline 在创建 NioSocketChannel 的时候创建, 其默认实现是 DefaultChannelPipeline

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

ChannelPipeline 中保存了 Channel 的引用,且其中每个节点都是一个 ChannelHandlerContext 对象。每个 ChannelHandlerContext 节点都保存了执行器(即:ChannelHandler)。

ChannelPipeline里有两种不同的节点,一种是 ChannelInboundHandler,处理 inbound 事件(例如:读取数据流),还有一种是 ChannelOutboundHandler,处理 Outbound 事件,比如调用writeAndFlush()类方法时,就会调用该 handler。

添加 handler 的逻辑如下

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检查是否有重复的 handler
            checkMultiplicity(handler);
            // 创建 节点
            newCtx = newContext(group, filterName(name, handler), handler);
            // 添加节点
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        // 回调用户方法
        callHandlerAdded0(newCtx);
        return this;
    }

如上代码,整个添加过程见注释说明,其实主要就是四步:

第一步:检查是否有重复的 handler,核心逻辑见

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                // 非共享的且添加过,就抛出异常,反之,如果一个 handler 支持共享,就可以无限次被添加到 ChannelPipeline 中
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

第二步:创建节点,即把 handler 包裹成 ChannelHandlerContext,核心逻辑如下

    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
            new FastThreadLocal<Map<Class<?>, String>>() {
        @Override
        protected Map<Class<?>, String> initialValue() {
            return new WeakHashMap<Class<?>, String>();
        }
    };
    private String generateName(ChannelHandler handler) {
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
        if (context0(name) != null) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

注:Netty 使用 FastThreadLocal 变量来缓存 Handler 的类和名称的映射关系,在生成 name 的时候,首先看缓存中有没有生成过默认 name,如果没有生成,就调用generateName0()生成默认名称,加入缓存。

第三步:把 ChannelHandlerContext 作为节点添加到 pipeline 中,核心代码如下

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

其本质就是一个双向链表的插入节点过程,而且,ChannelPipeline 删除 ChannelHandler 的方法,本质就是把这个双向链表的某个节点删掉!

第四步:回调用户方法,核心代码如下

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.callHandlerAdded();
        } catch (Throwable t) {
            boolean removed = false;
            try {
                atomicRemoveFromHandlerList(ctx);
                ctx.callHandlerRemoved();
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }
    final void callHandlerRemoved() throws Exception {
        try {
            // Only call handlerRemoved(...) if we called handlerAdded(...) before.
            if (handlerState == ADD_COMPLETE) {
                handler().handlerRemoved(this);
            }
        } finally {
            // Mark the handler as removed in any case.
            setRemoved();
        }
    }

其中ctx.callHandlerAdded();就是回调用户的handlerAdded方法,然后通过 CAS 方式修改节点的状态为 REMOVE_COMPLETE (说明该节点已经被移除),或者 ADD_COMPLETE (添加完成)。

完整代码见:hello-netty

本文所有图例见:processon: Netty学习笔记

更多内容见:Netty专栏

参考资料

跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

深度解析Netty源码

标签:Netty,name,newCtx,ChannelPipeline,handler,源码,String
From: https://www.cnblogs.com/greyzeng/p/16780213.html

相关文章

  • 阿里云openservices rocketmq消息队列消费消息底层源码分析
    mq消费源码依赖<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId></dependency>阿里云rocke......
  • HashMap实现原理及源码分析
    哈希表(hashtable)也叫散列表,是一种非常重要的数据结构,应用场景及其丰富,许多缓存技术(比如memcached)的核心其实就是在内存中维护一张大的哈希表,而HashMap的实现原理也常常出现......
  • 源码角度了解Skywalking之服务端OAP对Trace的处理
    源码角度了解Skywalking之服务端OAP对Trace的处理从前几篇的文章我们知道Skywalking对Trace信息进行生成收集后,将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送......
  • 基于SSM+Vue汽车租赁管理系统Java车辆出租系统(源码调试+讲解+文档)
    ......
  • golang-set包的用法及源码解析
    Set是一种基本的数据结构,它具备确定性、互异性、无序性三个特点。因此,在开发过程中我们通常用它来判断一些数据的集合与另一个数据集合或者元素的包含关系。在大部分开发......
  • 3.0 Spring生命周期源码解析
    Spring最核心的功能之一就是创建对象(IOC)Bean的生命周期指:在spring中,一个Bean的生成和销毁的过程1.生成BeanDefinitionSpring启动先进行扫描,调用org.springframework.c......
  • Lua5.3源码解析
    2022-10-11,16点52 大概看了2个月不到的时间,坚持每天看lua设计与实现.pdf还有csdn上面lua的博客.然后自己debug研究.最后把细节加到注释里面.建议看这个项目时候......
  • 工厂方法在Spring源码中的运用
    我们都知道Spring中IOC是使用的工厂模式,但是对于实现细节就一知半解了,今天这篇文章就带大家解读Spring中是如何使用工厂模式的。在上篇文章中我们懂了什么是工厂模式,这篇文......
  • drf三大认证之频率类源码解析
    主要从SimpleRateThrottle的allow_request方法开始分析第一步1.查看SimpleRateThrottle的allow_requestifself.rateisNone:returnTrue#表示没......
  • 云转码源码|m3u8切片程序全开源
     什么是云转码? 云转码是完全在云中将视频文件转换为其他格式的过程。更具体地说,转码意味着从单个编码视频文件创建不同大小、分辨率和比特率的新文件。这种方法使广播......