首页 > 其他分享 >SpringBoot系列---【线程池优雅停机,避免消费数据丢数的问题】

SpringBoot系列---【线程池优雅停机,避免消费数据丢数的问题】

时间:2024-05-28 15:12:02浏览次数:13  
标签:isRunning SpringBoot esThreadPool 停机 --- 线程 Override public

1.问题

项目中通过kafka来对接上游,在项目中写一个listener监听topTopic队列,for循环消费records,在for循环中处理成存储到es的对象,一次拉50条,使用自定义线程池esThreadPool异步推送到es中,但是每次停机就会丢数据,例:kafka消费了1000条,但是往es中存储比较慢,优雅停机的时候,esThreadPool还没完成存储的这部分数据就会丢失。
@KafkaListener(topics={"kafka.topics.publicFlow.topicName",groupId="TOP_TOPIC_READER",containerFactory="cluster1KafkaListenerFactory"})
public void consumer(List<ConsumerRecord<String,String>> records,Acknowledgment acknowledgment){
    List<PublicFlowVo> esList = new ArrayList<>(records.size);
    try{
        for(ConsumerRecord<String,String> record : records){
            //解析成PublicFlowVo
            esList.add(build(record));
        }
    }catch (Exception e){
      //打印错误日志
    }finally{
      //提交offset
      acknowledgment.acknowledge();
      //推送到es
      esThreadPool.submit(()->{
        //写入es
        sendEs(esList);
      });
    }
}

2.分析原因

之所以这样,是因为项目集成了graceful优雅停机,springboot从2.3.0版本开始支持,想当然的认为优雅停机就能实现线程任务执行完之后再停机,这样就不会丢数。实际上,验证过后,才发现,graceful只能监听到主线程,假如是用了自定义线程池,要手动实现优雅停机。如果想不丢数,就必须先关闭消费kafka的线程,其次是等待esThreadPool任务执行完成之后,再关闭esThreadPool线程池,最后再停机。

3.解决方案

esThreadPool实现SmartLifecycle,可以自定义停机顺序,以及停机逻辑,下面是两种实现方案,一种可以设置最大等待时间,一种一直等到任务全部完成。

方案一:支持设置最大等待时间

@Component
public class EsThreadPoolManager implements SmartLifecycle {
  private boolean isRunning = false;

  @Resource(name="esThreadPool")
  private ThreadPoolExecutor threadPoolExcutor;

  @Override
  public void start(){ isRunning = true;}

  @Override
  public void stop(){ stop(()->{});}

  @Override
  public void stop(@NotNull Runnable callback){ 
    log.info("esThreadPool线程池开始停止!")
    long start = System.currentTimeMillis();
        // 关闭ThreadPoolExecutor,等待已提交的任务完成
        threadPoolExcutor.shutdown();
        try {
            // 等待线程池终止,设置最长等待时间,这里示例为30秒,这个值和graceful优雅停机时间无关,这个值先生效,这个时间到了才会去按graceful的时间判断是否需要强制关闭
            if (!threadPoolExcutor.awaitTermination(30, TimeUnit.SECONDS)) {
                // 超时后强制关闭
                threadPoolExcutor.shutdownNow();
            }
        } catch (InterruptedException ex) {
            // 如果等待被中断,则直接关闭
            Thread.currentThread().interrupt();
            threadPoolExcutor.shutdownNow();
        } finally {
            long end = System.currentTimeMillis();
            // 完成停机处理
            log.info("esThreadPool线程池停止!耗时:{}s",(end-start)/1000);
            callback.run();
            isRunning = false;
        }
  }

    @Override
    public boolean isRunning() {
        // 返回当前运行状态
        return isRunning;
    }

    @Override
    public int getPhase() {
        // 控制停机顺序,数值越大越先停止
        // 可以根据需要调整,如果先停止kafka,建议设置成小于2147483547的值,这个值可以在KafkaListenerEndpointRegistry这个类的getPhase()方法中找到。
        return Integer.MAX_VALUE;
    }
}

方案二:直到所有任务全部执行完成

@Component
public class EsThreadPoolManager implements SmartLifecycle {
  private boolean isRunning = false;

  @Resource(name="esThreadPool")
  private ThreadPoolExecutor threadPoolExcutor;

  @Override
  public void start(){ isRunning = true;}

  @Override
  public void stop(){ stop(()->{});}

  @Override
  public void stop(@NotNull Runnable callback){ 
    log.info("esThreadPool线程池开始停止!")
    long start = System.currentTimeMillis();
        // 关闭ThreadPoolExecutor,等待已提交的任务完成
        threadPoolExcutor.shutdown();
        try {
            // 每隔一段时间检查线程池状态
            while (true) {
                // 线程池中活动线程为0,且队列中没有待执行任务时,可以安全关闭
                if (gracefulThreadPool1.getActiveCount() == 0 && gracefulThreadPool1.getQueue().isEmpty()) {
                    break;
                }
                // 等待一段时间后再次检查
                Thread.sleep(2000); // 2秒钟检查一次
            }
        } catch (InterruptedException ex) {
            // 如果等待被中断,则直接关闭
            Thread.currentThread().interrupt();
            threadPoolExcutor.shutdownNow();
        } finally {
            long end = System.currentTimeMillis();
            // 完成停机处理
            log.info("esThreadPool线程池停止!耗时:{}s",(end-start)/1000);
            callback.run();
            isRunning = false;
        }
  }

