背景:
在集群模式下,每个实例需要分组ID不同,共同消费某个topic,集群下的实例是动态扩展的,无法确认实例的个数,每次项目启动的时候,需要动态的给定kakfa的分组ID,但是分组ID整体是一样的,不能改变。
方式1:
CURRENT_INSTANCE_GROUP_ID = KafkaConstant.SSE_GROUP.concat(String.valueOf(System.identityHashCode(sendSyncTaskFactory)))
使用:System.identityHashCode(sendSyncTaskFactory)方法,获取某个class的实例code,这样不管集群有几个项目实例,都可以保证每个实例的分组ID不同
注意:这中模式下,每次启动项目都相当于重新给kafka赋值新的groupId
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: latest
这种模式下,该配置一定要配置为latest,不然每次启动都要把之前的topic全部重新消费一遍
方式二:
将kafka的分组预先存储到表里,分组数大于实例数即可,然后配置:auto-offset-reset: latest 这样可以保证每次重新启动都是从最新的offset进行消费
项目在启动的时候进行分组抢占:
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.PostConstruct; import java.util.HashMap; import java.util.List; import java.util.Map; /** * kafka的消费者配置 * * @author G008186 */ @Slf4j @Component public class KafkaConsumerConfig { @Autowired private SendSyncTaskFactory sendSyncTaskFactory; @Autowired private MessageKafkaGroupManage kafkaGroupManage; @Autowired private RedisLock redisLock; @Value("${spring.kafka.bootstrap-servers}") private String BROKERS; @Value("${spring.kafka.consumer.enable-auto-commit}") private Boolean ENABLE_AUTO_COMMIT; @Value("${spring.kafka.consumer.auto-commit-interval}") private String AUTO_COMMIT_INTERVAL_MS; @Value("${spring.kafka.consumer.auto-offset-reset}") private String AUTO_OFFSET_RESET; @Value("${spring.kafka.listener.concurrency}") private Integer CONCURRENCY; @Value("${spring.kafka.listener.missing-topics-fatal}") private Boolean TOPICS_FATAL; private String CURRENT_INSTANCE_GROUP_ID; @PostConstruct public void init(){
//查询kafka分组列表 List<MessageKafkaGroup> groupManageList = kafkaGroupManage.list(); for (MessageKafkaGroup kafkaGroup : groupManageList){
//通过redis进行抢占,抢占到就把这个分组id赋值给该实例 if (redisLock.lock(kafkaGroup.getGroupKey(),kafkaGroup.getGroupKey(),60)){
//赋值分组ID CURRENT_INSTANCE_GROUP_ID = kafkaGroup.getGroupKey(); break; } }
//若实例不够,未抢占到kafka分组,则启动失败 if (!StringUtils.hasText(CURRENT_INSTANCE_GROUP_ID)){ throw new BizMessageException(ExceptionMessage.BizSendCommon.KAFKA_GROUP_IS_NULL); } } /**构建kafka监听工厂*/ @Bean("kafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setConcurrency(CONCURRENCY); factory.setMissingTopicsFatal(TOPICS_FATAL); factory.setConsumerFactory(consumerFactory()); return factory; } /**初始化消费工厂配置 其中会动态指定消费分组*/ private ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); /**多实例部署每个实例设置不同groupId 实现发布订阅*/ //CURRENT_INSTANCE_GROUP_ID = KafkaConstant.SSE_GROUP.concat(String.valueOf(System.identityHashCode(sendSyncTaskFactory))); log.info("当前实例WsMsgConsumer group_id:{}",CURRENT_INSTANCE_GROUP_ID); //设置分组ID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CURRENT_INSTANCE_GROUP_ID); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET); return new DefaultKafkaConsumerFactory<String, String>(properties); } }
标签:springboot,springframework,kafka,分组,org,import,ID From: https://www.cnblogs.com/xzlnuli/p/18210982