首页 > 编程语言 >Kafka源码分析(七)——Producer:Sender线程——Broker连接检查

Kafka源码分析(七)——Producer:Sender线程——Broker连接检查

时间:2024-06-06 20:29:45浏览次数:32  
标签:node Sender Producer Broker Selector 源码 NetworkClient 连接 channel

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

学习必须往深处挖,挖的越深,基础越扎实!

阶段1、深入多线程

阶段2、深入多线程设计模式

阶段3、深入juc源码解析


阶段4、深入jdk其余源码解析


阶段5、深入jvm源码解析

码哥源码部分

码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】

码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】

码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】

​​​​​​码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】

码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】

码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】

码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】

终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!

打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】

上一章讲到,Sender线程会不断Loop循环,根据Batch是否就绪筛选出一批Broker。筛选完这些Broker后,还需要判断这些Broker的状态,比如连接是否已经成功建立,对于没有建立连接的Broker进行连接初始化,核心流程是在NetworkClient.ready()中完成的:

    // Sender.java
    
    // 3.检查与Broker的连接状态
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        // 没有客户端与Broker没有成功建立连接,需要移除
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }

本章,我就来讲解Broker筛选的底层原理。

一、整体流程

Broker筛选的核心逻辑是在NetworkClient.ready()中完成的,下面的方法首先判断Broker是否符合就绪条件,接着对于 从未建立过连接或断开连接超过一定时间 的Broker,尝试初始化连接:

    // NetworkClient.java
    
    public boolean ready(Node node, long now) {
        if (node.isEmpty())
            throw new IllegalArgumentException("Cannot connect to empty node " + node);
    
        // 1.判断Broker是否就绪
        if (isReady(node, now))
            return true;
    
        // 2.执行到这里,可能是从未建立过连接或断开连接超过一定时间,所以要初始化连接
        if (connectionStates.canConnect(node.idString(), now))
            initiateConnect(node, now);
        return false;
    }

1.1 就绪判断

我们先来看NetworkClient.isReady()方法,它会认为同时满足以下条件的Broker才处于就绪状态:

  1. MetadataUpdater元数据更新器当前不处于“即将更新元数据”的状态,也就是说,如果此时即将要更新元数据了,就不能发送请求,必须要等待元数据更新完成后才能发送请求;
  2. Broker已经建立连接,且连接状态正常。
    // NetworkClient.java
    
    public boolean isReady(Node node, long now) {
        // 当前没有准备更新元数据,且与Broker连接状态正常
        return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
    }

第一个判断——MetadataUpdater.isUpdateDue(),本质就是要断定 当前客户端不能处于“即将更新元数据”的状态 :

    // NetworkClient.DefaultMetadataUpdater.java
    
    public boolean isUpdateDue(long now) {
        // 当前不处于更新中状态 && 下一次更新时间为0
        return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
    }

第二个判断——NetworkClient.canSendRequest()方法包含了三部分判断逻辑,Broker必须满足以下三个条件才被认为是就绪的:

  1. Broker的连接状态是READY;
  2. Selector Channel已经就绪(NetworkClient底层采用了NIO通信机制);
  3. inFlightRequests 请求数未满,也就是说未响应的请求数不能超过参数 max.in.flight.requests.per.connection设置的值,默认为5个。
    // NetworkClient.java
    
    private boolean canSendRequest(String node) {
        return connectionStates.isReady(node) && selector.isChannelReady(node) 
            && inFlightRequests.canSendMore(node);
    }

1.2 建立连接

经过第一层筛选后,还有一种情况需要考虑,就是 客户端与Broker之间不存在连接 ,所以需要通过ClusterConnectionStates.canConnect()判断下是否属于这种情况,如果是则要建立连接:

    // ClusterConnectionStates.java
    
    public boolean canConnect(String id, long now) {
        // 与当前Broker的连接状态
        NodeConnectionState state = nodeState.get(id);
        if (state == null)    // null表示从未建立过连接
            return true;
        else
            // 连接为断开状态且已经超过一段时间没有重连(默认100ms),也算没有建立过连接
            return state.state == ConnectionState.DISCONNECTED && 
            now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
    }

