首页 > 编程语言 >Kafka安装配置及Java中的使用

Kafka安装配置及Java中的使用

时间:2024-06-01 12:29:40浏览次数:30  
标签:消费 Java -- 分区 kafka 消息 Kafka 安装

目录

一、消息队列

二、流派分类:

三、Kafka基本介绍

四、主题和分区的概念

五、Kafka集群

六、kafka-clients之生产者

七、kafka-clients之消费者

八、SpringBoot使用Kafka

九、Kafka集群中的controller、rebalance、HW

(1)controller

(2)rebalance机制

(3)HW和LEO

十、Kafka问题优化

十一、Kafka Eagle监控平台(新版本待测试)


一、消息队列

消息队列(Message Queue,简称MQ),具体地说,是为了解决通信问题

  1. 同步:顺序执行,存在性能和稳定性的问题;

    • 问题一:系统开销大,响应时间长;

    • 问题二:执行过程中需要保证每个服务的顺利执行,用户体验较差;

  2. 异步:通过消息队列,进行异步处理;

    • 优势一:极大提高系统的吞吐量;

    • 优势二:即使执行失败,也可以使用分布式事务来保证最终是成功的(最终一致性);

二、流派分类:

主流MQ分为以下几个:

  • KafKa:目前性能最好、速度最快;

  • RocketMQ:阿里根据Kafka封装,功能性更强;

  • RabbitMQ:功能性强,模式多;

  • ZeroMQ:看重的是MQ的通信能力,基于Socket封装。

功能性区分:

  1. 有Broker:通常有一台服务器作为Broker,所有消息都通过它中转。生产者将消息发送给Broker,Broker则把消息主动推送给消费者。

    • 重Topic:Kafka、JMS(ActiveMQ),在消息队列中,Topic必须存在。

    • 轻Topic:RabbitMQ(AMQP),Topic只是其中的一种中转模式。

  2. 无Broker:ZeroMQ,认为MQ是一种更高级的Socket,是为了解决通信问题。故ZeroMQ被设计为了一个库,而不是中间件,ZeroMQ做的事情就是封装出了一套类似Socket的API去完成发送、读取消息。

三、Kafka基本介绍

  1. 官网地址:Apache Kafka

  2. 依赖准备:

    • 安装JDK:jdk1.8 +;

    • 安装Zookeeper:自己安装,或者使用Kafka安装包内自带的也可以;

  3. Kafka安装:

    • 官网下载:当前版本为:kafka_2.13-2.8.0.tgz;

    • 解压缩:tar -xzvf kafka_2.13-2.8.0.tgz

    • 目录说明:

      • bin:二进制启动文件;

      • config:相关配置文件;

      • libs:依赖的jar包;

      • licenses:许可证文件;

      • logs:某些日志文件;

      • site-docs:压缩文档参考;

    • 修改配置文件(config目录内):

      • zookeeper.properties:修改日志目录,改为自定义路径;

      • server.properties

        # 单节点使用默认值,集群修改为唯一id
        broker.id=0
        # 监听的kafka服务ip地址
        listeners=PLAINTEXT://localhost:9092
        # 消息存储日志文件
        log.dirs=D://IT/Kafka/tmp/kafka-logs
        # 连接的zk服务器ip地址
        zookeeper.connect=localhost:2181

    • 启动测试(win10启动):

      • 启动zk服务器:创建脚本启动,命名为 zookeeper.bat,并运行;

        start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties"

      • 启动Kafka服务器:创建脚本启动,命名为 kafka.bat,并运行;

        start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server.properties"

      • 校验是否启动成功:进入zk-Cli,查看是否有kafka的 broker.id 节点;

        ls /brokers/ids

  4. Kafka基本概念:

    名称解释
    Broker消息中间件处理节点,一个Kafka节点就是一个Broker,一个或多个组成一个Kafka集群
    TopicKafka根据Topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个Topic
    Producer消息生产者,向Broker发送消息的客户端
    Consumer消息消费者,从Broker读取消息的客户端
  5. 创建Topic:

    • 创建Topic:

      ./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo1

    • 修改Topic:

      # 修改分区数
      ./kafka-topics.bat --zookeeper localhost:2181 -alter --partitions 3 --topic demo1

    • 查询所有的Topic:

      ./kafka-topics.bat --list --zookeeper localhost:2181

    • 查询具体某个Topic详细信息:

      ./kafka-topics.bat --zookeeper localhost:2181 --topic demo1 --describe

  6. 发送消息:

    使用Kafka自带的生产者命令客户端,既可以从本地文件读取内容,也可以在命令行直接输入消息。默认情况下,每一行都是一条独立的消息。发送消息时,需要指定发送到具体哪个Kafka服务器和Topic名称

    ./kafka-console-producer.bat --broker-list localhost:9092 --topic demo1

  7. 消费消息:

    • 从头开始消费:消费全部消息。

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic demo1

    • 从最新的消息开始消费(默认):从最后一条消息的偏移量 + 1开始消费。

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo1

    • 注意:

      • 消息可被存储,且是顺序结构,通过偏移量offset来描述消息的有序性;

      • 消息消费时可以指定偏移量进行描述消费的消息位置;

  8. 消费组:多个消费者可以组成一个消费组。

    • 查询所有的消费组:

      ./kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list

    • 查询某个消费组中具体信息:

      ./kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group demoGroup1
      • current-offset:当前消费组已消费偏移量;

      • log-end-offset:消息总量(最后一条消息的偏移量);

      • lag:积压消息总量;

  9. 单播消息:一个生产者,一个消费组(Group)。同一消费组中,只有一个消费者能收到Topic中的消息

    • 消费组:

      • 消费者一:

        ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup --topic demo1

      • 消费者二:

        ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup --topic demo1

  10. 多播消息:一个生产者,多个消费组(Group)。每个消费组中,只有一个消费者能收到Topic中的消息

    • 消费组一 demoGroup1

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup1 --topic demo1

    • 消费组二 demoGroup2

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup2 --topic demo1

