Kafka 生产者Producer
原理
producer和consumer过去直接与Zookeeper连接,以获得这些信息。
现在Kafka已经脱离了这种耦合,从0.8版和0.9版开始,客户端直接从Kafka brokers那里获取元数据信息,集群中的每个 broker 都会缓存所有主题的分区副本信息,元数据同步可以通过配置metadata.max.age.ms参数(默认五分钟)定时刷新元数据(注:如果在定时请求的时间间隔内发生的分区副本的选举,则意味着原来缓存的信息可能已经过时了,此时还有可能会收到 Not a Leader for Partition 的错误响应,这种情况下客户端会再次向Controller【与Zookeeper数据一致性由Controller完成】发出元数据请求,然后刷新本地缓存),有了元数据信息后,客户端就知道了leader副本所在的 broker,之后直接将读写请求发送给对应的 broker 即可。
如下图
发送消息流程:
- 发送到kafka的数据会封装为ProducerRecord对象,包含topic、partition、key、value信息;
- 调用send()方法后,将数据序列化为字节数组,分区器Partitioner会根据ProducerRecord 对象的键来计算一个分区
- 当消息达到一个批次设定的量(消息放在缓冲区中),通过网络发送到不同的主题,不同的分区;
- 如果消息成功写入 Kafka,就返回一 个
RecordMetaData
对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败, 则告知生产者尝试重新发送消息,达到最大重试次数就抛出异常,其中重试次数可以在配置message.send.max.retries中指定。
下面看一下详细流程图:
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。
-
主线程中由 KafkaProducer 创建消息,然后通过拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。
-
Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。
拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer. ProducerInterceptor
接口。ProducerInterceptor
接口中包含3个方法:
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息。
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer 的I/O线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
close() 方法主要用于在关闭拦截器时执行一些资源的清理工作。
序列化器
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给 Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。
生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,比如 StringSerializer,而消费者使用了另一种序列化器,比如 IntegerSerializer,那么是无法解析出想要的数据的。
序列化器都需要实现org.apache.kafka.common.serialization.Serializer
接口,此接口有3个方法:
public void configure(Map<String, ?> configs, boolean isKey)
public byte[] serialize(String topic, T data)
public void close()
configure() 方法用来配置当前类,在创建 KafkaProducer 实例的时候调用的,主要用来确定编码类型。serialize() 方法用来执行序列化操作。而 close() 方法用来关闭当前的序列化器。
分区器
消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。
如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
Kafka 中提供的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
,它实现了 org.apache.kafka.clients.producer.Partitioner
接口,这个接口中定义了2个方法,具体如下所示。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
-
partition() 方法用来计算分区号,返回值为 int 类型。partition() 方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。
-
close() 方法在关闭分区器的时候用来回收一些资源。
自定义的分区器,只需同 DefaultPartitioner 一样实现 Partitioner 接口即可。由于每个分区下的消息处理都是有顺序的,我们可以利用自定义分区器实现在某一系列的key都发送到一个分区中,从而实现有序消费。
生产者在向broker发送消息时是怎么确定向哪一个broker发送消息?
- 生产者客户端会向任一个broker发送一个元数据请求(MetadataRequest),获取到每一个分区对应的leader信息,并缓存到本地。
- 生产者在发送消息时,会指定partion或者通过key得到到一个partion,然后根据partion从缓存中获取相应的leader信息。
RecordAccumulator 消息累加器
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列。
消息写入缓存时,追加到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。
InFlightRequests
请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<NodeId, Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。
Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区, Deque< ProducerBatch>> 的保存形式转变成 <Node, List< ProducerBatch> 的形式,其中 Node 表示 Kafka 集群的 broker 节点。
KafkaProducer 要将此消息追加到指定主题的某个分区所对应的 leader 副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后 KafkaProducer 需要知道目标分区的 leader 副本所在的 broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka。
所以这里需要一个转换,对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区。
客户端缓存池技术
当我们应用程序调用kafka客户端 producer发送消息的时候,在kafka客户端内部,会把属于同一个topic分区的消息先汇总起来,形成一个batch。真正发往kafka服务器的消息都是以batch为单位的。
kafka是用java语言编写的(新版本大部分都是用java实现的了),如果是使用new一个空间然后赋值给一个引用,释放的时候把引用置为null等JVM GC处理就可以了。在并发量比较高的时候就会频繁的进行GC。我们都知道GC的时候有个stop the world,尽管最新的GC技术这个时间已经非常短,依然有可能成为生产环境的性能瓶颈。
针对上述容易出现GC的问题,Kafka客户端内部实现了一个非常优秀的机制,就是 缓冲池的机制(类似于数据库连接池,线程池等的池化技术)。
即首先开辟初始化一些内存块做为缓冲池,每个batch其实都对应了缓冲池中的一个内存空间,发送完消息之后,batch不再使用了,此时这个batch底层的内存空间不是交给JVM去垃圾回收,而是把内存块归还给缓冲池。
如果一个缓冲池里的内存资源都占满了,暂时没有内存块了,怎么办呢?很简单,阻塞写入,不停的等待,直到有内存块释放出来,然后再继续写入消息。
缓冲池的机制原理如下图: