首页 > 其他分享 >深入解析Kafka消息传递的可靠性保证机制

深入解析Kafka消息传递的可靠性保证机制

时间:2024-06-08 20:33:16浏览次数:23  
标签:kafka producer 偏移量 Kafka props put 消息传递 解析 new

深入解析Kafka消息传递的可靠性保证机制

Kafka在设计上提供了不同层次的消息传递保证,包括at most once(至多一次)、at least once(至少一次)和exactly once(精确一次)。每种保证通过不同的机制实现,下面详细介绍Kafka如何实现这些消息传递保证。

1. At Most Once(至多一次)

在这种模式下,消息可能会丢失,但不会被重复传递。这通常发生在消费者在处理消息之前提交了偏移量,导致即使消息处理失败,也认为已经处理完成。

实现机制:

  • 消费者配置enable.auto.commit=true,并且默认提交偏移量的时间间隔较短(auto.commit.interval.ms)。
  • 消费者在处理消息之前提交偏移量,处理过程中如果发生故障,消息不会被重新处理。

2. At Least Once(至少一次)

在这种模式下,消息不会丢失,但可能会被重复传递。消费者确保在处理消息后才提交偏移量,故障恢复后会重新处理未提交偏移量的消息。

实现机制:

  • 消费者配置enable.auto.commit=false,手动提交偏移量。
  • 消费者在处理完每条消息后调用consumer.commitSync()或consumer.commitAsync()提交偏移量。

示例代码

Properties props = new Properties();
props.put("bootstrap.servers", "your_kafka_broker:9092");
props.put("group.id", "test_group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your_topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 处理消息
        }
        consumer.commitSync(); // 确保消息处理后提交偏移量
    }
} finally {
    consumer.close();
}

3. Exactly Once(精确一次)

在这种模式下,消息既不会丢失也不会重复传递。Kafka通过引入幂等性生产者和事务性API来实现这种保证。

实现机制

  • 幂等性生产者:确保生产者在重试发送时不会产生重复消息。通过配置enable.idempotence=true启用幂等性。
  • 事务性生产者和消费者:确保在生产和消费过程中可以使用事务,使消息的生产和消费操作要么全部成功要么全部失败。

配置幂等性生产者

Properties props = new Properties();
props.put("bootstrap.servers", "your_kafka_broker:9092");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", "true");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

使用事务性生产者和消费者

// 配置事务性生产者
Properties props = new Properties();
props.put("bootstrap.servers", "your_kafka_broker:9092");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-transactional-id"); // 唯一的事务ID
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // 初始化事务

try {
    producer.beginTransaction(); // 开始事务
    producer.send(new ProducerRecord<>("your_topic", "key", "value"));
    // 其他的发送操作
    producer.commitTransaction(); // 提交事务
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction(); // 中止事务
    producer.close();
}

在消费者端,可以使用Kafka Streams API或者事务性消费模式确保精确一次语义。

总结

Kafka提供了不同层次的消息传递保证,通过合适的配置和使用模式,用户可以根据应用需求选择合适的保证模式:

  • At Most Once:适用于对数据丢失不敏感的应用。
  • At Least Once:适用于不能接受数据丢失但可以接受重复数据的应用。
  • Exactly Once:适用于对数据一致性要求非常高的应用。

通过合理配置生产者、消费者和broker,可以在不同场景下实现合适的消息传递保证。

标签:kafka,producer,偏移量,Kafka,props,put,消息传递,解析,new
From: https://blog.csdn.net/qq_38411796/article/details/139550706

