首页 > 其他分享 >记录一下 ArrayBlockingQueue 消息堆积的问题

记录一下 ArrayBlockingQueue 消息堆积的问题

时间:2024-01-20 15:33:30浏览次数:26  
标签:记录 线程 take logInfo ArrayBlockingQueue 日志 public 堆积


前言

由于之前这个系统的日志记录是被领导要求写表的,在不影响系统性能的前提下,日志的入库操作肯定是要改成异步进行的,当时利用 ArrayBlockingQueue+线程+AOP 简单的去实现了一下,但是初版代码测试下来发现了一个很严重的问题,就是日志丢失的问题,本文由此而来。

初步构思

代码实现逻辑实现很简单,利用 AOP 切面去记录用户的行为,最终调用对应的 DAO 去入库日志,切面如下。

/**
 * zzh:日志切面优先级第一
 */
@Slf4j
@Aspect
@Order(-999)
@Component
public class AspectLog {
    @Autowired
    private LogInfoService logInfoService;

    @Pointcut("execution(public * com.example.oraceldemo.controller..*(..)))")
    public void webPointCut() {
    }

    @Around("webPointCut()")
    public Object arround(ProceedingJoinPoint pjp) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        LogInfo logInfo = new LogInfo();
        logInfo.setOpTime(sdf.format(System.currentTimeMillis()))
                .setIp(request.getRemoteAddr())
                .setMethod(request.getRequestURL().toString());
        try {
            long startTime = System.currentTimeMillis();
            Object o = pjp.proceed();
            long endTime = System.currentTimeMillis();
            StringBuilder sb = new StringBuilder();
            sb.append("   ###请求URL: " + request.getRequestURL().toString());
            sb.append("   ###IP: " + request.getRemoteAddr());
            sb.append("   ###Params: " + Arrays.toString(pjp.getArgs()));
            sb.append("   ###CLASS_METHOD: " + pjp.getSignature().getDeclaringTypeName() + "." + pjp.getSignature().getName());
            sb.append("   ###耗时: " + (endTime - startTime) + "毫秒");
            log.info(sb.toString());
            logInfo.setStartTime(sdf.format(startTime))
                    .setEndTime(sdf.format(endTime))
                    .setReturnValue(JSONObject.toJSONString(o.toString()));
            logInfoService.put(logInfo);
            return o;
        } catch (Throwable e) {
            e.printStackTrace();
            logInfo.setErrors(JSONObject.toJSONString(e));
            log.error(e.toString());
            logInfoService.put(logInfo);
            return null;
        }
    }
}

DAO层代码

由于之前看过 NACOS 心跳健康监测的源码,感觉代码架构玩的比较 6 ,于是按照他那个设计思路,异步的将日志记录操作抽离出来,在项目启动的时候创建一个日志任务(可用线程池去优化),每当有日志需要记录的时候,就把日志对象堆积到 Notifier 里面,里面由一个阻塞队列去维护。去真正的执行入库日志的逻辑。

/**
 * <p>
 * 服务实现类
 * </p>
 *
 * @author 张子行
 * @since 2022-10-26
 */
@Service
public class LogInfoServiceImpl extends ServiceImpl<LogInfoMapper, LogInfo> implements LogInfoService {
    private notifier notifier = new Notifier();

    @Override
    public void put(LogInfo logInfo) {
        notifier.addTask(logInfo);
    }

    @PostConstruct
    private void init() {
        new Thread(notifier).start();
    }

    @Override
    public boolean save(LogInfo entity) {
        return super.save(entity);
    }
}

ArrayBlockingQueue发生堆积的地方

Notifier 中的 addTask 方法源源不断的提供需要写入的日志,当阻塞队列不为空的时候就会被唤醒,取出队列中的日志进行 db 插入操作。

@Slf4j
public class Notifier implements Runnable {
    private BlockingQueue<LogInfo> tasks = new ArrayBlockingQueue<>(1024 * 1024);

