首页 > 编程语言 >扒开源安卓性能测试工具moblieperf源码——开发属于你自己的性能稳定性测试工具

扒开源安卓性能测试工具moblieperf源码——开发属于你自己的性能稳定性测试工具

时间:2024-02-07 17:44:21浏览次数:35  
标签:cpuinfo 性能 源码 button time 测试工具 cold self result

moblieperf下载和使用

moblieperf由阿里巴巴开源的Android性能测试工具

下载:官方源码地址mobileperf github

使用:

  1. 使用pycharm打开下载的项目
  2. 使用只需要修改配置文件config.conf即可
  3. 运行采集:a.mac、linux 在mobileperf工具根目录下执行sh run.sh ; b.windows 双击run.bat

配置图:(简单使用只需要修改包名和设备序列号即可)

 源码阅读

原来我们主要阅读我们想实现功能是如何实现的(cpu,内存)

我们先从启动类StartUp中run方法看起:

for i in range(0,5):
    if self.device.adb.is_connected(self.serialnum):
        is_device_connect = True
        break
    else:
        logger.error("device not found:"+self.serialnum)
        time.sleep(2)

 self.serialnum就是配置表中的设备序列号,这一段是用过ADB类中的静态方法is_connected实现的,我们来看一下他们是怎么检查配置中的手机是否链接

@staticmethod
def is_connected(device_id):
    '''
                检查设备是否连接上
    '''
    if device_id in ADB.list_device():
        return True
    else:
        return False
    @staticmethod
    def list_device():
        '''获取设备列表

        :return: 返回设备列表
        :rtype: list
        '''
        proc = subprocess.Popen("adb devices", stdout=subprocess.PIPE, shell=True)
        result = proc.stdout.read()
        if not isinstance(result, str):
            result = result.decode('utf-8')
        result = result.replace('\r', '').splitlines()
        logger.debug("adb devices:")
        logger.debug(result)
        device_list = []
        for device in result[1:]:
            if len(device) <= 1 or not '\t' in device: continue
            if device.split('\t')[1] == 'device':
                # 只获取连接正常的
                device_list.append(device.split('\t')[0])
        return device_list

通过这一段代码我们可以发现,mobileperf是基于adb命令去读取相应的信息,而他的使用是通过subprocess类中的Popen去操作adb命令,通过adb devices命令判断是否已经链接成功

注:subprocess是python标准库兼容性较好,且是非阻塞性执行,在输入输出的拓展性、错误处理都比较健全完善,后面会在提到这个方法使用。

  # 对是否安装被测app的检查 只在最开始检查一次
        if not self.device.adb.is_app_installed(self.packages[0]):
            logger.error("test app not installed:" + self.packages[0])
            return
        try:
            #初始化数据处理的类,将没有消息队列传递过去,以便获取数据,并处理
            # datahandle = DataWorker(self.get_queue_dic())
            # 将queue传进去,与datahandle那个线程交互
            self.add_monitor(CpuMonitor(self.serialnum, self.packages, self.frequency, self.timeout))
            self.add_monitor(MemMonitor(self.serialnum, self.packages, self.frequency, self.timeout))

这里我们节选了一部分代码,当我们知道他基本实现逻辑其实我们大致也可以猜到is_app_installed判断app是否安装他是如何实现的!应该也是通过adb命令去寻找已安装的包是否存在

    def is_app_installed(self, package):
        '''
        判断app是否安装
        '''
        if package in self.list_installed_app():
            return True
        else:
            return False
    def list_installed_app(self):
        '''
                        获取已安装app列表
        :return: 返回app列表
        :rtype: list
        '''
        result = self.run_shell_cmd('pm list packages')
        result = result.replace('\r', '').splitlines()
        logger.debug(result)
        installed_app_list = []
        for app in result:
            if not 'package' in app: continue
            if app.split(':')[0] == 'package':
                # 只获取连接正常的
                installed_app_list.append(app.split(':')[1])
        logger.debug(installed_app_list)
        return installed_app_list

正如我们所料,他是用过命令adb shell pm list packages 返回的接口进行切割拿到包名,去判断需要测试包名是否在其中

