一. 测试环境搭建
引入依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
创建一个Bean,这里的配置后面会解析。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* @author kirin.麒麟
* @version 1.0.0
* @classname Kafka
* @desc Kafka配置
* @date 2022/1/9 5:04 下午
*/
@Slf4j
@Configuration
public class KafkaConfig {
/**
* 生产者Bean
* @return
*/
@Bean
public KafkaProducer<String, String> kafkaProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.138:9092");
// 消息确认机制配置
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
// 多长时间发送一个批次
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
// 缓冲
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(properties);
}
}
发生消息:
@RestController
public class KafkaController {
@Autowired
private KafkaProducer<String, String> kafkaProducer;
@GetMapping("/send")
public String sendMsg() throws InterruptedException {
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "这是第" + i + "条消息");
kafkaProducer.send(record);
Thread.sleep(200);
}
return "success";
}
}
开启主题的监听:
# 单节点监听
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9092 --topic test-topic --from-beginning
# 集群节点监听
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9091,192.168.31.138:9092,192.168.31.138:9093 --topic test-topic --from-beginning
二. 消息发送模式
从第一部分可以知道,发送消息就是将消息和Topic封装一个ProducerRecord
对象,然后通过KafkaProducer
的send
方法发送。
这里有两种种方法,这些方法都是通过Future
封装返回的,是可以拿到返回值的,其实都是异步执行的,第二个可以执行异步回调。
异步发送:
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
kafkaProducer.send(record);
同步发送:
因为这里使用Future
做返回,所以可以通过get()
方法阻塞,相当于同步。
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
Future<RecordMetadata> future = kafkaProducer.send(record);
RecordMetadata metadata = future.get();
异步回调发送:
回调就是在callback
中执行业务
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
kafkaProducer.send(record, (metadata, exception) -> {
// TODO
});
三. 生产者构建流程
这里简单看一下KafkaProducer
的初始化过程,从构造方法开始:
接着获取Client.id
,若没有配置则由是producer
-递增的数字
this.producerConfig = config;
this.time = time;
// 获取事物ID
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
// 获取用户配置的client.id
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
设置生产者监控:
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
设置分区器paritioner
,这里可以设置自己的分区器,需要增加:partitioner.class
,将自定义的分区器路径写入。
设置key和value的序列化器:
解析并实例化拦截器,这里可以自定义实现拦截器,过滤特定的消息。
构建RecordAccumulator
,这个类是用来存放消息
到RecordAccumulator
这个类里面看一下属性batches
:
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
它是一个 ConcurrentMap
,key
是 TopicPartition
类,代表一个 topic
的一个 partition
。value
是一个包含 ProducerBatch
的双端队列。等待 Sender
线程发送给 broker
。
创建守护线程sender
,用来监听发送消息:
在KafkaThread
的可以看出这个线程是守护线程:
这里需要注意,KafkaProducer
的主线程是用来往RecordAccumulator
里面写消息,Sender
守护线程是用来读取消息并发送到Kafka
中的。
四. 消息发送大体流程
上面的例子演示了消息的发送,这里简单看一下消息发送的大体流程:
先从send
函数开始分析
首先方法会先进入拦截器集合ProducerInterceptors
,onSend
方法是遍历拦截器onSend
方法,拦截器的目的是将数据处理加工,Kafka
本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现接口ProducerInterceptor
并实现onSend
方法。
接下来看一下doSend
方法,首先先判断守护线程sender
是否可用,接着判断要发送到topic
的metadata
是否可用:
序列化key
和value
计算record
对应的partition
的值,这里如果定义了分区器会使用自定义,也可以指定算法获取该值。下面会介绍具体分区器实现
int partition = partition(record, serializedKey, serializedValue, cluster);
接着像accumulator
中追加数据:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
如果是新批次需要重新指定分区在追加数据:
缓冲满了之后,唤醒sender线程发送消息:
到这儿只是消息发送大体的流程,内部还有一大堆的代码,功力不够看着有点上头,等有时间在看吧!
五. 序列化器
消息在网络上传输,必须进行序列化转化为字节流。Kafka提供了默认的字符串序列化器StringSerializer
,除此之外还有ByteArray
、ByteBuffer
、Bytes
、Double
、Integer
、Long
等。
在org.apache.kafka.common.serialization
包下面可以看到默认实现的序列化器!
接下来看一下借助fastjson
实现自定义序列化器:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserInfo {
private String name;
private Integer age;
}
// 序列化器
public class CustomizeSerializer implements Serializer<UserInfo> {
@Override
public byte[] serialize(String topic, UserInfo data) {
return JSON.toJSONBytes(data);
}
}
// 反序列化器
public class CustomizeDeserializer implements Deserializer<UserInfo> {
@Override
public UserInfo deserialize(String topic, byte[] data) {
return JSON.parseObject(data, UserInfo.class);
}
}
配置使用:
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomizeSerializer.class);
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomizeDeserializer.class);
使用:
@Slf4j
@RestController
public class CustomizeController {
@Autowired
private KafkaProducer<String, UserInfo> kafkaProducer;
private static final String TOPIC_NAME = "long-topic";
@GetMapping("c_send")
public String send(@RequestParam(value = "size", defaultValue = "20") Integer size) {
for (int i = 0; i < size; i++) {
ProducerRecord<String, UserInfo> record = new ProducerRecord<>(TOPIC_NAME, 0, "user:" + i, new UserInfo("这是第" + i + "条消息", i));
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}
}
六. 拦截器
拦截器的使用场景:
- 按照某个规则过滤掉不符合要求的消息
- 修改消息内容
- 发送消相关统计
自定义拦截器,主要是实现ProducerInterceptor
接口,这个接口中四个方法:
-
onSend
:该方法封装进KafkaProducer.send
方法中,即它运行在用户的主线程中。Producer
确保在消息被序列化以及计算分区前调用该方法。注意:用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic
和分区,否则会影响目标分区的计算 -
onAcknowledgement
:该方法会在消息从RecordAccumulator
成功发送到Kafka Broker
之后,或者在发送过程中失败时调用。并且通常都是在producer
回调逻辑触发之前。注意:onAcknowledgement
运行在producer
的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer
的消息发送效率 -
close
:关闭interceptor,主要用于执行一些资源清理的工作 -
configure
:获取配置信息和初始化数据时调用
实例:
public class CustomizeProducer implements ProducerInterceptor<String, UserInfo> {
@Override
public ProducerRecord<String, UserInfo> onSend(ProducerRecord<String, UserInfo> record) {
UserInfo value = record.value();
return value.getAge() % 2 == 0 ? new ProducerRecord<>(
record.topic(),
record.key(),
new UserInfo(value.getName(), value.getAge() + 100)) :
new ProducerRecord<>(
record.topic(),
record.key(),
new UserInfo(value.getName(), value.getAge() + 60));
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
使用自定义拦截器:
注意:ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
对应的值是一个拦截器数组。
七. 分区器
7.1. 测试集群分区器
发送消息结果数据实体RecordMetadata
,返回数据包好分区器、偏移量和主题。
测试发动100条消息数据:
@GetMapping("/send")
public String sendMsg(@RequestParam(value = "size", defaultValue = "100") Integer size) throws InterruptedException {
for (int i = 0; i < size; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "这是第" + i + "条消息");
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}
如果没有指定分区器系统会使用默认的默认的分区器:DefaultPartitioner
(在ProducerConfig
中有定义)。该分区器首先会随机选择一个分区,并尽可能一直使用该分区,直到该分区批次(默认是16k
)已满或者已完成(linger.ms
配置的时间已到),Kafka会再随机选择一个和上一个分区不同的分区进行使用。
关于计算分区器的值有下面这几种情况:
- 指明
partition
的情况下,直接将指明的值作为partition
的值
@GetMapping("/send")
public String sendMsg(@RequestParam(value = "size", defaultValue = "20") Integer size) throws InterruptedException {
for (int i = 0; i < size; i++) {
// 设置0分区发送数据
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 0, "key:" + i, "这是第" + i + "条消息");
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}
- 没有指明
partition
值但有key
的情况下,将key
和hash
值与topic
的partition
数进行取余得到partition
的值
Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
-
partition
和key
都不存在的情况下,通过 stickyPartitionCache
的 partition
方法计算出分区。
7.1. 默认分区器
系统里面支持如下几种:
-
DefaultPartitioner
:默认分区策略,如果key
也不存在,则会对可用分区进行轮询,如果没有指定分区,且存在key
值,则会根据key
的hash
进行取模来选择分区。(需要注意这里和以前的版本实现不一样)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
这里看看一下StickyPartitionCache
,这里面的具体实现:
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}
public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}
indexCache
是一个 ConcurrentHashMap
对象,对应的是 Topic -> Partition
的映射,如果该值不存在则调用 nextPartition
方法选择一个分区并缓存。
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// 但是分区是空或者与prevPartition相同
if (oldPart == null || oldPart == prevPartition) {
// 获取主题对应partition
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// partition数量小于1
if (availablePartitions.size() < 1) {
// 获取一个线程安全的随机数
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
// 取余返回新的partition
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
// partition数量等于1,返回对应的parition
newPart = availablePartitions.get(0).partition();
} else {
// partition数量大于1
while (newPart == null || newPart.equals(oldPart)) {
// 生成的新partition不为null,并且与当前的partion不相同
int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// 如果当前的partition是null,将partition缓冲更新
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
// 否则替换旧的partition
indexCache.replace(topic, prevPartition, newPart);
}
// 最后返回主题对应的partition
return indexCache.get(topic);
}
// 最后返回主题对应的partition
return indexCache.get(topic);
}
这个过程其实比较简单的,这里其实也是粘性分区策略实现方式
-
UniformStickyPartitioner
:粘性分区策略
参看DefaultPartitioner
-
RoundRobinPartitioner
:轮询策略
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题对应的partition
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size(); // 数量
int nextValue = nextValue(topic); // 主题对应的新值
// 获取主题对应的可用的partition
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
// 计算新的partition值
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 没有可用的分区,给一个不可用的分区
return Utils.toPositive(nextValue) % numPartitions;
}
}
// 将主题对应的值+1
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
7.2. 自定义分区器
自定义CustomizePartition
类,并且实现Partitioner
接口,重写partition
和close
方法
public class CustomizePartition implements Partitioner {
/**
* Compute the partition for the given record.
*
* @param topic 主题名
* @param key key值,不存在则是null
* @param keyBytes 序列化的key,不存在则是null
* @param value value值,不存在则是null
* @param valueBytes 系列化的value,不存在则是null
* @param cluster 集群信息
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
/**
* This is called when partitioner is closed.
*/
@Override
public void close() {}
/**
* Configure this class with the given key-value pairs
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {}
}
使用的时候只需要在配置文件中指定一下就可以:
八. 生产者核心配置参数
-
acks
:发送应答,默认值是1。 -
batch.size
:批量发送大小,默认是16384(16k) -
bootstrap.servers
:服务器地址,多个服务器地址用逗号分割开 -
buffer.memory
:生产者最大可用的缓冲,默认是33554432(32M) -
client.id
:生产者ID,默认是“” -
compression.type
:压缩类型,默认是producer(未压缩)
,还有其他的配置类型:gzip(压缩率高,适合高内内存和CPU)/snappy(适合带宽敏感,压缩力度大)/lz4/sztd
-
retries
:失败重试次数,默认整型的最大值(2147483647
) -
retry.backoff.ms
:重试阻塞时间 -
delivery.timeout.ms
:传输时间 -
connections.max.idle.ms
:关闭空闲连接时间,默认是540000
-
enable.idempotence
:开启幂等,默认是false, -
max.in.flight.request.per.connection
:单个连接上发送的未确认请求的最大连接数,默认是5 -
interceptor.classes
:拦截器,默认是无拦截器 -
key.seriailzer
:key的序列化器,默认是无 -
value.seriailzer
:value的序列化器,默认是无 -
linger.ms
:发送延迟时间,默认是0 -
max.block.ms
:阻塞时间,默认是一分钟(60000) -
max.request.size
:最大请求字节大小,默认是1M(1048576) -
metric.reporters
:自定义指标报告器 -
partitioner.class
:自定义分区器 -
request.timeout.bytes
:请求超时时间,默认30000 -
receive.buffer.bytes
:读取数据时使用TCP接收缓冲区(SO_RCVBUF)的大小,默认值32k(32768)。如果值是-1,将使用OS默认值 -
send.buffer.bytes
:发送数据时使用的TCP发送缓冲去(SO_SEDBUF)的大小,默认是是128k(131072)。如果值是-1,将使用OS的默认值