相关文章

  • CCF-GESP 等级考试 2023年9月认证C++四级真题解析
    一、单选题(每题2分,共30分)第1题⼈们所使⽤的⼿机上安装的App通常指的是()。A.⼀款操作系统B.⼀款应⽤软件C.⼀种通话设备D.以上都不对正确答案:B.⼀款应⽤软件解析:App是"Application"的缩写,中文意思是"应用",特指安装在智能手机上的第三方应用软件。这些软件通常......
  • GPT-4o多模态处理能力解析:AI技术的新高度
     GPT-4o模型在2024年5月14日被宣布推出,具有多项引人注目的特点与功能。能够支持文本、音频和图像的任意组合输入,并生成相应的文本、音频和图像输出。它在视觉和音频理解方面尤其出色,可以实时对音频、视觉和文本进行推理。相比之前的模型,GPT-4o在速度上有了显著的提升,例如,它可......
  • Asp .Net Core 系列:详解鉴权(身份验证)以及实现 Cookie、JWT、自定义三种鉴权 (含源码解
    什么是鉴权(身份验证)?https://learn.microsoft.com/zh-cn/aspnet/core/security/authentication/?view=aspnetcore-8.0定义鉴权,又称身份验证,是确定用户身份的过程。它验证用户提供的凭据(如用户名和密码)是否有效,并据此确认用户是否具备访问系统的权利。过程用户向系统提供......
  • 【源码】Spring Data JPA原理解析之事务注册原理
     SpringDataJPA系列1、SpringBoot集成JPA及基本使用2、SpringDataJPACriteria查询、部分字段查询3、SpringDataJPA数据批量插入、批量更新真的用对了吗4、SpringDataJPA的一对一、LazyInitializationException异常、一对多、多对多操作5、SpringDataJPA自定义......
  • 使用Apache Kafka构建可扩展的消息系统——Java的高吞吐数据处理
    引言:在处理大数据和实时事件驱动架构时,ApacheKafka展示了其强大的能力。作为一个高性能的消息队列,Kafka支持数据的发布和订阅,以及对数据流的存储,使其成为构建复杂的实时应用程序的核心组件。什么是ApacheKafka?ApacheKafka是一个开源的流处理平台,由LinkedIn开发并贡献给Ap......
  • 探索Java 17:新特性解析与实战指南
    引言Java作为企业级应用开发的首选语言之一,每次更新都带来了许多期待与讨论。Java17,作为最新的长期支持版本,不仅稳定了过去的实验特性,还引入了多项改进和新功能,本文将深入探讨这些新特性,并提供实用的代码示例来展示如何在实际项目中应用这些新特性。Java17的核心新特性Jav......
  • Curl 命令参数解析
    Curl参数:详细解析与示例curl是一个功能强大的命令行工具,用于传输数据。它支持多种协议,如HTTP、HTTPS、FTP、SFTP等。curl提供了丰富的参数,以满足各种传输需求。本文将详细解析curl参数,并通过代码示例说明其用法。1.参数概述curl参数分为两大类:通用参数和协议相关......
  • 免费,C++蓝桥杯比赛历年真题--第14届蓝桥杯省赛真题(含答案解析和代码)
    C++蓝桥杯比赛历年真题–第14届蓝桥杯省赛真题一、选择题答案:A解析:C++中bool类型与char类型一样,都需要1byte。一些其他类型的占用字节数:short:2byte,int:4byte,longlong:8byte,double:8byte,故答案为A。答案:C解析:A中结构体中可以定义成员变量,也可以定义只有该结......
  • kafka知识整理——部署
    一、部署(1)zk配置修改zk配置文件config/zookeeper.properties,修改dataDir或端口dataDir=/home/kafka/kafka3.7/data/zookeeperclientPort=2181(2)zk启动./zookeeper-server-start.sh-daemon../config/zookeeper.properties(3)kafka配置以部署三台kafka集群为例修改各自kafka......
  • 【Webpack4打包机制原理解析】
    webpack是一个打包模块化JavaScript的工具,在webpack里一切文件皆模块,通过Loader转换文件,通过Plugin注入钩子,最后输出由多个模块组合成的文件。webpack专注于构建模块化项目。#简单版打包模型步骤我们先从简单的入手看,当webpack的配置只有一个出口时,不考虑分......