那么他是采集和收集内存/cpu等信息是不是也是这样实现的呢,答案是肯定的

        end_time = time.time() + self._timeout
        cpu_title = ["datetime", "device_cpu_rate%", "user%", "system%","idle%"]
        cpu_file = os.path.join(RuntimeData.package_save_path, 'cpuinfo.csv')
        for i in range(0, len(self.packages)):
            cpu_title.extend(["package", "pid", "pid_cpu%"])
        if len(self.packages) > 1:
            cpu_title.append("total_pid_cpu%")
        try:
            with open(cpu_file, 'a+') as df:
                csv.writer(df, lineterminator='\n').writerow(cpu_title)
        except RuntimeError as e:
            logger.error(e)
        while not self._stop_event.is_set() and time.time() < end_time:
            try:
                logger.debug("---------------cpuinfos, into _collect_package_cpu_thread loop thread is : " + str(threading.current_thread().name))
                before = time.time()
                #为了cpu值的准确性,将采集的时间间隔放在top命令中了
                cpu_info = self._top_cpuinfo()
                after = time.time()
                time_consume = after - before
                logger.debug("  ============== time consume for cpu info : "+str(time_consume))

这里我们截取了一段收集cpu的代码,可以发现他是通过一个while循环去调用_top_cpuinfo方法

    def _top_cpuinfo(self):
        self._top_pipe = self.device.adb.run_shell_cmd(self.top_cmd, sync=False)
        out = self._top_pipe.stdout.read()
        error = self._top_pipe.stderr.read()

我们接着往下看

    def run_shell_cmd(self, cmd, **kwds):
        '''执行 adb shell 命令
        '''
        # 如果失去连接后,adb又正常连接了
        if not self.before_connect and self.after_connect:
            cpu_uptime_file = os.path.join(RuntimeData.package_save_path, "uptime.txt")
            with open(cpu_uptime_file, "a+",encoding = "utf-8") as writer:
                writer.write(TimeUtils.getCurrentTimeUnderline() + " /proc/uptime:" + self.run_adb_cmd("shell cat /proc/uptime") + "\n")
            self.before_connect = True
        ret = self.run_adb_cmd('shell', '%s' % cmd, **kwds)
        # 当 adb 命令传入 sync=False时,ret是Poen对象
        if ret == None:
            logger.error(u'adb cmd failed:%s ' % cmd)
        return ret

不难发现,最终依旧是调用的run_adb_cmd方法去执行的adb命令,最后把收集到数据写入对应文件中,看到这里我们大概可以自己实现一下了

开发自己的安卓性能工具

注:这里工具页面开发我们使用gui开发工具pyside6

前面我们提到了标准库subprocess这里介绍一下我们会用到的两个方法communicate从子进程的stdoutstderr读取数据。这个方法会阻塞主程序,直到子进程完成,poll检查子进程是否结束,如果子进程正常退出,输出应该为 0;否则为非0值

页面简单拖拽

 ui文件转换py文件

pyside6-uic <ui文件名>.ui -o <py文件名>.py
# -*- coding: utf-8 -*-

################################################################################
## Form generated from reading UI file 'performanceTools.ui'
##
## Created by: Qt User Interface Compiler version 6.6.1
##
## WARNING! All changes made in this file will be lost when recompiling UI file!
################################################################################

from PySide6.QtCore import (QCoreApplication, QDate, QDateTime, QLocale,
    QMetaObject, QObject, QPoint, QRect,
    QSize, QTime, QUrl, Qt)
from PySide6.QtGui import (QBrush, QColor, QConicalGradient, QCursor,
    QFont, QFontDatabase, QGradient, QIcon,
    QImage, QKeySequence, QLinearGradient, QPainter,
    QPalette, QPixmap, QRadialGradient, QTransform)
from PySide6.QtWidgets import (QApplication, QHBoxLayout, QLabel, QLineEdit,
    QPushButton, QSizePolicy, QVBoxLayout, QWidget)

