首页 > 其他分享 >【知识整理】基于Springboot的Kafka消费者动态操作

【知识整理】基于Springboot的Kafka消费者动态操作

时间:2023-08-24 17:03:09浏览次数:44  
标签:container Springboot MessageListenerContainer Kafka private ID 动态 Consumer publi

基于Springboot的Kafka消费者动态操作

1. 问题

​ 在基于Springboot开发Kafka相关业务时,遇到如下业务场景:

  • 执行部分操作时,如停止服务替换镜像、执行特殊业务处理等,需要先停止Consumer接收Kafka消息,待处理完成后再开启Consumer接续接收Kafka消息
  • 为并发消费Kafka消息,可通过配置spring.kakfa.listener.concurency来设置Consumer的并发数;但spring.kakfa.listener.concurency是一个全局配置,当一个服务需要同时监听多个Topic,并且不同的Topic的Consumer需要设置不同的并发数时,这种方法就不适用

2. 解决思路

2.1 源码分析

​ 在Springboot项目中,一般通过方法上的@KafkaListener注解来注册Consumer,在Springboot服务启动过程中,通过实现了Springboot的扩展点的KafkaListenerAnnotationBeanPostProcessor类,在postProcessAfterInitialization方法中识别含有@KafkaListener注解的方法,并注册至KafkaListenerEndpointRegistry中(详细的源码在此不展开描述,有兴趣的可以自行翻阅源码或查询资料)。因此,后续的操作也将围绕着Listener容器MessageListenerContainer和注册表KafkaListenerEndpointRegistry展开。

2.2 动态启停Consumer

​ Listener容器MessageListenerContainer接口扩展了SmartLifecycle接口,在Lifecycle接口的start()方法基础上,扩展了pause()方法和resume()方法。通过注释可以知道,这三个方法分别对应了Listener的启动、暂停和恢复。

​ 在KafkaListenerEndpointRegistry类中,提供了根据ID获取MessageListenerContainer的方法。

image-20230824153735822

​ 因此,只要通过ID在KafkaListenerEndpointRegistry中获取了Listener容器MessageListenerContainer后,即可进行对应的开始、暂停和恢复Consumer的操作。

2.3 动态修改参数

​ 要想为不同的Listener配置不同的concurrency参数,首先得知道concurrency参数是在哪里被设置至Listener中的。通过Debug分析源码可知,在实现了MessageListenerContainer接口的ConcurrentMessageListenerContainer类中有一个setConcurrency(int)方法,可以设置容器的并发数。同时,Listener的注册表KafkaListenerEndpointRegistry类同样实现了SmartLifecycle接口,并在start()方法中实际启动Listener容器,因此想要动态修改参数,必须在容器启动前,即KafkaListenerEndpointRegistry执行start()方法前进行处理。

image-20230824160352685

3. 动态启停Consumer

​ 首先,定义一个公共的抽象类AbstractScheduledConsumer

public abstract class AbstractScheduledConsumer<T> {

    @Resource
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public abstract void onMessage(T data);

    protected abstract String getId();

    protected MessageListenerContainer getListenerContainer() {
        String containerId = this.getId();
        MessageListenerContainer container = this.kafkaListenerEndpointRegistry.getListenerContainer(containerId);
        Assert.notNull(container, String.format("MessageListenerContainer [%s] 获取失败", containerId));
        return container;
    }

    /**
     * 启动
     */
    public void start() {
        MessageListenerContainer container = getListenerContainer();
        if (!container.isRunning()) {
            container.start();
        } else {
            container.resume();
        }
    }

    /**
     * 暂停
     */
    public void pause() {
        getListenerContainer().pause();
    }

    /**
     * 恢复
     */
    public void resume() {
        getListenerContainer().resume();
    }
}

​ 业务处理的Consumer类只需要继承AbstractScheduledConsumer类即可实现Consumer的动态启停。变量ID即为Listener的ID,需要为每个Consumer定义不同的ID。

@Component
public class BusinessConsumer extends AbstractScheduledConsumer<ConsumerRecord<String, byte[]>> {

    /**
     * 自定义ID
     */
    private static final String ID = "business-consumer-id";

