首页 > 其他分享 >Kafka知识总结之生产者简单使用

Kafka知识总结之生产者简单使用

时间:2022-12-19 14:00:33浏览次数:53  
标签:总结 生产者 分区 partition value Kafka topic record public


一. 测试环境搭建

引入依赖:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>

创建一个Bean,这里的配置后面会解析。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
* @author kirin.麒麟
* @version 1.0.0
* @classname Kafka
* @desc Kafka配置
* @date 2022/1/9 5:04 下午
*/
@Slf4j
@Configuration
public class KafkaConfig {

/**
* 生产者Bean
* @return
*/
@Bean
public KafkaProducer<String, String> kafkaProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.138:9092");
// 消息确认机制配置
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
// 多长时间发送一个批次
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
// 缓冲
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(properties);
}

}

发生消息:

@RestController
public class KafkaController {

@Autowired
private KafkaProducer<String, String> kafkaProducer;

@GetMapping("/send")
public String sendMsg() throws InterruptedException {
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "这是第" + i + "条消息");
kafkaProducer.send(record);
Thread.sleep(200);
}
return "success";
}

}

开启主题的监听:

# 单节点监听
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9092 --topic test-topic --from-beginning
# 集群节点监听
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9091,192.168.31.138:9092,192.168.31.138:9093 --topic test-topic --from-beginning

二. 消息发送模式

从第一部分可以知道,发送消息就是将消息和Topic封装一个​​ProducerRecord​​​对象,然后通过​​KafkaProducer​​​的​​send​​方法发送。

Kafka知识总结之生产者简单使用_kafka

这里有两种种方法,这些方法都是通过​​Future​​封装返回的,是可以拿到返回值的,其实都是异步执行的,第二个可以执行异步回调。

Kafka知识总结之生产者简单使用_生产者_02

异步发送:

ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
kafkaProducer.send(record);

同步发送:

因为这里使用​​Future​​​做返回,所以可以通过​​get()​​方法阻塞,相当于同步。

ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
Future<RecordMetadata> future = kafkaProducer.send(record);
RecordMetadata metadata = future.get();

异步回调发送:

回调就是在​​callback​​中执行业务

ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
kafkaProducer.send(record, (metadata, exception) -> {
// TODO
});

三. 生产者构建流程

这里简单看一下​​KafkaProducer​​的初始化过程,从构造方法开始:

Kafka知识总结之生产者简单使用_序列化_03

接着获取​​Client.id​​​,若没有配置则由是​​producer​​-递增的数字

this.producerConfig = config;
this.time = time;
// 获取事物ID
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
// 获取用户配置的client.id
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

设置生产者监控:

this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);

设置分区器​​paritioner​​​,这里可以设置自己的分区器,需要增加:​​partitioner.class​​,将自定义的分区器路径写入。

Kafka知识总结之生产者简单使用_序列化_04

设置key和value的序列化器:

Kafka知识总结之生产者简单使用_生产者_05


解析并实例化拦截器,这里可以自定义实现拦截器,过滤特定的消息。

Kafka知识总结之生产者简单使用_kafka_06


构建​​RecordAccumulator​​,这个类是用来存放消息

Kafka知识总结之生产者简单使用_拦截器_07


到​​RecordAccumulator​​​这个类里面看一下属性​​batches​​:

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

它是一个 ​​ConcurrentMap​​​,​​key​​​ 是 ​​TopicPartition​​​ 类,代表一个 ​​topic​​​ 的一个 ​​partition​​​。​​value​​​ 是一个包含 ​​ProducerBatch​​​ 的双端队列。等待 ​​Sender​​​ 线程发送给 ​​broker​​。

创建守护线程​​sender​​,用来监听发送消息:

Kafka知识总结之生产者简单使用_拦截器_08

在​​KafkaThread​​的可以看出这个线程是守护线程:

Kafka知识总结之生产者简单使用_生产者_09

这里需要注意,​​KafkaProducer​​​的主线程是用来往​​RecordAccumulator​​​里面写消息,​​Sender​​​守护线程是用来读取消息并发送到​​Kafka​​中的。

四. 消息发送大体流程

上面的例子演示了消息的发送,这里简单看一下消息发送的大体流程:

Kafka知识总结之生产者简单使用_生产者_10


先从​​send​​函数开始分析

Kafka知识总结之生产者简单使用_序列化_11


首先方法会先进入拦截器集合​​ProducerInterceptors​​​ ,​​onSend​​​方法是遍历拦截器​​onSend​​​方法,拦截器的目的是将数据处理加工,​​Kafka​​​本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现接口​​ProducerInterceptor​​​并实现​​onSend​​方法。

