目录
九、Kafka集群中的controller、rebalance、HW
一、消息队列
消息队列(Message Queue,简称MQ),具体地说,是为了解决通信问题。
-
同步:顺序执行,存在性能和稳定性的问题;
-
问题一:系统开销大,响应时间长;
-
问题二:执行过程中需要保证每个服务的顺利执行,用户体验较差;
-
-
异步:通过消息队列,进行异步处理;
-
优势一:极大提高系统的吞吐量;
-
优势二:即使执行失败,也可以使用分布式事务来保证最终是成功的(最终一致性);
-
二、流派分类:
主流MQ分为以下几个:
-
KafKa:目前性能最好、速度最快;
-
RocketMQ:阿里根据Kafka封装,功能性更强;
-
RabbitMQ:功能性强,模式多;
-
ZeroMQ:看重的是MQ的通信能力,基于Socket封装。
功能性区分:
-
有Broker:通常有一台服务器作为Broker,所有消息都通过它中转。生产者将消息发送给Broker,Broker则把消息主动推送给消费者。
-
重Topic:Kafka、JMS(ActiveMQ),在消息队列中,Topic必须存在。
-
轻Topic:RabbitMQ(AMQP),Topic只是其中的一种中转模式。
-
-
无Broker:ZeroMQ,认为MQ是一种更高级的Socket,是为了解决通信问题。故ZeroMQ被设计为了一个库,而不是中间件,ZeroMQ做的事情就是封装出了一套类似Socket的API去完成发送、读取消息。
三、Kafka基本介绍
-
官网地址:Apache Kafka
-
依赖准备:
-
安装JDK:jdk1.8 +;
-
安装Zookeeper:自己安装,或者使用Kafka安装包内自带的也可以;
-
-
Kafka安装:
-
官网下载:当前版本为:kafka_2.13-2.8.0.tgz;
-
解压缩:
tar -xzvf kafka_2.13-2.8.0.tgz
; -
目录说明:
-
bin:二进制启动文件;
-
config:相关配置文件;
-
libs:依赖的jar包;
-
licenses:许可证文件;
-
logs:某些日志文件;
-
site-docs:压缩文档参考;
-
-
修改配置文件(config目录内):
-
zookeeper.properties
:修改日志目录,改为自定义路径; -
server.properties
:# 单节点使用默认值,集群修改为唯一id broker.id=0 # 监听的kafka服务ip地址 listeners=PLAINTEXT://localhost:9092 # 消息存储日志文件 log.dirs=D://IT/Kafka/tmp/kafka-logs # 连接的zk服务器ip地址 zookeeper.connect=localhost:2181
-
-
启动测试(win10启动):
-
启动zk服务器:创建脚本启动,命名为
zookeeper.bat
,并运行;start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties"
-
启动Kafka服务器:创建脚本启动,命名为
kafka.bat
,并运行;start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server.properties"
-
校验是否启动成功:进入zk-Cli,查看是否有kafka的
broker.id
节点;ls /brokers/ids
-
-
-
Kafka基本概念:
名称 解释 Broker 消息中间件处理节点,一个Kafka节点就是一个Broker,一个或多个组成一个Kafka集群 Topic Kafka根据Topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个Topic Producer 消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端 -
创建Topic:
-
创建Topic:
./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo1
-
修改Topic:
# 修改分区数 ./kafka-topics.bat --zookeeper localhost:2181 -alter --partitions 3 --topic demo1
-
查询所有的Topic:
./kafka-topics.bat --list --zookeeper localhost:2181
-
查询具体某个Topic详细信息:
./kafka-topics.bat --zookeeper localhost:2181 --topic demo1 --describe
-
-
发送消息:
使用Kafka自带的生产者命令客户端,既可以从本地文件读取内容,也可以在命令行直接输入消息。默认情况下,每一行都是一条独立的消息。发送消息时,需要指定发送到具体哪个Kafka服务器和Topic名称;
./kafka-console-producer.bat --broker-list localhost:9092 --topic demo1
-
消费消息:
-
从头开始消费:消费全部消息。
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic demo1
-
从最新的消息开始消费(默认):从最后一条消息的偏移量 + 1开始消费。
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo1
-
注意:
-
消息可被存储,且是顺序结构,通过偏移量offset来描述消息的有序性;
-
消息消费时可以指定偏移量进行描述消费的消息位置;
-
-
-
消费组:多个消费者可以组成一个消费组。
-
查询所有的消费组:
./kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
-
查询某个消费组中具体信息:
./kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group demoGroup1
-
current-offset:当前消费组已消费偏移量;
-
log-end-offset:消息总量(最后一条消息的偏移量);
-
lag:积压消息总量;
-
-
-
单播消息:一个生产者,一个消费组(Group)。同一消费组中,只有一个消费者能收到Topic中的消息。
-
消费组:
-
消费者一:
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup --topic demo1
-
消费者二:
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup --topic demo1
-
-
-
多播消息:一个生产者,多个消费组(Group)。每个消费组中,只有一个消费者能收到Topic中的消息。
-
消费组一
demoGroup1
:./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup1 --topic demo1
-
消费组二
demoGroup2
:./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup2 --topic demo1
-
四、主题和分区的概念
-
主题Topic:
-
topic是一个逻辑的概念,Kafka通过Topic将消息进行分类,不同的topic将被订阅的消费组进行消费。
-
当topic中的消息非常多,由于消息会保存在日志中,导致内存占用过大,由此提出分区Partition的概念。
-
-
分区Partition:
通过partition将一个topic中的消息进行分区存储。好处如下:
-
分区存储,可以解决存储问题过大的问题;
-
提升了消息日志读写的吞吐量,读和写可以在多个分区中同时进行;
-
-
消息日志文件分析(kafka-logs):
-
0000000000.log
:文件中保存的就是消息内容; -
__consumer_offsets-x
文件夹:Kafka内部默认会创建50个__consumer_offsets-x
分区(0-49),目的是为了存放消费者消费某个主题Topic的偏移量。当消费者消费完成后就会把偏移量上报给对应的主题Topic进行保存。目的是为了提升当前的Topic的并发性。-
消费完成提交至对应分区的计算方式:hash(消费组ID) % __consumer_offsets分区总数;
-
提交到主题内容的
Key
的方式:消费组ID + topic名称 + __consumer_offsets分区号; -
提交到主题内容的
Value
的方式:当前offset的值;
-
-
0000000000.index
:日志文件的索引; -
0000000000.timeindex
:日志文件按时间点的索引; -
日志文件,默认保存周期为七天,到期后自动删除。
-
五、Kafka集群
-
集群搭建(三个Broker):
-
创建三个
server.properties
文件:-
第一个
server0.properties
:broker.id=0 listeners=PLAINTEXT://localhost:9092 log.dirs=D://IT/Kafka/tmp/kafka-logs-0 zookeeper.connect=localhost:2181
-
第二个
server1.properties
:broker.id=1 listeners=PLAINTEXT://localhost:9093 log.dirs=D://IT/Kafka/tmp/kafka-logs-1 zookeeper.connect=localhost:2181
-
第三个
server2.properties
:broker.id=2 listeners=PLAINTEXT://localhost:9094 log.dirs=D://IT/Kafka/tmp/kafka-logs-2 zookeeper.connect=localhost:2181
-
-
启动三个Broker:
start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server0.properties" start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server1.properties" start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server2.properties"
-
使用zkCli客户端检测是否启动成功:
ls /brokers/ids # 启动成功显示三个broker的id
-
-
副本(replication-factor)的概念:
-
副本:是为主题中的分区创建的备份,一般来说,有几个Broker就应该创建几个副本。其中,会选举出一个副本作为
leader
,其他的则是follower
。-
leader:Kafka的读写操作,都发生在leader上。leader负责把数据同步至其他的follower,当leader宕机时,将会进行主从选举,选出一个新的leader。
-
follower:介绍leader的同步数据;
-
isr:可以同步和已同步的节点会存入 ISR 集合中,主从选举从 ISR 集合内选出leader。当节点的性能较差时,ISR 集合将会踢出该节点。
-
-
-
集群消费问题:
-
向集群发送消息:
./kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic demo1
-
从集群中消费消息:
./kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --consumer-property group.id=demoGroup --topic demo1
-
分区消费的细节:
-
一个分区partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性。多个分区partition的多个消费者的顺序性无法保证(后续有方法可以保证)。
-
分区partition的数量决定了消费组中的消费者数量,建议消费组中的消费者数量不要超过分区partition的数量,防止多出的消费者消费不到消息造成资源的浪费。
-
消费者宕机时,将会触发 rebalance 机制,选出其他消费者进行消费该分区的消息。
-
-
六、kafka-clients之生产者
-
引入依赖(版本号与Kafka版本一致):
<!--kafka-clients--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
-
简单测试:
-
创建一个Topic:
./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-test-topic
-
启动三台Broker作为集群
-
代码测试:
public class ProducerTest { private static final String TOPIC_NAME = "my-test-topic"; private static final String BOOTSTRAP_SERVER = "localhost:9092,localhost:9093,localhost:9094"; private static final String KEY = "test:"; private static final String VALUE = "This is a test for Kafka-producer!"; private static final Properties prop = new Properties(); static { // Kafka集群IP prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER); // ACK:消息持久化配置(-1、0、1) prop.put(ProducerConfig.ACKS_CONFIG, "1"); // 重试次数配置 prop.put(ProducerConfig.RETRIES_CONFIG, "3"); // 重试间隔配置,单位:ms prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300"); // 缓冲区大小,32Mb prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 32); // 发送消息大小,16Kb prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 16); // 未能拉取到16Kb数据,间隔 10ms 后,立即发送 prop.put(ProducerConfig.LINGER_MS_CONFIG, 10); // key的序列化方式 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value的序列化方式 prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); } @Test public void test1() throws Exception { // 1.创建消息生产者对象 Producer<String, String> producer = new KafkaProducer<>(prop); for (int i = 0; i < 6; i++) { // 2.封装消息载体对象 // 分区计算方式:hash(key) % partitionNum,即:key的哈希值 取余 分区的总数 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, KEY + UUID.randomUUID(), VALUE); // 自定义发送的分区位置 // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2, KEY + UUID.randomUUID(), VALUE); // 3.同步发送消息 Future<RecordMetadata> future = producer.send(record); // 4.获取消息发送成功后的元数据 RecordMetadata metadata = future.get(); System.out.println("主题Topic名称:" + metadata.topic()); System.out.println("保存partition分区的位置:" + metadata.partition()); System.out.println("总消息数(offset偏移量):" + metadata.offset()); } producer.close(); } @Test public void test2() { // 1.创建消息生产者对象 Producer<String, String> producer = new KafkaProducer<>(prop); // 2.封装消息载体对象 // 分区计算方式:hash(key) % partitionNum,即:key的哈希值 取余 分区的总数 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, KEY + UUID.randomUUID(), VALUE); // 自定义发送的分区位置 // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2, KEY + UUID.randomUUID(), VALUE); // 3.异步发送消息 producer.send(record, (data, exception) -> { // 4.消息发送异常,要做的事 if (exception != null) { System.out.println("发送失败,异常原因:" + exception.getMessage()); } // 5.消息发送成功要做的事 if (data != null) { System.out.println("主题Topic名称:" + data.topic()); System.out.println("保存partition分区的位置:" + data.partition()); System.out.println("总消息数(offset偏移量):" + data.offset()); } }); // 延时等待,异步消息进程执行结束 ThreadUtils.sleep(1000); producer.close(); } }
-
同步发送消息:如果发送消息后没有收到 ack,生产者将阻塞等待 3 秒,之后会进行重试,重试三次后仍然失败,则抛出异常。执行慢,但是不会丢失消息;
-
异步发送消息:生产者发送完成后就可以执行后续业务,broker收到消息后,进行异步调用生产者提供的 callback 回调方法。当网络异常时可能出现回调方法未执行的问题,即消息丢失;
-
-
-
消息持久化机制参数(ACK配置,同步发送时用到 ACK 配置)
-
acks=0
:发送消息后无需等待broker进行确认,就能发送下一条。性能最高,消息最容易丢失; -
acks=1
(默认值):至少需要等待 leader 成功写入本地日志,才能发送下一条。如果此时 leader 宕机,则会发生消息丢失; -
acks=-1/all
:需要等待多个节点写入日志(在min.insync.replicas
中进行设置,默认值为1,推荐大于等于 2),才能发送下一条。只要有一个备份存活就不会丢失消息。金融级别的常用配置,性能最差,安全性最高; -
其他配置:
-
重试次数配置
-
重试间隔配置
-
-
配置相关代码:
// ACK:消息持久化配置(-1、0、1) prop.put(ProducerConfig.ACKS_CONFIG, "1"); // 重试次数配置 prop.put(ProducerConfig.RETRIES_CONFIG, "3"); // 重试间隔配置,单位:ms prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
-
-
消息缓冲区配置:
-
Kafka会创建一个消息缓冲区,存放要发送的消息,缓冲区大小为 32Mb;
// 缓冲区大小,32Mb prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 32);
-
Kafka本地线程会去缓冲区拉取数据,发送至 Broker,一次拉取 16Kb;
// 发送消息大小 16Kb prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 16);
-
如果线程拉取不到 16Kb 的数据,间隔 10ms 后,也会将拉取的数据发送至Broker;
// 未能拉取到16Kb数据,间隔 10ms 后,立即发送 prop.put(ProducerConfig.LINGER_MS_CONFIG, 10);
-
七、kafka-clients之消费者
-
引入依赖(版本号与Kafka版本一致):
<!--kafka-clients--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
-
简单测试:
public class ConsumerTest { private static final String TOPIC_NAME = "my-test-topic"; private static final String BOOTSTRAP_SERVER = "localhost:9092,localhost:9093,localhost:9094"; private static final String CONSUMER_GROUP_NAME = "test-group-0"; private static final Properties prop = new Properties(); static { // Kafka集群IP prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER); // 消费组的名称 prop.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); // key的反序列化方式 prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // value的反序列化方式 prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); } @Test public void test1() { // 1.创建消费者对象 Consumer<String, String> consumer = new KafkaConsumer<>(prop); // 2.订阅Topic主题的消息列表 consumer.subscribe(Collections.singletonList(TOPIC_NAME)); // 3.长轮询进行监听消息 while (true) { // 4.拉取消息列表 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 5.循环处理每一条消息 for (ConsumerRecord<String, String> record : records) { System.out.println("主题Topic名称:" + record.topic()); System.out.println("分区partition的位置:" + record.partition()); System.out.println("消息offset偏移量:" + record.offset()); System.out.println("消息的键key:" + record.key()); System.out.println("消息的值value:" + record.value()); } } } }
-
消费者提交 offset:
-
提交的内容:所属的消费组 + 消费的 Topic + 消费的 partition + 偏移量 offset;
-
自动提交:消息 poll 之后,消费消息之前进行 offset 提交。当消费者宕机时,会发生消息丢失问题;
// 是否开启自动提交 offset,默认为 true prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交 间隔时间 prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
-
手动提交:消息进行消费后,手动提交 offset;
// 是否开启自动提交 offset,默认为 true prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
同步提交(更推荐使用):
// 拉取消息列表 Duration delay = Duration.ofMillis(1000); ConsumerRecords<String, String> records = consumer.poll(delay); if (records.count() > 0) { // 6.手动同步提交,等待Broker返回ACK才会执行后续业务,否则阻塞等待 try { consumer.commitAsync(); } catch (Exception e) { // 提交失败执行的逻辑 e.printStackTrace(); } }
-
异步提交:
// 拉取消息列表 Duration delay = Duration.ofMillis(1000); ConsumerRecords<String, String> records = consumer.poll(delay); if (records.count() > 0) { // 6.手动异步提交,无需返回ACK,提交成功后异步调用回调方法 consumer.commitAsync((offset, exception) -> { // 提交失败执行的逻辑 if (exception != null) { System.out.println("手动提交异常:" + exception.getMessage()); } // 提交成功执行的逻辑 if (CollUtil.isNotEmpty(offset)) { offset.entrySet().forEach(System.out::println); } }); }
-
-
-
消费者长轮询 poll 的配置:
-
单次拉取条数:默认500条;
// 单次拉取的消息条数 prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
-
单次轮询的时间:poll方法的入参,此处为 1000 毫秒;
Duration delay = Duration.ofMillis(1000); ConsumerRecords<String, String> records = consumer.poll(delay);
-
poll的机制:在轮询内时,重复拉取消息,直至拉取 500 条消息,或超过轮询时长停止;
-
当首次拉取消息,够了 500 条,则停止拉取,执行后续业务逻辑;
-
首次没有拉取到 500 条,进行重复拉取,直至 拉取够,或超过轮询时长停止;
-
若多次拉取都不够 500 条,且超过了轮询时长,则停止拉取,执行后续业务逻辑;
-
-
当前后两次轮询的间隔时间超过了 30 秒,集群将判定此消费者能力弱,并踢出消费组,并处罚
rebalance
机制,rebalance
机制会造成性能开销。可以修改如下配置,提升消费者的速度;// 单次拉取的消息条数,默认500 prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200); // 单次长轮询的间隔时间,默认 1000 ms prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5 * 1000);
-
-
-
消费者的健康状态检测配置:
// 消费者发送心跳间隔时长,心跳频率 prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); // 心跳超时将触发 rebalance机制,并踢出消费组 的超时时长 prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
-
消费组指定partition分区进行消费:
// 参数一,Topic名称。参数二,分区位置 TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0); consumer.assign(Collections.singletonList(topicPartition));
-
消费者的消息回溯:指定分区位置,并从头开始消费;
// 参数一,Topic名称。参数二,分区位置 TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0); consumer.assign(Collections.singletonList(topicPartition)); consumer.seekToBeginning(Collections.singletonList(topicPartition));
-
指定 offset 位置消费:
// 指定offset位置进行消费 consumer.seek(topicPartition, new OffsetAndMetadata(10));
-
指定时间点,获取 offset 进行消费:
// 获取此Topic的全部分区 List<PartitionInfo> infos = consumer.partitionsFor(TOPIC_NAME); // 消费当前时间前一天的消息 long time = new Date().getTime() - 24 * 60 * 60 * 1000; // 封装分区和消费时间的参数 HashMap<TopicPartition, Long> map = new HashMap<>(); for (PartitionInfo info : infos) { map.put(new TopicPartition(TOPIC_NAME, info.partition()), time); } // 根据时间节点,找到消息偏移量 offset Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); // 进行消息订阅,指定消费的偏移量 offset consumer.assign(Collections.singletonList(key)); consumer.seek(key, new OffsetAndMetadata(value.offset())); }
-
新消费组的消费 offset 规则:
当创建新的消费组启动消费者时,默认只会消费最新的消息。通过修改以下配置,使消费者首次启动时,从头开始消费,后续启动从最新消息开始消费。
// 创建新的消费组时的消费规则,默认latest。latest:从最新开始 / earliest:从头开始 prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
latest:从最新开始,只消费最新的消息;
-
earliest:首次启动从头开始,后续启动从最新开始;
-
区别:
seekToBeginning()
方法是每次启动都从头开始消费,earliest
只有首次启动从头开始,后续启动从最新开始;
-
八、SpringBoot使用Kafka
-
引入依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
配置文件:
spring: kafka: bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 producer: acks: 1 # 0:无需ack确认;1:broker写入 leader日志后返回ack;-1:broker写入多个副本日志后返回ack retries: 3 # 重试次数 batch-size: 16384 # 单次发送消息大小 16Kb buffer-memory: 33554432 # 缓存区大小 32Mb key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test-group-0 max-poll-records: 500 # 单次拉取消息条数 enable-auto-commit: false # 消费后是否自动提交 auto-offset-reset: earliest # 新消费组消费策略,earliest:首次从头消费,后续从最新消息消费 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: ack-mode: manual # 手动提交,监听器处理一次轮询的消息后(默认500条)调用ack.acknowledge()进行提交 # ack-mode: manual_immediate # 手动提交,监听器处理单条消息后调用ack.acknowledge()进行提交
-
消息生产者:
@RestController @RequestMapping("/kafka") public class KafkaController { private static final String TOPIC_NAME = "my-test-topic"; private static final String KEY = "test:"; private static final String VALUE = "This is a test for Kafka-producer!"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/send") public AjaxResult sendMessage() { ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, KEY + UUID.randomUUID(), VALUE); // 返回值是Future kafkaTemplate.send(record); return AjaxResult.success(); } }
-
消费者监听:
@Component public class MyConsumer { private static final String TOPIC_NAME = "my-test-topic"; private static final String CONSUMER_GROUP_NAME_0 = "test-group-0"; /** * 自动监听是否有消息 * * @param record 可以是 ConsumerRecords,也可以是 ConsumerRecord,前者一次处理多条消息,后者一次处理一条消息 * @param ack 当关闭自动提交时,需要使用此参数进行 ack 确认提交 */ @KafkaListener(topics = TOPIC_NAME, groupId = GROUP_NAME) public void listenMessage(ConsumerRecord<String, String> record, Acknowledgment ack) { System.out.println(record); ack.acknowledge(); } }
-
消费者配置主题Topic、分区partition、偏移量offset:
/** * 创建一个消费组,包含 3 个消费者(concurrency),持续监听两个Topic的消息。 * 第一个Topic,监听分区(0、1),分区 0的偏移量初始为 5;分区 1的偏移量初始为 10。 * 第二个Topic,监听分区(1、2),分区 1的偏移量初始为 15;分区 2的偏移量初始为 20。 * * @param record 可以是 ConsumerRecords,也可以是 ConsumerRecord,前者一次处理多条消息,后者一次处理一条消息 * @param ack 当关闭自动提交时,需要使用此参数进行 ack 确认提交 */ @KafkaListener(groupId = GROUP_NAME, concurrency = "3", topicPartitions = { @TopicPartition(topic = TOPIC_NAME_1, partitions = {"0", "1"}, partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "5"), @PartitionOffset(partition = "1", initialOffset = "10") }), @TopicPartition(topic = TOPIC_NAME_2, partitions = {"1", "2"}, partitionOffsets = { @PartitionOffset(partition = "1", initialOffset = "15"), @PartitionOffset(partition = "2", initialOffset = "20") }) }) public void listenMessage2(ConsumerRecord<String, String> record, Acknowledgment ack) { System.out.println(record); ack.acknowledge(); }
九、Kafka集群中的controller、rebalance、HW
(1)controller
-
选举机制:使用zk的机制,当 broker 创建时,会在 zookeeper 中创建一个临时序号节点,序号最小的节点代表的 broker 将作为集群中的 controller。
-
作用:
-
当集群中某一个副本的 leader 宕机,需要在集群中选出一个新的 leader,选举的规则是 ISR 集合中最左边的元素(ISR 集合会按照性能排序,性能越好越靠前);
-
当集群中有 broker 的增加或减少,controller 会同步信息给其他的 broker;
-
当集群中有 partition 分区的增加或减少,controller 会同步信息给其他的 broker;
-
(2)rebalance机制
-
rebalance 的前提:当消费组中的消费者没有指定分区进行消费,由 Kafka 决定消息分区的分配。
-
触发 rebalance 的条件:当消费组中的消费者和分区的关系发生变化时;
-
分区分配策略(rebalance之前,分区分配有三种策略):
-
range
:根据公式计算消费者消费的分区。 -
轮询:分区逐一分配到消费者上。
-
sticky:粘合策略。如果需要rebalance,会在已分配的基础上进行调整,而不会影响之前的分配情况。建议开启此策略,因为 rebalance 机制会重新分配会造成资源浪费;
-
(3)HW和LEO
-
HW:Height Water,称为高水位。它是 Broker 已完成同步的分界线,当消息写入 Broker,并同步到所有副本之后,HW才会变化。HW变化之前,消费者无法消费到未同步完成的消息,这么做的目的是为了防止消息丢失。
-
LEO:Log End Offset,指的是某个副本的最后一条消息的位置。
-
关系图如下:
十、Kafka问题优化
-
如何防止消息丢失?
-
生产者:
-
发送消息时,使用同步发送的方式;
-
设置 ACK 的级别,设为1 或 -1即可,-1时可以做到99.99%防丢失率,需要修改
min.insync.replicas
分区备份数,推荐大于等于 2;
-
-
消费者:拉取消息后的自动提交,修改为手动提交;
-
-
如何防止重复消费?
-
生产者:
-
关闭 retry 重试:不推荐,这样做可能导致消息丢失;
-
-
消费者:
-
开启自动提交:不推荐,这样做可能导致消息丢失;
-
保证消费的幂等性(指多次访问结果一致):
-
方式一:使用 redis / zk 分布式锁,主流方案,推荐使用;
-
方式二:创建联合主键,保证消息的唯一性,防止插入多条记录;
-
-
-
-
如何做到顺序消费?
-
生产者:
-
使用同步发送,且 ACK 不为 0(0会导致消息丢失),确保消息发送顺序是正确的;
-
-
消费者:
-
主题 Topic 只能设置一个分区 Partition,且消费组只能有一个消费者;
-
-
Kafka的顺序消费会牺牲性能,可以考虑使用 RocketMQ 代替 ;
-
-
如何处理消息积压?
-
消息积压的原因:由于消费的速度远赶不上生产的速度,导致
-
解决方案:
-
消费者使用多线程,充分利用机器的性能;
-
优化业务架构,提升业务层的消费速度;
-
创建多个消费组,多个消费者,提升消费者的速度;
-
消息分发:创建一个消费者,进行转发消息,接收方为新的 Topic ,且配置了多个分区及多个消费者进行消费(不常用)。
-
-
-
如何实现延时队列?
-
场景:创建订单后,如果 30 分钟未支付,则取消订单。
-
解决方案:
-
单独创建相应的主题;
-
消费者消费该主题的消息(轮询);
-
消费前进行判断,当前时间是否与消息的创建时间相差 30 分钟,且未支付;
-
如果是:修改数据库状态为取消订单;
-
如果否:记录当前 offset,且不再继续消费之后的消息。等待一分钟后,根据记录的 offset 拉取消息,继续进行判断。
-
-
-
十一、Kafka Eagle监控平台(新版本待测试)
未完待续...
标签:消费,Java,--,分区,kafka,消息,Kafka,安装 From: https://blog.csdn.net/fyx_demo/article/details/139371324