    @Override
    public boolean isRunning() {
        // 返回当前运行状态
        return isRunning;
    }

    @Override
    public int getPhase() {
        // 控制停机顺序,数值越大越先停止
        // 可以根据需要调整,如果先停止kafka,建议设置成小于2147483547的值,这个值可以在KafkaListenerEndpointRegistry这个类的getPhase()方法中找到。
        return Integer.MAX_VALUE;
    }
}

标签:isRunning,SpringBoot,esThreadPool,停机,---,线程,Override,public
From: https://www.cnblogs.com/hujunwei/p/18218069

相关文章

  • CSP历年复赛题-P1179 [NOIP2010 普及组] 数字统计
    原题链接:https://www.luogu.com.cn/problem/P1179题意解读:统计l~r之间的整数包括多少个数字2。解题思路:枚举每一个数,对每一个数的每一位数字进行判断。100分代码:#include<bits/stdc++.h>usingnamespacestd;intl,r,ans;intmain(){cin>>l>>r;f......
  • create react app - cra系列比较
    省流版:craco目前还算在更新,其他的已经几年未更新了。虽然react官网已经不推荐cra了,但如果非要用这个系列还是推荐craco吧。GitHub-dilanx/craco:CreateReactAppConfigurationOverride,aneasyandcomprehensibleconfigurationlayerforCreateReactApp.3个月前Gi......
  • 基于java中的springboot框架实现医药管理系统项目演示【内附项目源码+论文说明】
    基于java中的springboot框架实现医药管理系统项目演示【内附项目源码+LW说明】摘要计算机网络发展到现在已经好几十年了,在理论上面已经有了很丰富的基础,并且在现实生活中也到处都在使用,可以说,经过几十年的发展,互联网技术已经把地域信息的隔阂给消除了,让整个世界都可以即......
  • 基于java中的springboot框架实现秒杀系统项目演示【内附项目源码+论文说明】
    基于java中的springboot框架实现秒杀系统项目演示【内附项目源码+LW说明】摘要社会发展日新月异,用计算机应用实现数据管理功能已经算是很完善的了,但是随着移动互联网的到来,处理信息不再受制于地理位置的限制,处理信息及时高效,备受人们的喜爱。本次开发一套基于SpringBoo......
  • 持续性学习-Day16(前端基础JavaScript)
    LearnJavaScript参考教学视频:秦疆参考知识UI框架Ant-design:阿里巴巴出品,基于React的UI框架ElementUI、iview、ice:饿了吗出品,基于VUE的UI框架BootStrap:Twitter推出的一个用于前端开发的开源工具包AmazeUI:一款HTML5跨屏前端框架JavaScript构建工具Babel:JS编译......
  • 【chisel】chisel中for (i <- 0 until N-2) {} 的用法,表示什么?
    在Chisel中,for(i<-0untilN-2){}是一个for循环的语法,它表示从0开始到N-2的整数(不包括N-2),并对每个整数i执行大括号{}内的代码块。这里的until是Scala语言的一个关键字,用于生成一个从起始值到结束值之前的所有整数的序列。在Chisel中,Scala的这个特性......
  • 【wiki知识库】02.wiki知识库SpringBoot后端的准备
      ......
  • 2024-Linux
    单选题 一.单选题(共64题,100分)1. (单选题)如果umask设置为022,新创建的文件的缺省权限是什么?A.\----w--w-B.\-w--w----C.\r-xr-x---D.\rw-r--r--我的答案: D:\rw-r--r--;正确答案: D:\rw-r--r--; 1.5分2. (单选题)如果要列出一个目录下的所有......
  • SD8002D单声道功率放大器输入1KHZ5V电压驱动功率SOP8封装2.0V-5.5V
    SD8002D是一款AB类,单声道带关断模式,桥式音频功率放大器。在输入1KHz,5V工作电压时,最大驱动功率为:3W,(4Ω负载,总谐波失真<10%),2W,(4Ω负载,总谐波失真<1%);音频范围内总谐波失真噪音小于1%(20赫兹·20KHz);SD8002D应用电路简单,只需要极少数外围器件,就能提供高品质的输出功率。......
  • 【GPT应用】Python-GEE遥感大数据分析
    随着航空、航天、近地空间遥感平台的持续发展,遥感技术近年来取得显著进步。遥感数据的空间、时间、光谱分辨率及数据量均大幅提升,呈现出大数据特征。这为相关研究带来了新机遇,但同时也带来巨大挑战。传统的工作站和服务器已无法满足大区域、多尺度海量遥感数据处理需求。为解......