    public void addTask(LogInfo logInfo) {
        log.info("Notifier tasks.add() begin,{}", logInfo.getIp());
        tasks.add(logInfo);
        log.info("Notifier tasks.add() end,{}", logInfo.getIp());
    }

    /**
     * Runnable不能抛出异常,抛出异常线程无法执行!因此需要自己内部消化
     */
    @Override
    public void run() {
        for (; ; ) {
            try {
                log.info("BlockingQueue tasks take begin...");
                LogInfo logInfo = tasks.take();
                log.info("BlockingQueue tasks take end...{}"+logInfo.toString());
                LogInfoServiceImpl logInfoService = WebUtil.getBean(LogInfoServiceImpl.class);
                logInfoService.save(logInfo);
            } catch (InterruptedException e) {
                log.error("Notifier BlockingQueue tasks take InterruptedException, Thread interrupt status{}" + Thread.currentThread().isInterrupted());
            } catch (Exception e) {
                log.error("Notifier logInfoService save exceptionType :{},msg: {}", e.getClass(), e.getMessage());
            }
        }
    }
}

初版的代码如下

只对31行代码做了异常catch,而 33-34 行可能会出现一些 SQL 异常,但是之前我并未做异常catch 处理,然后 当 33-34 行出现异常的时候直接抛出到了 Runable 中去了,导致此线程运行终结,(这也是 Runable的一大特性了 )。导致消息没被正常消费,于是乎 ArrayBlockingQueue 中的消息越来越多,就有问题了。解决办法加一层 catch 捕获所有类型的异常,也就是在 Runable 中吞掉异常,或者手动捕获 33-34 行的异常,设置线程的中断状态,让其他业务感知到此线程被中断执行了,自定义处理逻辑。等等

记录一下 ArrayBlockingQueue 消息堆积的问题_队列

问题排查思路

起初我还以为是 InterruptedException 导致的线程中断,于是乎去看了一下 ArrayBlockingQueue 中的 put 与 take 方法的源码,也算有点收获,接下来也分析一下吧。记录于下文的面试专栏中。

面试专栏

为什么当 BlockingQueue 中没有消息的时候 take 方法会阻塞当前线程?

利用了 ReentrantLock 中的条件锁(Condition)实现的, ArrayBlockingQueue 里面维护了俩个 Condition 分别是 notEmpty 与 notFull ,当 ArrayBlockingQueue 阻塞队列进行 take 的时候,且队列中没有元素的时候会进行调用 notEmpty.await(); 方法阻塞当前线程,当 ArrayBlockingQueue 阻塞队列进行 add 的时候,会去调用 notEmpty.signal(); 去唤醒当前线程。加元素的时候去唤醒线程,取元素且元素数量为0的时候会去阻塞当前线程。这也是为什么 ArrayBlockingQueue 被称之为阻塞队列的原因。对应回答的源码剖析如下。

记录一下 ArrayBlockingQueue 消息堆积的问题_开发语言_02


ArrayBlockingQueue take核心源码,注意里面用到的是 ReentrantLock 中的 lockInterruptibly 方法(加锁可中断),当线程状态为中断状态时候,lock.lockInterruptibly() 这行代码就会抛出 InterruptedException 异常,具体的 ReentrantLock 为可中断锁的源码可以去看 浅聊一下,可中断锁(ReentrantLock),这也是为什么调用 take 方法的时候必须要去捕获一下 InterruptedException 异常的原因了。

记录一下 ArrayBlockingQueue 消息堆积的问题_ide_03


ArrayBlockingQueue add里面的核心代码,里面调用了 notEmpty.signal(); 这行代码,去唤醒当前线程。

记录一下 ArrayBlockingQueue 消息堆积的问题_java_04


标签:记录,线程,take,logInfo,ArrayBlockingQueue,日志,public,堆积
From: https://blog.51cto.com/u_16414043/9345740

