首页 > 编程语言 >Kafka源码分析(六)——Producer:Sender线程——Batch筛选

Kafka源码分析(六)——Producer:Sender线程——Batch筛选

时间:2024-06-06 20:30:00浏览次数:29  
标签:Sender Producer Partition Broker long batch 源码

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

学习必须往深处挖,挖的越深,基础越扎实!

阶段1、深入多线程

阶段2、深入多线程设计模式

阶段3、深入juc源码解析


阶段4、深入jdk其余源码解析


阶段5、深入jvm源码解析

码哥源码部分

码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】

码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】

码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】

​​​​​​码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】

码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】

码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】

码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】

终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!

打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】

KafkaProducer通过组件RecordAccumulator将消息按照批次缓存后,就立马返回了,也就是说消息的发送是 异步 的。那么,到底是谁负责从缓存中获取各个batch消息,然后通过网络组件发送给Broker呢?

没错,就是Sender线程。本章,我就来讲解Sender线程是如何完成消息发送的。

注:前面章节我讲解过KafkaProdcer的初始化流程,我们应该已经知道Sender本质是一个Runnable任务,实际创建的线程是“KafkaThread”,我这里习惯叫做”Sender线程“。

一、整体流程

我们先来回顾下Sender线程的整体处理流程,然后再逐节分析内部的细节。Sender线程启动后,会在一个Loop循环中不断执行run方法:

    // Sender.java
    
    void run(long now) {
        // Cluster包含了元数据
        Cluster cluster = metadata.fetch();
    
        // 1.获取已经有消息就绪的Partition对应的Broker
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
        // 2.如果有Partition对应的元数据都没拉取到,就标识一下,后续需要尝试拉取元数据
        if (!result.unknownLeaderTopics.isEmpty()) {
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }
    
        // 3.检查与Broker的连接状态
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            // 没有建立好连接就移除
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }
    
        // 4.按照Broker维度,对Partition进行分组
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                                                                         this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
    
        // 5.剔除超时的batch(默认60s)
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
    
        sensors.updateProduceRequestMetrics(batches);
    
        // 6.计算超时时间
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        }
    
        // 7.创建ClientReqeust对象,包括了多个Batch,形成一个请求
        sendProduceRequests(batches, now);
    
        // 8.负责真正的网络请求发送
        this.client.poll(pollTimeout, now);
    }

我来解释下上述的各个步骤:

  1. 首先,获取Cluster对象,里面包含了缓存的Broker集群的元数据信息(也可能没有元数据);

  2. 接着,需要判断哪些Broker上的Partition已经准备好发送数据了,比如:

    • 已经有写满的batch(16kb)的Partition;
    • batch创建时间已经超过了linger.ms的Partition。
  3. 第二步筛选完后,可能有一些Partition不知道Leader信息,对于这种情况,后面需要重新拉取元数据;

  4. 接着,检查这些数据已经就绪的Broker,看看客户端是否已经与它们建立了长连接,如果没有则建立连接;

  5. 接着,还需要对数据重新按照Broker维度分组:<Integer, List<RecordBatch>>,一个Broker可以对应多个Partition的Batch;

  6. 剔除超时的batch(默认60s),也就是说如果有batch在内存缓冲区里停留超过60s,就丢弃掉;

  7. 然后,针对每个要发送的Broker,创建一个ClientReqeust对象,里面包括了多个Batch,形成一个请求,后面会将它发送给Broker;

  8. 最后,通过 NetWorkClient 网络通信组件,发送实际的网络I/O通信请求,同时也读取响应结果;

以上就是Sender线程运行的整体流程,接下来我们分步骤来看内部实现细节。

二、就绪Batch筛选

首先来看RecordAccumulator是如何筛选哪些Broker上的Partition已经准备好发送数据了:

    // Sender.java
    
    // 1.获取已经可以发送消息的Partition
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

核心就是RecordAccumulator的ready方法,该方法会筛选出就绪的Leader Partition所在的Broker节点,ReadyCheckResult本质是一个包装Bean对象,封装了以下内容:

    public final static class ReadyCheckResult {
        // 就绪Partition Leader所在的Broker
        public final Set<Node> readyNodes;
        // 下一次进行就绪筛选要等待的时间
        public final long nextReadyCheckDelayMs;
        // 未知Leader的Topic
        public final Set<String> unknownLeaderTopics;
    }

2.1 流程

我们再来看RecordAccumulator.ready()的处理流程,它的核心思路就是:

  1. 遍历缓存的map<TopicPartition, Deque<RecordBatch>>,针对每一个Partition,获取队列头的batch,检查该batch 是否符合就绪条件 ;
  2. 如果符合就绪条件,就把Partition对应的Leader Partition所在的Broker加入结果集;
  3. 如果不符合就绪条件,就计算出下一次处理的时间nextReadyCheckDelayMs,那么Sender线程后续会等待该时间之后再来检查。
    // RecordAccumulator.java
    
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        // 定义各类返回对象
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set<String> unknownLeaderTopics = new HashSet<>();
    
        // 判断BufferPool可用内存是否已经耗尽
        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();
            // Leader Partition
            Node leader = cluster.leaderFor(part);
            synchronized (deque) {
                // 不存在Leader,说明要重新拉取元数据
                if (leader == null && !deque.isEmpty()) {
                    unknownLeaderTopics.add(part.topic());
                } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                    // 从队列头获取batch,因为入队是从队尾入的
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        // 判断是否有就绪的Batch
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            // 计算下一次ready check的时间
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }

