首页 > 其他分享 >Kafka 如何保证消息不丢失?【消息手动 ACK】

Kafka 如何保证消息不丢失?【消息手动 ACK】

时间:2024-11-07 14:44:57浏览次数:7  
标签:ACK kafka 发送 丢失 import Kafka 消息

前言:

Kafka 作为一个 MQ 它肯定会有消息丢失的场景,那我们如何做到让 Kafka 的消息不丢失呢?本篇我们来剖析一下 Kafka 如何做到消息不丢失。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 之批量消息发送消费

Kafka 之消息广播消费

Kafka 之消息并发消费

Kafka 之顺序消息

Kafka 之事务消息

Kafka 如何保证消息不丢失?

Kafka 有生产者、Broker、Consumer,这三个环节都可能有消息丢失的情况发生,下面我们就从这三个方面来分析 Kafka 是如何保证消息不丢失的。

生产者:

生产者发送消息到 Kafka 集群的时候,可能会因为网络等其他原因导致发送失败,因此我们可以需要一个机制告诉我们消息是否发送成功,如果没有发送成功就一直发送,直到消息发送成功为止,我们常用的 send(msg) 方法其实是异步发送,发送完消息后会立即返回,我们并不知道消息是否发送成功,为了保证消息一定能够发送成功,建议使用同步发送 send(msg).get() 方法或者带有回调的 send(msg,callback) 方法。

同时我们可以对生产者增加一些配置来保证消息不丢失,配置如下:

#0:表示消息发送后立即返回 无需等待 Leader 的任何确认 1:表示消息写入了 Leader 副本 -1: 表示需要等到消息写入到所有 ISR 同步副本中
spring.kafka.producer.acks = 1
#生产消息发送的重试次数
spring.kafka.producer.retries = 3

spring.kafka.producer.acks 各个值的含义如下:

  • acks =0:表示生产者不需要等待任何 Broker 确认收到消息的回复,就可以继续发送下一条消息,性能最高,但是最容易丢消息,可以用在对性能要求很高,但对数据丢失不敏感的情况可以用这种。
  • acks =1:需要保证 Leader 已经成功将消息写入本地 文件,但是不需要等待所有 ISR副本(同步副本)是否成功写入,就可以继续发送下一条消息,这种情况下,如果 ISR副本(同步副本)没有成功备份数据,而此时 Leader又挂掉,则消息会丢失。
  • acks =-1:需要 Leader 及其所有的 ISR副本(同步副本)都成功写入日志,才可以继续发送下一条消息,这种策略会保证只要有一个副本存活就不会丢失数据,最大程度的保证了消息不会丢失。

Broker:

Broker 合理的使用持久化机制,ISR 副本同步机制可以最大程度的保证消息不丢失。

  • 持久化存储:Kafka 使用持久化来存储消息,让消息在写入 Kafka 的时候被写入磁盘,这种方式可以防止消息因为节点宕机而丢失。
  • ISR 副本复制机制:Kafka 使用 ISR 副本同步机制来保证消息不丢失,ISR 副本同步机制可以让一个分区有多个副本,且副本可以分布在不同的节点上,当某个节点宕机后,其他节点可以继续提供服务,保证消息不丢失。

消费者:

做为消费者只需要保证能够正确的消费消息,并正确的提交消息 offset 即可,Kafka 会记录每个消费者的偏移量,消费者每次消费消息的时候,都会将偏移量向后移动,当消费者挂掉或者 Kafka 宕机的时候,Kafka 会将该消费者的所消费的分区偏移量保存下来,当故障恢复后,消费者可以继续从上一次的偏移量开始消费,为了保证消息不丢失,我们使用手动提交偏移量即可,避免拉取了消息后,业务逻辑没有处理完的时候消费者挂掉了,但是提交了偏移量,导致消息丢失。

Consumer 需要关闭自动提交并开启手动提交,具体配置如下:

#消息 ACK 模式 有7种
spring.kafka.listener.ack-mode = manual
#是否开启手动提交 默认自动提交
spring.kafka.consumer.enable-auto-commit = false

Kafka 消息手动 ACK 案例演示