相关文章

  • 2024-01-20:用go语言,小扣在探索丛林的过程中,无意间发现了传说中“落寞的黄金之都“, 而
    2024-01-20:用go语言,小扣在探索丛林的过程中,无意间发现了传说中"落寞的黄金之都",而在这片建筑废墟的地带中,小扣使用探测仪监测到了存在某种带有「祝福」效果的力场,经过不断的勘测记录,小扣将所有力场的分布都记录了下来,forceField[i]=[x,y,side],表示第i片力场将覆盖以坐标......
  • DC-4靶机做题记录
    靶机下载地址:链接:https://pan.baidu.com/s/1YbPuSw_xLdkta10O9e2zGw?pwd=n6nx提取码:n6nx参考:【【基础向】超详解vulnhub靶场DC-4-爆破+反弹shell+信息收集】https://www.bilibili.com/video/BV1Le4y1o7Sx/?share_source=copy_web&vd_source=12088c39299ad03109d9a21304b3......
  • 记录--前端实现翻转图像
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助翻转图像是在视觉上比较两个不同图像的常用方法。单击其中一个将翻转它,并显示另一个图像。布局布局结构如下:<divclass="flipping-images"><divclass="flipping-images__inner"><divclass="flip......
  • 假期学习记录04
    学习了Scala语言的面向对象方面的知识类的定义classCounter{privatevarvalue=0defincrement():Unit={value+=1}defcurrent():Int={value}}若只有一行语句,可以写成classCounter{privatevarvalue=0defincrem......
  • 假期学习记录03
    继续学习了scala语言数据结构:容器列表LIst不可变对象序列,一旦进行初始化,后不可以在被修改进行初始化在已有列表前端添加元素,通过::进行实现需要注意的是。这不会进行修改操作,而是直接生成了另一个List集合不重复元素的集合,包括可变集合和不可变集合若进行导包,导入muta......
  • 【测试自动化覆盖率】记录统计自动化的工具testrail 如何实现自动统计覆盖率
        点击编辑来到这个页面 点击自己想要统计的testplan里面的用例选择selectcases   先选择右边的过滤所有Automated 为yes的tag,然后在底下点击确定 在左边呈现的就是出现的  取消不要的用例  ......
  • DC-3靶机做题记录
    靶机下载地址:链接:https://pan.baidu.com/s/1-P5ezyt5hUbmmGMP4EI7kw?pwd=rt2c提取码:rt2c参考:http://t.csdnimg.cn/hhPi8https://www.vulnhub.com/entry/dc-32,312/官网http://t.csdnimg.cn/5mVZ7DC-3(1).pdfhttps://c3ting.com/archives/vulnhnbshua-ti---dc-3......
  • CTF学习记录pwn篇
    作为一个CTF初学者,在这里记录自己学习刷题的过程,不定期更新。此为pwn篇,有关pwn方向的题目会放在这里,目前来说,这篇会主要更新,其他方向也许会有,敬请期待。目录一、formatstring1.[HUBUCTF2022新生赛]fmt-NSSCTF2.test_format-PolarD&N二、ROP1.03ret2syscall_32-PolarD&N......
  • 寒训1.11记录
    AtCoderBeginnerContest272-AtCoderA-IntegerSum(赛题目概述求和B-EveryoneisFriends(赛题目概述二维数组记录相互关系即可,数据量少可用\(O(n^3)\)C-MaxEven(赛题目概述从非负整数数列中找出和为偶数,且和最大的两个数解题思路遍历数列,把偶数和奇数放入两个向......
  • BeanUtils 的 copyProperties 踩坑记录
    代码示例importorg.apache.commons.beanutils.BeanUtils;publicclassTestBeanUtils{publicstaticvoidmain(String[]args)throwsException{testApacheBeanUtils();testSpringBeanUtils();}privatestaticvoidtestSpringBeanUtil......