首页 > 其他分享 >kafka-生产者事务-数据传递语义&事务介绍&事务消息发送

kafka-生产者事务-数据传递语义&事务介绍&事务消息发送

时间:2024-06-05 16:33:09浏览次数:29  
标签:事务 语义 kafka 发送 消息 import org

文章目录

1、kafka数据传递语义

kafka发送消息时是否需要重试

  1. 仅发送一次:生产者发送消息后不重试,只发送一次 可能丢失消息 效率最高
  2. 至少一次:生产者发送消息后重试,可能重试多次 效率差
  3. 精准一次发送:生产者发送消息后无论是否重复发送 发送了多少次,在 kafka broker 中只保存一次消息,通过幂等性 + 生产者事务来实现

kafak天然支持幂等性,每个消息头中带了一个唯一的标志 kafka broker 根据此标志判断消息是否已经发送过,生产者事务可以保证数据没有最终发送成功时,消费者不可以消费,如果生产者发送消息时出现异常会自动回滚(清除之前发送的事务中的消息)

kafka天然幂等性:但是指的是生产者事务 生产消息时的幂等性,发送消息时消息中带唯一标识、broker接收到消息时如果重复不再保存,事务没提交消费者不能消费改消息

2、kafka生产者事务

一组消息要么一起成功 被消费者消息 要么一起失败都不能被消费者消费

  1. 配置ack为-1 分区所有副本均落盘成功
  2. 配置生产者重试(发送失败可以继续发送:需要保证发送失败后再次发送消息到kafka实现 精准一次发送)
  3. 需要给事务分配事务id(区分一个事务中的多条消息)

3、事务消息发送

3.1、application.yml配置

server:
  port: 8110

# v1
spring:
  kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    producer: # producer 生产者
      retries: 1 # 重试次数 0表示不重试
      acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)
      transaction-id-prefix: tx_  # 事务id前缀:配置后producer自动开启事务
      batch-size: 16384 # 批次大小 单位byte
      buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

3.2、创建生产者监听器

package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {

    //生产者 ack 配置为 0 只要发送即成功
    //ack为 1  leader落盘  broker ack之后 才成功
    //ack为 -1 分区所有副本全部落盘  broker ack之后 才成功
    @Override
    public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
        //ProducerListener.super.onSuccess(producerRecord, recordMetadata);
        System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()
        +",partition = "+producerRecord.partition()
        +",key = "+producerRecord.key()
        +",value = "+producerRecord.value()
        +",offset = "+recordMetadata.offset());
    }

    //消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息
    @Override
    public void one rror(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
        System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()
                +",partition = "+producerRecord.partition()
                +",key = "+producerRecord.key()
                +",value = "+producerRecord.value()
                +",offset = "+recordMetadata.offset());
        System.out.println("异常信息:" + exception.getMessage());
    }
}

3.3、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {
    //kafka生产者发送消息前执行:拦截发送的消息预处理
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
        +",partition:"+producerRecord.partition()
        +",key = "+producerRecord.key()
        +",value = "+producerRecord.value());
        return null;
    }

    //kafka broker 给出应答后执行
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        //exception为空表示消息发送成功
        if(e == null){
            System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
                    +",partition:"+recordMetadata.partition()
                    +",offset="+recordMetadata.offset()
            +",timestamp="+recordMetadata.timestamp());
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

3.4、发送消息测试

package com.atguigu.kafka.producer;

import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;

@SpringBootTest
class KafkaProducerApplicationTests {

    //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
    @Resource
    KafkaTemplate kafkaTemplate;

    @Resource
    MyKafkaInterceptor myKafkaInterceptor;

    @PostConstruct
    public void init() {
        kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
    }
    @Test
    void contextLoads() throws IOException {
        kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");
        //回调是等kafka,ack以后才执行,需要阻塞
        System.in.read();
    }

    //kafka事务支持spring-tx的事务注解
    //单元测试中的事务会自动回滚

    @Test
    void testTransaction() throws  IOException {

       //多个消息的发送在一个事务中执行
        kafkaTemplate.executeInTransaction((var1) -> {
            //通过一个事务中的operations对象来发送消息,执行事务操作
            var1.send("my_topic1",0,"", "spring-kafka-事务1");
            var1.send("my_topic1",0,"", "spring-kafka-事务2");
            int i = 1/0;
            var1.send("my_topic1",0,"", "spring-kafka-事务3");
            return "发送消息失败";
        });
        System.in.read();
    }
}

3.5、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {
    @Bean
    public NewTopic myTopic1() {
        //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
        return TopicBuilder.name("my_topic1")//主题名称
                .partitions(3)//主题分区
                .replicas(3)//主题分区副本数
                .build();//创建
    }
}

3.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

3.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu.kafka</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-producer</name>
    <description>kafka-producer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.8、控制台日志