class Ui_Form(object):
    def setupUi(self, Form):
        if not Form.objectName():
            Form.setObjectName(u"Form")
        Form.resize(400, 300)
        self.horizontalLayoutWidget = QWidget(Form)
        self.horizontalLayoutWidget.setObjectName(u"horizontalLayoutWidget")
        self.horizontalLayoutWidget.setGeometry(QRect(20, 90, 201, 80))
        self.horizontalLayout = QHBoxLayout(self.horizontalLayoutWidget)
        self.horizontalLayout.setObjectName(u"horizontalLayout")
        self.horizontalLayout.setContentsMargins(0, 0, 0, 0)
        self.label = QLabel(self.horizontalLayoutWidget)
        self.label.setObjectName(u"label")

        self.horizontalLayout.addWidget(self.label)

        self.packagename = QLineEdit(self.horizontalLayoutWidget)
        self.packagename.setObjectName(u"packagename")

        self.horizontalLayout.addWidget(self.packagename)

        self.verticalLayoutWidget = QWidget(Form)
        self.verticalLayoutWidget.setObjectName(u"verticalLayoutWidget")
        self.verticalLayoutWidget.setGeometry(QRect(230, 30, 160, 231))
        self.verticalLayout = QVBoxLayout(self.verticalLayoutWidget)
        self.verticalLayout.setObjectName(u"verticalLayout")
        self.verticalLayout.setContentsMargins(0, 0, 0, 0)
        self.cold_time_cal_button = QPushButton(self.verticalLayoutWidget)
        self.cold_time_cal_button.setObjectName(u"cold_time_cal_button")

        self.verticalLayout.addWidget(self.cold_time_cal_button)

        self.cold_time_cal_result_button = QPushButton(self.verticalLayoutWidget)
        self.cold_time_cal_result_button.setObjectName(u"cold_time_cal_result_button")

        self.verticalLayout.addWidget(self.cold_time_cal_result_button)

        self.cpuinfo_button = QPushButton(self.verticalLayoutWidget)
        self.cpuinfo_button.setObjectName(u"cpuinfo_button")

        self.verticalLayout.addWidget(self.cpuinfo_button)

        self.cpuinfo_result_button = QPushButton(self.verticalLayoutWidget)
        self.cpuinfo_result_button.setObjectName(u"cpuinfo_result_button")

        self.verticalLayout.addWidget(self.cpuinfo_result_button)


        self.retranslateUi(Form)

        QMetaObject.connectSlotsByName(Form)
    # setupUi

    def retranslateUi(self, Form):
        Form.setWindowTitle(QCoreApplication.translate("Form", u"Form", None))
        self.label.setText(QCoreApplication.translate("Form", u"\u8bf7\u8f93\u5165\u5305\u540d", None))
        self.cold_time_cal_button.setText(QCoreApplication.translate("Form", u"\u51b7\u542f\u52a8\u5e73\u5747\u65f6\u95f4\u7edf\u8ba1", None))
        self.cold_time_cal_result_button.setText(QCoreApplication.translate("Form", u"\u51b7\u542f\u52a8\u65f6\u95f4\u7ed3\u679c\u67e5\u8be2", None))
        self.cpuinfo_button.setText(QCoreApplication.translate("Form", u"CPU\u4fe1\u606f\u6570\u636e\u7edf\u8ba1", None))
        self.cpuinfo_result_button.setText(QCoreApplication.translate("Form", u"CPU\u4fe1\u606f\u6570\u636e\u7ed3\u679c\u67e5\u8be2", None))
    # retranslateUi

功能实现逻辑代码

import time

import pyecharts.charts
from pyecharts import options as opts
from util import ADB
import subprocess
import os
import sys
from PySide6.QtWidgets import QApplication, QWidget, QLabel, QPushButton, QMessageBox
from PySide6.QtCore import Slot, Signal
import performanceTools
import threading