在演示 Kafka 手动 ACK 之前我们先了解一下 Kafka 的几种 ACK 的含义,也就是 AckMode 的枚举值的含义。

public static enum AckMode {
	RECORD,
	BATCH,
	TIME,
	COUNT,
	COUNT_TIME,
	MANUAL,
	MANUAL_IMMEDIATE;

	private AckMode() {
	}
}
  • RECORD:每一条记录被消费者消费之后提交。
  • BATCH:当每一批 poll() 的消息被消费者处理之后提交,频率取决于 poll 的调用频率,是 Kafka 的默认提交方式,BATCH 模式适用于需要提高处理效率的场景,例如批量处理大量消息以减少网络传输和系统调用的开销。
  • TIME:当每一批 poll()的数据被消费者处理之后,距离上次提交时间大于TIME时提交,如果当前时间有消息正在处理,则等当前消息处理完成在提交。
  • COUNT:当每一批 poll()的数据被消费者监处理之后,被处理消息数量大于等于 COUNT 时提交,如果当前时间有消息正在处理,则等当前消息处理完成在提交。
  • COUNT_TIME:TIME 或 COUNT 满足其中一个就提交。
  • MANUAL:当每一批 poll()的数据被消费者监处理之后,手动调用 Acknowledgment.acknowledge() 先将 offset 存放到 map 本地缓存,在下一次 poll 之前从缓存拿出来批量提交。
  • MANUAL_IMMEDIATE:当每一批 poll()的数据被消费者监处理之后,手动调用 Acknowledgment.acknowledge()后立即提交。

Kafka Producer

Kafka Producer 的代码同样很简单,这里我们使用了前面分享的同步、异步发送的的代码,具体如下:

package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ClassName: SyncKafkaProducer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description: 同步发送消息
 */