NetworkClient.initiateConnect()建立连接,这里其实用到了设计模式里的 状态模式 ,通过状态机来修改连接的不同状态:

    // NetworkClient.java
    
    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
            // 通过状态机修改连接状态
            this.connectionStates.connecting(nodeConnectionId, now);
            // 通过底层NIO组件建立连接
            selector.connect(nodeConnectionId,
                             new InetSocketAddress(node.host(), node.port()),
                             this.socketSendBuffer,
                             this.socketReceiveBuffer);
        } catch (IOException e) {
            connectionStates.disconnected(nodeConnectionId, now);
            metadataUpdater.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
        }
    }

另外还要注意,Selector是Kafka客户端自己的NIO组件,底层建立的都是Socket连接。Selector在建立连接时,在底层初始化一个SocketChannel组件,然后把它注册到Selector上,监听它建立连接的事件。

二、初始化连接

上述Broker筛选过程中,如果客户端与某个Broker没有建立连接,就会通过Selector组件去建立连接。本节,我就对Selector这个NIO网络通信组件作一个讲解。

2.1 NetworkClient构造

我们在初始化KafkaProducer时,会创建网络通信组件 NetworkClient :

    // kafkaProducer.java
    
    NetworkClient client = new NetworkClient(
        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
        this.metadata, clientId,
        config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
        this.requestTimeoutMs, time, true);

可以看到,它的内部封装了一个Selector对象,这个就是Kafka客户端最核心的通信组件:

    public class NetworkClient implements KafkaClient {
        private final Selectable selector;
        //...
    }

Selector内部封装了Java NIO核心组件——java.nio.channels.Selector,并且建立了Broker ID和KafkaChannel的映射关系:

    public class Selector implements Selectable {
        private final java.nio.channels.Selector nioSelector;
        private final Map<String, KafkaChannel> channels;
    
        //...
    }

KafkaChannel底层又封装了java.nio.channels.SocketChannel,所以本质就是利用了Java NIO来完成底层的网络通信,整个结构是下面这样的:

2.2 注册监听事件

我们再来看Selector是如何与Broker建立连接的,本质就是些Java NIO的代码,熟悉NIO编程的童鞋对这块应该不会陌生:

    // Selector.java
    
    private final Map<String, KafkaChannel> channels;    //Channel缓存
    
    public void connect(String id, InetSocketAddress address, 
                        int sendBufferSize, int receiveBufferSize) throws IOException {
        if (this.channels.containsKey(id))
            throw new IllegalStateException("There is already a connection for id " + id);
    
        // 创建一个SocketChannel
        SocketChannel socketChannel = SocketChannel.open();
        // 非阻塞模式
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        // keepalive = true,表示这是一个保活连接
        socket.setKeepAlive(true);
        // 发送缓冲区
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setSendBufferSize(sendBufferSize);
        // 接受缓冲区
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setReceiveBufferSize(receiveBufferSize);
        // 设置TcpNoDelay=true,表示不对数据包进行拼接,而是立即发送
        socket.setTcpNoDelay(true);
    
        // 尝试建立连接,非阻塞模式下会立即返回,connected=false
        boolean connected;
        try {
            connected = socketChannel.connect(address);
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }
    
        // 向java.nio.channels.Selector注册SocketChannel,监听OP_CONNECT事件
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
    
        // 构造一个KafkaChannel
        KafkaChannel channel;
        try {
            channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
        } catch (Exception e) {
            try {
                socketChannel.close();
            } finally {
                key.cancel();
            }
            throw new IOException("Channel could not be created for socket " + socketChannel, e);
        }
        // 关联KafkaChannel和SelectionKey
        key.attach(channel);
        // 缓存Channel,也就是每一个Broker对应一个KafkaChannel
        this.channels.put(id, channel);
    
        //...
    }

上述代码的目的就是创建SocketChannel,注册到Selector上,然后通过SelectionKey关联,可以用下面这张图描述:

有几个需要特别关注的地方:

  • SocketChannel配置为“非阻塞”模式时,连接建立是异步的,Selector会监听SocketChannel上的OP_CONNECT事件,当有该事件发生时会获取到一个对应的SelectionKey,所以真正建立连接的地方在Selector.pool()方法中,这个后面会讲;
  • SocketChannel的keepalive,设置keepalive=true之后,如果一段时间内客户端和服务端没有任何通信,客户端就会发送一个探测包,根据探测包的结果保持连接、重新连接或者断开连接;
  • SocketChannel的tcpNoDelay,这个参数当设置false时表示开启Nagle算法,就会把一些小数据包收集起来,组装成一个大包一次性发送出去。所以,如果设置为true,就代表不对数据包进行组装,立马发送;
  • Selector会缓存KafkaChannel,每个Broker一个KafkaChannel,代表着客户端与这个Broker之间的连接。KafkaChannel内部封装了一个TransportLayer对象,而TransportLayer又封装了Java NIO原生的SocketChannel和SelectionKey。

