首页 > 编程语言 >【优分享】JMeter源码解析之结果收集器

【优分享】JMeter源码解析之结果收集器

时间:2023-02-27 16:32:39浏览次数:68  
标签:sample JMeter 收集器 queue 源码 result null


本文作者 优测性能测试专家高源。

简介:本文以最新的JMeter 5.5版本源代码为例详细介绍了单机模式和分布式模式下结果收集器的工作原理。通篇干货,还不快来了解一下!

 

一、JMeter结果收集器概述

JMeter是在压力领域中最常见的性能测试工具,由于其开源的特点,受到广大测试和开发同学的青睐。但是,在实际应用过程中,JMeter存在的一些性能瓶颈也凸显出来,经常会遇到大并发下压不上去的情况。笔者通过深入分析其源码实现,找到JMeter存在的瓶颈问题及根本原因,为以后更好地使用工具提供一些思路。

结果收集器:在JMeter中担任报告数据收集的重任,无论是单机模式还是master-slave模式,每一个请求的结果都是通过相应的结果收集器进行数据采集的。在单机模式下用Result Collector这个监听器去采集,在分布式(master-slave)场景下通过配RemoteSampleListenerWrapper下的指定sender进行收集,具体配置jmeter.property文件的mode属性和队列长度实现。下面我们以当前最新的JMeter 5.5版本的源代码为例详细介绍下单机模式和分布式模式下结果收集器的工作原理。

二、单机模式

1、初始化

在命令行模式下,JMeter会根据用户的logfile配置选择是否添加Result Collector,一般在实际测试的时候,我们都是需要有详细统计报告生成的,所以都会添加Result Collector,收集器放在了整个hashtree的第一个节点,代码如下:

 void runNonGui(String testFile, String logFile, boolean remoteStart, String remoteHostsString, boolean generateReportDashboard){

 ....

 ResultCollector resultCollector = null;

   if (logFile != null) {

     resultCollector = new ResultCollector(summariser);

     resultCollector.setFilename(logFile);

     clonedTree.add(clonedTree.getArray()[0], resultCollector);

     }

   else {

     // only add Summariser if it can not be shared with the ResultCollector

   if (summariser != null) {

      clonedTree.add(clonedTree.getArray()[0], summariser);

      }

      }

 ....

 

 }

2、加载流程

添加完结果收集器后,执行脚本过程中,JMeter会根据jmx的编排,按照如下的执行顺序进行调用:

 

【优分享】JMeter源码解析之结果收集器_jmeter

 

每一个线程都是按照以上的顺序循环反复执行,直到压测停止。具体代码如下(相应的关键点已增加注释):