生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务1
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务2
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务1,offset = -1
异常信息:Failing batch since transaction was aborted
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务2,offset = -1
异常信息:Failing batch since transaction was aborted

java.lang.ArithmeticException: / by zero

在这里插入图片描述

标签:事务,语义,kafka,发送,消息,import,org
From: https://blog.csdn.net/m0_65152767/article/details/139452792

相关文章

  • 赶紧收藏!2024 年最常见 20道 Kafka面试题(八)
    上一篇地址:赶紧收藏!2024年最常见20道Kafka面试题(七)-CSDN博客十五、Kafka中生产者运行流程是怎样的?Kafka生产者的运行流程涉及多个步骤,这些步骤确保了消息能够高效、可靠地从生产者发送到Kafka集群。以下是生产者运行流程的详细步骤:初始化:首先,生产者需要初始化,这包括设......
  • (大全集)大规模数据处理入门与实战(套装全10册 Kafka权威指南 Flink基础教程 数据科学
    书:pan.baidu.com/s/1YNu61Jk91VeISAX2F7-64g提取码:14pd是一本涉及大规模数据处理的入门级别的书籍,它通常旨在向读者介绍大规模数据处理的基本概念、技术、工具和实际应用。一些笔记:大数据概述: 介绍大数据的定义、特征和发展趋势。分布式系统: 讨论大规模数据处理的基础,包括......
  • MySQL InnoDB Cluster如何定位或找出超过事务大小的SQL?
    在MySQLInnoDBCluster中,有一个系统变量/参数group_replication_transaction_size_limit控制着事务的大小,如下所示mysql> select @@global.group_replication_transaction_size_limit;+---------------------------------------------------+| @@global.group_replication_tr......
  • 2024年公共事务管理与社会服务国际会议(ICPAMSS2024)
    2024年公共事务管理与社会服务国际会议(ICPAMSS2024)会议简介2024国际公共事务管理与社会服务会议(ICPAMSS2024)将在广州隆重举行。本次盛会诚挚邀请来自世界各地的公共事务管理和社会服务领域的专家、学者和从业者齐聚一堂,探索行业发展前沿,分享实践经验,推动理论创新。会议将......
  • MySQL——事务补充
    十一、RR和RC的本质区别select*from表名(lockinsharemode)#当不加共享锁时,说明此时进行的是快照读,加了共享锁则进行的是当前读;​当进行快照读的时候才会形成readview结构;​readview形成的时机不同,会影响事务的可见性,会造成RR和RC级别下事务可见性的不同;当RR......
  • MySQL从入门到高级 --- 12.事务 && 13.锁机制 && 14.日志
    文章目录第十二章&&第十三章&&第十四章:12.事务12.1特性12.2隔离级别13.锁机制13.1各存储引擎对锁的支持状况:13.2锁特性13.3MyISAM表锁13.3.1加表锁13.4InnoDB行锁13.4.1行锁特点13.4.2行锁模式14.日志14.1错误日志14.2二进制日志14.2.1日志格式14.3......
  • SQLServer事务的妙用
    日常处理数据,难免会操作数据库。update,delete或者insert操作,例如没有带条件或者带的条件不对,将痛苦万分。踩坑王在此分享经验,希望能帮到到家。 操作技巧:就一句话,只要是update,delete或者insert,提前开启一个事务,再去执行相应的sql语句。如果发现错了直接rollback即可。确认无......
  • 持续总结中!2024年面试必问 20 道 Kafka面试题(三)
    上一篇地址:持续总结中!2024年面试必问20道Kafka面试题(二)-CSDN博客五、Kafka的ISR(In-SyncReplica)是什么?Kafka的ISR(In-SyncReplicas)是Kafka中用于维护数据一致性和高可用性的关键概念之一。ISR列表包含了一个领导者副本(Leader)和与之保持完全同步的追随者副本(Follower......
  • 赶紧收藏!2024 年最常见 20道 Kafka面试题(二)
    上一篇地址:赶紧收藏!2024年最常见20道Kafka面试题(一)-CSDN博客三、Kafka的设计架构是什么?Kafka的设计架构是分布式和可扩展的,旨在处理高吞吐量的数据流。以下是Kafka设计架构的关键组成部分及其功能:Producer(生产者):生产者是向Kafka集群发送消息的客户端。它们负责创建消息......
  • 实验 5 语义分析和中间代码生成器 (Price 600)
    WX:help-assignmentcodeprice600实验5语义分析和中间代码生成器语义分析器分两部分,第一部分为赋值表达式,第二部分为数组、布尔表达式和控制语句。要求参考课本6.4.2、6.4.3和7.3、7.4、7.5,实现递归下降翻译器。注意数据结构:四元式:结构体四元式序列:结构体数......