首页 > 其他分享 >kafka producer生产消息发送到kafka的过程

kafka producer生产消息发送到kafka的过程

时间:2023-06-29 14:24:26浏览次数:33  
标签:node 发送到 producer partion batch send kafka now

1 KafkaProducer的几个重要成员变量

1)Partitioner 

  用来获取消息应该发往哪个分区

private final Partitioner partitioner;

 

2)ProducerMetadata 

  kafka元数据

private final ProducerMetadata metadata;

 

3)RecordAccumulator 

  消息累加器,储存生产者生产的消息,再从这里取出发向kafka

private final RecordAccumulator accumulator;

 

4) Sender

  实现了runnable,消息发送对象,作为参数传入下面的Thread

private final Sender sender;

 

5)Thread 

  消息发送的线程

private final Thread ioThread;

 

2 KafkaProducer的构造方法

  在构造方法中,关注下面代码

    1)创建了sender 

    2)创建了ioThread 

    3)ioThread.run启动了线程

  也就是说,在创建一个KafkaProducer对象的时候,会创建一个线程,并且跑起来,跑的是sender 里面的run方法。

  这个线程做什么事情,后面再说

    this.sender = newSender(logContext, kafkaClient, this.metadata);
       String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
       this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
       this.ioThread.start();

 

3 KafkaProducer.send

  下面,梳理send方法,只看主要的节点

 

3.1 对消息的key进行序列化

serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

 

3.2 对消息的value进行序列化

serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

 

3.3 获取消息应该发往哪个分区

int partition = partition(record, serializedKey, serializedValue, cluster);

 

1)首先判断消息是否指定了partion

  指定了,直接返回这个partion的id,没有指定,再去判断

return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);

 

2)判断是否指定了key,没有指定key

  轮询

    存在连通的partion,在连通的partion里轮询

    不存在连通的partion,在不连通的partion里轮询

if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        }

 

3)指定了key,根据key的hash获取partion

else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }

 

3.4 将消息放入消息累加器中accumulator.append

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);

  

3.4.1 RecordAccumulator-消息累加器的数据结构

  它会为每个topic的每个partion都维护一个队列。每个队列里面会有一个或者多个batch(是一个集合)来存储消息。

 

3.4.2  RecordAccumulator消息累加器的数据结构大小限制和相关参数

1)Batch

  默认大小是16K

  相关的可配置参数是BATCH_SIZE_CONFIG 

public static final String BATCH_SIZE_CONFIG = "batch.size";

  说明:

    (1)msg进来,大小小于一个batch的剩余大小,放进去

    (2)msg进来,大小大于一个batch的剩余大小,新创建一个batch放进去

    (3)msg大于16k,创建一个新的batch存放这个消息,也就是说,batch的大小在这种情况下可以大于配置的大小

 

2)RecordAccumulator

  默认大小是32M,满了消息就存不进去,就会阻塞

  可配置参数是BUFFER_MEMORY_CONFIG 

public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";

 

3)阻塞超时时间

  既然会阻塞,那么必然会有超时时间,RecordAcculator满了,消息放不进来,阻塞的超时时间,消息阻塞超过这个时间,发送失败

  可配置参数MAX_BLOCK_MS_CONFIG 

public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";

 

3.4.3 append方法

1)获取或创建队列

  根据partion去获取队列 

TopicPartition tp
Deque<ProducerBatch> dq = getOrCreateDeque(tp);

   存在,直接获取

   不存在,创建

   可以看到这个队列是ArrayDeque

Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
    return d;
d = new ArrayDeque<>();

 

2)加锁tryAppend

  加锁是为了保证线程安全,因为多个线程持有同一个producer对象,调用send方法的话,就会存在线程安全问题

synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

 

3)进入tryAppend方法

  先去获取了Batch

    获取到了,尝试添加

    获取不到tryAppend方法返回null,再去创建Batch,再调用tryAppend。如何去创建这里就不做说明了。

ProducerBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());

 

4)获取到了batch之后,就把消息存进去

  如何存的细节就不做说明了

 

3.5 append方法结束后

  append方法结束后,send方法后面就没有做什么其它的事情了,后面就不细究了

  

3.6 send方法做的事情

  根据上面可以看出,send方法就是把消息被放入了RecordAccumulator

  并没有涉及到将消息发送给kafka

  