Kafka客户端的 NetworkClient 、 Selector 、 KafkaChannel 、 ConnectStates ,这些底层的通信组件是极为值得我们去研究的。因为Kakfa本身作为一个工业级的中间件,经历了无数线上环境的淬炼。所以,如果我们自己的公司要自研底层NIO通信组件,Kafka是一个现成的参考。

2.3 轮询处理

Selector会监听SocketChannel上的OP_CONNECT事件,真正建立连接的操作是通过Selector.poll方法完成的:

在Sender线程的主流程的最后一步会调用NetworkClient.poll()

    // Sender.java
    
    void run(long now) {
        //...
    
        // 8.负责真正的网络请求发送
        this.client.poll(pollTimeout, now);
    }

NetworkClient.poll()首先会根据元数据更新标识,判断是否要更新集群元数据,需要的话就通过 MetadataUpdater 更新;接着,调用Selector.poll()方法轮询SocketChannel上的所有SelectionKey,根据不同事件类型做处理:

    // NetworkClient.java
    
    public List<ClientResponse> poll(long timeout, long now) {
        // 1.判断是否需要更新元数据,需要的话就更新
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
    
        // 2.轮询SocketChannel的事件,有相应的事件(建立连接、准备读、准备写)发生就进行处理
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
    
        // 3.处理响应
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleAbortedSends(responses);
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutRequests(responses, updatedNow);
    
        // 4.触发回调函数
        for (ClientResponse response : responses) {
            try {
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
        return responses;
    }

我们重点看Selector.poll(),它的核心逻辑在内部的pollSelectionKeys方法中:

    // Selector.java
    
    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");
    
        // 1.各类缓存数据清理
        clear();
        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;
    
        // 2.判断是否已经有就绪Channel
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            // 重点在这里:根据不同事件进行处理
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }
    
        addToCompletedReceives();
    
        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
    
        // 3.关闭长时间空闲的连接
        maybeCloseOldestConnection(endSelect);
    }

Selector.pollSelectionKeys()就是从底层的java.nio.channels.Selector获取注册的SelectionKey:

    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        // 遍历SelectionKey
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            // 1.获取关联的KafkaChannel
            KafkaChannel channel = channel(key);
    
            sensors.maybeRegisterConnectionMetrics(channel.id());
    
            // 2.idleExpiryManager内部保存了一个LRU Map,用来跟踪剔除最近最少使用的连接
            if (idleExpiryManager != null)
                idleExpiryManager.update(channel.id(), currentTimeNanos);
    
            try {
    
                // 3.key.isConnectable:发生了可建立连接的事件
                if (isImmediatelyConnected || key.isConnectable()) {
                    // 尝试建立连接
                    if (channel.finishConnect()) {
                        // 将连接添加到缓存
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    } else
                        continue;
                }
    
                // 2.已经连接上,但连接还没完全就绪
                if (channel.isConnected() && !channel.ready())
                    // 完成三次握手
                    channel.prepare();
    
                // 3.连接已经建立,且发生了可读事件
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }
    
                // 4.连接已经建立,且发生了可写事件
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }
    
                // 5.对于无效Key,关闭对应的Channel
                if (!key.isValid())
                    close(channel, true);
            } catch (Exception e) {
                //...
                close(channel, true);
            }
        }
    }

2.4 建立连接

当SelectionKey上发生可建立连接的事件时,客户端最终会通过KafkaChannel.finishConnect()方法完成连接的建立:

    // Selector.java
    
    if (isImmediatelyConnected || key.isConnectable()) {
        if (channel.finishConnect()) {    // 真正建立连接的地方
            this.connected.add(channel.id());
            this.sensors.connectionCreated.record();
        } else
            continue;
    }
    // KafkaChannel.java
    
    public boolean finishConnect() throws IOException {
        // 调用了内部的TransportLayer组件,里面又封装了SocketChannel
        return transportLayer.finishConnect();
    }