四、主题和分区的概念

  1. 主题Topic

    • topic是一个逻辑的概念,Kafka通过Topic将消息进行分类,不同的topic将被订阅的消费组进行消费。

    • 当topic中的消息非常多,由于消息会保存在日志中,导致内存占用过大,由此提出分区Partition的概念。

  2. 分区Partition

    通过partition将一个topic中的消息进行分区存储。好处如下:

    • 分区存储,可以解决存储问题过大的问题;

    • 提升了消息日志读写的吞吐量,读和写可以在多个分区中同时进行;

  3. 消息日志文件分析(kafka-logs):

    • 0000000000.log:文件中保存的就是消息内容;

    • __consumer_offsets-x 文件夹:Kafka内部默认会创建50个 __consumer_offsets-x 分区(0-49),目的是为了存放消费者消费某个主题Topic的偏移量。当消费者消费完成后就会把偏移量上报给对应的主题Topic进行保存。目的是为了提升当前的Topic的并发性。

      • 消费完成提交至对应分区的计算方式:hash(消费组ID) % __consumer_offsets分区总数

      • 提交到主题内容的 Key 的方式:消费组ID + topic名称 + __consumer_offsets分区号

      • 提交到主题内容的 Value的方式:当前offset的值

    • 0000000000.index:日志文件的索引;

    • 0000000000.timeindex:日志文件按时间点的索引;

    • 日志文件,默认保存周期为七天,到期后自动删除。