@Slf4j
@Component
public class SyncKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    //同步发送消息 
    public void sendSyncMessage(String message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(new Date());
        //同步发送消息
        try {
            kafkaTemplate.send("sync-topic", message).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("完成消息发送,当前时间:{}", dateStr);
    }

    //异步发送消息
    public void sendAsyncMessage(String message) {
        try {
            //同步发送消息
            ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("sync-topic", message);
            listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    log.error("消费发送失败");
                }

                @Override
                public void onSuccess(SendResult<String, String> stringStringSendResult) {
                    log.info("消息发送成功");
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

Consumer 代码演示

Consumer 我们还是使用 @KafkaListener 来完成消息监听,在 Consumer 代码中,我们刻意模拟了除0异常。

package com.order.service.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * @ClassName: ManualAckKafkaConsumer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description: 手动 ACK 消息消费
 */
@Slf4j
@Component
public class ManualAckKafkaConsumer {

    @KafkaListener(id = "my-kafka-manual-consumer",
            groupId = "my-kafka-consumer-manual-groupId-01",
            topics = "sync-topic",
            containerFactory = "myContainerFactory")
    public void listen(String message, Acknowledgment acknowledgment) {
        log.info("Manual ACK 消息消费成功,消息内容:{}", message);
        int a = 1 / 0;
        //手动提交 ACK
        acknowledgment.acknowledge();
    }

}

结果验证

2024-10-28 17:41:01.568  INFO 17764 --- [-consumer-0-C-1] c.o.s.k.consumer.ManualAckKafkaConsumer  : Manual ACK 消息消费成功,消息内容:我是一条同步消息

结果符合预期,测试的是如果没有关闭除0 异常,客户端会不停的消费这条消息,因此我们在消息消费失败的时候也要注意做出合理处理,例如加入死信队列,避免消息一直在被消费而占用系统资源。

总结:本篇简单分享了 Kafka 如何保证消息不丢失,并分享了对应的手动 ACK 的代码案例,需要注意的是 Kafka 无法做到消息 100% 不丢失,至于 Kafka 为什么没办法做到消息 100% 不丢失,后面会做分享,欢迎持续关注。

如有不正确的地方欢迎各位指出纠正。

标签:ACK,kafka,发送,丢失,import,Kafka,消息
From: https://blog.csdn.net/weixin_42118323/article/details/143301010

相关文章

  • SSL 固定(SSL Pinning)是一种提高应用程序安全性的技术,用于防止中间人攻击(MITM,Man-in-th
    SSL固定(SSLPinning)是一种提高应用程序安全性的技术,用于防止中间人攻击(MITM,Man-in-the-Middleattacks)和证书伪造攻击。它通过将服务器的SSL/TLS证书或其公钥“固定”到客户端应用程序中,确保客户端在与服务器通信时只信任特定的证书或公钥,从而降低了遭遇伪造证书或中间人攻击的......
  • Kafka面试题总结
    1、kafka消息发送的流程?2、Kafka的设计架构你知道吗?3、Kafka分区的目的?4、你知道Kafka是如何做到消息的有序性?5、ISR、OSR、AR是什么?6、Kafka在什么情况下会出现消息丢失7、怎么尽可能保证Kafka的可靠性8、Kafka中如何做到数据唯一,即数据去重?9、生产者如何提高......
  • kafka 相关操作命令
    /home/kafka/config/kafka_client_producer_jaas.conf文件为对应集群的鉴权配置文件,例如sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule\requiredusername="dev-user"password="devuser@123";security.protocol=SASL_PLAINTEXTsasl.mec......
  • Kafka在后端开发中的应用场景是什么?
    Kafka在后端开发中的应用场景非常广泛,主要体现在以下几个方面:异步处理:Kafka可以用于异步处理消息,使得各个模块之间的处理流程可以独立进行,不需要等待前一个流程完成即可开始下一个流程。消息系统(Messaging) :Kafka可以替代传统的消息代理,用于解耦生产者和消费者之间的关系,缓......
  • 深入理解 Kafka 的消息持久化机制
    在分布式系统中,消息队列扮演着至关重要的角色。Kafka作为一种高性能、高可靠的分布式消息队列系统,其强大的消息持久化机制是保证数据可靠性的关键。那么,什么是Kafka的消息持久化机制呢?一、Kafka简介Kafka是一个开源的分布式事件流平台,最初由LinkedIn公司开发,后来成为Apac......
  • RabbitMQ如何保证发送的消息可靠(RabbitMQ的Confirm模式和2.Return模式)
    RabbitMQ如何保证发送的消息可靠(RabbitMQ的Confirm模式和2.Return模式)1、RabbitMQ消息Confirm模式(保证从生产者到交换机的消息可靠)1.1、Confirm模式简介1.2、具体代码实现1.2.1、application.yml开启确认模式1.2.2、生产者方式1:实现RabbitTemplate.ConfirmCallback生产......
  • kafka分片与副本消息同步的详细策略[持久化]
    kafka分片与副本消息同步的详细策略[持久化]参考文章Kafka学习之路(三)Kafka的高可用DataReplication(副本策略)1.消息传递同步策略Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的ReplicationFactor为多少,Producer只将......
  • 第三章:组织页面完善、引入消息帖子与页面独立状态
    第三章:组织页面完善、引入消息帖子与页面独立状态在这一章里,我们来完善组织页面,打算将组织根据实际情况分为三种,工作室、社团、部门。我的想法是,将三种情况使用uni-ui中的卡片来进行介绍,点击卡片后跳转到相应页面,相应页面介绍所有的组织。在这里有一个让我为难的点,就是我不......
  • yolov8目标跟踪与行人车辆计数+YOLOv8 Object Detection with DeepSORT Tracking(ID +
    YOLOv8目标检测与DeepSORT跟踪技术简介在计算机视觉领域,目标检测和跟踪是两个至关重要的任务。目标检测旨在识别图像或视频中的特定对象,并确定它们的位置;而目标跟踪则是在连续的帧之间保持对这些对象的身份和位置的一致性跟踪。本文将详细介绍YOLOv8作为先进的目标检测算法......
  • Docker搭建kafka集群
    Docker搭建kafka集群kafka中的基本概念broker:消息中间件处理节点,一个broker就是一个kafka节点,一个或者多个broker就组成了一个kafka集群topic:kafka根据topic对消息进行归类,发布到kafka集群的每个消息,都要指定一个topicproducer:消息生产者,向broker发送消息的客户端consumer:消......