底层就是调用Java NIO的SocketChannel.finishConnect()

    public boolean finishConnect() throws IOException {
        // 建立连接
        boolean connected = socketChannel.finishConnect();
        if (connected)
            // 如果连接已经建立,则对于当前Channel,后续不关注OP_CONNECT和OP_READ事件
            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        return connected;
    }

三、总结

本章,我对客户端检查Broker连接状态的底层原理进行了讲解,核心思路就是 检查当前客户端与哪些待筛选的Broker还没有建立TCP长连接 ,如果没有建立连接,Kafka Producer就会在底层通过Java NIO的封装组件尝试建立连接。

标签:node,Sender,Producer,Broker,Selector,源码,NetworkClient,连接,channel
From: https://blog.csdn.net/smart_an/article/details/139509293

相关文章

  • 电子合同签署小程序,最新源码分享
    获取最新源码 添加我为微信好友一合通源码是一款开源的在线电子合同签署工具,其背后由重庆弈联数聚科技有限公司开发,从公司的商业化产品“一合通”中剥离出来。以下是关于一合通源码的详细介绍:核心功能:智能合同模板:提供智能模板、智能起草、智能审批功能,帮助企业迅速高效......
  • 个人向 godot 源码阅读 - 3 - MainLoop 以及 2D 视口
    3-MainLoop以及2D视口godot默认的主循环类型为SceneTree,在之上则承载了godot中的重要概念之一节点树.SceneTree的源文件位于scene/main/scene_tree.cpp,SceneTree默认将会在Main::start()函数中被创建,然后被设置到OS的mainloop上,现在让我们来看看Scen......
  • 个人向 godot 源码阅读 - 2 - 入口点
    2-入口点由于godot是一个跨平台的引擎,所以就需要不可避免的对不同平台的入口点进行封装,在windows上godot的WinMain入口点的定义位于platform/windows/godot_windows.cpp中.它所做的仅仅是调用到传统的C入口点:在C入口点中则环绕了SEH异常造成的崩溃处理......
  • 个人向 godot 源码阅读 - 1 - 获取 / 编译源码
    1-获取/编译源码获取最常见的方法便是从GitHub上拉取,不过鉴于国内访问GitHub速度之慢,可以采取个折中的方案:先从Gitee上拉取:gitclonehttps://gitee.com/mirrors/godot然后更改remote地址:gitremoteset-urloriginhttps://github.com/godotengine/god......
  • 【爬虫+数据清洗+数据可视化】Python分析“淄博烧烤”热门事件-全流程附源码
    目录一、背景介绍二、爬虫代码2.1展示爬取结果2.2爬虫代码讲解三、可视化代码3.1读取数据3.2数据清洗3.3可视化3.3.1IP属地分析-柱形图3.3.2评论时间分析-折线图3.3.3点赞数分布-箱线图3.3.4评论内容-情感分布饼图3.3.5评论内容-词云图四、技术总结五、演示视频六、转载......
  • JAVA计算机毕业设计基于的儿童疫苗预约系统(附源码+springboot+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着科技的进步和人们健康意识的增强,儿童疫苗接种已成为保障儿童健康成长的重要措施。然而,传统的疫苗预约方式往往存在诸多不便,如预约流程繁琐、信息......
  • JAVA计算机毕业设计基于的高校党务管理系统(附源码+springboot+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着高校党建工作的不断深入和发展,党务管理面临着越来越多的挑战。传统的党务管理方式往往依赖于纸质记录和人工操作,效率低下且容易出错。为了提高党......
  • JAVA计算机毕业设计基于的畅游旅游网(附源码+springboot+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在信息化和全球化的大背景下,旅游业作为现代服务业的重要组成部分,正在经历着前所未有的变革。畅游旅游网作为一个集旅游信息、服务、交易于一体的综合......
  • JAVA计算机毕业设计基于的仓库管理系统(附源码+springboot+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着信息技术的迅猛发展和企业规模的不断扩大,仓库管理作为企业运营中不可或缺的一环,面临着日益复杂和多样化的挑战。传统的仓库管理模式依赖于人工操......
  • 基于java ssm vue mysql校园短期闲置资源置换平台系统(源码+lw+部署文档+讲解等)
    前言......