五、Kafka集群

  1. 集群搭建(三个Broker):

    • 创建三个 server.properties 文件:

      • 第一个 server0.properties

        broker.id=0
        listeners=PLAINTEXT://localhost:9092
        log.dirs=D://IT/Kafka/tmp/kafka-logs-0
        zookeeper.connect=localhost:2181

      • 第二个 server1.properties

        broker.id=1
        listeners=PLAINTEXT://localhost:9093
        log.dirs=D://IT/Kafka/tmp/kafka-logs-1
        zookeeper.connect=localhost:2181

      • 第三个 server2.properties

        broker.id=2
        listeners=PLAINTEXT://localhost:9094
        log.dirs=D://IT/Kafka/tmp/kafka-logs-2
        zookeeper.connect=localhost:2181

    • 启动三个Broker:

      start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server0.properties"
      
      start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server1.properties"
      
      start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server2.properties"

    • 使用zkCli客户端检测是否启动成功:

      ls /brokers/ids		# 启动成功显示三个broker的id

  2. 副本(replication-factor)的概念:

    • 副本:是为主题中的分区创建的备份,一般来说,有几个Broker就应该创建几个副本。其中,会选举出一个副本作为 leader,其他的则是 follower

      • leader:Kafka的读写操作,都发生在leader上。leader负责把数据同步至其他的follower,当leader宕机时,将会进行主从选举,选出一个新的leader。

      • follower:介绍leader的同步数据;

      • isr:可以同步和已同步的节点会存入 ISR 集合中,主从选举从 ISR 集合内选出leader。当节点的性能较差时,ISR 集合将会踢出该节点。

  3. 集群消费问题:

    • 向集群发送消息:

      ./kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic demo1

    • 从集群中消费消息:

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --consumer-property group.id=demoGroup --topic demo1

    • 分区消费的细节:

      • 一个分区partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性。多个分区partition的多个消费者的顺序性无法保证(后续有方法可以保证)。

      • 分区partition的数量决定了消费组中的消费者数量,建议消费组中的消费者数量不要超过分区partition的数量,防止多出的消费者消费不到消息造成资源的浪费。

      • 消费者宕机时,将会触发 rebalance 机制,选出其他消费者进行消费该分区的消息。