    @Override
    @KafkaListener(id = ID, topics = "")
    public void onMessage(ConsumerRecord<String, byte[]> data) {
        // 业务处理
    }

    @Override
    protected String getId() {
        return ID;
    }
}

​ 至此,BusinessConsumer类已具备动态启停的Kafka Consumer的功能,只需要在Service和Controller增加代码即可通过接口实时启动、暂停和恢复Consumer。

4. 动态修改Consumer参数

​ 首先,定义一个配置项,用于配置需要修改的Consumer参数,此处的ID和上文的Listener的ID一致。

public class CustomizedKafkaConfig {

    /**
     * 是否启用Consumer拦截器
     */
    private boolean consumerInterceptorAutoStart = true;

    /**
     * 全局Consumer配置
     */
    private ConsumerInfo globalConsumerInfo;

    /**
     * 独立Consumer配置
     */
    private Map<String, ConsumerInfo> customizedConsumerInfos = new HashMap<>();

    // 省略 get/set 方法

    /**
     * 根据ID获取配置
     */
    public ConsumerInfo getConsumerInfo(String id) {
        return customizedConsumerInfos.get(id);
    }

    public static class ConsumerInfo {

        private Boolean autoStart;

        private Integer concurrency;

        public Boolean getAutoStart() {
            return autoStart;
        }

        public void setAutoStart(Boolean autoStart) {
            this.autoStart = autoStart;
        }

        public Integer getConcurrency() {
            return concurrency;
        }

        public void setConcurrency(Integer concurrency) {
            this.concurrency = concurrency;
        }
    }
}

​ 接着定义Consumer拦截器,同样实现SmartLifecycle接口,通过getPhase()返回值保证优先于KafkaListenerEndpointRegistry执行。

@Slf4j
public class KafkaListenerContainerInterceptor implements SmartLifecycle {

    private final CustomizedKafkaConfig config;

    private final KafkaListenerEndpointRegistry registry;

    private volatile boolean running = false;

    public KafkaListenerContainerInterceptor(CustomizedKafkaConfig customizedKafkaConfig, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
        this.config = customizedKafkaConfig;
        this.registry = kafkaListenerEndpointRegistry;
    }

    @Override
    public void start() {
        Collection<MessageListenerContainer> listenerContainers = registry.getAllListenerContainers();

        ConsumerInfo globalConsumerInfo = config.getGlobalConsumerInfo();

        // 存在全局设置
        if (globalConsumerInfo != null) {
            log.info("已设置全局ConsumerInfo [autoStartup = {}, concurrency = {}]", globalConsumerInfo.getAutoStart(), globalConsumerInfo.getConcurrency());

            listenerContainers.forEach(c -> resetMessageListenerContainer(c, globalConsumerInfo));
        }

        // 自定义消费者设置
        for (MessageListenerContainer container : listenerContainers) {
            String id = container.getListenerId();

            ConsumerInfo consumerInfo;
            // 未自定义消费者设置,跳过拦截
            if ((consumerInfo = config.getConsumerInfo(id)) == null) {
                continue;
            }

            // 拦截设置
            resetMessageListenerContainer(container, consumerInfo);
        }
    }

    @Override
    public void stop() {
        this.running = false;
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    @Override
    public boolean isAutoStartup() {
        return config.isConsumerInterceptorAutoStart();
    }

    @Override
    public int getPhase() {
        return 0;
    }

    private void resetMessageListenerContainer(MessageListenerContainer container, ConsumerInfo consumerInfo) {
        String id = container.getListenerId();

        // 设置AutoStartup属性
        Optional.ofNullable(consumerInfo.getAutoStart()).ifPresent(v -> {
            container.setAutoStartup(v);
            log.info("MessageListenerContainer [{}] [autoStartup] 属性设置为 [{}]", id, v);
        });

        // 设置concurrency属性
        if (container instanceof ConcurrentMessageListenerContainer<?,?>) {
            Optional.ofNullable(consumerInfo.getConcurrency()).ifPresent(v -> {
                ((ConcurrentMessageListenerContainer<?,?>) container).setConcurrency(v);
                log.info("MessageListenerContainer [{}] [concurrency] 属性设置为 [{}]", id, v);
            });
        } else {
            log.warn("MessageListenerContainer [{}] 不是 [ConcurrentMessageListenerContainer],无法修改 [concurrency] 属性", id);
        }
    }
}

​ 最后定义Configuration类,用户注册KafkaListenerContainerInterceptor类。

@Configuration
public class CustomizedKafkaConfiguration {