4 ioThread和sender

4.1 简介

  上面,已经知道producer的send方法只是把消息放入了RecordAccumulator,并没有发向kafka。

  实际上,发向kafka的操作是在ioThread这个线程进行的

  在producer创建的时候,这个线程就跑起来了,实际运行的是sender的run方法

  下面,我们来看run方法

 

4.2 run方法简介

1)循环

  一进run方法,就看见一个循环,不停地去调用runOnce方法

while (running) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

 

2)进入runOnce方法,看到最重要的两行代码  

  snendProductData :发送数据

  client.poll :更新元数据,建立连接,把连接注册到多路复用器上,处理io时间

long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);

 

3)run方法图解

   下面再具体介绍sendProducerData和poll两个方法

 

4.3 sendProducerData

  下面,梳理下sendProducerData方法

 

1)获取kafka元数据

  包括节点信息,topic、partion等等信息

  获取元数据,如果元数据已经从节点获取到了,则这里拿到的是完整信息,否则只能获取到节点信息

Cluster cluster = metadata.fetch();

 

2)获取可发送消息的kafka节点

RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
Iterator<Node> iter = result.readyNodes.iterator();
while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

 

3)从消息累加器中获取各个partion的batchs

  key是partion的id

  value是partion对应的batch集合,这个batch集合称为一个包

  注意:这里有一个参数maxRequestSize

      指的是这个包的最大大小

       默认是1M

       相关可配置参数是MAX_REQUEST_SIZE_CONFIG

Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

  也就是说,从消息累加器中去获取各个partion的batchs,可能只能获取一部分,因为这个包是有大小限制的

 

4)发送数据

sendProduceRequests(batches, now);

 

5)进入sendProduceRequests方法

  遍历了batchs,一个包一个包的去发送

for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
            sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());

 

6)进入sendProduceRequest方法

  先构建了一个ClientRequest对象

  里面存储了要发送到的topic、partion还有消息等信息

for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);
            recordsByPartition.put(tp, batch);
        }
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                requestTimeoutMs, callback);

  然后,client.send(clientRequest, now);

 

7)进入client.send方法

  进入dosend方法

  它先做了个判断,是否能够发送消息到kafka

  因为生产者这一端最开始是不知道kafka的元数据信息的。只知道kafka的节点的ip端口信息。

  所以它需要先去获取到元信息后,建立连接,才能发送

  获取元数据连接连接的操作不在sendProducerData这个里面,而是在poll方法里面

if (!canSendRequest(nodeId, now))
                throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");

 

8)然后一直来到下面这个doSend方法里面

doSend(clientRequest, isInternalRequest, now, builder.build(version));

  看到下面代码

  在发送之前,创建了一个inFlightRequest,并且存入了inFlightRequests中

  这是用来向kafka记录发出了多少个请求,kafka还没有响应,这个请求数和一个配置有关MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION

  默认是5,当发出还没有响应的请求数达到这个值,将会阻塞阻塞等待响应之后才会继续发送请求

InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        selector.send(send);

  

9)selector.send(send)

  最后,通过selector将包发向kafka

  至此,sendProducerData方法就走完了

 

4.4  client.poll

1)进入poll方法,就发现它尝试去更新元数据信息

long metadataTimeout = metadataUpdater.maybeUpdate(now);

 

2)进入maybeUpdate方法

  看Default的那个

  先去获取了最近的一个连通的Node

Node node = leastLoadedNode(now);

 

3)进入maybeUpdate(now, node)方法

  首先,去判断是否可以发送请求,如果可以,直接发送请求

if (canSendRequest(nodeConnectionId, now)) {
                Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion();
                this.inProgressRequestVersion = requestAndVersion.requestVersion;
                MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
                log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                return defaultRequestTimeoutMs;
            }

  如果不行,去建立连接,更新元数据

if (connectionStates.canConnect(nodeConnectionId, now)) {
                // We don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node);
                initiateConnect(node, now);
                return reconnectBackoffMs;
            }

 

4)进入initiateConnect(node, now)方法

  先通过上面获取的最近的连通的那个node去建立连接,更新元数据

selector.connect(nodeConnectionId,
                    new InetSocketAddress(address, node.port()),
                    this.socketSendBuffer,
                    this.socketReceiveBuffer);

 