六、kafka-clients之生产者

  1. 引入依赖(版本号与Kafka版本一致):

    <!--kafka-clients-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>

  2. 简单测试:

    • 创建一个Topic:

      ./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-test-topic

    • 启动三台Broker作为集群

    • 代码测试:

      public class ProducerTest {
      
          private static final String TOPIC_NAME = "my-test-topic";
          private static final String BOOTSTRAP_SERVER = "localhost:9092,localhost:9093,localhost:9094";
          private static final String KEY = "test:";
          private static final String VALUE = "This is a test for Kafka-producer!";
      
          private static final Properties prop = new Properties();
      
          static {
              // Kafka集群IP
              prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
              // ACK:消息持久化配置(-1、0、1)
              prop.put(ProducerConfig.ACKS_CONFIG, "1");
              // 重试次数配置
              prop.put(ProducerConfig.RETRIES_CONFIG, "3");
              // 重试间隔配置,单位:ms
              prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
      
              // 缓冲区大小,32Mb
              prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 32);
              // 发送消息大小,16Kb
              prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 16);
              // 未能拉取到16Kb数据,间隔 10ms 后,立即发送
              prop.put(ProducerConfig.LINGER_MS_CONFIG, 10);
      
              // key的序列化方式
              prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
              // value的序列化方式
              prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
          }
      
          @Test
          public void test1() throws Exception {
              // 1.创建消息生产者对象
              Producer<String, String> producer = new KafkaProducer<>(prop);
      
              for (int i = 0; i < 6; i++) {
                  // 2.封装消息载体对象
                  // 分区计算方式:hash(key) % partitionNum,即:key的哈希值 取余 分区的总数
                  ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, KEY + UUID.randomUUID(), VALUE);
                  // 自定义发送的分区位置
                  // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2, KEY + UUID.randomUUID(), VALUE);
      
                  // 3.同步发送消息
                  Future<RecordMetadata> future = producer.send(record);
      
                  // 4.获取消息发送成功后的元数据
                  RecordMetadata metadata = future.get();
                  System.out.println("主题Topic名称:" + metadata.topic());
                  System.out.println("保存partition分区的位置:" + metadata.partition());
                  System.out.println("总消息数(offset偏移量):" + metadata.offset());
              }
              producer.close();
          }
      
          @Test
          public void test2() {
              // 1.创建消息生产者对象
              Producer<String, String> producer = new KafkaProducer<>(prop);
      
              // 2.封装消息载体对象
              // 分区计算方式:hash(key) % partitionNum,即:key的哈希值 取余 分区的总数
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, KEY + UUID.randomUUID(), VALUE);
              // 自定义发送的分区位置
              // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2, KEY + UUID.randomUUID(), VALUE);
      
              // 3.异步发送消息
              producer.send(record, (data, exception) -> {
                  // 4.消息发送异常,要做的事
                  if (exception != null) {
                      System.out.println("发送失败,异常原因:" + exception.getMessage());
                  }
      
                  // 5.消息发送成功要做的事
                  if (data != null) {
                      System.out.println("主题Topic名称:" + data.topic());
                      System.out.println("保存partition分区的位置:" + data.partition());
                      System.out.println("总消息数(offset偏移量):" + data.offset());
                  }
              });
      
              // 延时等待,异步消息进程执行结束
              ThreadUtils.sleep(1000);
              producer.close();
          }
      
      }
      • 同步发送消息:如果发送消息后没有收到 ack,生产者将阻塞等待 3 秒,之后会进行重试,重试三次后仍然失败,则抛出异常。执行慢,但是不会丢失消息;

      • 异步发送消息:生产者发送完成后就可以执行后续业务,broker收到消息后,进行异步调用生产者提供的 callback 回调方法。当网络异常时可能出现回调方法未执行的问题,即消息丢失;

  3. 消息持久化机制参数(ACK配置,同步发送时用到 ACK 配置)

    • acks=0:发送消息后无需等待broker进行确认,就能发送下一条。性能最高,消息最容易丢失;

    • acks=1(默认值):至少需要等待 leader 成功写入本地日志,才能发送下一条。如果此时 leader 宕机,则会发生消息丢失;

    • acks=-1/all:需要等待多个节点写入日志(在 min.insync.replicas 中进行设置,默认值为1,推荐大于等于 2),才能发送下一条。只要有一个备份存活就不会丢失消息。金融级别的常用配置,性能最差,安全性最高;

    • 其他配置:

      • 重试次数配置

      • 重试间隔配置

    • 配置相关代码:

      // ACK:消息持久化配置(-1、0、1)
      prop.put(ProducerConfig.ACKS_CONFIG, "1");
      // 重试次数配置
      prop.put(ProducerConfig.RETRIES_CONFIG, "3");
      // 重试间隔配置,单位:ms
      prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");

  4. 消息缓冲区配置:

    • Kafka会创建一个消息缓冲区,存放要发送的消息,缓冲区大小为 32Mb;

      // 缓冲区大小,32Mb
      prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 32);

    • Kafka本地线程会去缓冲区拉取数据,发送至 Broker,一次拉取 16Kb;

      // 发送消息大小 16Kb
      prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 16);

    • 如果线程拉取不到 16Kb 的数据,间隔 10ms 后,也会将拉取的数据发送至Broker;

      // 未能拉取到16Kb数据,间隔 10ms 后,立即发送
      prop.put(ProducerConfig.LINGER_MS_CONFIG, 10);

