首页 > 其他分享 >【spring boot】 重启kafka客户端连接

【spring boot】 重启kafka客户端连接

时间:2023-04-23 10:48:19浏览次数:32  
标签:consume spring void boot kafka consumer public 客户端

背景

kafka 服务端重建时,kafka客户端会连不上kafka服务端,此时需要重启客户端重连

代码实现

@Service
public class KafkaConsumerService {
    private KafkaConsumer<String, String> consumer;
    
    @Autowired
    private KafkaProperties kafkaProperties;

   //在应用程序启动时初始化Kafka消费者,并启动一个单独的线程用于消费消息
    @PostConstruct
    public void init() {
        consumer = new KafkaConsumer<>(kafkaProperties.buildConsumerProperties());
        consumer.subscribe(Arrays.asList("myTopic"));
        Executors.newSingleThreadExecutor().submit(this::consume);
    }

    @PreDestroy
    public void destroy() {
        consumer.close();
    }

    private void consume() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息逻辑
            }
        }
    }

   //先关闭现有的Kafka消费者,然后创建一个新的消费者实例,并重新订阅我们的主题。最后,我们使用 ExecutorService 启动一个新的消费者线程
    public void restart() {
        consumer.close();
        consumer = new KafkaConsumer<>(kafkaProperties.buildConsumerProperties());
        consumer.subscribe(Arrays.asList("myTopic"));
        Executors.newSingleThreadExecutor().submit(this::consume);
    }
}

标签:consume,spring,void,boot,kafka,consumer,public,客户端
From: https://www.cnblogs.com/xiaoyu-jane/p/17345762.html

相关文章

  • Spring缓存注解的使用与源码分析
    SpringCache提供了一个对缓存使用的抽象,以及大量的实现方便开发者使用。SpringCache主要提供了如下注解:注解说明@Cacheable根据方法的请求参数对其结果进行缓存@CachePut根据方法的请求参数对其结果进行缓存,和@Cacheable不同的是,它每次都会触发真实方法的调用@CacheEvict根据一定......
  • SpringMVC启动流程源码分析
    SpringMVC向WEB容器中注入了两个对象:ContextLoaderListener:由AbstractContextLoaderInitializer注入。DispatcherServlet:由AbstractDispatcherServletInitializer注入。下面分别分析这两个对象在WEB容器启动时做了什么工作?ContextLoaderListenerContextLoaderListener实现了Servle......
  • 为spring boot定制启动banner
    直接打开这个网站 https://patorjk.com/software/taag/#p=testall&f=Larry%203D&t=Type%20Something%20 输入你想要的文字内容,点TestAll即可,我们这里选择的字体是:Larry3D,你也可以根据喜好,选择自己想要的字体 复制并保存到src/main/resources/banner.txt即可 参考资料:......
  • spring boot配置mybatis出现Invalid bound statement (not found)报错的解决办法
     背景:spring-boot-starter-parent2.5.6mybatis-spring-boot-starter2.2.0我遇到这个报错,是因为使用idea创建xml文件是没有后缀,举个例子,比如你创建的是AccountMapper.xml,结果使用idea创建的是AccountMapper,根本就没有后缀!解决办法也很简单,加上后缀就可以了,不需要做其他额外的......
  • mac m1运行jeecgboot指南
    后端使用m1mac运行x86的docker镜像会有问题,需要更换镜像,并且把最后一个镜像给注释掉,手动启动。同时,需要修改hosts文件,添加记录127.0.0.1jeecg-boot-redis127.0.0.1jeecg-boot-mysqlmysql启动后,把db目录下的sql文件执行一下,然后启动java项目version:'2'ser......
  • SpringDay01-入门基础知识、Bean的配置(一)
    Spring(黑马)一、基础知识1.1传统JavaWeb的缺点传统的JavaWeb在实现某个主要的业务逻辑时需要做的事情:new一个实现类对象,然后通过对象调用某个主要的方法;开启事务、提交事务、回滚事务;在日志中记录修改数据;在日志中记录异常数据等。以上传统方法带来的问题:实现类与接......
  • 记录一次艰难的云服务器部署前后端项目springBoot+mybatis和vue(两天解决的前后端跨域
    前言大家好我是歌谣今天继续给大家带来后端java的学习最近刚学习完java的一个增删改查紧接着就是部署项目了代码准备工作前端:vue后端:springboot+mybatis数据库mysql部署后端项目打包找到maven-package-runmavenbuild云服务器上面建立文件mkdir/www/springBoot创建文件......
  • kafka实践(十五): 滴滴开源Kafka管控平台 Logi-KafkaManager研究
    目录调试环境搭建前端调试环境后端调试环境功能架构工具理解应用开发人员kafka/管控开发人员kafka/管控运维人员部署验证windows环境下的部署/调试环境linux环境下生产使用后续 调试环境搭建前端调试环境github克隆比较慢gitee很快,采取前后端分离架构(springboot+reactJS+Typescrip......
  • SpringSecurity过滤器之HeaderWriterFilter
    HeaderWriterFilter用于对当前的HttpServletResponse添加某些浏览器保护的响应头。HeaderWriterFilter由HeadersConfigurer配置,在执行HeadersConfigurer#configure时调用createHeaderWriterFilter创建HeaderWriterFilter,同时添加了HeaderWriter集合:privateList<HeaderWriter>ge......
  • SpringMVC 后台从前端获取单个参数
    1.编写web.xml(模板)2.springmvc配置文件3.编写对应数据库字段的pojo实体类@Data@AllArgsConstructor@NoArgsConstructorpublicclassUser{privateintid;privateStringname;privateintage;}ViewCode4.编写Controller类首先是从前端获取单......