private void executeSamplePackage(Sampler current,

      TransactionSampler transactionSampler,

      SamplePackage transactionPack,

      JMeterContext threadContext) {

  threadContext.setCurrentSampler(current);

  // Get the sampler ready to sample

  SamplePackage pack = compiler.configureSampler(current);

  runPreProcessors(pack.getPreProcessors());//运行前置处理器

  // Hack: save the package for any transaction controllers

  threadVars.putObject(PACKAGE_OBJECT, pack);

  delay(pack.getTimers());//定时器timer

  SampleResult result = null;

  if (running) {

       Sampler sampler = pack.getSampler();

       result = doSampling(threadContext, sampler);

   }

   // If we got any results, then perform processing on the result

   if (result != null) {

   if (!result.isIgnore()) {

          ...               

   runPostProcessors(pack.getPostProcessors());//运行后置处理器

   checkAssertions(pack.getAssertions(), result, threadContext);//运行断言处理器

            // PostProcessors can call setIgnore, so reevaluate here

            if (!result.isIgnore()) {

            // Do not send subsamples to listeners which receive the transaction sample

            List<SampleListener> sampleListeners = getSampleListeners(pack, transactionPack, transactionSampler);

            notifyListeners(sampleListeners, result);//执行监听器,此处为执行报告收集器的sampleOccurred方法

            }

            compiler.done(pack);

            ...

    }

 

 

收集器Result Collector执行的具体代码:

@Override

public void sampleOccurred(SampleEvent event) {

    SampleResult result = event.getResult();

    if (isSampleWanted(result.isSuccessful())) {

        sendToVisualizer(result);

        if (out != null && !isResultMarked(result) && !this.isStats) {

        SampleSaveConfiguration config = getSaveConfig();

        result.setSaveConfig(config);

        try {

               if (config.saveAsXml()) {

                   SaveService.saveSampleResult(event, out);

               } else { // !saveAsXml

                   CSVSaveService.saveSampleResult(event, out);

               }

          } catch (Exception err) {

              log.error("Error trying to record a sample", err); // should throw exception back to caller

           }

      }

  }

   if(summariser != null) {

       summariser.sampleOccurred(event);

   }

}

 

以上主要实现了将每个请求的结果数据存储到日志文件中(CSV /XML),为后续的报告生成提供数据文件。

3、性能瓶颈分析

从以上的流程不难看出,由于每个线程的每个请求后都会频繁调用Result Collector的sample Occurred方法,即会频繁读写文件,有可能导致IO瓶颈。一旦存储的速度下降,必然导致线程循环发包的速度下降,从而导致压不上去的情况出现。所以单机模式下不建议设置超过200以上的并发,若非必须,尽量关闭日志采集和html报告生成,以免报告置信度存在问题。

 

 

三、分布式模式

为了应对单机的各种瓶颈问题,JMeter采用了分布式(master-slave)模式。加载执行流程与单机基本一致,不再赘述,区别在于监听器换成了Remote Sample ListenerImpl收集器。

1、发送模式指定方法

下面我们重点看下Remote Sample ListenerImpl监听器的代码:

@Override

public void processBatch(List<SampleEvent> samples) {

    if (samples != null && sampleListener != null) {

        for (SampleEvent e : samples) {

            sampleListener.sampleOccurred(e);

        }

    }

}

@Override

public void sampleOccurred(SampleEvent e) {

    if (sampleListener != null) {

        sampleListener.sampleOccurred(e);

    }

}

 

从以上代码可以看出,这个监听器里又调用了sample Listener的sample Occurred方法,而sample Listener是通过用户在jmeter.property文件中指定的。

 

 

【优分享】JMeter源码解析之结果收集器_jmeter_02

 

2、AsynchSampleSender源码解析

下面我们以Asynch Sample Sender为例进行源码详细介绍:

public class AsynchSampleSender extends AbstractSampleSender implements Serializable {

       protected Object readResolve() throws ObjectStreamException{

        int capacity = getCapacity();

        log.info("Using batch queue size (asynch.batch.queue.size): {}", capacity); // server log file

        queue = new ArrayBlockingQueue<>(capacity);

        Worker worker = new Worker(queue, listener);

        worker.setDaemon(true);

        worker.start();

        return this;

    }

@Override

public void testEnded(String host)

    log.debug("Test Ended on {}", host);

    try {

        listener.testEnded(host);

        queue.put(FINAL_EVENT);

    } catch (Exception ex) {

        log.warn("testEnded(host)", ex);

    }

    if (queueWaits > 0) {

        log.info("QueueWaits: {}; QueueWaitTime: {} (nanoseconds)", queueWaits, queueWaitTime);

        }

    }

 @Override

public void sampleOccurred(SampleEvent e)

    try {

        if (!queue.offer(e)){ // we failed to add the element first time

            queueWaits++;

            long t1 = System.nanoTime();

            queue.put(e);

            long t2 = System.nanoTime();

            queueWaitTime += t2-t1;

        }

    } catch (Exception err) {

        log.error("sampleOccurred; failed to queue the sample", err);

    }

}

private static class Worker extends Thread {

    @Override

    public void run()

        try {

            boolean eof = false;

            while (!eof) {

                List<SampleEvent> l = new ArrayList<>();

                SampleEvent e = queue.take();

                // try to process as many as possible

                // The == comparison is not an error

                while (!(eof = e == FINAL_EVENT) && e != null) {

                     l.add(e);

                     e = queue.poll(); // returns null if nothing on queue currently

                 }

                int size = l.size();

                if (size > 0) {

                    try {

                       listener.processBatch(l);

                    } catch (RemoteException err) {

                        if (err.getCause() instanceof java.net.ConnectException){

                            throw new JMeterError("Could not return sample",err);

                        }

                        log.error("Failed to return sample", err);

                    }

                 }

            }

        } catch (InterruptedException e) {

            Thread.currentThread().interrupt();

            }

        log.debug("Worker ended");

        }

    }

}

 

 

从以上代码可以看出,Asynch SampleSender的sample Occurred方法里只进行入列的操作,而采集上报工作是启动了一个work线程实现的,相当于异步处理所有请求数据。这样设计不会阻塞发包的流程,性能上要优于单机模式。但是,在一定情况下,也是会出现性能瓶颈的。