class QmyWeidge(QWidget):
    #建立信号
    packagenameSignal = Signal(str)
    def __init__(self):
        super().__init__()
        self.ui = performanceTools.Ui_Form()
        self.ui.setupUi(self)
        self.cold_time_cal_htmlname = ""
        self.cpuinfo_htmlname = ""
        self.packagenameSignal.connect(self.isPackagenameExciting)#判断包名是否存在信号槽链接
    def open_html_by_firefox(self, local_html_name):
        # 获取当前工作目录
        current_dir = os.getcwd()
        # 指定本地HTML文件的路径
        local_file = os.path.join(current_dir, local_html_name)
        # 指定火狐浏览器的可执行文件路径
        firefox_path = "C:/Program Files/Mozilla Firefox/firefox.exe"  # 替换为你的火狐浏览器可执行文件的路径
        # 构建火狐浏览器的命令行参数
        url = 'file://' + local_file + '.html'
        cmd = [firefox_path, url]
        # 使用subprocess启动火狐浏览器并打开本地HTML文件
        subprocess.Popen(cmd)

    def result_to_html(self, xaxis, yaxis, y_name, title):
        pyecharts.charts.Line().add_xaxis(xaxis).add_yaxis(y_name, yaxis).set_global_opts(
            title_opts=opts.TitleOpts(title=title)).render(path=f'./{title}.html')

    @Slot()
    def on_cold_time_cal_button_clicked(self):
        """
        运行10次取平均值
        :param current_activity: 当前运行的app页面,或者是待测的页面
        :return: 范围运行第x次的数据y
        """
        #按钮置灰
        self.ui.cold_time_cal_button.setEnabled(False)
        self.ui.cold_time_cal_button.repaint()
        self.ui.cold_time_cal_result_button.setEnabled(False)
        self.ui.cold_time_cal_result_button.repaint()

        current_activity = ADB().get_current_activity()
        x = []
        y = []
        for i in range(1, 11):
            x.append(f"第{i}次")
            cold_start_time = ADB().get_launchState_cold_totalTime(current_activity)
            y.append(cold_start_time)
            ADB().stop_app(current_activity)
        #输出结果
        self.cold_time_cal_htmlname = f"冷启动时间趋势图_平均耗时{sum(y) / 10}"
        self.result_to_html(x, y ,"启动时间", self.cold_time_cal_htmlname)
        #恢复按钮
        self.ui.cold_time_cal_button.setEnabled(True)
        self.ui.cold_time_cal_result_button.setEnabled(True)

    @Slot()
    def on_cold_time_cal_result_button_clicked(self):
        self.open_html_by_firefox(self.cold_time_cal_htmlname)

    @Slot()
    def on_cpuinfo_button_clicked(self):
        """
        运行10次取平均值
        :return: 范围运行第x次的数据y
        """
        #按钮置灰
        self.ui.cpuinfo_button.setEnabled(False)
        self.ui.cpuinfo_button.repaint()
        self.ui.cpuinfo_result_button.setEnabled(False)
        self.ui.cpuinfo_result_button.repaint()

        x = []
        y = []
        if self.ui.packagename.text():
            for i in range(1,11):
                x.append(f"第{i}次")
                try:
                    cpu, _, _ = ADB().get_cpuinfo(self.ui.packagename.text())
                    y.append(round(float(cpu.replace('%', '')), 4))
                    time.sleep(2)
                    # 输出结果
                    self.cpuinfo_htmlname = f"cpu占比趋势图_平均占比{sum(y) / 10} %"
                    self.result_to_html(x, y, "cpu占比(%)", self.cpuinfo_htmlname)
                except:
                    msgBox = QMessageBox()
                    msgBox.setWindowTitle("提醒")
                    msgBox.setText(f"{self.ui.packagename.text()}确认是否运行!未获取到对应cpu信息!")
                    msgBox.exec()
                    break
        else:
            self.packagenameSignal.emit(self.ui.packagename.text())

        #恢复按钮
        self.ui.cpuinfo_result_button.setEnabled(True)
        self.ui.cpuinfo_button.setEnabled(True)

    @Slot()
    def on_cpuinfo_result_button_clicked(self):
        self.open_html_by_firefox(self.cpuinfo_htmlname)

    def isPackagenameExciting(self, value):
        if not value :
            msgBox  = QMessageBox()
            msgBox.setWindowTitle("提醒")
            msgBox.setText("请输入包名!")
            msgBox.exec()


if __name__ == '__main__':
    app = QApplication(sys.argv)
    myWindow = QmyWeidge()
    myWindow.show()
    sys.exit(app.exec())

 

标签:cpuinfo,性能,源码,button,time,测试工具,cold,self,result
From: https://www.cnblogs.com/yetangjian/p/18009512