七、kafka-clients之消费者

  1. 引入依赖(版本号与Kafka版本一致):

    <!--kafka-clients-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>

  2. 简单测试:

    public class ConsumerTest {
    
        private static final String TOPIC_NAME = "my-test-topic";
        private static final String BOOTSTRAP_SERVER = "localhost:9092,localhost:9093,localhost:9094";
        private static final String CONSUMER_GROUP_NAME = "test-group-0";
    
        private static final Properties prop = new Properties();
    
        static {
            // Kafka集群IP
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
            // 消费组的名称
            prop.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
            // key的反序列化方式
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // value的反序列化方式
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        }
    
        @Test
        public void test1() {
            // 1.创建消费者对象
            Consumer<String, String> consumer = new KafkaConsumer<>(prop);
    
            // 2.订阅Topic主题的消息列表
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    
            // 3.长轮询进行监听消息
            while (true) {
                // 4.拉取消息列表
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    
                // 5.循环处理每一条消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("主题Topic名称:" + record.topic());
                    System.out.println("分区partition的位置:" + record.partition());
                    System.out.println("消息offset偏移量:" + record.offset());
                    System.out.println("消息的键key:" + record.key());
                    System.out.println("消息的值value:" + record.value());
                }
            }
    
        }
    
    }

  3. 消费者提交 offset:

    • 提交的内容:所属的消费组 + 消费的 Topic + 消费的 partition + 偏移量 offset;

    • 自动提交:消息 poll 之后,消费消息之前进行 offset 提交。当消费者宕机时,会发生消息丢失问题;

      // 是否开启自动提交 offset,默认为 true
      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      // 自动提交 间隔时间
      prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

    • 手动提交:消息进行消费后,手动提交 offset;

      // 是否开启自动提交 offset,默认为 true
      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
      • 同步提交(更推荐使用):

        // 拉取消息列表
        Duration delay = Duration.ofMillis(1000);
        ConsumerRecords<String, String> records = consumer.poll(delay);
        if (records.count() > 0) {
            // 6.手动同步提交,等待Broker返回ACK才会执行后续业务,否则阻塞等待
            try {
                consumer.commitAsync();
            } catch (Exception e) {
                // 提交失败执行的逻辑
                e.printStackTrace();
            }
        }

      • 异步提交:

        // 拉取消息列表
        Duration delay = Duration.ofMillis(1000);
        ConsumerRecords<String, String> records = consumer.poll(delay);
        if (records.count() > 0) {
            // 6.手动异步提交,无需返回ACK,提交成功后异步调用回调方法
            consumer.commitAsync((offset, exception) -> {
                // 提交失败执行的逻辑
                if (exception != null) {
                    System.out.println("手动提交异常:" + exception.getMessage());
                }
        
                // 提交成功执行的逻辑
                if (CollUtil.isNotEmpty(offset)) {
                    offset.entrySet().forEach(System.out::println);
                }
            });
        }

  4. 消费者长轮询 poll 的配置:

    • 单次拉取条数:默认500条;

      // 单次拉取的消息条数
      prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");

    • 单次轮询的时间:poll方法的入参,此处为 1000 毫秒;

      Duration delay = Duration.ofMillis(1000);
      ConsumerRecords<String, String> records = consumer.poll(delay);
      • poll的机制:在轮询内时,重复拉取消息,直至拉取 500 条消息,或超过轮询时长停止;

        • 当首次拉取消息,够了 500 条,则停止拉取,执行后续业务逻辑;

        • 首次没有拉取到 500 条,进行重复拉取,直至 拉取够,或超过轮询时长停止;

        • 若多次拉取都不够 500 条,且超过了轮询时长,则停止拉取,执行后续业务逻辑;

      • 当前后两次轮询的间隔时间超过了 30 秒,集群将判定此消费者能力弱,并踢出消费组,并处罚 rebalance 机制, rebalance 机制会造成性能开销。可以修改如下配置,提升消费者的速度;

        // 单次拉取的消息条数,默认500
        prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
        // 单次长轮询的间隔时间,默认 1000 ms
        prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5 * 1000);

  5. 消费者的健康状态检测配置:

    // 消费者发送心跳间隔时长,心跳频率
    prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
    // 心跳超时将触发 rebalance机制,并踢出消费组 的超时时长
    prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

  6. 消费组指定partition分区进行消费:

    // 参数一,Topic名称。参数二,分区位置
    TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
    consumer.assign(Collections.singletonList(topicPartition));

  7. 消费者的消息回溯:指定分区位置,并从头开始消费;

    // 参数一,Topic名称。参数二,分区位置
    TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
    consumer.assign(Collections.singletonList(topicPartition));
    consumer.seekToBeginning(Collections.singletonList(topicPartition));

  8. 指定 offset 位置消费:

    // 指定offset位置进行消费
    consumer.seek(topicPartition, new OffsetAndMetadata(10));

  9. 指定时间点,获取 offset 进行消费:

    // 获取此Topic的全部分区
    List<PartitionInfo> infos = consumer.partitionsFor(TOPIC_NAME);
    
    // 消费当前时间前一天的消息
    long time = new Date().getTime() - 24 * 60 * 60 * 1000;
    
    // 封装分区和消费时间的参数
    HashMap<TopicPartition, Long> map = new HashMap<>();
    for (PartitionInfo info : infos) {
        map.put(new TopicPartition(TOPIC_NAME, info.partition()), time);
    }
    
    // 根据时间节点,找到消息偏移量 offset
    Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);
    for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
        TopicPartition key = entry.getKey();
        OffsetAndTimestamp value = entry.getValue();
    
        // 进行消息订阅,指定消费的偏移量 offset
        consumer.assign(Collections.singletonList(key));
        consumer.seek(key, new OffsetAndMetadata(value.offset()));
    }

  10. 新消费组的消费 offset 规则:

    当创建新的消费组启动消费者时,默认只会消费最新的消息。通过修改以下配置,使消费者首次启动时,从头开始消费,后续启动从最新消息开始消费。

    // 创建新的消费组时的消费规则,默认latest。latest:从最新开始 / earliest:从头开始  
    prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    • latest:从最新开始,只消费最新的消息;

    • earliest:首次启动从头开始,后续启动从最新开始;

    • 区别: seekToBeginning() 方法是每次启动都从头开始消费,earliest 只有首次启动从头开始,后续启动从最新开始;

