Kafka的生产者
1. 生产者的执行流程
生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程),其中,
- 在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称消息收集器)中。
- Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
我们从创建一个 KafkaProducer对象开始,它将创建一个ProducerRecord对象。这个对象是Kafka中的一个核心类、它代表生产者发送到Kafka服务器端的一个消息对象,即个Key-Value 的键值对。在ProducerRecord对象中,包含如下信息。
- Kafka 服务器端的主题名称(Topic Name)
- Topic 中可选的分区号
- 时间戳。
- 其他 Kev-Value 键值对。
其中,十分重要的就是Kafka服务器端的主题名称。
ProducerRecord创建成功后,需要经过拦截器、序列化器将其转换为字节数组,这样它们才能够在网络上传输,然后消息到达分区器。分区器的作用是根据发送过程中指定的有效的分区号,将ProducerRecord发送到该分区;如果没有指定Topic中的分区号,则会根据 Key进行 Hash 运算,将ProducerRecord 映射到一个对应的分区。
ProducerRecord 默认采用当前的时间,用户也在创建ProducerRecord 的时候提供一个时间戳。Kafka 最终使用的时间戳取决于Topic 的配置,而Topic 时间戳的配置主要有以下两种:
- CreateTime 表示使用生产者产生的时间戳作为Kafka最终的时间戳。
- LogAppendTime 表示生产者记录中的时间戳将在消息添加到其日志中时,由Kafka Broker重写。
ProducerRecord 在经过主线程后,最终由发送线程发送到Kafka服务器端。Kafka Broker在收到消息时会返回一个响应,如果写入成功,则返回一个RecordMetaData 对象,其中包含主题和分区信息,以及记录在分区里的偏移量,上面两种时间戳类型也会返回给用户。如果写入失败,则返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次后如果还是写入失败,就返回错误消息。
2. 自定义Kafka序列化器
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
/**
* @author strind
* @date 2024/10/29 15:32
* @description 自定义kafka序列化器
*/
public class ProducerSerializer implements Serializer<String> {
@Override
public byte[] serialize(String topic, String data) {
return data.getBytes(StandardCharsets.UTF_8);
}
}
3. 生产者的消息发送模式
Kafka 生产者的消息发送主要有三种模式:发后即忘(fire-and-forget)、同步模式(sync)及异步模式(async)。
发后即忘:只管向Kafka中发送消息而并不用关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。
同步模式:要实现同步的发送方式,可以利用返回的Future对象的阻塞等待Kafka的响应即可实现,直到消息发送成功。如果发生异常,就需要捕获异常并交由外层逻辑处理。
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author strind
* @date 2024/10/29 15:31
* @description 生产者发送消息
*/
public class SimpleKafkaProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port,ip:port");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String,String> producer = new KafkaProducer<>(properties);
// 发后即忘
producer.send(new ProducerRecord<>("test-topic","data"));
// 同步发送
// Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test-topic", "data"));
// RecordMetadata response = future.get();
// if (response != null){
// // do something
// }
producer.close();
}
}
异步模式:为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。一般在 send()方法中指定一个 Callback的回调函数,Kafka 在返回响应时调用该函数来实现异步的发送确认。
// 异步发送
producer.send(new ProducerRecord<>("test-topic","data"), (recordMetadata,exception)->{
// recordMetadata 与 exception是 互斥的,只有一个为null
if (exception == null){
// 发送成功
}else {
// 发送失败
}
});
4. 生产者的高级特性
4.1 分区机制
Kafka 的主题 Topic 是由分区组成的,而将Topic进行分区的主要目的就是提供负载均衡和容错的能力,以及实现系统的高伸缩性和高可用性。Kafka的消息组织方式实际上是三层结构:主题一分区一消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中保存多份。在创建Topic 的时候可以指定每个分区的副本数,用于支持分区中消息的容错。
不同的分区能够放置在Kafka集群中不同的节点上,而生产者和消费者在产生消息和消费消息的时候,也都是针对分区进行的,这样每个节点的机器都能独立执行各自分区的读写请求处理,并且还可以通过添加新的Kafka节点来增加整体系统的吞吐量。
Kafka 提供了如下的几种分区策略(分区策略是决定生产者将消息发送到哪个分区的算法):
-
默认分区策略:
- 如果记录中指定了分区,则可以直接使用。
- 如果记录中未指定分区,但指定了key值,则根据key 的 hash 值选择一个分区。
- 如果记录中未指定分区,也未指定key值,则以黏性分区策略选择一个分区。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { return this.stickyPartitionCache.partition(topic, cluster); } else { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
-
轮询分区策略:
如果 key 值为 null,并且使用了默认的分区器,Kafka会根据轮询策略将消息均匀地分布到各个分区上。public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int nextValue = this.nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> { return new AtomicInteger(0); }); return counter.getAndIncrement(); }
-
黏性分区策略:
黏性分区策略就像黏住这个分区一样,只要这个分区没有被填满,就会尽可能地坚持使用该分区。这种策略首先会选择单个分区发送所有无key的消息,一旦这个分区已填满,黏性分区策略就会随机选择另一个分区。public int partition(String topic, Cluster cluster) { Integer part = (Integer)this.indexCache.get(topic); return part == null ? this.nextPartition(topic, cluster, -1) : part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = (Integer)this.indexCache.get(topic); Integer newPart = oldPart; if (oldPart != null && oldPart != prevPartition) { return (Integer)this.indexCache.get(topic); } else { List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); Integer random; if (availablePartitions.size() < 1) { random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = ((PartitionInfo)availablePartitions.get(0)).partition(); } else { while(newPart == null || newPart.equals(oldPart)) { random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = ((PartitionInfo)availablePartitions.get(random % availablePartitions.size())).partition(); } } if (oldPart == null) { this.indexCache.putIfAbsent(topic, newPart); } else { this.indexCache.replace(topic, prevPartition, newPart); } return (Integer)this.indexCache.get(topic); } }
-
散列分区策略:
如果键值不为 null,并且使用了默认的分区器,Kafka 会对键进行散列,然后根据散列值把消息映射到对应的分区上。
-
自定义分区策略:
只需实现接口org.apache.kafka.clients.producer.Partitioner,用户可以根据需要对数据使用不一样的分区策略,在自定义分区策略后,只需要在生产者的Properties中指定 ProducerConfig.PARTITIONER_CLASS_CONFIG 参数即可。
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @author strind
* @date 2024/10/29 16:01
* @description 分区策略
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
Integer num = (Integer) key;
if (num % 2 == 0){
return 0;
}
return 1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
4.2 压缩机制
Kafka发送消息的时候,可以在生产者端和Broker端进行消息的压缩。在一般情况建议采用的压缩机制是:生产者端负责压缩;Broker端负责保持;消费者端负责解压。Kafka采用这样的压缩机制,主要是使用CPU的时间去换磁盘存储的空间,以及网络IO的传输量。这样的做法可以以较小的CPU开销带来更少的磁盘占用或更少的网络IO 传输。
在目前的Kafka版本中,支持GZIP、Snappy和 LZ4 三种压缩方式。
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
4.3 拦截器
在KafkaProducer的主线程中可以创建一个或多个Producerlnterceptors(拦截器)。拦截器在生产者端和消费者端均可设置,拦截器主要用于实现生产者端和消费者端的定制化控制逻辑。
对生产者而言,拦截器需要实现 org.apache.kafka.clients.producer.ProducerInterceptor接口。
下面对接口中的三个方法做出必要的解释:
-
onSend( )。
该方法将在KafkaProducer.send方法的主线程中执行。KafkaProducer 确保在消息被序列化以前,调用ProducerInterceptor.onSend方法。用户可以在该方法中对消息进行操作,但最好不要修改消息所属的Topic和分区,否则会影响目标分区的计算。 -
onAcknowledgement( )。
该方法会在消息被确认应答之前或消息发送失败时调用。如果生产者采用的是异步发送机制,该方法通常是在生产者回调逻辑被触发之前被调用的。需要注意的是,该方法运行在生产者的 IO 线程中,如果在该方法中放入很重的逻辑,会影响生产者的消息发送性能
-
close( )
关闭拦截器之前,可以将一些资源清理工作放在close 方法中。
需要注意的是,如果指定了多个连接器,生产者将按照指定顺序调用它们。如果拦截器中出现了异常,生产者会将异常的错误信息记录到错误日志中,而不是向上传递。
5. 生产者参数配置
Kafka 生产者端的配置参数,除了之前使用过的bootstrap.servers、key.serializer 和value.serializer 三个必须参数,还有很多可选的参数。
-
acks
这个参数用于指定 分区中必须有多少个副本成功接收到消息之后,才能向生产者返回这条消息写入是成功的。acks 参数的本质其实就是一个字符串。有三种类型的值。-
acks=1。
这是 acks 参数的默认值。Kafka的生产者将消息发送到Kafka的Broker 服务器端。只要 Topic 分区中的Leader 成功写入消息,就算该消息成功发送。
如果在生产者写入消息的过程中,Leader分区所在的Broker出现了宕机,将会造成
消息无法正常写入。在重新选举Leader 的过程中,生产者Producer 会受到一个服务器端返回的错误信息。生产者为了支持容错,避免消息的丢失,会尝试重新发送该消息。直至消息成功写入 Leader 分区。这里需要注意的是,如果消息已经成功写入Leader所在的分区,但还未同步至其他Follower 分区的时候,如果Leader分区所在的Broker 出现了宕机,这时候会造成写入Leader分区的消息丢失。所以在这种参数值的设置下,消息是可能丢失的。
-
acks=0。
在这种参数设置下,Kafka的生产者不需要等待任何服务器端的响应,所以这时Kafka 集群可以达到最大的吞吐量。如果消息从生产者发送到写入Kafka消息系统的过程中出现异常,比如Broker宕机,生产者将不会得到任何反馈信息,也不会重发消息,会导致消息丢失。 -
acks=-1 或acks=all。
在这种参数设置下,Kafka集群将达到最高的可靠性。生产者发送完消息后,需要等待Leader 分区和所有Follower分区都成功写入消息后,才返回给生产者一个成功写入的应答响应。
-
-
buffer.memory
Kafka 生产者的 Sender 线程在将消息发送到Kafka 服务器端之前,会把消息缓存到内存中,这个参数就决定了消息缓存的内存大小,其默认值是32MB。如果生产者产生消息的速度大于将消息发送到服务器端的速度,那么生产者将会被阻塞,并最终导致生产者抛出一个RecordTooLargeException的异常。
在实际的生产环境下,应该根据实际情况进行测试最终决定 buffer.memory 参数值的大小。
-
batch.size 。
当 Kafka 客户端将多个消息发送到同一个分区的时候,生产者为了减少客户端与服务器端的请求交互,会尝试将消息批量打包在一起,进行统一发送,这样有助于提升客户端和服务器端的性能。该配置的默认批次大小(以字节为单位)是16384 字节。如果消息的内存大小大于该参数的配置,将不会进行批量打包的过程。
通过提升 batch.size 的大小,可以允许更多数据缓冲在分区中,那么一次请求服务器端所发送出去的数据量就更多了,这样吞吐量可能会有所提升。但是这样会造成大量内存的浪费。反过来如果减小 batch.size 的大小,则会系统地降低吞吐量。如果将 batch.size 设置为0,则批处理机制被禁用。所以需要在这里按照生产环境的发消息速率,调节不同的batch.size的大小。 -
compression.type。
该参数指定给到 Topic 中数据的压缩类型,其有效值的设置可以是标准的压缩方式,例如,'gzip'、'snappy’、"z4'、'zstd,同时该参数也可以是'uncompressed',在这种设置下消息数将不会被压缩。
-
client.id。
当生产者向服务器端发送请求时,传递给服务器端ID字符串。通过这个ID字符串Kafka服务器端就可以追踪请求的资源,其本质就是将生产者及其请求的资源进行逻辑上的隔离。 -
connections.max.idle.ms.
当生产者不再往服务器端发送消息时,这个参数用来决定关闭生产者连接的时间阈值,其默认值是 9min。 -
linger.ms.
该参数决定消息在由生产者发送到服务器端之前,在客户端延长发送的时间。通过这样的延时发送机制,可以将多个消息组合成一个批处理进行统一发送。从本质上讲,该参数与上面提到的 batch.size 参数类似。合理设置 batch.size 参数和 linger.ms 参数,将很好地利用 到Kafka 批处理机制。把linger.ms设置得太小了,比如默认就是0ms,或者设置为5ms,那可能导致 Batch虽然设置了 32KB,但是经常是还没凑够 32KB的数据,5ms之后就直接强制 Batch将数据发送出去,这会导致你的 Batch 形同虚设,一直凑不满数据。 -
max.block.ms.
该配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()将消息阻塞多长时间,此外也可能是因为缓冲区已满或元数据不可用,导致这些方法被阻止。在用户提供的序化程序或分区器中的锁定不会计入此超时,其默认值为60000ms。 -
max.request.size .
Kafka生产者能发送消息的最大值,默认值为1MB。此设置将限制生产者的单个请求中发送的消息批次数,以避免发送过大的请求。这个参数涉及其他一些相关参数,比如务器 Broker 端的 message.max.bytes 参数,如果 message.max.bytes 参数设置为 10,而 max.request.size 设置为20,这时候就可以造成生产者报错。 -
retries 和 retry.backoff.ms .
如果生产者出现了异常,或者消息没有成功写入Kafka的服务器端,生产者可以配置重试的参数值,通过生产者端的内部重试机制来执行恢复,并不是直接将异常抛出。如果重试达到设定次数,生产者才会放弃重试并抛出异常。retries参数的默认值是0。同时生产者的重试还与retry,backof.ms参数有关,该参数用来设定两次重试之间的时间间隔其默认值是 100ms,从而避免无效的频繁重试。在配置retries 和 retry.backoft.ms之前,可以设定总重试时间要大于异常恢复时间,最好先估算一下异常恢复时间,避免过早放弃重试。 -
receive.buffer.bytes 。
这个参数用来设置socket接收消息缓冲区的大小,该缓存区大小的默认值32KB。如果将其值设置为-1,则使用操作系统的默认值。、 -
send.buffer.bytes.
这个参数用来设置socket发送消息缓冲区的大小,默认值为128KB。与receive.buffer.bytes 参数一样,如果将其值设置为-1,则使用操作系统的默认值。 -
request.timeout.ms.
消息由生产者发出后,该参数用于决定生产者等待请求响应的最长时间,其默认值为40s。如果响应的时间超过了该参数的设置,客户端将按照重试策略进行重试。注意,这个参数值需要比 Broker 端的参数replica.lag.time.max.ms值要大,这样可以减少因客户端重试引起的消息重复的概率。 -
reconnect.backoff.max.ms.
该参数表示Kaka客户端重连的最大时间。每次连接失败,重连时间都会成指数级增加,每次增加的时间会存在20%的随机浮动,以避免连接风暴。 -
reconnect.backoff.ms.
该参数表示Kafka客户端每次重连时候的间隔时间。 -
delivery.timeout.ms.
当生产者调用send方法后,该参数用于指定客户端等待发送成功或失败报告时,客户端等待时间的上限。这个时间上限包含以下几部分。- 一条消息在发送前的延时时间。
- 生产者等待服务器端Broker确认信息的等待时间,
- 失败时的重试时间
-
transaction.timeout.ms.
该参数表示生产者主动终止当前正在进行的操作之前,Kaka等待操作状态更新的最大时间,其默认值是1min。如果该值大于Broker 中max.transaction.timeout.ms 的设置,则请求失败,并报"InvalidTransactionTimeout"错误。 -
transactional.id.
在事务传递过程中该参数用于表示某个事务的ID。这样可以保证跨多个生产者会话时语义的可靠性。因为它允许客户端保证在开始任何新事务之前使用相同的Transactional ID的事务来完成。 -
max.in.flight.requests.per.connection .
该参数表示在消息被阻塞前,每个客户端上发送的未应答请求的最大数量,其默认值是5。注意,如果该参数值设置大于1,并且消息发送失败,则由于客户端的重试会增加消息重新排序的风险。 -
metadata.max.age.ms .
该参数表示当超过这个时间间隔时,系统就会更新元信息,其默认值5min。Kafka的元数据信息由ZooKeeper 维护,包含Topic 信息、副本信息、分区信息、Broker 信息。 -
metadata.max.idle.ms.
当Topic 处于空闲状态时,该参数用于控制生产者抓取 Topic 元信息的时间。