相关文章

  • 再聊阴影裁剪与高性能视锥剔除
    【USparkle专栏】如果你深怀绝技,爱“搞点研究”,乐于分享也博采众长,我们期待你的加入,让智慧的火花碰撞交织,让知识的传递生生不息!一、实际需求因为项目的树与草都采用ComputeShader剔除的GPUInstance绘制,所以需要自己实现阴影投递物的裁剪方法。也就是每一帧具体让哪些物体绘......
  • 源码搭建教学:直播带货商城小程序开发
    结合小程序开发的直播带货商城,不仅可以提供更便捷的购物体验,还可以实现更高效的销售。因此,学习如何搭建一个直播带货商城小程序将成为您拓展商业领域的利器。步骤一:准备工作在开始开发之前,您需要进行一些准备工作。首先,确保您已经安装了微信开发者工具,并且注册了微信小程序的开发者......
  • 基于 GPU 渲染的高性能空间包围计算
    空间包围检测在计算机图形学、虚拟仿真、工业生产等有着广泛的应用。现代煤矿开采过程中,安全一直是最大的挑战之一。地质空间中存在诸多如瓦斯积聚、地质构造异常、水文条件不利等隐蔽致灾因素,一旦被触发,可能引发灾难性的后果。因此在安全生产过程中有效的管理和规避各隐蔽致灾因素......
  • 二刷 K8s 源码 - workqueue 的所有细节
    1.概述-何来此文2.Queue的实现2.1Queue.Add(iteminterface{})方法2.2Queue.Get()方法2.3Queue.Done(iteminterface{})方法3.DelayingQueue的实现4.RateLimitingQueue的实现5.rateLimiter限速器的实现6.控制器里用的默认限速器7.总结1.概述-......
  • [Ngbatis源码学习] Ngbatis 源码学习之资源加载器 DaoResourceLoader
    Ngbatis源码阅读之资源加载器DaoResourceLoaderDaoResourceLoader是Ngbatis的资源文件加载器,扩展自MapperResourceLoader。本篇文章主要分析这两个类。1.相关类MapperResourceLoaderDaoResourceLoader2.MapperResourceLoader在介绍DaoResourceLoader之前有必要......
  • 软件测试学习笔记丨性能分析系统级别指标 io cpu mem net
    io指标监控命令iostat命令描述:监控系统设备的IO负载情况命令演示:iostatio指标监控命令df命令描述:列出⽂件系统的整体磁盘空间使⽤情况命令演示:df-hcpu指标监控命令uptime命令描述:用于显示系统总共运行了多长时间和系统的平均负载命令演示:uptimecpu指标监控命令cat/......
  • 通达信跟庄乾坤源码副图
    {股票指标}DLYZ1:=SUM((WINNER(C)*100),30)/30*0.1;DLYZ2:=SUM((WINNER(C)*100),20)/20*0.4;DLYZ3:=SUM((WINNER(C)*100),10)/10*0.3;DLYZ4:=SUM((WINNER(C)*100),5)/5*0.1;DLYZ5:=SUM((WINNER(C)*100),3)/3*0.1;DLYZZ:=DLYZ1+DLYZ2+DLYZ3+DLYZ4+DLYZ5;赚钱效应:IF(DLYZ......
  • 通达信暗流涌动指标公式源码副图
    {股票指标}TYP:=(IF(HIGH<=0,CLOSE,HIGH)+IF(LOW<=0,CLOSE,LOW)+CLOSE)/3;CL:=(TYP-ma(TYP,30))/(0.015*AVEDEV(TYP,30));C2:=MA(CL,4);C3:=MA(CL,10);DRAWBAND(CL,RGB(0,255,0),C2,RGB(0,0,0));DRAWBAND(CL,RGB(255,0,0),C3,RGB(0,0,0));动力:CL,COLORYELLOW,LINET......
  • 通达信MACD买卖副图指标公式源码
    {股票指标}VAR3:=(CLOSE-MA(CLOSE,6))/MA(CLOSE,6)*100; VAR4:=(CLOSE-MA(CLOSE,24))/MA(CLOSE,24)*100;VAR5:=(CLOSE-MA(CLOSE,32))/MA(CLOSE,32)*100;VAR6:=(VAR3+VAR4+VAR5)/3;VAR7:=EMA(VAR6,5);指标:=EMA(EMA(VAR3,5),5)*3,COLORSTICK;VAR8:=IF(VAR6<=-20,10,0......
  • 通达信【龙头战狼】珍藏版套装指标 止盈止损 胜者为王 终极盈利模式 源码文件分享
    {股票指标}【战狼波段主图】本套指标设计两个主图,一个看盘简单明了,还有一个波段主图,操作波段上涨非常好,买卖点明确!【战狼主力资金】副图有拉升资金和主力资金两个信号,两个信号都大于0,而且处于上升趋势,易产生大妖股,可以辅助战狼信号操盘.1、指标原理:本指标主力洗盘+拉升特征......