八、SpringBoot使用Kafka

  1. 引入依赖:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

  2. 配置文件:

    spring:
      kafka:
        bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
        producer:
          acks: 1    # 0:无需ack确认;1:broker写入 leader日志后返回ack;-1:broker写入多个副本日志后返回ack
          retries: 3    # 重试次数
          batch-size: 16384   # 单次发送消息大小 16Kb
          buffer-memory: 33554432   # 缓存区大小 32Mb
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: test-group-0
          max-poll-records: 500   # 单次拉取消息条数
          enable-auto-commit: false   # 消费后是否自动提交
          auto-offset-reset: earliest   # 新消费组消费策略,earliest:首次从头消费,后续从最新消息消费
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
          ack-mode: manual  # 手动提交,监听器处理一次轮询的消息后(默认500条)调用ack.acknowledge()进行提交
    #      ack-mode: manual_immediate  # 手动提交,监听器处理单条消息后调用ack.acknowledge()进行提交

  3. 消息生产者:

    @RestController
    @RequestMapping("/kafka")
    public class KafkaController {
    
        private static final String TOPIC_NAME = "my-test-topic";
        private static final String KEY = "test:";
        private static final String VALUE = "This is a test for Kafka-producer!";
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @GetMapping("/send")
        public AjaxResult sendMessage() {
            ProducerRecord<String, String> record = 
                new ProducerRecord<>(TOPIC_NAME, 
                                     KEY + UUID.randomUUID(), 
                                     VALUE);
            // 返回值是Future
            kafkaTemplate.send(record);
            return AjaxResult.success();
        }
        
    }

  4. 消费者监听:

    @Component
    public class MyConsumer {
    
        private static final String TOPIC_NAME = "my-test-topic";
        private static final String CONSUMER_GROUP_NAME_0 = "test-group-0";
    
        /**
         * 自动监听是否有消息
         *
         * @param record 可以是 ConsumerRecords,也可以是 ConsumerRecord,前者一次处理多条消息,后者一次处理一条消息
         * @param ack    当关闭自动提交时,需要使用此参数进行 ack 确认提交
         */
        @KafkaListener(topics = TOPIC_NAME, groupId = GROUP_NAME)
        public void listenMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
            System.out.println(record);
            ack.acknowledge();
        }
    
    }

  5. 消费者配置主题Topic、分区partition、偏移量offset:

    /**
     * 创建一个消费组,包含 3 个消费者(concurrency),持续监听两个Topic的消息。
     * 第一个Topic,监听分区(0、1),分区 0的偏移量初始为 5;分区 1的偏移量初始为 10。
     * 第二个Topic,监听分区(1、2),分区 1的偏移量初始为 15;分区 2的偏移量初始为 20。
     *
     * @param record 可以是 ConsumerRecords,也可以是 ConsumerRecord,前者一次处理多条消息,后者一次处理一条消息
     * @param ack    当关闭自动提交时,需要使用此参数进行 ack 确认提交
     */
    @KafkaListener(groupId = GROUP_NAME, concurrency = "3", topicPartitions = {
            @TopicPartition(topic = TOPIC_NAME_1, partitions = {"0", "1"}, partitionOffsets = {
                    @PartitionOffset(partition = "0", initialOffset = "5"),
                    @PartitionOffset(partition = "1", initialOffset = "10")
            }),
            @TopicPartition(topic = TOPIC_NAME_2, partitions = {"1", "2"}, partitionOffsets = {
                    @PartitionOffset(partition = "1", initialOffset = "15"),
                    @PartitionOffset(partition = "2", initialOffset = "20")
            })
    })
    public void listenMessage2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println(record);
        ack.acknowledge();
    }

