Kafka 学习专题
Kafka 学习目标 - 搞懂它解决的问题,学习其架构,理解其思想,并掌握工程实践中如何使用它。
文章目录
写在前面
学习 Kafka 引入的概念与设计思想,学习如何使用 Docker 部署 Kafka,并介绍在 DDD 工程中如何使用 Kafka 发送 MQ 消息,最后以面试题为驱动检验学习成果。
一、Kafka 是什么?
Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 开发,用于实时处理大规模的流数据。Kafka 的核心目标是提供一种可扩展、高吞吐量、低延迟的消息分发机制。
Kafka 的用途
- 日志收集:集中化收集和存储应用系统的日志数据。
- 流处理:与流式处理框架(如 Spark Streaming、Flink)结合,处理实时数据。
- 数据集成:作为数据的桥梁,将不同系统的数据整合在一起。
- 削峰:作为数据的桥梁,消息队列天然起到一个缓冲的作用,例如在有任务量庞大灌入系统或者系统与系统间吞吐量不一致时,数据可以先暂存入队列等待。
- 事件驱动架构:支持基于事件的微服务通信。
核心概念
- Topic:消息的分类,类似于数据库中的表。
- Producer:生产者,用于发布消息。
- Consumer:消费者,用于订阅并消费消息。
- Partition:Topic 的分片,用于支持并发处理和分布式存储。
- Broker:Kafka 服务器实例,负责消息的存储和分发。
二、Kafka 设计机制详解
2.1 分布式架构
Kafka 采用分布式架构,由多个 Broker 节点组成集群。每个 Broker 是 Kafka 的一个实例,负责处理集群的一部分数据和流量。Kafka 的分布式设计具有以下特点:
2.1.1 水平扩展
Kafka 的扩展能力体现在其水平扩展的设计中。通过增加 Broker 节点,可以线性提升集群的存储容量和处理能力。具体而言:
- 分区划分:每个 Topic 可以分为多个 Partition,每个 Partition 可以分布在不同的 Broker 上。
- Leader-Follower 机制:每个 Partition 有一个 Leader 和多个 Follower,Producer 和 Consumer 只与 Leader 交互,从而减少并发冲突。
2.1.2 高可用性
Kafka 通过副本机制(Replication)实现高可用性:
- 副本分配策略:Kafka 使用 Rack Awareness 策略,将副本分布在不同的机架上,以提高容灾能力。
- 自动故障转移:当某个 Broker 下线时,Kafka 会选举新的 Leader 确保服务不中断。
2.1.3 高吞吐量
Kafka 的分布式设计结合批量处理和零拷贝(Zero Copy)技术,使其在处理大规模数据时依然保持高性能:
- 批量写入与读取:Kafka 支持 Producer 和 Consumer 批量操作,从而减少网络开销。
- 页缓存优化:Kafka 利用操作系统的页缓存,避免频繁的磁盘 I/O 操作。
2.2 存储机制
Kafka 的存储机制是其高性能和可靠性的关键。Kafka 使用分段日志文件的存储策略,为消息的存储和管理提供了强大的支持。
2.2.1 顺序写入与读取
Kafka 的消息存储是以日志文件的形式进行的,支持高效的顺序写入和读取:
- 写入性能优化:Kafka 将消息追加到日志文件末尾,避免了随机写入的开销。
- 读取性能优化:Consumer 按偏移量读取消息,无需查找,提高了读取效率。
2.2.2 分段存储
Kafka 的每个 Partition 的日志文件被分为多个 Segment:
- 固定大小:每个 Segment 文件大小固定,当文件达到指定大小时会滚动创建新的文件。
- 元数据管理:Kafka 使用索引文件记录每条消息的偏移量和物理位置,便于快速定位。
2.2.3 数据清理策略
Kafka 提供了两种数据清理策略:
- 基于时间的清理:清理超过保留时间的 Segment 文件。
- 基于日志压缩的清理:保留每个 Key 的最新消息,删除旧版本。
2.3 消息的可靠性
Kafka 通过多种机制保证消息的可靠性,确保消息从 Producer 到 Consumer 不丢失。
2.3.1 副本机制
每个 Partition 都有多个副本:
- Leader-Follower 模式:Leader 处理所有读写请求,Follower 同步 Leader 数据。
- 同步机制:Follower 定期向 Leader 发送心跳和同步请求,确保数据一致性。
2.3.2 ACK 确认机制
Producer 可以通过设置 acks
参数控制消息的确认级别:
acks=0
:Producer 不等待任何确认。acks=1
:Producer 等待 Leader 的确认。acks=all
:Producer 等待所有副本的确认,确保最高可靠性。
2.3.3 事务支持
Kafka 提供事务支持,以确保消息的原子性:
- 幂等性 Producer:防止消息重复发送。
- 事务 Consumer:通过偏移量提交确保消费的事务性。
2.4 消息的顺序性
Kafka 的设计确保了消息的顺序性,但其实现机制依赖于分区和 Producer 的配置。
2.4.1 单分区顺序性
在单个分区内,消息的写入和读取是严格有序的。
2.4.2 多分区顺序性
对于多个分区,Kafka 的顺序性实现取决于:
- Key 分区策略:通过设置消息的 Key,使同一 Key 的消息始终写入同一分区。
- 全局顺序性:需要额外的排序逻辑,例如在 Consumer 端聚合排序。
三、基于 Docker 快速部署 Kafka
环境要求
- Docker
- Docker Compose
Docker Compose 配置示例
版本一:Kafka + Zookeeper 的配置脚本
version: '3.0'
# docker-compose -f docker-compose.yml up -d
services:
zookeeper:
image: zookeeper:3.9.0
container_name: zookeeper
restart: always
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
ZOOKEEPER_CLIENT_PORT: 2181
ALLOW_ANONYMOUS_LOGIN: yes
TZ: Asia/Shanghai
networks:
- my-network
kafka:
image: bitnami/kafka:3.7.0
container_name: kafka
volumes:
- /etc/localtime:/etc/localtime
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_MESSAGE_MAX_BYTES: "2000000"
KAFKA_ENABLE_KRAFT: no
JMX_PORT: 9999
TZ: Asia/Shanghai
depends_on:
- zookeeper
networks:
- my-network
kafka-eagle:
image: echo21bash/kafka-eagle:3.0.2
container_name: kafka-eagle
environment:
KAFKA_EAGLE_ZK_LIST: zookeeper:2181
volumes:
- ./kafka-eagle/system-config.properties:/opt/kafka-eagle/conf/system-config.properties
ports:
- "8048:8048"
depends_on:
- kafka
networks:
- my-network
networks:
my-network:
driver: bridge
Kafka 不依赖 Zookeeper 独立运行的配置脚本
version: '3.0'
# docker-compose -f docker-compose.yml up -d
services:
kafka:
container_name: kafka
image: bitnami/kafka:3.9.0
ports:
- :9092:9092
restart: always
environment:
ALLOW_PLAINTEXT_LISTENER: "yes"
# 注意,你可以在电脑上增加 kafka 的 host 映射ip,或者直接写上服务器ip地址
# KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://真实ip地址,云服务器公网ip/本地电脑分配ip:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@127.0.0.1:9093
KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES: 524288000
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_LOG_RETENTION_MS: 60000
KAFKA_CFG_MAX_REQUEST_SIZE: 524288000
KAFKA_CFG_MESSAGE_MAX_BYTES: 524288000
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PARTITION_FETCH_BYTES: 524288000
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_REPLICA_FETCH_MAX_BYTES: 524288000
KAFKA_HEAP_OPTS: -Xmx512m -Xms256m
networks:
- my-network
extra_hosts:
- "kafka:host-gateway"
# http://127.0.0.1:8088
redpanda-console:
container_name: kafka-console
image: redpandadata/console:v2.7.2
ports:
- 8088:8080
restart: always
networks:
- my-network
environment:
KAFKA_BROKERS: kafka:9092
networks:
my-network:
driver: bridge
功能组件的对比
功能组件 | 版本一 | 版本二 |
---|---|---|
Zookeeper | 使用了 Zookeeper 管理 Kafka 集群 | 没有使用 Zookeeper,Kafka 启用了 KRaft(Kafka 自身的元数据管理) |
监控工具 | 使用 Kafka-Eagle | 使用 Redpanda Console |
启动服务
docker-compose -f docker-compose.yml up -d
验证服务
使用 Kafka 自带的命令行工具验证生产和消费消息功能。
# 1. 进入 Kafka 容器
docker exec -it kafka bash
# 2. 创建一个 Topic
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
# 输出: Created topic test-topic.
# 3. 生产消息
kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
# 输入以下消息后回车发送:
# Hello, Kafka!
# Test message 1
# Test message 2
# 4. 消费消息
# 打开另一个终端窗口,再次进入 Kafka 容器:
docker exec -it kafka bash
# 消费消息:
kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning
# 输出:
# Hello, Kafka!
# Test message 1
# Test message 2
四、应用实例
4.1 DDD 项目中引入 Kafka 实例
4.1.1 环境配置实例
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
# 发生错误后,消息重发的次数。
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
# 配置主题
kafka:
topic:
group: test-group
user: test-topic
4.1.2 项目分包推荐&代码演示
事件发送组件模版——基础层 cn.test.infrastructure.event.EventPublisher
@Slf4j
@Component
public class EventPublisher {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {
try {
String messageJson = JSON.toJSONString(eventMessage);
kafkaTemplate.send(topic, messageJson);
log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);
} catch (Exception e) {
log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);
throw e;
}
}
}
消息事件定义——领域层 cn.test.domain.auth.event.UserMessageEvent
public class UserMessageEvent extends BaseEvent<UserMessageEvent.UserMessage> {
@Value("${kafka.topic.user}")
private String topic;
@Override
public EventMessage<UserMessage> buildEventMessage(UserMessage data) {
return EventMessage.<UserMessage>builder()
.id(RandomStringUtils.randomNumeric(11))
.timestamp(new Date())
.data(data)
.build();
}
@Override
public String topic() {
return topic;
}
/**
* 要推送的事件消息,聚合到当前类下。
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class UserMessage {
private String userId;
private String userName;
private String userType;
}
}
消息事件发送——仓储层 cn.test.infrastructure.repository.UserRepository
@Service
public class UserRepository extends UserMessageEvent implements IUserRepository {
@Resource
private EventPublisher publisher;
@Override
public void doSaveUser(UserEntity userEntity) {
// 推送消息
publisher.publish(this.topic(), this.buildEventMessage(UserMessageEvent.UserMessage.builder()
.userId(userEntity.getUserId())
.userName(userEntity.getUserName())
.userType(userEntity.getUserTypeVO().getDesc())
.build()));
}
}
消息事件监听——触发层 cn.test.trigger.listener.KafkaMessageListener
@Slf4j
@Component
public class KafkaMessageListener {
@KafkaListener(topics = "${kafka.topic.user}", groupId = "${kafka.topic.group}", concurrency = "1")
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
try {
// 逻辑处理
// 确认消息消费完成,如果抛异常消息会进入重试
ack.acknowledge();
log.info("Kafka消费成功! Topic:" + topic + ",Message:" + msg);
} catch (Exception e) {
e.printStackTrace();
log.error("Kafka消费失败!Topic:" + topic + ",Message:" + msg, e);
}
}
}
4.2 Kafka 在项目中的应用案例介绍
4.2.1 日志收集系统
通过 Kafka 实现分布式日志收集,将不同服务的日志发送到 Kafka,集中处理。
实现步骤
- 启动 Kafka 集群。
- 使用 Logstash 作为日志收集工具,将日志推送到 Kafka。
- 消费日志并存储到 Elasticsearch 中,提供可视化展示。
4.2.1 实时数据流处理
结合 Kafka 和 Flink 构建实时流处理平台。
示例场景
- 实时订单监控
- 用户行为分析
实现步骤
- 创建 Kafka Topic。
- 编写 Flink 程序消费 Kafka 数据并实时处理。
- 将处理后的结果写入数据库或消息队列。
五、面试常问
深入从 Kafka 的机制原理和优化角度,基于问题的特点进行的优化分析。
1. Kafka 的消息消费模式有哪些?如何选择适合的模式?
Kafka 消息消费模式:
- 单消费者模式:一个消费者订阅并消费一个或多个分区。所有消息都由该消费者处理,适用于小规模的消息处理场景。
- 消费者组模式(Consumer Group):多个消费者组成一个消费者组,组内消费者共同消费分区。Kafka 会根据消费者的数量和分区数进行负载均衡,每个消费者最多消费一个分区,确保每个分区内的消息只被一个消费者处理。
优化选择与设计:
-
单消费者模式优化:
- 若消息量较小,单个消费者即可满足需求,保持简化设计。
- 如果系统对顺序性有较强要求(例如订单处理),可以使用单个消费者模式,避免跨分区消费导致顺序打乱。
-
消费者组模式优化:
- 若消息量较大且需要提高消费并发,选择消费者组模式是更好的选择。消费者组内的消费者数目与分区数目应匹配,以避免消费者空闲。
- 优化思路:调整分区数和消费者数,确保消费者不会因过多的空闲而降低吞吐量。合理的分区数能最大化消费者组的并行性。
代码优化示例:
// 创建消费者组
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic-1", "topic-2"));
// 通过动态调整消费者数和分区数来优化负载
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 处理逻辑
processMessage(record);
}
consumer.commitSync(); // 手动提交偏移量,减少消费错误
}
2. Kafka 如何保证消息的顺序性?
Kafka 顺序性的保证机制:
- 分区内顺序:Kafka 保证同一分区内的消息顺序。每条消息都有一个
offset
,消费者通过该偏移量来确保按顺序消费。 - 跨分区顺序性:Kafka 无法保证跨分区的顺序,因为消息会根据分区规则分配到不同的分区。分区的划分和分配是决定消息顺序性的关键。
优化思路:
- 基于业务逻辑设计分区键:通过选择合适的
key
来确保业务中相关的消息(例如同一用户的多个事件)发送到同一个分区,从而保证消息顺序。 - 避免过多的分区:过多的分区可能导致跨分区的顺序问题。如果顺序性非常关键,控制分区数,不要让消息分散到过多的分区。
代码优化示例:
// 发送消息时选择合适的key来保证顺序性
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "user-123", "order_created");
producer.send(record);
3. Kafka 如何处理消息消费失败?
Kafka 消费失败的处理机制:
- 自动提交与手动提交:Kafka 默认自动提交消费的偏移量,但自动提交存在风险,可能会在消息处理失败时丢失消息。手动提交偏移量则能在消息处理成功后再提交,避免错误消费。
- 消息重试:如果消息处理失败,消费者可以通过重试机制进行重新消费,确保消息不会丢失。
优化思路:
- 手动提交偏移量:将
enable.auto.commit
设置为false
,确保在成功处理完消息后再提交偏移量。这样即使消费失败,未提交的消息会被重新消费。 - 死信队列(DLQ)设计:将处理失败的消息存储到死信队列中,便于后续人工干预或特殊处理,防止消息丢失。
代码优化示例:
// 手动提交偏移量,确保消费成功后提交
consumer.commitSync();
@RetryableTopic
是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,首先推荐使用这个注解来完成重试。
// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(
attempts = "5",
backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {"test_topic"}, groupId = "test_group")
private void customer(String message) {
...
}
如果处理失败,可以将消息推送到死信队列。对于死信队列的处理,既可以用 @DltHandler
处理,也可以使用 @KafkaListener
重新消费。
4. Kafka 如何保证消息的持久化和可靠性?
Kafka 消息持久化与可靠性机制:
- 消息持久化:Kafka 将消息持久化到磁盘,并且使用日志文件和分区副本机制来保证消息的可靠性。每个分区有多个副本,副本分布在不同的 Kafka 节点上,确保即使某些节点宕机,数据不会丢失。
- 生产者的确认机制:生产者通过
acks
参数来设置消息确认机制,acks=1
表示只有领导副本确认,acks=all
表示所有副本确认后消息才算成功。
优化思路:
- 提高副本数:通过设置合理的
replication.factor
来保证副本数,通常至少设置为 2 或 3,以提高容错性。 - 生产者确认策略:通过设置
acks=all
来确保所有副本确认,增加消息的可靠性。
代码优化示例:
acks=all // 确保所有副本都确认
retries=3 // 设置消息重试次数
5. Kafka 如何应对高吞吐量场景?
Kafka 高吞吐量优化机制:
- 分区与并行消费:Kafka 使用分区机制将消息分散到多个节点上,通过增加分区数和消费者数来实现高并发消费。
- 批量处理:Kafka 支持批量发送消息和批量消费,可以显著提高吞吐量。生产者通过批量发送减少了网络传输的开销,消费者通过
max.poll.records
控制一次性拉取的消息量,提高处理效率。
优化思路:
- 增加分区数与消费者数:增加分区数可以提高并行消费的能力,进一步提升吞吐量。合理配置消费者的并行度,避免消费者空闲。
- 生产者端批量优化:通过调整
batch.size
和linger.ms
来批量发送消息,减少每条消息的网络开销。
代码优化示例:
batch.size=16384 // 每批次发送消息的字节数
linger.ms=1 // 消息在发送前的等待时间
compression.type=gzip // 启用压缩减少带宽使用
总结
由 Kafka 等高可用中间件引出来的相关技术点有许多,值得再深入关注的一些技术(填坑中):
- 零拷贝技术
- 分布式事务
- 持久化刷盘机制
- 死信队列(DLQ)
- …