首页 > 编程语言 >【Java 并发】【队列应用】【一】ArrayBlockingQueue 的使用-Logback异步日志打印

【Java 并发】【队列应用】【一】ArrayBlockingQueue 的使用-Logback异步日志打印

时间:2024-02-11 15:33:05浏览次数:34  
标签:异步 Java 队列 打印 线程 put ArrayBlockingQueue 日志 Logback

1  前言

看了那么多 Java 提供的队列工具,那么我们这节开始看看哪些地方用到了这些队列哈。

这一节我们讲解logback异步日志打印中ArrayBlockingQueue的使用。

2  异步日志打印模型概述

在高并发、高流量并且响应时间要求比较小的系统中同步打印日志已经满足不了需求 了,这是因为打印日志本身是需要写磁盘的,写磁盘的操作会暂时阻塞调用打印日志的业 务线程,这会造成调用线程的rt增加。如图11-1所示为同步日志打印模型。

同步日志打印模型的缺点是将日志写入磁盘的操作是业务线程同步调用完成的,那么 是否可以让业务线程把要打印的日志任务放入一个队列后直接返回,然后使用一个线程专门负责从队列中获取日志任务并将其写入磁盘呢?这样的话,业务线程打印日志的耗时就 仅仅是把日志任务放入队列的耗时了,其实这就是logback提供的异步日志打印模型要做 的事,具体如图11-2所示。

由图11-2可知,其实logback的异步日志模型是一个多生产者-单消费者模型,其通 过使用队列把同步日志打印转换为了异步,业务线程只需要通过调用异步appender把日志 任务放入日志队列,而日志线程则负责使用同步的appender进行具体的日志打印。日志打 印线程只需要负责生产日志并将其放入队列,而不需要关心消费线程何时把日志具体写入 磁盘。

3  异步日志与具体实现

3.1  异步日志

一般配置同步日志打印时会在logback的xml文件里面配置如下内容。

  //(1)配置同步日志打印appender 
    <appender name="PROJECT" class="ch.qos.logback.core.FileAppender"> 
        <file>project.log</file> 
        <encoding>UTF-8</encoding> 
        <append>true</append> 
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> 
            <!-- daily rollover -->
     <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern> 
            <!-- keep 7 days' worth of history --> 
            <maxHistory>7</maxHistory> 
        </rollingPolicy> 
        <layout class="ch.qos.logback.classic.PatternLayout"> 
            <pattern><![CDATA[ 
%n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method} 
%X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer}, 
ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n  %-5level %logger{35} - %m%n 
            ]]></pattern> 
        </layout> 
    </appender> 
    //(2) 设置logger 
    <logger name="PROJECT_LOGGER" additivity="false"> 
        <level value="WARN" /> 
        <appender-ref ref="PROJECT" /> 
    </logger>

然后以如下方式使用。

要把同步日志打印改为异步则需要修改logback的xml配置文件为如下所示。

<appender name="PROJECT" class="ch.qos.logback.core.FileAppender">
   <file>project.log</file>
   <encoding>UTF-8</encoding>
   <append>true</append>
   <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
       <!-- daily rollover -->
       <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern>
       <!-- keep 7 days' worth of history -->
       <maxHistory>7</maxHistory>
   </rollingPolicy>
   <layout class="ch.qos.logback.classic.PatternLayout">
       <pattern>
               <![CDATA[ 
           %n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method} 
           %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer}, 
           ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n  %-5level %logger{35} - %m%n 
                ]]>
       </pattern> 
   </layout> 
</appender> 
<appender name="asyncProject" class="ch.qos.logback.classic.AsyncAppender"> 
   <discardingThreshold>0</discardingThreshold> 
   <queueSize>1024</queueSize> 
   <neverBlock>true</neverBlock> 
   <appender-ref ref="PROJECT" /> 
</appender> 
<logger name="PROJECT_LOGGER" additivity="false"> 
   <level value="WARN" /> 
   <appender-ref ref="asyncProject" /> 
</logger>

由以上代码可以看出,AsyncAppender是实现异步日志的关键,下一节主要讲它的内 部实现。

3.2  异步日志实现原理

本文使用的logback-classic 的版本为1.0.13。 我们首先从AsyncAppender的类图结构 来认识下AsyncAppender的组件构成,如图11-3所示。