上述流程中,还需要关注的一点是:判断缓冲池BufferPool是否已经耗尽。当BufferPool没用可用空间时,条件等待队列 waiters 不为空:

    // BufferPool.java
    
    private final Deque<Condition> waiters;
    
    public int queued() {
        lock.lock();
        try {
            return this.waiters.size();
        } finally {
            lock.unlock();
        }
    }

2.2 筛选条件

那么,哪些batch才算是已经就绪的batch呢?判断逻辑在如下代码中,每个变量我都加了详尽注释:

    // RecordAccumulator.java
    
    // 是否属于重试batch
    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
    
    // 表示这个batch目前已经等待了多久:waitedTimeMs = 当前时间 - 上一次重试时间,默认情况下batch.lastAttemptMs为创建时间
    long waitedTimeMs = nowMs - batch.lastAttemptMs;
    
    // 表示这个batch最多要等待多久才被发送:timeToWaitMs = 重试时间间隔 或 最大可逗留时间,
    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
    
    // 表示这个batch的剩余等待时间
    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
    
    // 表示batch空间是否已经满了
    boolean full = deque.size() > 1 || batch.isFull();
    
    // 表示batch是否到期,到期就是就绪了
    boolean expired = waitedTimeMs >= timeToWaitMs;
    boolean sendable = full || expired || exhausted || closed || flushInProgress();
    
    if (sendable && !backingOff) {
        // 把就绪batch所在的Leader Partition加入readyNodes
        readyNodes.add(leader);
    }

从上述代码可以看出,以下情况就会认为已经就绪:

  1. BufferPool缓冲区满了,默认32MB;
  2. Batch空间满了,默认16KB;
  3. RecordAccumulator已经关闭,因为当客户端关闭时,必须立马把内存缓冲区中的Batch发送出去;
  4. Batch的缓存时间超过了linger.ms

三、总结

本章,我对Sender线程Loop处理流程中, 就绪Batch消息筛选 的底层原理进行了讲解。我们需要主要关注的是,在哪些情况下,缓冲区中的消息会被立马发送。

标签:Sender,Producer,Partition,Broker,long,batch,源码
From: https://blog.csdn.net/smart_an/article/details/139509104

相关文章

  • Kafka源码分析(七)——Producer:Sender线程——Broker连接检查
    作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬学习必须往深处挖,挖的越深,基础越扎实!阶段1、深入多线程阶段2、深入多线程设计模式阶段3、深入juc源码解析阶段4、深入jdk其余源码解析......
  • 电子合同签署小程序,最新源码分享
    获取最新源码 添加我为微信好友一合通源码是一款开源的在线电子合同签署工具,其背后由重庆弈联数聚科技有限公司开发,从公司的商业化产品“一合通”中剥离出来。以下是关于一合通源码的详细介绍:核心功能:智能合同模板:提供智能模板、智能起草、智能审批功能,帮助企业迅速高效......
  • 个人向 godot 源码阅读 - 3 - MainLoop 以及 2D 视口
    3-MainLoop以及2D视口godot默认的主循环类型为SceneTree,在之上则承载了godot中的重要概念之一节点树.SceneTree的源文件位于scene/main/scene_tree.cpp,SceneTree默认将会在Main::start()函数中被创建,然后被设置到OS的mainloop上,现在让我们来看看Scen......
  • 个人向 godot 源码阅读 - 2 - 入口点
    2-入口点由于godot是一个跨平台的引擎,所以就需要不可避免的对不同平台的入口点进行封装,在windows上godot的WinMain入口点的定义位于platform/windows/godot_windows.cpp中.它所做的仅仅是调用到传统的C入口点:在C入口点中则环绕了SEH异常造成的崩溃处理......
  • 个人向 godot 源码阅读 - 1 - 获取 / 编译源码
    1-获取/编译源码获取最常见的方法便是从GitHub上拉取,不过鉴于国内访问GitHub速度之慢,可以采取个折中的方案:先从Gitee上拉取:gitclonehttps://gitee.com/mirrors/godot然后更改remote地址:gitremoteset-urloriginhttps://github.com/godotengine/god......
  • 【爬虫+数据清洗+数据可视化】Python分析“淄博烧烤”热门事件-全流程附源码
    目录一、背景介绍二、爬虫代码2.1展示爬取结果2.2爬虫代码讲解三、可视化代码3.1读取数据3.2数据清洗3.3可视化3.3.1IP属地分析-柱形图3.3.2评论时间分析-折线图3.3.3点赞数分布-箱线图3.3.4评论内容-情感分布饼图3.3.5评论内容-词云图四、技术总结五、演示视频六、转载......
  • JAVA计算机毕业设计基于的儿童疫苗预约系统(附源码+springboot+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着科技的进步和人们健康意识的增强,儿童疫苗接种已成为保障儿童健康成长的重要措施。然而,传统的疫苗预约方式往往存在诸多不便,如预约流程繁琐、信息......
  • JAVA计算机毕业设计基于的高校党务管理系统(附源码+springboot+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着高校党建工作的不断深入和发展,党务管理面临着越来越多的挑战。传统的党务管理方式往往依赖于纸质记录和人工操作,效率低下且容易出错。为了提高党......
  • JAVA计算机毕业设计基于的畅游旅游网(附源码+springboot+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在信息化和全球化的大背景下,旅游业作为现代服务业的重要组成部分,正在经历着前所未有的变革。畅游旅游网作为一个集旅游信息、服务、交易于一体的综合......
  • JAVA计算机毕业设计基于的仓库管理系统(附源码+springboot+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着信息技术的迅猛发展和企业规模的不断扩大,仓库管理作为企业运营中不可或缺的一环,面临着日益复杂和多样化的挑战。传统的仓库管理模式依赖于人工操......