首页 > 其他分享 >kakfa发版丢消息事件分析

kakfa发版丢消息事件分析

时间:2024-06-21 17:32:16浏览次数:7  
标签:System kakfa 线程 事件 shutdown new 发版 executorService out

背景

其他部门同事反馈在项目发版/重启(kill -15)的那段时间,经常会出现导致 C 端业务出现问题,从而产生资损

一听资损,赶紧应答下来,了解了下具体情况,然后立马去排查了

问题分析

结合同事的描述以及对业务的了解,很快就定位到是 kafka 消息丢失导致 C 端业务出现问题

业务当前消费架构图


从上图可以了解到几个点会导致目前这个场景消息丢失

  1. kafka 一秒一次的位移提交
  2. Queue 队列没消费完任务
  3. work 线程池从 Queue 中拉取的任务没消费完(每次拉取一个)

问题所在:因C端业务特性,非准实时的消息是没有意义的(分钟级),所以kafka的自动提交位移实际上是符合业务需求,三点结合起来看问题应该是出在:在发版时 消费单线程 依旧在拉取消息写入 Queue,并且后续的 线程池也没有将 Queue中的任务给处理完

消费架构改造

  1. 改造消费流程
  2. 启动时增加JVM关闭钩子,在关闭前将 isRunning 修改为fale,从而停止 消费单线程 继续拉取kafka消息
  3. 优雅关闭 work线程池

// shutdown() 与 shutdownNow()这里也给到一段shutdown测试代码
ThreadPoolExecutor executorService =
    new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