由图11-3可知,AsyncAppender 继承自AsyncAppenderBase,其中后者具体实现了异 步日志模型的主要功能,前者只是重写了其中的一些方法。由该图可知,logback中的异 步日志队列是一个阻塞队列,其实就是有界阻塞队列ArrayBlockingQueue,其中queueSize 表示有界队列的元素个数,默认为256个。

worker 是个线程,也就是异步日志打印模型中的单消费者线程。aai是一个appender 的装饰器,里面存放同步日志的appender,其中appenderCount记录aai里面附加的同 步appender 的个数。neverBlock 用来指示当日志队列满时是否阻塞打印日志的线程。 discardingThreshold 是一个阈值,当日志队列里面的空闲元素个数小于该值时,新来的某 些级别的日志会被直接丢弃,下面会具体讲。

首先我们来看何时创建日志队列,以及何时启动消费线程,这需要看 AsyncAppenderBase 的 start 方法。该方法在解析完配置AsyncAppenderBase 的xml的节点 元素后被调用。

public void start() { 
    ...
    //(1)日志队列为有界阻塞队列 
    blockingQueue = new ArrayBlockingQueue<E>(queueSize);
    //(2)如果没设置discardingThreshold则设置为队列大小的1/5 
    if (discardingThreshold == UNDEFINED)
        discardingThreshold = queueSize / 5;
    //(3)设置消费线程为守护线程,并设置日志名称 
    worker.setDaemon(true);
    worker.setName("AsyncAppender-Worker-" + worker.getName());
    //(4)设置启动消费线程 
    super.start();
    worker.start();
}

由以上代码可知,logback使用的是有界队列ArrayBlockingQueue,之所以使用有界 队列是考虑内存溢出问题。在高并发下写日志的QPS会很高,如果设置为无界队列,队 列本身会占用很大的内存,很可能会造成OOM。

这里消费日志队列的worker线程被设置为守护线程,这意味着当主线程运行结束并 且当前没有用户线程时,该worker线程会随着JVM的退出而终止,而不管日志队列里面 是否还有日志任务未被处理。另外,这里设置了线程的名称,这是个很好的习惯,因为在 查找问题时会很有帮助,根据线程名字就可以定位线程。

既然是有界队列,那么肯定需要考虑队列满的问题,是丢弃老的日志任务,还是阻塞 日志打印线程直到队列有空余元素呢?要回答这个问题,我们需要看看具体进行日志打印 的AsyncAppenderBase 的 append 方法。

protected void append(E eventObject) {
    //(5)调用AsyncAppender重写的isDiscardable方法
    if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
        return;
    }
    ...
    //(6)将日志任务放入队列
    put(eventObject);
}
private boolean isQueueBelowDiscardingThreshold() {
    return (blockingQueue.remainingCapacity() < discardingThreshold);
}

其中代码(5)调用了AsyncAppender重写的isDiscardable 方法,该方法的具体内容为

//(7) 
protected boolean isDiscardable(ILoggingEvent event) {
    Level level = event.getLevel();
    return level.toInt() <= Level.INFO_INT;
}

结合代码(5)和代码(7)可知,如果当前日志的级别小于等于INFO_INT并且当前队 列的剩余容量小于discardingThreshold 则会直接丢弃这些日志任务。

下面看具体代码(6)中的put方法。

private void put(E eventObject) {
    //(8)
    if (neverBlock) {
        blockingQueue.offer(eventObject);
    } else {
        try {//(9)
            blockingQueue.put(eventObject);
        } catch (InterruptedException e) {
            // Interruption of current thread when in doAppend method should not
            be consumed
            // by AsyncAppender
            Thread.currentThread().interrupt();
        }
    }
}

如果neverBlock 被设置为false(默认为false)则会调用阻塞队列的put方法,而put 是阻塞的,也就是说如果当前队列满,则在调用put方法向队列放入一个元素时调用线程 会被阻塞直到队列有空余空间。这里可以看下put方法的实现。