5)再回到poll方法

  maybeUpdate方法走完后
  调用了selector.poll
  这是去处理io事件
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));

  进入selector.poll方法,看到下面代码

  select(timeout)这里传入了一个超时时间

  select等待事件的超时时间,超过时间不再阻塞

  可配置参数是REQUEST_TIMEOUT_MS_CONFIG

int numReadyKeys = select(timeout);
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

 

6) poll方法图解

 

 

标签:node,发送到,producer,partion,batch,send,kafka,now
From: https://www.cnblogs.com/jthr/p/17513840.html

相关文章

  • Kafka参数
     参数解释 brokerbroker.id=1每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumerslog.dirs=/tmp/kafka-logskafka数据的存放地址,多个地址的话用逗号分割/tmp/kafka-logs-1,/tmp/kafka-logs-2port=6667提供给客户端响应......
  • 从Kafka中学习高性能系统如何设计 | 京东云技术团队
    1前言相信各位小伙伴之前或多或少接触过消息队列,比较知名的包含RocketMQ和Kafka,在京东内部使用的是自研的消息中间件JMQ,从JMQ2升级到JMQ4的也是带来了性能上的明显提升,并且JMQ4的底层也是参考Kafka去做的设计。在这里我会给大家展示Kafka它的高性能是如何设计的,大家也可以学习相......
  • kafka
    1kafka及消息队列简介kafka及消息队列简介 2kafka单机安装和简单使用kafka单机安装和简单使用 3kafka集群搭建kafka集群搭建 4kafka常用命令kafka常用命令 5kafka术语及架构简介kafka术语及架构简介 6kafka高可用kafka高可用......
  • kafka常用命令
    启动kafkabin/kafka-server-start.shconfig/server.properties、后台启动加参数-daemonbin/kafka-server-start.sh-daemon../config/server.properties查看topic信息cd到kafka的安装位置,找到bin目录  单机/集群都可以配置参数,下面命令为查询集群的topic信息bin/kafka-top......
  • docker之kafka安装
    一、带zookeeper和UI版本version:'2'services:xbd-zk-1:image:bitnami/zookeeper:3.8.1restart:alwayscontainer_name:xbd-zk-1privileged:trueports:-2181:2181environment:-TZ=Asia/Shanghai-ZOO_P......
  • kafka ack机制
    1kafka基本架构kafka的partion分为leader和follow。leader参与允许,二follower仅作为备份。那么,leader和follower之间是怎么同步的呢 2leader和follower的同步Partition只有Leader是对外提供读写服务的也就是说,如果有一个客户端往一个Partition写入数据,......
  • 【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
    问题描述AzureEventHubs--Kafka生产者发送消息存在延迟接收和丢失问题,在客户端的日志中发现如下异常:2023-06-0502:00:20.467[kafka-producer-thread|producer-1]ERRORcom.deloitte.common.kafka.CommonKafkaProducer-messageId:9235f334-e39f-b429-227e-45cd30dd6486......
  • 【Azure 事件中心】Kafka 生产者发送消息失败的分析步骤
    问题描述AzureEventHubs--Kafka生产者发送消息存在延迟接收和丢失问题,在客户端的日志中发现如下异常:2023-06-0502:00:20.467[kafka-producer-thread|producer-1]ERRORcom.deloitte.common.kafka.CommonKafkaProducer-messageId:9235f334-e39f-b429-227e-45cd30dd......
  • 何时使用Kafka而不是RabbitMQ
    Kafka和RabbitMQ都是流行的开源消息系统,它们可以在分布式系统中实现数据的可靠传输和处理。Kafka和RabbitMQ有各自的优势和特点,它们适用于不同的场景和需求。本文将比较Kafka和RabbitMQ的主要区别,并分析何时使用Kafka而不是RabbitMQ。影响因素可扩展性:Kafka旨在......
  • linux-kafka
    kafka一、单点部署docker-compose创建参考地址https://gitee.com/jasonyin2020/docker-compose/tree/master1.下载kafka软件包[[email protected]~]#ll-rw-r--r--1rootroot103956099Apr1016:09kafka_2.13-3.2.1.tgz2.解压软件包[[email protected]~]#tarxf......