首页 > 其他分享 >springboot集成kafka解决集群模式下分组ID不同问题

springboot集成kafka解决集群模式下分组ID不同问题

时间:2024-05-24 14:59:08浏览次数:21  
标签:springboot springframework kafka 分组 org import ID

背景:

在集群模式下,每个实例需要分组ID不同,共同消费某个topic,集群下的实例是动态扩展的,无法确认实例的个数,每次项目启动的时候,需要动态的给定kakfa的分组ID,但是分组ID整体是一样的,不能改变。

方式1:

CURRENT_INSTANCE_GROUP_ID = KafkaConstant.SSE_GROUP.concat(String.valueOf(System.identityHashCode(sendSyncTaskFactory)))

使用:System.identityHashCode(sendSyncTaskFactory)方法,获取某个class的实例code,这样不管集群有几个项目实例,都可以保证每个实例的分组ID不同

注意:这中模式下,每次启动项目都相当于重新给kafka赋值新的groupId

# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录

# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录

auto-offset-reset: latest

这种模式下,该配置一定要配置为latest,不然每次启动都要把之前的topic全部重新消费一遍

 

方式二:

将kafka的分组预先存储到表里,分组数大于实例数即可,然后配置:auto-offset-reset: latest 这样可以保证每次重新启动都是从最新的offset进行消费

项目在启动的时候进行分组抢占:

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * kafka的消费者配置
 *
 * @author G008186
 */

@Slf4j
@Component
public class KafkaConsumerConfig {

    @Autowired
    private SendSyncTaskFactory sendSyncTaskFactory;

    @Autowired
    private MessageKafkaGroupManage kafkaGroupManage;

    @Autowired
    private RedisLock redisLock;

    @Value("${spring.kafka.bootstrap-servers}")
    private String BROKERS;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean ENABLE_AUTO_COMMIT;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String AUTO_COMMIT_INTERVAL_MS;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String AUTO_OFFSET_RESET;

    @Value("${spring.kafka.listener.concurrency}")
    private Integer CONCURRENCY;

    @Value("${spring.kafka.listener.missing-topics-fatal}")
    private Boolean TOPICS_FATAL;

    private String CURRENT_INSTANCE_GROUP_ID;

    @PostConstruct
    public void init(){
//查询kafka分组列表 List<MessageKafkaGroup> groupManageList = kafkaGroupManage.list(); for (MessageKafkaGroup kafkaGroup : groupManageList){
//通过redis进行抢占,抢占到就把这个分组id赋值给该实例 if (redisLock.lock(kafkaGroup.getGroupKey(),kafkaGroup.getGroupKey(),60)){
//赋值分组ID CURRENT_INSTANCE_GROUP_ID = kafkaGroup.getGroupKey(); break; } }
//若实例不够,未抢占到kafka分组,则启动失败 if (!StringUtils.hasText(CURRENT_INSTANCE_GROUP_ID)){ throw new BizMessageException(ExceptionMessage.BizSendCommon.KAFKA_GROUP_IS_NULL); } } /**构建kafka监听工厂*/ @Bean("kafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setConcurrency(CONCURRENCY); factory.setMissingTopicsFatal(TOPICS_FATAL); factory.setConsumerFactory(consumerFactory()); return factory; } /**初始化消费工厂配置 其中会动态指定消费分组*/ private ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); /**多实例部署每个实例设置不同groupId 实现发布订阅*/ //CURRENT_INSTANCE_GROUP_ID = KafkaConstant.SSE_GROUP.concat(String.valueOf(System.identityHashCode(sendSyncTaskFactory))); log.info("当前实例WsMsgConsumer group_id:{}",CURRENT_INSTANCE_GROUP_ID); //设置分组ID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CURRENT_INSTANCE_GROUP_ID); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET); return new DefaultKafkaConsumerFactory<String, String>(properties); } }

 

标签:springboot,springframework,kafka,分组,org,import,ID
From: https://www.cnblogs.com/xzlnuli/p/18210982

相关文章

  • BOSHIDA AC/DC电源模块:适用于各种功率需求的电子设备
    BOSHIDAAC/DC电源模块:适用于各种功率需求的电子设备AC/DC电源模块是一种广泛应用于不同电子设备中的电源转换模块。它具有输出稳定、高效率、可靠性强等特点,适用于各种功率需求的电子设备。在本文中,我们将探讨AC/DC电源模块的工作原理、优点以及在不同应用领域的应用。 首先......
  • 京东面试:SpringBoot同时可以处理多少请求?
    SpringBoot作为Java开发中必备的框架,它为开发者提供了高效且易用的开发工具,所以和它相关的面试题自然也很重要,咱们今天就来看这道经典的面试题:SpringBoot同时可以处理多少个请求?准确的来说,SpringBoot同时可以处理多少个请求,并不取决于SpringBoot框架本身,而是取决于其内......
  • 麒麟系统下springboot程序开机自启动
    1、编写脚本放置到/etc/systemed/system目录下例如display.service[Unit]Description=display#Documentation=http://www.baidu.com#Requires=network.targetAfter=network.targetelasticsearch.serviceredis.servicemysql.server.service[Service]Type=forkingEn......
  • Java基于saas模式云MES制造执行系统源码Spring Boot + Hibernate Validation什么是MES
    Java基于saas模式云MES制造执行系统源码SpringBoot+HibernateValidation什么是MES系统?MES制造执行系统,通过互联网技术实现从订单下达到产品完成的整个生产过程进行优化管理。能有效地对生产现场的流程进行智能控制,防错防呆防漏,自动化集成各种制造信息,使管理者准确掌控工......
  • JAVA计算机毕业设计基于SpringBoot的疫苗接种管理系统(附源码+springboot+开题+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着全球范围内新冠疫情的爆发和持续,疫苗接种成为了防控疫情的重要手段。然而,疫苗接种的管理涉及到众多的环节和人员,如何有效地管理和跟踪接种者的接......
  • JAVA计算机毕业设计基于SpringBoot的窈窕之求食单平台的设计与实现(附源码+springboot+
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着生活节奏的加快和人们健康饮食意识的提升,越来越多的消费者开始关注菜谱的选择和制作。然而,市场上缺乏一个统一的、便捷的在线平台,用于提供丰富的......
  • GeminiDB PITR,让游戏回档“进退自如”!
    本文分享自华为云社区《GeminiDBPITR,让游戏回档“进退自如”!》,作者:GaussDB数据库。在实际业务场景中,客户数据库难免会出现数据损毁、数据丢失、数据误删除等故障场景。为保障业务的正常运行,通常需要将数据库恢复到故障发生前的某一个正常时刻。传统数据库采取周期性备份策略,即......
  • 基于SpringBoot+Vue的在线教育平台
    !!!有需要的小伙伴可以通过文章末尾名片咨询我哦!!! ......
  • 基于SpringBoot+Vue的在线拍卖系统
    !!!有需要的小伙伴可以通过文章末尾名片咨询我哦!!! ......
  • 基于SpringBoot+Vue的学科竞赛管理系统
    初衷在后台收到很多私信是咨询毕业设计怎么做的?有没有好的毕业设计参考?能感觉到现在的毕业生和当时的我有着同样的问题,但是当时的我没有被骗,因为现在很多人是被骗的,还没有出学校还是社会经验少,容易相信别人。所以为了大家少踩坑,我推荐一批可以运行的毕业设计和相关资料......