public void put(E e) throws InterruptedException { 
    ...
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //如果队列满,则调用await方法阻塞当前调用线程 
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

这里有必要解释下代码(9),当日志队列满时put方法会调用await()方法阻塞当前线 程,而如果其他线程中断了该线程,那么该线程会抛出InterruptedException异常,并且当 前的日志任务就会被丢弃。在logback-classic的1.2.3版本中,则添加了不对中断进行响应 的方法。

private void put(E eventObject) {
    if (neverBlock) {
        blockingQueue.offer(eventObject);
    } else {
        putUninterruptibly(eventObject);
    }
}
private void putUninterruptibly(E eventObject) {
    boolean interrupted = false;
    try {
        while (true) {
            try {
                blockingQueue.put(eventObject);
                break;
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
    } finally {
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }
}

如果当前日志打印线程在调用 blockingQueue.put时被其他线程中断,则只是记录中断 标志,然后继续循环调用 blockingQueue.put,尝试把日志任务放入日志队列。新版本的这 个实现通过使用循环保证了即使当前线程被中断,日志任务最终也会被放入日志队列。

如果neverBlock 被设置为true则会调用阻塞队列的offer方法,而该方法是非阻塞的, 所以如果当前队列满,则会直接返回,也就是丢弃当前日志任务。这里回顾下offer方法 的实现。

public boolean offer(E e) { 
    ...
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列满则直接返回false。 
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

最后来看addAppender方法都做了什么。

public void addAppender(Appender<E> newAppender) {
    if (appenderCount == 0) {
        appenderCount++;
        ...
        aai.addAppender(newAppender);
    } else {
        addWarn("One and only one appender may be attached to AsyncAppender.");
        addWarn("Ignoring additional appender named [" + newAppender.getName() + "]");
    }
}

由如上代码可知,一个异步appender只能绑定一个同步appender。这个appender会 被放到AppenderAttachableImpl 的 appenderList 列表里面。

到这里我们已经分析完了日志生产线程把日志任务放入日志队列的实现,下面一起来 看消费线程是如何从队列里面消费日志任务并将其写入磁盘的。由于消费线程是一个线程, 所以就从worker的run方法开始。

class Worker extends Thread {
    public void run() {
        AsyncAppenderBase<E> parent = AsyncAppenderBase.this;
        AppenderAttachableImpl<E> aai = parent.aai;
        //(10)一直循环直到该线程被中断
        while (parent.isStarted()) {
            try {//(11)从阻塞队列获取元素
                E e = parent.blockingQueue.take();
                aai.appendLoopOnAppenders(e);
            } catch (InterruptedException ie) {
                break;
            }
        }
        //(12)到这里说明该线程被中断,则把队列里面的剩余日志任务
        //刷新到磁盘
        for (E e : parent.blockingQueue) {
            aai.appendLoopOnAppenders(e);
            parent.blockingQueue.remove(e);
        }
        ...
    }
}

其中代码(11)使用take方法从日志队列获取一个日志任务,如果当前队列为空则当 前线程会被阻塞直到队列不为空才返回。获取到日志任务后会调用AppenderAttachableImpl 的aai.appendLoopOnAppenders方法,该方法会循环调用通过addAppender注入的同步日志,appener 具体实现把日志打印到磁盘。

3.3  延申-@Slf4j 注解

看了异步日志要在 logback-spring.xml 中配置上我们的异步 appender,以及整个的异步处理过程原理哈。我们回忆下平时我们写日志,是不是在类上加一个 @Slf4j 的注解,就可以 log.info等等进行写日志啦,那么它的原理是什么呢?我们这里小小的引申一下。

首先我们要知道这个注解是 lombok 包里的:

import lombok.extern.slf4j.Slf4j;

lombok 提供了大量的编译时注解,比如 @Dara @Getter等,都是在编译时帮我们直接生成代码的,@Slf4j 同样也是如此,大家可以看一下编译后的文件:

它的原理这里简单解释一下也是基于java 提供的一个类似钩子函数的东西,它的核心类是 AbstractProcessor,之前我有一篇博客也写过这个哈。而 lombok 利用 java spi的机制,从而引入自己的 Processor,从而进行自己的处理,比如扫描类上的注解,继而修改 .class信息达到编译后的效果。

具体的细节过程这里就不看了哈。

4  小结

本节结合logback 中异步日志的实现介绍了并发组件ArrayBlockingQueue的使 用,包括put、offer 方法的使用场景以及它们之间的区别,take方法的使用,同时也 介绍了如何使用ArrayBlockingQueue 来实现一个多生产者-单消费者模型。另外使用 ArrayBlockingQueue 时需要注意合理设置队列的大小以免造成OOM,队列满或者剩余元 素比较少时,要根据具体场景制定一些抛弃策略以避免队列满时业务线程被阻塞。

标签:异步,Java,队列,打印,线程,put,ArrayBlockingQueue,日志,Logback
From: https://www.cnblogs.com/kukuxjx/p/18013361

相关文章

  • 【Java 并发】【十】【JUC数据结构】【十】PriorityBlockingQueue 原理
    1 前言这节我们继续看看另一个队列 PriorityBlockingQueue,优先级的哈。2 PriorityBlockingQueue介绍PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证有序。默认使......
  • 【Azure Function】Azure Function中使用 Java 8 的安全性问题
    问题描述使用AzureFunction,环境是Linux的Java8。目前OracleJavaJDK8,11,17和OpenJDK8/11/17都在存在漏洞受影响版本的范围内。OpenJDK                 CVEnumbers:    CVE‑2023‑21954CVE‑2023‑21938CVE‑2023‑21937CVE......
  • 【Java 并发】【十】【JUC数据结构】【九】ConcurrentLinkedQueue 原理
    1 前言JDK中提供了一系列场景的并发安全队列。总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列,前者使用锁实现,而后者则使用CAS非阻塞算法实现。这节我们来看看 ConcurrentLinkedQueue。2 ConcurrentLinkedQueue介绍ConcurrentLinkedQueue是线程安全的无界非阻......
  • Java之泛型系列--继承父类与实现多个接口(有示例)
    原文网址:​​Java之泛型系列--继承父类与实现多个接口(有示例)_IT利刃出鞘的博客-CSDN博客​​简介本文介绍java如何用泛型表示继承父类并实现多个接口。用泛型表示某个类是某个类的子类或者实现了接口的方法为:<TextendsA&B&C> 用法1:全都是接口。对于本例来说:A、B......
  • JAVA中this和super
    thisthis表示使用的对象本身,可以调用对象的属性和对象的方法以及对象的构造方法(this.x和this.(),其中this.()只能在构造方法的第一行调用且不能和super.()共同使用)使用原因避免属性和方法变量名相同时出现就近原则的冲突使用细节supersuper代表父类的引用,用于访问父类......
  • JAVA构造方法
    构造方法介绍语法使用细节关于在继承中新增的构造方法使用细节1子类必须要调用父类的构造器,完成父类的初始化2父类构造器的调用不限于直接父类!将一直往上追溯直到Object类(顶级父类)3当创建子类对象时,不管使用的是子类的哪个构造器,默认情况下总会调用父类......
  • JAVA的4种访问修饰符
    1、基本介绍补充:1属性和方法可以用四种访问修饰符修饰;类只能用public和默认修饰符修饰,且一个.java文件中只能有一个public修饰的类作为主类,其他类用默认修饰符修饰。2访问权限起作用的情况:①在一个类中定义另一个类的对象,当访问该对象的属性或方法时,修饰符根据同类、同......
  • java中使用opencl操作GPU
    需要管理GPU资源,使用java编写,选用opencl框架,并且选择org.jocl包(<dependency><groupId>org.jocl</groupId><artifactId>jocl</artifactId><version>2.0.5</version></dependency>)。具体opencl原理此处不涉及,仅记录使用java该如何做基本操作。最少要以下几步,详细可以参看:ht......
  • JAVA零钱通面向过程和oop
    编程思想1.每一个小功能对应一个代码块,高内聚低耦合。2.建议先排除不正确情况,而不是对每一个正确情况做一些操作。编程效果源码实现面向过程点击查看代码packagelingqiantong.chuantong;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.uti......
  • Java中String、StringBuffer、StringBuilder的区别以及使用场景总结
    Java中,String、StringBuffer和StringBuilder都用于处理字符串,但在功能和性能上有显著的区别。了解这些区别有助于选择最适合特定情境的类型。在选择使用String、StringBuffer或StringBuilder时,应根据字符串操作的性能需求和线程安全要求来做出决定。1、String、StringBuffer、......