Kafka知识总结之生产者简单使用_生产者_12


接下来看一下​​doSend​​​方法,首先先判断守护线程​​sender​​​是否可用,接着判断要发送到​​topic​​​的​​metadata​​是否可用:

Kafka知识总结之生产者简单使用_拦截器_13


序列化​​key​​​和​​value​

Kafka知识总结之生产者简单使用_拦截器_14


计算​​record​​对应的​​partition​​的值,这里如果定义了分区器会使用自定义,也可以指定算法获取该值。下面会介绍具体分区器实现

int partition = partition(record, serializedKey, serializedValue, cluster);

接着像​​accumulator​​中追加数据:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

如果是新批次需要重新指定分区在追加数据:

Kafka知识总结之生产者简单使用_生产者_15


缓冲满了之后,唤醒sender线程发送消息:

Kafka知识总结之生产者简单使用_生产者_16


到这儿只是消息发送大体的流程,内部还有一大堆的代码,功力不够看着有点上头,等有时间在看吧!

五. 序列化器

消息在网络上传输,必须进行序列化转化为字节流。Kafka提供了默认的字符串序列化器​​StringSerializer​​​,除此之外还有​​ByteArray​​​、​​ByteBuffer​​​、​​Bytes​​​、​​Double​​​、​​Integer​​​、​​Long​​等。

在​​org.apache.kafka.common.serialization​​包下面可以看到默认实现的序列化器!

接下来看一下借助​​fastjson​​实现自定义序列化器:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserInfo {
private String name;
private Integer age;
}

// 序列化器
public class CustomizeSerializer implements Serializer<UserInfo> {
@Override
public byte[] serialize(String topic, UserInfo data) {
return JSON.toJSONBytes(data);
}
}

// 反序列化器
public class CustomizeDeserializer implements Deserializer<UserInfo> {
@Override
public UserInfo deserialize(String topic, byte[] data) {
return JSON.parseObject(data, UserInfo.class);
}
}

配置使用:

// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomizeSerializer.class);
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomizeDeserializer.class);

Kafka知识总结之生产者简单使用_拦截器_17

使用:

@Slf4j
@RestController
public class CustomizeController {

@Autowired
private KafkaProducer<String, UserInfo> kafkaProducer;

private static final String TOPIC_NAME = "long-topic";

@GetMapping("c_send")
public String send(@RequestParam(value = "size", defaultValue = "20") Integer size) {
for (int i = 0; i < size; i++) {
ProducerRecord<String, UserInfo> record = new ProducerRecord<>(TOPIC_NAME, 0, "user:" + i, new UserInfo("这是第" + i + "条消息", i));
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}

}

Kafka知识总结之生产者简单使用_kafka_18

六. 拦截器

拦截器的使用场景:

  • 按照某个规则过滤掉不符合要求的消息
  • 修改消息内容
  • 发送消相关统计

自定义拦截器,主要是实现​​ProducerInterceptor​​接口,这个接口中四个方法:

  • ​onSend​​​:该方法封装进​​KafkaProducer.send​​​方法中,即它运行在用户的主线程中。​​Producer​​​确保在消息被序列化以及计算分区前调用该方法。注意:用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的​​topic​​和分区,否则会影响目标分区的计算
  • ​onAcknowledgement​​​:该方法会在消息从​​RecordAccumulator​​​成功发送到​​Kafka Broker​​​之后,或者在发送过程中失败时调用。并且通常都是在​​producer​​​回调逻辑触发之前。注意:​​onAcknowledgement​​​运行在​​producer​​​的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢​​producer​​的消息发送效率
  • ​close​​:关闭interceptor,主要用于执行一些资源清理的工作
  • ​configure​​:获取配置信息和初始化数据时调用

实例:

public class CustomizeProducer implements ProducerInterceptor<String, UserInfo> {

@Override
public ProducerRecord<String, UserInfo> onSend(ProducerRecord<String, UserInfo> record) {
UserInfo value = record.value();
return value.getAge() % 2 == 0 ? new ProducerRecord<>(
record.topic(),
record.key(),
new UserInfo(value.getName(), value.getAge() + 100)) :
new ProducerRecord<>(
record.topic(),
record.key(),
new UserInfo(value.getName(), value.getAge() + 60));
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
}

使用自定义拦截器:

Kafka知识总结之生产者简单使用_生产者_19


注意:​​ProducerConfig.INTERCEPTOR_CLASSES_CONFIG​​对应的值是一个拦截器数组。

七. 分区器

7.1. 测试集群分区器

发送消息结果数据实体​​RecordMetadata​​,返回数据包好分区器、偏移量和主题。

Kafka知识总结之生产者简单使用_序列化_20

测试发动100条消息数据:

@GetMapping("/send")
public String sendMsg(@RequestParam(value = "size", defaultValue = "100") Integer size) throws InterruptedException {
for (int i = 0; i < size; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "这是第" + i + "条消息");
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}

如果没有指定分区器系统会使用默认的默认的分区器:​​DefaultPartitioner​​​(在​​ProducerConfig​​​中有定义)。该分区器首先会随机选择一个分区,并尽可能一直使用该分区,直到该分区批次(默认是​​16k​​​)已满或者已完成(​​linger.ms​​配置的时间已到),Kafka会再随机选择一个和上一个分区不同的分区进行使用。

Kafka知识总结之生产者简单使用_kafka_21


关于计算分区器的值有下面这几种情况:

  • 指明​​partition​​的情况下,直接将指明的值作为​​partition​​的值
@GetMapping("/send")
public String sendMsg(@RequestParam(value = "size", defaultValue = "20") Integer size) throws InterruptedException {
for (int i = 0; i < size; i++) {
// 设置0分区发送数据
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 0, "key:" + i, "这是第" + i + "条消息");
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}
  • 没有指明​​partition​​值但有​​key​​的情况下,将​​key​​和​​hash​​值与​​topic​​的​​partition​​数进行取余得到​​partition​​的值
Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  • ​partition​​​和​​key​​都不存在的情况下,通过 ​​stickyPartitionCache​​ 的 ​​partition​​ 方法计算出分区。

7.1. 默认分区器

系统里面支持如下几种:

Kafka知识总结之生产者简单使用_拦截器_22

  • ​DefaultPartitioner​​​:默认分区策略,如果​​key​​也不存在,则会对可用分区进行轮询,如果没有指定分区,且存在​​key​​值,则会根据​​key​​的​​hash​​进行取模来选择分区。(需要注意这里和以前的版本实现不一样)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

这里看看一下​​StickyPartitionCache​​,这里面的具体实现:

private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}

public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}

​indexCache​​​ 是一个 ​​ConcurrentHashMap​​​ 对象,对应的是 ​​Topic -> Partition​​​ 的映射,如果该值不存在则调用 ​​nextPartition​​ 方法选择一个分区并缓存。

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// 但是分区是空或者与prevPartition相同
if (oldPart == null || oldPart == prevPartition) {
// 获取主题对应partition
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// partition数量小于1
if (availablePartitions.size() < 1) {
// 获取一个线程安全的随机数
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
// 取余返回新的partition
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
// partition数量等于1,返回对应的parition
newPart = availablePartitions.get(0).partition();
} else {
// partition数量大于1
while (newPart == null || newPart.equals(oldPart)) {
// 生成的新partition不为null,并且与当前的partion不相同
int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// 如果当前的partition是null,将partition缓冲更新
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
// 否则替换旧的partition
indexCache.replace(topic, prevPartition, newPart);
}
// 最后返回主题对应的partition
return indexCache.get(topic);
}
// 最后返回主题对应的partition
return indexCache.get(topic);
}

这个过程其实比较简单的,这里其实也是粘性分区策略实现方式

  • ​UniformStickyPartitioner​​​:粘性分区策略
    参看​​​DefaultPartitioner​
  • ​RoundRobinPartitioner​​:轮询策略
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题对应的partition
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size(); // 数量
int nextValue = nextValue(topic); // 主题对应的新值
// 获取主题对应的可用的partition
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
// 计算新的partition值
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 没有可用的分区,给一个不可用的分区
return Utils.toPositive(nextValue) % numPartitions;
}
}

// 将主题对应的值+1
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}

7.2. 自定义分区器

自定义​​CustomizePartition​​​类,并且实现​​Partitioner​​​接口,重写​​partition​​​和​​close​​方法