AtomicInteger integer = new AtomicInteger();
for (int i = 0; i < 100; i++) {
    executorService.execute(() -> {
        try {
            System.out.println(new Date() + "=====>" + integer.incrementAndGet());
            Thread.sleep(1000L);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
}

Thread.sleep(5000L);
executorService.shutdown();
// executorService.shutdownNow();
System.out.println("线程池已触发shutdown");

随之而来的另一个问题,若在JVM关闭钩子中对 work线程池 操作shutdown,在任务中是有使用到Spring容器中的bean,若bean销毁了,那么work线程池中的任务都无法再执行成功(具体销毁优先级细则可自行百度,这里不做延伸)。
基于这个问题,回想到之前常用的一个注解 @PostConstruct 的一个孪生兄弟 @PreDestroy,这是在Java规范JSR-250引入的注解,定义了对象的创建和销毁工作,那么Spring必然对它有做支持,测试代码如下

ThreadPoolExecutor executorService =
        new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

@PostConstruct
public void postConstruct(){
    AtomicInteger integer = new AtomicInteger();
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            try {
                System.out.println(new Date() + "=====>" + integer.incrementAndGet());
                Thread.sleep(1000L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

@PreDestroy
public void preDestroy(){
    executorService.shutdown();
}

// 增加一个测试关闭的接口
@GetMapping("/shutdown")
public void shutdown() {
    System.exit(0);
}

测试结果依旧失败,看日志打印是正在处理线程池中已被接收的任务时挂掉的(这不科学,上面shutdown()测试案例结果明明会等待所有任务结束以后再结束),心里一群 草姓的马 飘过-_-

转念一想:其实这样也对,若一个池任务过多导致一直无法kill掉进程,这种行为也不对…那有没有什么补偿机制可以用,emm,山重水复疑无路,柳暗花明又一村哇,Doug Lea大神名不虚传,早就为我们考虑好了

// 贴出改动方法
@PreDestroy
public void preDestroy(){
    executorService.shutdown();
    try {
        if(executorService.awaitTermination(5, TimeUnit.SECONDS)){
            System.out.println("任务执行完毕结束");
        } else {
            System.out.println("time out 结束");
        }
    } catch (InterruptedException e) {
        System.out.println("Interrupted while waiting for executor");
        Thread.currentThread().interrupt();
        executorService.shutdownNow();
    }
}

嘿嘿,这么一改顺眼多了,线程池在shutdown后再至多等待N秒(若无任务则直接返回true),业务可以根据特性去决定此值配置


但是这么写多麻烦,那么多重要的线程池各个都要在这里写,那Spring如何实现线程池的优雅停的呢?想到Spring的生命周期中的 销毁回调,实现 DisposableBean 即可,那看看ThreadPoolTaskExecutor,其父类ExecutorConfigurationSupport在处理销毁时,会判定其 waitForTasksToCompleteOnShutdown 参数是否为true来决定是否要调用shutdown(),并且根据其 awaitTerminationSeconds 参数来决定是否需要调用 ExecutorService.awaitTermination 去等待线程池处理一定时间

那让我们来改造改造现在的work线程池,指定业务指定配置以后,交给spring去帮我们去做这些重复的销毁动作

写到最后

若使用Spring提供线程池,并指定以下两个参数即可实现线程池优雅停

  1. waitForTasksToCompleteOnShutdown 参数,在销毁时会帮我们调用一次线程池shutdown()
  2. awaitTerminationSeconds 参数,在调用shutdown以后可以等等一段时间,从而尽可能的将线程池中任务给执行完毕

ExecutorService.awaitTermination 虽好,可不要贪杯(滥用)哦,多个线程池都指定此参数并在销毁时都存在大量的任务,可能会导致 kill -15 的时间增加,从而出现一种 “kill不掉” 的现象

标签:System,kakfa,线程,事件,shutdown,new,发版,executorService,out
From: https://blog.csdn.net/weixin_44700876/article/details/139866011

相关文章

  • java微信公众平台----带参数二维码生成和扫描事件
    功能是在详情页面点击按钮,生成二维码。打开微信扫码,扫码之后手机跳转到公众号并发送一条模板消息。点击模板消息,跳转到H5的详情页面。参考推荐:https://blog.csdn.net/weixin_42720002/category_8977300.html官方文档:https://developers.weixin.qq.com/doc/offiaccount/Account_Ma......
  • 使用EventBus在Activity和fragment之间传递数据,出现post一次,却接收到多次对应事件
    背景项目中有一个activity,其中通过viewpager管理着多个页面,在activity操作某些数据时,通过eventbus将消息传递给fragment。该fragment中,分别在onViewCreated注册了eventbusif(!EventBus.getDefault().isRegistered(this)){EventBus.getDefault().register(this);}在onDes......
  • 一封伪造电子发票的邮件攻击事件分析
    一、事件简述4月6日,收到一份邮件,来自名为:云发票<[email protected]>的邮件,主题为:***账户支取发票,邮件内容称寄来开具的充值电子发票,具体内容如下:尊敬的用户***你好您于2023年4月6日开具的充值电子发票,票据信息如下:開票日期:2023年4月6日發票金额:150.00¥发票代碼:1440......
  • 6.2 事件的创建,修改和删除
    6.2.1事件的概述事件(Event)是在指定时刻才被执行的过程式数据库对象。事件通过MySQL中一个很有特色的功能模块——事件调度器(EventScheduler)进行监视,并确定其是否需要被调用。 MySQL的事件调度器可以精确到每秒钟执行一个任务,比操作系统的计划任务更具实时优势。对于......
  • 玄机——第六章 流量特征分析-常见攻击事件 tomcat wp
    文章目录一、前言二、概览简介三、参考文章步骤(分析)步骤#1.1在web服务器上发现的可疑活动,流量分析会显示很多请求,这表明存在恶意的扫描行为,通过分析扫描的行为后提交攻击者IPflag格式:flag{ip},如:flag{127.0.0.1}步骤#1.2找到攻击者IP后请通过技术手段确定其所在地......
  • 探索 v-on 之奥秘一:事件及各类修饰符的深度剖析与案例展示
    目录v-on介绍v-on事件修饰符e.stopPropagation()、.stop、.self​​​​​​​capture​​​​​​​e.preventDefault()、.prevent​​​​​​​.once​​​​​​​.passivev-on介绍        在Vue中,v-on乃是其所提供的用于事件绑定的一种机制,比如要......
  • JQuery高级29_事件绑定2
    一、jquery标准的绑定方式jq对象.事件方法(回调函数);注:如果调用事件方法,不传递回调函数,则会触发浏览器默认行为。表单对象.submit();//让表单提交<!DOCTYPEhtml><html><head><metacharset="UTF-8"><title>jquery标准的绑定方式</title......
  • 对 websocket 进行封装 (心跳检测 断开重连 发送事件等等 支持断开重连后发送上次发
    代码封装:  //websocketService.jsimport{ref}from"vue";const{DEV,VITE_APP_PUSH_WS}=import.meta.env;const{push_ws}=window.WEB_CONFIG;constbaseWsUrl=DEV?VITE_APP_PUSH_WS:push_ws;classWebSocketService{constructor(ur......
  • spring 使用 事件机制
    概述在编写代码的时候,比如我删除一篇文章,这个时候,如果我想做些额外的逻辑,这是就需要修改删除部分的代码。spring提供了事件机制更优雅的实现这个,用户只需要实现事件监听即可。代码实现注入发布者publicclassKnowledgeBaseServiceimplementsApplicationEventPublisherAwar......
  • react 自定义鼠标右键点击事件
    功能:鼠标右键点击节点时,出现“复制”功能,点击其他部位,隐藏“复制”;鼠标右键事件的文案,始终在鼠标点击位置的右下方;点击复制,提示复制成功效果图:代码:const[showRight,setShowRight]=useState(false);constcontextMenu=useRef(null);const[clickX,setClickX]=us......