这个队列采用的是Array Blocking Queue(阻塞队列),这个队列有如下特点:

·Array Blocking Queue是有界的初始化必须指定大小,队列满了后,无法入列。

·Array Blocking Queue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个Reenter Lock锁。

3、性能瓶颈分析

瓶颈点一:队列大小问题

当我们实际压测过程中,如果队列大小(asynch.batch.queue.size)设置过小,入列速度大于出列速度,就会导致队列满而阻塞整个发压流程,而如果队列设置过大,一旦请求的包体比较大,很容易造成内存溢出。

瓶颈点二:单一锁问题

在压测过程中,入列出列是非常频繁的,而同一个Reenter Lock锁也可能造成入列和出列过程中,因无法获得锁而入列或者出列延迟,继而影响发压效率。

四、总结

JMeter因其完善的社区和开源特点,在日常压测中可广泛使用。JMeter适合进行小规模的压测。但是在大规模的压测过程中,受本地机器性能、带宽等限制,不宜进行单机压测,可以使用JMeter的master-slave的方式进行分布式压测。但是需提前设置好结果收集器和队列的大小,并进行预先演练评估出上限qps,防止出现压不上去的情况。此外,master-slave通信方式是远程RMI的双向通信方式,连接数过多也会造成master的瓶颈出现,需要做好量级的提前评估。

 

*版权声明:本文作者 优测性能测试专家高源。

 

 

优测压力测试是一款在线云原生全链路压测平台,百万级并发即召即用。兼容JMeter脚本,一键上传即可随时发压,免去压测工具搭建成本。欢迎大家登录优测官网0元体验!

 

想了解更多压力测试知识、产品与服务,可扫下方二维码进群交流。

 

【优分享】JMeter源码解析之结果收集器_性能测试_03

联系我们

官网:

​https://utest.21kunpeng.com/home/perftest?from=baiduseo​

企业微信:

 

【优分享】JMeter源码解析之结果收集器_jmeter_04

 

标签:sample,JMeter,收集器,queue,源码,result,null
From: https://blog.51cto.com/u_15943164/6088632

相关文章

  • app直播源码,利用原生JS实现回到顶部以及吸顶效果
    app直播源码,利用原生JS实现回到顶部以及吸顶效果  <style>    .box1{      width:1200px;      height:800px;      ......
  • vue源码分析-动态组件
    前面花了两节的内容介绍了组件,从组件的原理讲到组件的应用,包括异步组件和函数式组件的实现和使用场景。众所周知,组件是贯穿整个Vue设计理念的东西,并且也是指导我们开发的......
  • vue源码分析-响应式系统(一)
    从这一小节开始,正式进入Vue源码的核心,也是难点之一,响应式系统的构建。这一节将作为分析响应式构建过程源码的入门,主要分为两大块,第一块是针对响应式数据props,methods,da......
  • Jmeter学习:插件
    第三方插件官方下载网址:https://jmeter-plugins.org/install/Install/第三方插件官方文档网址:https://jmeter-plugins.org/wiki/Start/插件安装过程如下:1、下载plugin......
  • slate源码解析(二)- 基本框架与数据模型
    源码架构首先来看下最核心的slate包下的目录:可以看到,作为一个开源富文本库,其源码是相当之少。在第一篇文章中说过,Slate没有任何开箱即用的功能,只提供给开发者用于构建富......
  • 【转】如何阅读源码
    为何要阅读源码在聊如何去阅读源码之前,先来简单说一下为什么要去阅读源码,大致可分为以下几点原因:最直接的原因,就是面试需要,面试喜欢问源码,读完源码才可以跟面试官battle......
  • 【RocketMQ】Dledger日志复制源码分析
    消息存储在【RocketMQ】消息的存储一文中提到,Broker收到消息后会调用CommitLog的asyncPutMessage方法写入消息,在DLedger模式下使用的是DLedgerCommitLog,进入asyncPutMess......
  • 【Mybatis】【配置文件解析】【四】Mybatis源码解析-mappers的解析一
    1 前言这节我们分析一个大头,也是我们平时写的最多的,就是我们写的增删改查了,我们来看下它的解析。既然MyBatis的行为已经由上述元素配置完了,我们现在就要来定义SQL......
  • python flask就业分析可视化系统(课设、毕设、学习、源码下载)
    pythonflask就业分析可视化系统基于Pythonflask职业可视化系统基于Pythonflask工作大数据可视化系统后端:python3 flask数据库:MySQL前端:html css js主要功能......