九、Kafka集群中的controller、rebalance、HW

(1)controller
  1. 选举机制:使用zk的机制,当 broker 创建时,会在 zookeeper 中创建一个临时序号节点,序号最小的节点代表的 broker 将作为集群中的 controller。

  2. 作用

    • 当集群中某一个副本的 leader 宕机,需要在集群中选出一个新的 leader,选举的规则是 ISR 集合中最左边的元素(ISR 集合会按照性能排序,性能越好越靠前);

    • 当集群中有 broker 的增加或减少,controller 会同步信息给其他的 broker;

    • 当集群中有 partition 分区的增加或减少,controller 会同步信息给其他的 broker;

(2)rebalance机制
  1. rebalance 的前提:当消费组中的消费者没有指定分区进行消费,由 Kafka 决定消息分区的分配。

  2. 触发 rebalance 的条件:当消费组中的消费者和分区的关系发生变化时;

  3. 分区分配策略(rebalance之前,分区分配有三种策略):

    • range:根据公式计算消费者消费的分区。

    • 轮询:分区逐一分配到消费者上。

    • sticky:粘合策略。如果需要rebalance,会在已分配的基础上进行调整,而不会影响之前的分配情况。建议开启此策略,因为 rebalance 机制会重新分配会造成资源浪费;

(3)HW和LEO
  1. HW:Height Water,称为高水位。它是 Broker 已完成同步的分界线,当消息写入 Broker,并同步到所有副本之后,HW才会变化。HW变化之前,消费者无法消费到未同步完成的消息,这么做的目的是为了防止消息丢失

  2. LEO:Log End Offset,指的是某个副本的最后一条消息的位置。

  3. 关系图如下:

十、Kafka问题优化

  1. 如何防止消息丢失?

    • 生产者:

      • 发送消息时,使用同步发送的方式;

      • 设置 ACK 的级别,设为1 或 -1即可,-1时可以做到99.99%防丢失率,需要修改 min.insync.replicas 分区备份数,推荐大于等于 2;

    • 消费者:拉取消息后的自动提交,修改为手动提交;

  2. 如何防止重复消费?

    • 生产者:

      • 关闭 retry 重试:不推荐,这样做可能导致消息丢失;

    • 消费者:

      • 开启自动提交:不推荐,这样做可能导致消息丢失;

      • 保证消费的幂等性(指多次访问结果一致):

        • 方式一:使用 redis / zk 分布式锁,主流方案,推荐使用;

        • 方式二:创建联合主键,保证消息的唯一性,防止插入多条记录;

  3. 如何做到顺序消费?

    • 生产者:

      • 使用同步发送,且 ACK 不为 0(0会导致消息丢失),确保消息发送顺序是正确的;

    • 消费者:

      • 主题 Topic 只能设置一个分区 Partition,且消费组只能有一个消费者;

    • Kafka的顺序消费会牺牲性能,可以考虑使用 RocketMQ 代替 ;

  4. 如何处理消息积压?

    • 消息积压的原因:由于消费的速度远赶不上生产的速度,导致

    • 解决方案:

      • 消费者使用多线程,充分利用机器的性能;

      • 优化业务架构,提升业务层的消费速度;

      • 创建多个消费组,多个消费者,提升消费者的速度;

      • 消息分发:创建一个消费者,进行转发消息,接收方为新的 Topic ,且配置了多个分区及多个消费者进行消费(不常用)。

  5. 如何实现延时队列?

    • 场景:创建订单后,如果 30 分钟未支付,则取消订单。

    • 解决方案:

      • 单独创建相应的主题;

      • 消费者消费该主题的消息(轮询);

      • 消费前进行判断,当前时间是否与消息的创建时间相差 30 分钟,且未支付;

        • 如果是:修改数据库状态为取消订单;

        • 如果否:记录当前 offset,且不再继续消费之后的消息。等待一分钟后,根据记录的 offset 拉取消息,继续进行判断。