public class CustomizePartition implements Partitioner {
/**
* Compute the partition for the given record.
*
* @param topic 主题名
* @param key key值,不存在则是null
* @param keyBytes 序列化的key,不存在则是null
* @param value value值,不存在则是null
* @param valueBytes 系列化的value,不存在则是null
* @param cluster 集群信息
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}

/**
* This is called when partitioner is closed.
*/
@Override
public void close() {}

/**
* Configure this class with the given key-value pairs
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {}
}

使用的时候只需要在配置文件中指定一下就可以:

Kafka知识总结之生产者简单使用_生产者_23

八. 生产者核心配置参数

  • ​acks​​:发送应答,默认值是1。
  • ​batch.size​​:批量发送大小,默认是16384(16k)
  • ​bootstrap.servers​​:服务器地址,多个服务器地址用逗号分割开
  • ​buffer.memory​​:生产者最大可用的缓冲,默认是33554432(32M)
  • ​client.id​​:生产者ID,默认是“”
  • ​compression.type​​​:压缩类型,默认是​​producer(未压缩)​​​,还有其他的配置类型:​​gzip(压缩率高,适合高内内存和CPU)/snappy(适合带宽敏感,压缩力度大)/lz4/sztd​
  • ​retries​​​:失败重试次数,默认整型的最大值(​​2147483647​​)
  • ​retry.backoff.ms​​:重试阻塞时间
  • ​delivery.timeout.ms​​:传输时间
  • ​connections.max.idle.ms​​​:关闭空闲连接时间,默认是​​540000​
  • ​enable.idempotence​​:开启幂等,默认是false,
  • ​max.in.flight.request.per.connection​​:单个连接上发送的未确认请求的最大连接数,默认是5
  • ​interceptor.classes​​:拦截器,默认是无拦截器
  • ​key.seriailzer​​:key的序列化器,默认是无
  • ​value.seriailzer​​:value的序列化器,默认是无
  • ​linger.ms​​:发送延迟时间,默认是0
  • ​max.block.ms​​:阻塞时间,默认是一分钟(60000)
  • ​max.request.size​​:最大请求字节大小,默认是1M(1048576)
  • ​metric.reporters​​:自定义指标报告器
  • ​partitioner.class​​:自定义分区器
  • ​request.timeout.bytes​​:请求超时时间,默认30000
  • ​receive.buffer.bytes​​:读取数据时使用TCP接收缓冲区(SO_RCVBUF)的大小,默认值32k(32768)。如果值是-1,将使用OS默认值
  • ​send.buffer.bytes​​:发送数据时使用的TCP发送缓冲去(SO_SEDBUF)的大小,默认是是128k(131072)。如果值是-1,将使用OS的默认值


标签:总结,生产者,分区,partition,value,Kafka,topic,record,public
From: https://blog.51cto.com/luckyqilin/5952268

相关文章

  • 嵌入式:堆栈寻址、相对寻址与ARM指令总结
    堆栈寻址堆栈是一种数据结构,按先进后出(FirstInLastOut,FILO)的方式工作,使用一个称作堆栈指针(SP)的专用寄存器(R13)指示当前的操作位置,堆栈指针总是指向栈顶。当堆栈指针指向最......
  • TCPIP命令总结
    TCPIP命令总结1、网络设备的几种模式?1)用户模式刚刚进入就是用户模式2)特权模式执行:enable或者en3)全局模式执行:config terminal或者conf t4)接口模式interface f0/0  ......
  • Kafka数据可靠性探究
    概述Kafka作为商业级消息中间件,消息的可靠性保障是非常重要。那Kafka是怎么保障消息的可靠性的呢?上图是Kafka的消息发送基础架构,一条消息的完整生命周期是:生产者发送消息至K......
  • 我的2022年-总结、感悟、碎碎念
    又到年底了,总结下2022吧,今年还是蛮多收获和感悟的,感觉越发活的通透了些,有些事情我们无法把握,有些事情我们能把握。淡然面对无法把握的,积极把握能把我的。工作工作上面今......
  • Spring Security 使用总结
    暑假的时候在学习了SpringSecurity并成功运用到了项目中。在实践中摸索出了一套结合json+jwt(jsonwebtoken)+SpringBoot+SpringSecurity技术的权限方案趁......
  • [编程基础] Python随机数生成模块总结
    date:2020-06-2421:05:32+0800tags:-编程基础-PythonPython随机数生成模块教程演示如何在Python中生成伪随机数。1介绍1.1随机数字生成器随机数生成器(......
  • 第十周总结
    Django请求生命周期流程图1.路由层、视图层、模板层、模型层、组件、BBS项目2.django路由层1.路由匹配:'''当我们输入一个地址时,接口后面的/不用输入,也可以自动跳转......
  • 二分类模型评价指标-总结
    knitr::opts_chunk$set(echo=TRUE)  介绍评价二分类模型的一些指标。1.混淆矩阵预测为正类预测为负类实际为正类TPFN实际为负类FPTN符号标记:TP—将正类预测为正类数......
  • 周末总结12.18
    目录django路由层django请求生命周期流程图(*****)1.路由匹配2.正则匹配3.正则匹配的无名有名分组反向解析基本使用动态路由的反向解析路由分发视图层必会三板斧JsonRespons......
  • 【博学谷学习记录】超强总结,用心分享|接口加解密介绍
    一、介绍在做接口测试的时候,如果是外部用户直接能看到我们的参数,可能会造成接口不安全,比如直接用明文的参数请求接口,把参数自己定义,脏数据就会存到我们的数据库中,严重......