    @Resource
    private CustomizedKafkaConfig customizedKafkaConfig;

    @Resource
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Bean
    @ConditionalOnMissingBean
    public KafkaListenerContainerInterceptor kafkaListenerContainerInterceptor() {
        return new KafkaListenerContainerInterceptor(customizedKafkaConfig, kafkaListenerEndpointRegistry);
    }
}

标签:container,Springboot,MessageListenerContainer,Kafka,private,ID,动态,Consumer,publi
From: https://www.cnblogs.com/little-code-farmer/p/17654572.html

相关文章

  • springboot3 集成mybatis 和通用mapper
    xml版本查看:https://www.cnblogs.com/binz/p/6564490.htmlspringboot3.x以前的版本查看https://www.cnblogs.com/binz/p/17421063.htmlspringboot3.x查看  https://www.cnblogs.com/binz/p/17654403.html1、pom引用<parent><groupId>org.springframework.boot</gro......
  • 帆软报表--动态参数注入
    帆软官方文档https://help.fanruan.com/当报表的数据需要关联多张表才能查询,但关联查询速度又太慢时,可以使用动态参数注入的功能,可以提高报表的加载速度,又不用编写复杂的SQL语句--将当前行B列的数据作为参数获取当前列对应的值......
  • springboot中2种配置定时任务
    1、@Schedule用@Schedule注解,直接放到方法上就可以生效,代码如下:@Scheduled(cron="0*/5***?")publicvoiddealResult(){log.info("开始执行定时任务......");DefaultProfileprofile=DefaultProfile.getProfile(rdsConf......
  • 【知识整理】Springboot启动扩展点
    SpringBoot启动扩展点整理1.前言​ 在Springboot服务启动阶段,Springboot提供了许多扩展点。在实际的业务开发过程中,部分特殊的业务需求需要再Springboot服务启动过程中动态的加载配置或者执行业务处理,特此将常用的Springboot启动扩展点做一个简单的整理。2.准备阶段2.1Env......
  • 直播系统开发,springboot指定时间触发定时任务
    直播系统开发,springboot指定时间触发定时任务新建测试Demo类执行定时任务 packagecom.task.zhixingshijian;importjava.time.LocalDateTime;importjava.time.temporal.ChronoField;importjava.util.*;/** *@authorwuzhenyong *ClassName:TaskDemo.java *date:2022-......
  • Vue【原创】下划线动态效果按钮,一般按钮模式,开关切换模式。
     1.lilo-icon-button一般按钮模式:1<template>2<divclass="icon-button":style="{color:font.color}"@click="onclick">3<i:class="[icon.type]":style="{color:icon.color,font......
  • kafka设计原理详解
      Kafka核心总控制器Controller在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(KafkaController),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集......
  • Springboot-配置文件
    1.SpringBoot配置文件格式1.properties2.ymlproperties优先级高于yml。自动识别的配置文件:bootstrap.yml和application.yml,bootstrap.yml先于application.yml加载,一般用于系统级别的配置,application.yml一般用于项目级别的配置Springboot官方的配置:https://docs......
  • springBoot 整合 poi 导出带有复杂表格(合并表格)的word文件
    1.Maven依赖见上一篇文章直接贴源码如下:packagecom.mingx.pms.web.system.file;importcn.hutool.core.date.DateUtil;importcom.mingx.pms.constant.SystemInfo;importcom.mingx.pms.entities.workplan.plan.vo.WorkPlanDetailExportVO;importcom.mingx.pms.entities......
  • kafka发送超大消息
    kafka发送超大消息设置 最近开发一cdc框架,为了测试极端情况,需要kafka传递100万条数据过去,1个G左右,由于其他环节限制,不便进行拆包(注:测下来,大包走kafka不一定性能更好,甚至可能更低)。测试百万以上的变更数据时,报消息超过kafkabroker允许的最大值,因此需要修改如下参数,......