十一、Kafka Eagle监控平台(新版本待测试)

未完待续...

标签:消费,Java,--,分区,kafka,消息,Kafka,安装
From: https://blog.csdn.net/fyx_demo/article/details/139371324

相关文章

  • 【JavaScript脚本宇宙】从i18next到Date-fns:国际化和本地化库
    跨越JavaScript新境界:六大库全面评测前言本文将详细介绍六种具有不同功能的JavaScript库,包括处理多语言支持、全球化和本地化、格式化日期、数字和字符串,解析、验证、操作、显示日期和时间,格式化和操作数字,以及最全面、最简单和一致的工具集用于处理JavaScript中的日期等......
  • Linux系统中,要检查CUDA是否安装成功
    在Linux系统中,要检查CUDA是否安装成功,可以通过运行一些命令来验证CUDA工具包和库是否可以被系统正确识别和链接。以下是一些可以执行的命令:检查CUDA版本:bashnvcc--version或者bashcuda--version检查CUDA安装路径:bashwhichnvcc检查CUDA目录是否存在:bashls/u......
  • HTML期末学生大作业-基于班级校园我的校园网页设计与实现html+css+javascript
    ......
  • HTML期末作业-基于HTML+CSS+JavaScript制作学生信息管理系统模板
    ......
  • mysql数据库8.4免安装方法配置
    在执行以下操作时,先把以前的data删除,移除相关服务后再进行操作。netstopmysqlmysqld--remove【简单安装,且操作不复杂】#1、根目录下新建my.ini[mysqld]basedir=C:/mysqldatadir=C:/mysql/dataport=3306#2、空密码初始化mysqld--initialize-insecure--user=mysql--conso......
  • 1940java swing零售库存管理系统myeclipse开发Mysql数据库CS结构java编程
    一、源码特点   javaswing零售库存管理系统是一套完善的窗体设计系统,对理解SWINGjava编程开发语言有帮助,系统具有完整的源代码和数据库,,系统主要采用C/S模式开发。应用技术:java+mysql开发工具:Myeclipse8.5、jdk。java零售商品库存管理系统二、功能介绍零售库......
  • 1882java密室逃脱管理系统 Myeclipse开发mysql数据库web结构java编程计算机网页项目
    一、源码特点java密室逃脱管理系统是一套完善的web设计系统,对理解JSPjava编程开发语言有帮助采用了java设计,系统具有完整的源代码和数据库,系统采用web模式,系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发,数据库为Mysql,使用java语言开发。二、功能介绍(1)......
  • 零基础学Java第二十七天之前端-HTML5详解
    前端-HTML5详解一、概述HTML5是HTML的第五个版本,它对HTML进行了许多改进和扩展,使得网页开发更加丰富和便利。HTML5是Web标准的重要组成部分,旨在提高浏览器兼容性,统一网页开发标准。HTML5不仅包括了HTML的基本元素和标签,还新增了许多功能和API,为网页开发提供了更多的可能......
  • 零基础学Java第二十七天之前端HTML5新特性
    HTML5新特性定义文档类型在文件的开头总是会有一个标签语言HTML4,文档声明<!DOCTYPEHTMLPUBLIC"-//W3C//DTDHTML4.01//EN""http://www.w3.org/TR/html4/strict.dtd">语言HTML5,文档声明<!DOCTYPEhtml>新增语义化标签头部标签<header>导航标签<nav>内容标......
  • 零基础学Java第二十四天之注解的理解与使用
    注解1、什么是注解java.annotation包Annotation是从JDK1.5开始引入的新技术,注解即可以对程序员解释又可以对程序解释2、注解与注释的区别注释:对程序员解释代码信息注解:对程序和程序员解释代码信息3、注解的所用不是程序本身,可以对程序作出解释(与注释(comment)类......