首页 > 其他分享 >华为用例自动化

华为用例自动化

时间:2024-05-17 17:08:54浏览次数:22  
标签:count sonic execute item 用例 华为 command 自动化 re

# -*-encoding:utf-8 -*-
"""
@Time : 2024/4/16 19:47
@Auth : ruqing
@File :华为自动化测试.py
@IDE :PyCharm
@Motto:ABC(Always Be Coding)
"""

import time
import re
from config import InitParam

# 初始化sonic参数
sonic = InitParam().sonic_init()
# 初始化apc电源个数,apc电源地址及端口
apc_all = InitParam().apc_init()
sonic.console_login()
if not sonic.check_login('uname', 'Linux'):
    sonic.login()


def apc():
    # 下电所有apc
    for apc in apc_all:
        apc.apc_off()

    time.sleep(10)
    print('============================================================')

    # 上电所有apc
    for apc in apc_all:
        apc.apc_on()

    sonic.login()
    time.sleep(120)


class Dx510H(object):

    def equipment_basic_information_query(self, count, version):
        """
        item1:设备基本信息查询
        """
        for i in range(count):
            # 检查版本信息
            sonic.execute_command(
                '***************************item1 执行次数{}***************************'.format(i + 1))
            check_version = sonic.execute_command('show version')
            if version not in check_version:
                sonic.execute_command('****************************')
                sonic.execute_command('版本检查有误,请查看日志')
                sonic.execute_command('****************************')
                return

            # 检查风扇状态
            pattern = r'intake\s+Present\s+OK'
            text = sonic.execute_command('show platform fan')
            matches = re.findall(pattern, text)
            if len(matches) != 16:
                sonic.execute_command('****************************')
                sonic.execute_command('请检查风扇状态')
                sonic.execute_command('****************************')
                return

            # 检查温度
            tem = sonic.execute_command('show platform  temperature')
            if 'True' in tem:
                sonic.execute_command('****************************')
                sonic.execute_command('请检查设备器件温度,是否异常')
                sonic.execute_command('****************************')
                return

            # 检查硬盘健康度
            output = sonic.execute_command('show platform  ssdhealth --verbose')
            health_pattern = r'Health\s+:\s+(\d+\.?\d*)%'
            match = re.search(health_pattern, output)
            if match:
                health = match.group(1)
                if health == 'NA' and health == '' and health == 'na':
                    sonic.execute_command('****************************')
                    sonic.execute_command('请检查硬盘健康度是否异常')
                    sonic.execute_command('****************************')
                    return
            else:
                sonic.execute_command('****************************')
                sonic.execute_command('未找到硬盘健康度')
                sonic.execute_command('****************************')
                return

            # 检查syseeprom信息
            syseeprom = sonic.execute_command('show platform  syseeprom')
            info_list = [
                "Product Name",
                "Part Number",
                "Serial Number",
                "Base MAC Address",
                "Manufacture Date",
                "Label Revision",
                "Platform Name",
                "ONIE Version",
                "MAC Addresses",
                "Manufacturer",
                "Manufacture Country",
                "Vendor Name",
                "CRC-32"]
            for info in info_list:
                if info not in syseeprom:
                    sonic.execute_command('****************************')
                    sonic.execute_command('syseeprom的信息{}不全,请检查日志信息'.format(info))
                    sonic.execute_command('****************************')
                    return

            # 检查summary信息
            summary = sonic.execute_command('show platform  summary')
            if bool(summary) is not True:
                return

            # 检查固件版本状态
            fir_stat = sonic.execute_command('show platform  firmware  status')
            cpld_pattern = r'CPLD\s+(\S+)'
            fpga_pattern = r'FPGA\s+(\S+)'
            bios_pattern = r'BIOS\s+(\S+)'

            cpld_version = re.search(cpld_pattern, fir_stat).group(1)
            fpga_version = re.search(fpga_pattern, fir_stat).group(1)
            bios_version = re.search(bios_pattern, fir_stat).group(1)
            if cpld_version == 'NA' or cpld_version == '' or cpld_version == 'N/A' or cpld_version == 'NONE':
                return
            if fpga_version == 'NA' or fpga_version == '' or fpga_version == 'N/A' or fpga_version == 'NONE':
                return
            if bios_version == 'NA' or bios_version == '' or bios_version == 'N/A' or bios_version == 'NONE':
                return

            # 检查固件是否需要升级
            fir_update = sonic.execute_command('show platform  firmware  updates')
            unm = fir_update.count('up-to-date')
            if unm != 3:
                sonic.execute_command('****************************')
                sonic.execute_command('版本需要升级,请先升级后再测试')
                sonic.execute_command('****************************')
                return

            # 检查fwutil信息
            fir_fwu = sonic.execute_command('show platform firmware version')
            if 'fwutil version' not in fir_fwu:
                sonic.execute_command('****************************')
                sonic.execute_command('请确认fwutil version信息')
                sonic.execute_command('****************************')
                return

            # 检查配置是否保存成功
            config_save = sonic.execute_command('sudo config save  -y')
            if 'Save the configuration successfully' not in config_save:
                sonic.execute_command('****************************')
                sonic.execute_command('请确认配置信息是否保存成功')
                sonic.execute_command('****************************')
                return

            # 检查容器状态以及每个容器的大小是否大于400MB
            docker_ps = sonic.execute_command('docker ps -a')
            pattern = r"(\w{12})\s+.*?\s+(Up|Exited)\s+.*?\s"
            matches = re.findall(pattern, docker_ps)

            for match in matches:
                container_id, status = match
                if status != 'Up':
                    sonic.execute_command('****************************')
                    sonic.execute_command('容器ID: {}, 状态: {} 异常'.format(container_id, status))
                    sonic.execute_command('****************************')
                    return

            docker_image = sonic.execute_command('docker images')
            # 使用re.findall()函数,编写正则表达式来匹配每个镜像的REPOSITORY和SIZE
            matches = re.findall(r'(\w{12})\s+.*?\s+(\d+)MB', docker_image)

            # 遍历匹配结果,将SIZE转换为MB,然后判断是否大于400MB
            for match in matches:
                repository, size = match
                size = int(size)
                if size < 400:
                    sonic.execute_command('****************************')
                    sonic.execute_command('镜像 {}: 的大小为 {}MB,小于400MB'.format(repository, size))
                    sonic.execute_command('****************************')
                    return

            # version_image = sonic.execute_command('docker images')
            # # 使用re.findall()函数,编写正则表达式来匹配每个镜像的REPOSITORY和SIZE
            # matches = re.findall(r'(\w{12})\s+.*?\s+(\d+)MB', version_image)
            #
            # # 遍历匹配结果,将SIZE转换为MB,然后判断是否大于400MB
            # for match in matches:
            #     repository, size = match
            #     size = int(size)
            #     if size < 400:
            #         sonic.execute_command('****************************')
            #         sonic.execute_command('镜像 {}: 的大小为 {}MB,小于400MB'.format(repository, size))
            #         sonic.execute_command('****************************')
            #         return

        sonic.execute_command('***************************item1 执行PASS***************************')

    def equipment_alarm_test(self, count, alarm):
        """
        item2:设备告警测试
        """
        for i in range(count):
            sonic.execute_command('***************************item2执行次数{}***************************'.format(i + 1))

            # 登录设备检查告警信息
            if not sonic.check_login('uname', 'Linux'):
                sonic.login()
            log_ala = sonic.execute_command('show alarm  info')
            if alarm not in log_ala:
                sonic.execute_command('****************************')
                sonic.execute_command('设备状态异常,请查看日志,1')
                sonic.execute_command('****************************')
                return

            # 执行warm-reboot后,检查告警
            sonic.execute_command('sudo warm-reboot -y', 0)
            time.sleep(4)
            if not sonic.check_login('uname', 'Linux'):
                sonic.login()
            time.sleep(120)
            war_ala = sonic.execute_command('show alarm  info')
            if alarm not in war_ala:
                sonic.execute_command('****************************')
                sonic.execute_command('设备状态异常,请查看日志,2')
                sonic.execute_command('****************************')
                return

            # 执行整机reboot后,检查告警
            sonic.execute_command('sudo reboot -y', 0)
            time.sleep(4)
            if not sonic.check_login('uname', 'Linux'):
                sonic.login()
            time.sleep(120)
            war_ala = sonic.execute_command('show alarm  info')
            if alarm not in war_ala:
                sonic.execute_command('****************************')
                sonic.execute_command('设备状态异常,请查看日志,3')
                sonic.execute_command('****************************')
                return
        sonic.execute_command('***************************item2 执行PASS***************************')

    def device_optical_module_information_query_and_command_line_injection_test(self, count, start_port, end_port):
        """
        item3:设备光模块信息查询和命令行注入测试
        """
        for i in range(count):
            sonic.execute_command('***************************item3执行次数{}***************************'.format(i + 1))

            def check_power_range(port):
                """
                提出功率值,并且进行判断,检查是否在阈值范围之内
                """
                output = sonic.execute_command('show interfaces  transceiver  eeprom  --dom Ethernet{}'.format(port))
                time.sleep(0.2)

                # 提取Vendor相关信息
                vendor = re.findall(r'Vendor\s+\w+:\s+(\S+)', output)

                # 提取功率值
                rx_powers = [float(x) for x in re.findall(r'RX\dPower:\s+(-?\d+\.\d+)dBm', output)]
                tx_biass = [float(x) for x in re.findall(r'TX\dBias:\s+(-?\d+\.\d+)mA', output)]
                tx_powers = [float(x) for x in re.findall(r'TX\dPower:\s+(-?\d+\.\d+)dBm', output)]
                Temperature = float(re.search(r'Temperature:\s+(-?\d+\.\d+)C', output).group(1))
                Vcc = float(re.search(r'Vcc:\s+(-?\d+\.\d+)Volts', output).group(1))

                # RxPower
                RxPowerHighAlarm = float(re.search(r'RxPowerHighAlarm\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                RxPowerHighWarning = float(re.search(r'RxPowerHighWarning\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                RxPowerLowAlarm = float(re.search(r'RxPowerLowAlarm\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                RxPowerLowWarning = float(re.search(r'RxPowerLowWarning\s*:\s*(-?\d+\.\d+)dBm', output).group(1))

                # TxBias
                TxBiasHighAlarm = float(re.search(r'TxBiasHighAlarm\s*:\s*(\d+\.\d+)mA', output).group(1))
                TxBiasHighWarning = float(re.search(r'TxBiasHighWarning\s*:\s*(\d+\.\d+)mA', output).group(1))
                TxBiasLowAlarm = float(re.search(r'TxBiasLowAlarm\s*:\s*(\d+\.\d+)mA', output).group(1))
                TxBiasLowWarning = float(re.search(r'TxBiasLowWarning\s*:\s*(\d+\.\d+)mA', output).group(1))

                # TxPower
                TxPowerHighAlarm = float(re.search(r'TxPowerHighAlarm\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                TxPowerHighWarning = float(re.search(r'TxPowerHighWarning\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                TxPowerLowAlarm = float(re.search(r'TxPowerLowAlarm\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                TxPowerLowWarning = float(re.search(r'TxPowerLowWarning\s*:\s*(-?\d+\.\d+)dBm', output).group(1))

                # Temperature
                TempHighAlarm = float(re.search(r'TempHighAlarm\s*:\s*(-?\d+\.\d+)C', output).group(1))
                TempHighWarning = float(re.search(r'TempHighWarning\s*:\s*(-?\d+\.\d+)C', output).group(1))
                TempLowAlarm = float(re.search(r'TempLowAlarm\s*:\s*(-?\d+\.\d+)C', output).group(1))
                TempLowWarning = float(re.search(r'TempLowWarning\s*:\s*(-?\d+\.\d+)C', output).group(1))

                # Vcc
                VccHighAlarm = float(re.search(r'VccHighAlarm\s*:\s*(-?\d+\.\d+)Volts', output).group(1))
                VccHighWarning = float(re.search(r'VccHighWarning\s*:\s*(-?\d+\.\d+)Volts', output).group(1))
                VccLowAlarm = float(re.search(r'VccLowAlarm\s*:\s*(-?\d+\.\d+)Volts', output).group(1))
                VccLowWarning = float(re.search(r'VccLowWarning\s*:\s*(-?\d+\.\d+)Volts', output).group(1))

                def is_power_in_range(power, low_alarm, high_alarm):
                    return low_alarm <= power <= high_alarm

                for value in vendor:
                    result = all(value)
                    if bool(result) is not True:
                        sonic.execute_command('请检查模块相关信息,在端口{}上'.format(port))
                        import sys
                        # 如果type不为true,则停止脚本的执行
                        sys.exit()
                        return

                for rx_power in rx_powers:
                    if not is_power_in_range(rx_power, RxPowerLowAlarm, RxPowerHighAlarm):
                        sonic.execute_command("Rx_power超出范围")
                        return

                for tx_bias in tx_biass:
                    if not is_power_in_range(tx_bias, TxBiasLowAlarm, TxBiasHighAlarm):
                        sonic.execute_command("Tx_bias超出范围")
                        return

                for tx_power in tx_powers:
                    if not is_power_in_range(tx_power, TxPowerLowAlarm, TxPowerHighAlarm):
                        sonic.execute_command("Tx_power超出范围")
                        return

                temperature = is_power_in_range(Temperature, TempLowAlarm, TempHighAlarm)
                try:
                    flag = False if temperature else True
                except Exception as e:
                    sonic.execute_command("temperature异常:", e)
                    return

                vcc = is_power_in_range(Vcc, VccLowAlarm, VccHighAlarm)
                try:
                    flag = False if vcc else True
                except Exception as e:
                    sonic.execute_command("vcc异常:", e)
                    return

                return rx_powers, tx_biass, tx_powers, Temperature, vcc

        for i in range(start_port, end_port + 1):
            check_power_range(i)
        # 检查在位端口是否为presence
        stat = sonic.execute_command('show interfaces  status  | grep SFP')  # 后期正则判断改为是否up
        prese = sonic.execute_command('show interfaces  transceiver  presence')
        st_up = re.findall(r'Ethernet(\d+)', stat)
        st_pre = re.findall(r'Ethernet(\d+)\s+Present', prese)
        stat_all = []
        prese_all = []
        for s in st_up:
            stat_all.append(s)
        for u in st_pre:
            prese_all.append(u)
        if stat_all != prese_all:
            sonic.execute_command('****************************')
            sonic.execute_command('端口对应在位状态异常,请查看日志')
            sonic.execute_command('****************************')
            return

        # 检查Low-power Mode是否为off
        lpmode = sonic.execute_command('show interfaces  transceiver  lpmode')
        lp = re.findall(r'Ethernet(\d+)\s+(\w+)', lpmode)
        for p in lp:
            if p[1] != 'Off':
                sonic.execute_command('****************************')
                sonic.execute_command('Low-power异常,请查看日志')
                sonic.execute_command('****************************')
                return

        # 检查是否会提示输入格式不正确
        sonic.execute_command('sudo show interfaces transceiver eeprom “Ethernet1;cpld;reboot -y;sys"', 0)
        sonic.tn.write(("\003" + "\r").encode('utf8'))
        sonic.execute_command('进入到>视图,已ctrl+c退出')
        # show interfaces transceiver eeprom "Ethernet1;cpld;reboot -y;sys"
        tran = sonic.execute_command('show interfaces transceiver eeprom "Ethernet1;cpld;reboot -y;sys"')
        if 'Error: Invalid interface name.' not in tran:
            sonic.execute_command('****************************')
            sonic.execute_command('"Ethernet1;cpld;reboot -y;sys"回显异常,请查看日志')
            sonic.execute_command('****************************')
            return
        cpld = sonic.execute_command('show interfaces transceiver eeprom cpld')
        reboot = sonic.execute_command('show interfaces transceiver eeprom reboot')
        sys = sonic.execute_command('show interfaces transceiver eeprom sys')
        if 'SFP EEPROM Not detected' not in cpld and reboot and sys:
            sonic.execute_command('****************************')
            sonic.execute_command('cpld , reboot , sys 回显异常,请查看日志')
            sonic.execute_command('****************************')
            return

        sonic.execute_command('***************************item3 执行PASS***************************')

    def dynamic_information_refresh(self, count, start_port, end_port):
        """
        item4:光模块动态信息刷新正常
        """
        for i in range(count):
            sonic.execute_command('***************************item4执行次数{}***************************'.format(i + 1))

            def check_power_range(port):
                """
                提出功率值,并且进行判断,检查是否在阈值范围之内
                """
                output = sonic.execute_command('show interfaces  transceiver  eeprom  --dom Ethernet{}'.format(port))
                time.sleep(0.2)
                # 提取功率值
                rx_powers = [float(x) for x in re.findall(r'RX\dPower:\s+(-?\d+\.\d+)dBm', output)]
                tx_biass = [float(x) for x in re.findall(r'TX\dBias:\s+(-?\d+\.\d+)mA', output)]
                tx_powers = [float(x) for x in re.findall(r'TX\dPower:\s+(-?\d+\.\d+)dBm', output)]
                Temperature = float(re.search(r'Temperature:\s+(-?\d+\.\d+)C', output).group(1))
                Vcc = float(re.search(r'Vcc:\s+(-?\d+\.\d+)Volts', output).group(1))

                # RxPower
                RxPowerHighAlarm = float(re.search(r'RxPowerHighAlarm\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                RxPowerHighWarning = float(re.search(r'RxPowerHighWarning\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                RxPowerLowAlarm = float(re.search(r'RxPowerLowAlarm\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                RxPowerLowWarning = float(re.search(r'RxPowerLowWarning\s*:\s*(-?\d+\.\d+)dBm', output).group(1))

                # TxBias
                TxBiasHighAlarm = float(re.search(r'TxBiasHighAlarm\s*:\s*(\d+\.\d+)mA', output).group(1))
                TxBiasHighWarning = float(re.search(r'TxBiasHighWarning\s*:\s*(\d+\.\d+)mA', output).group(1))
                TxBiasLowAlarm = float(re.search(r'TxBiasLowAlarm\s*:\s*(\d+\.\d+)mA', output).group(1))
                TxBiasLowWarning = float(re.search(r'TxBiasLowWarning\s*:\s*(\d+\.\d+)mA', output).group(1))

                # TxPower
                TxPowerHighAlarm = float(re.search(r'TxPowerHighAlarm\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                TxPowerHighWarning = float(re.search(r'TxPowerHighWarning\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                TxPowerLowAlarm = float(re.search(r'TxPowerLowAlarm\s*:\s*(-?\d+\.\d+)dBm', output).group(1))
                TxPowerLowWarning = float(re.search(r'TxPowerLowWarning\s*:\s*(-?\d+\.\d+)dBm', output).group(1))

                # Temperature
                TempHighAlarm = float(re.search(r'TempHighAlarm\s*:\s*(-?\d+\.\d+)C', output).group(1))
                TempHighWarning = float(re.search(r'TempHighWarning\s*:\s*(-?\d+\.\d+)C', output).group(1))
                TempLowAlarm = float(re.search(r'TempLowAlarm\s*:\s*(-?\d+\.\d+)C', output).group(1))
                TempLowWarning = float(re.search(r'TempLowWarning\s*:\s*(-?\d+\.\d+)C', output).group(1))

                # Vcc
                VccHighAlarm = float(re.search(r'VccHighAlarm\s*:\s*(-?\d+\.\d+)Volts', output).group(1))
                VccHighWarning = float(re.search(r'VccHighWarning\s*:\s*(-?\d+\.\d+)Volts', output).group(1))
                VccLowAlarm = float(re.search(r'VccLowAlarm\s*:\s*(-?\d+\.\d+)Volts', output).group(1))
                VccLowWarning = float(re.search(r'VccLowWarning\s*:\s*(-?\d+\.\d+)Volts', output).group(1))

                def is_power_in_range(power, low_alarm, high_alarm):
                    return low_alarm <= power <= high_alarm

                for rx_power in rx_powers:
                    if not is_power_in_range(rx_power, RxPowerLowAlarm, RxPowerHighAlarm):
                        sonic.execute_command("Rx_power超出范围")
                        break

                for tx_bias in tx_biass:
                    if not is_power_in_range(tx_bias, TxBiasLowAlarm, TxBiasHighAlarm):
                        sonic.execute_command("Tx_bias超出范围")
                        break

                for tx_power in tx_powers:
                    if not is_power_in_range(tx_power, TxPowerLowAlarm, TxPowerHighAlarm):
                        sonic.execute_command("Tx_power超出范围")
                        break

                temperature = is_power_in_range(Temperature, TempLowAlarm, TempHighAlarm)
                try:
                    flag = False if temperature else True
                except Exception as e:
                    sonic.execute_command("temperature异常:", e)

                vcc = is_power_in_range(Vcc, VccLowAlarm, VccHighAlarm)
                try:
                    flag = False if vcc else True
                except Exception as e:
                    sonic.execute_command("vcc异常:", e)

                return rx_powers, tx_biass, tx_powers, Temperature, vcc

            rx_powers_list = []
            tx_biass_list = []
            tx_powers_list = []
            Temperature_list = []
            Vcc_list = []
            for i in range(start_port, end_port + 1):
                for z in range(4):
                    rx_powers, tx_biass, tx_powers, Temperature, Vcc = check_power_range(port=i)
                    rx_powers_list.append(rx_powers)
                    tx_biass_list.append(tx_biass)
                    tx_powers_list.append(tx_powers)
                    Temperature_list.append(Temperature)
                    Vcc_list.append(Vcc)
                    time.sleep(60)

                # 判断每次读出的功率值是否与前一次一致,一致说明没有更新,此处有问题
                # sonic.execute_command(rx_powers_list)
                for v in range(1, len(rx_powers_list)):
                    if rx_powers_list[v] != rx_powers_list[v - 1]:
                        print("第{}次和第{}次读取的Rx_power值不同".format(v, v + 1))
                    else:
                        print("第{}次和第{}次读取的Rx_power值相同,请检查日志".format(v, v + 1))
                        sonic.execute_command(
                            "请检查端口 {} 的Rx_power信息:第{}次和第{}次读取的Rx_power值相同,请检查日志".format(i, v,
                                                                                                                v + 1))
                        # sonic.execute_command('请检查端口 {} 的Rx_power信息'.format(i))
                        return sonic.execute_command('请检查端口 {} 的Rx_power信息'.format(i))
                time.sleep(0.2)

                # sonic.execute_command(tx_biass_list)
                for v in range(1, len(tx_biass_list)):
                    if tx_biass_list[v] != tx_biass_list[v - 1]:
                        print("第{}次和第{}次读取的Tx_bias值不同".format(v, v + 1))
                    else:
                        print("第{}次和第{}次读取的Tx_bias值相同,请检查日志".format(v, v + 1))
                        sonic.execute_command(
                            "请检查端口 {} 的Tx_bias信息:第{}次和第{}次读取的Tx_bias值相同,请检查日志".format(i, v,
                                                                                                              v + 1))
                        sonic.execute_command('请检查端口 {} 的Tx_bias信息'.format(i))

                        # return sonic.execute_command('请检查端口 {} 的Tx_bias信息'.format(i))
                time.sleep(0.2)

                # sonic.execute_command(tx_powers_list)
                for v in range(1, len(tx_powers_list)):
                    if tx_powers_list[v] != tx_powers_list[v - 1]:
                        print("第{}次和第{}次读取的Tx_power值不同".format(v, v + 1))
                    else:
                        print("第{}次和第{}次读取的Tx_power值相同,请检查日志".format(v, v + 1))
                        sonic.execute_command(
                            "请检查端口 {} 的Tx_power信息:第{}次和第{}次读取的Tx_power值相同,请检查日志".format(i, v,
                                                                                                                v + 1))
                        sonic.execute_command('请检查端口 {} 的Tx_power信息'.format(i))

                        # return sonic.execute_command('请检查端口 {} 的Tx_power信息'.format(i))
                time.sleep(0.2)

                # sonic.execute_command(Temperature_list)
                for v in range(1, len(Temperature_list)):
                    if Temperature_list[v] != Temperature_list[v - 1]:
                        print("第{}次和第{}次读取的Temperature值不同".format(v, v + 1))
                    else:
                        sonic.execute_command(
                            "请检查端口 {} 的Temperature信息:第{}次和第{}次读取的Temperature值相同,请检查日志".format(i,
                                                                                                                      v,
                                                                                                                      v + 1))
                        sonic.execute_command('请检查端口 {} 的Temperature信息'.format(i))

                        # return sonic.execute_command('请检查端口 {} 的Temperature信息'.format(i))
                time.sleep(0.2)

                # sonic.execute_command(Vcc_list)
                for v in range(1, len(Vcc_list)):
                    if Vcc_list[v] != Vcc_list[v - 1]:
                        print("第{}次和第{}次读取的Vcc值不同".format(v, v + 1))
                    else:
                        print("第{}次和第{}次读取的Vcc值相同,请检查日志".format(v, v + 1))
                        sonic.execute_command(
                            "请检查端口 {} 的Vcc信息:第{}次和第{}次读取的Vcc值相同,请检查日志".format(i, v, v + 1))
                        sonic.execute_command('请检查端口 {} 的Vcc信息'.format(i))

                        # return sonic.execute_command('请检查端口 {} 的Vcc信息'.format(i))
                time.sleep(0.2)

                # 清空列表
                rx_powers_list.clear()
                tx_biass_list.clear()
                tx_powers_list.clear()
                Temperature_list.clear()
                Vcc_list.clear()
        xcvrd_log = sonic.execute_command("show logging syslog | grep -iaE 'xcvrd ValueError'")
        if xcvrd_log is not None:
            sonic.execute_command('请检查端口日志信息')
            return
        else:
            sonic.execute_command('日志信息没有异常')

        sonic.execute_command('***************************item4 执行PASS***************************')

    def xcvrd_process_runs_normally(self, count):
        """
        item5:xcvrd进程正常运行
        """
        for i in range(count):
            sonic.execute_command('***************************item5执行次数{}***************************'.format(i + 1))
            cmd = sonic.execute_command('ps aux |grep xcvrd --color=never')
            time.sleep(2)
            pattern = r'python3\s+/usr/local/bin/xcvrd'
            # print(cmd.encode())
            matches = re.findall(pattern, cmd)
            # print(matches)
            count = len(matches)
            if count != 2:
                sonic.execute_command('请检查进程')
                return
        sonic.execute_command('***************************item5 执行PASS***************************')

    def query_bmc_firmware_information(self, count):
        """
        item6:查询BMC固件信息
        """
        for i in range(count):
            sonic.execute_command('***************************item6执行次数{}***************************'.format(i + 1))
            info = sonic.execute_command('ipmitool mc info')
            print(info)
            if 'Could not open device at /dev/ipmi0 or /dev/ipmi/0 or /dev/ipmidev/0: No such file or directory' not in info:
                sonic.execute_command('请检查提示信息')
                return
        sonic.execute_command('***************************item6 执行PASS***************************')

    def bios_and_cpld_status_check(self, count, cpld, fpga, bios, status):
        """
        item7:bios和cpld状态检查
        """
        for i in range(count):
            sonic.execute_command('***************************item7执行次数{}***************************'.format(i + 1))
            all = sonic.execute_command('show platform firmware  updates')
            if cpld and fpga and bios and status not in all:
                return
        sonic.execute_command('***************************item7 执行PASS***************************')

    def aftedevice_exists_in_the_after_install_bios_filer(self, count, image):
        """
        item8:设备存在after_install_bios.sh文件
        例如:2.23.RC5-venus-B096
        """
        for i in range(count):
            sonic.execute_command(
                '***************************item:8执行次数{}***************************'.format(i + 1))
            after = sonic.execute_command('ls -lh /host/image-{}/platform/after_install_bios.sh'.format(image))
            if 'after_install_bios.sh' not in after:
                return
        sonic.execute_command('***************************item8 执行PASS***************************')

    def read_device_real_time_power_and_rated_power(self, count):
        """
        item9:读取设备实时功率和额定功率
        """
        for i in range(count):
            sonic.execute_command('clear')
            sonic.execute_command(
                '***************************item:9执行次数{}***************************'.format(i + 1))
            sonic.execute_command('redis-cli -n 6', 0)
            time.sleep(4)
            sonic.execute_command('', 0)
            sonic.execute_command('', 0)
            sonic.execute_command('', 0)
            actual = sonic.execute_command('HGET "CHASSIS_INFO|chassis 1" actual_power', 0)
            rated = sonic.execute_command('HGET "CHASSIS_INFO|chassis 1" rated_power', 0)
            sonic.tn.write(("\003" + "\r").encode('utf8'))
            sonic.execute_command('')
            pattern = r'"\d+\.\d+"'
            # 实时功率
            act = re.findall(pattern, actual)
            # 额定功率
            rat = re.findall(pattern, rated)
            # print(act)
            # print(rat)
            if rat <= act:
                return
        sonic.execute_command('***************************item9 执行PASS***************************')

    def device_supports_health_query(self, count, types='DX510'):
        """
        item10:设备支持健康度查询-----后续添加type类型,
        DX510-H目前只使用一款SSD(ME619HXELDF6TE),该SSD获取健康度方法为smartctl并非iSmart(iSmart方法只适用于InnoDisk厂商硬盘,即M.2型号,目前该型号为DX510/6865/8850使用)
        smartctl获取健康度需查看第167、168行最后一个数值(167为已经磨损数值,168为可磨损总数值)健康度计算方法(168-167)/168*100%,即(3000-147)/3000*100%=95.1%
        暂未完成,后续补充
        """

        # 检查硬盘健康度
        def health(types=types):
            output = sonic.execute_command('show platform  ssdhealth --verbose')

            # 查看健康度和硬盘名字
            health_pattern = r'Health\s+:\s+(\d+\.?\d*)%'
            match = re.search(health_pattern, output)
            health_name = r'Device Model :+\s+(.*)'
            model_name = re.search(health_name, output)

            if match:
                health = match.group(1)
                if health == 'NA' and health == '' and health == 'na':
                    sonic.execute_command('****************************')
                    sonic.execute_command('请检查硬盘健康度是否异常')
                    sonic.execute_command('****************************')
                    return
            else:
                sonic.execute_command('****************************')
                sonic.execute_command('未找到硬盘健康度')
                sonic.execute_command('****************************')
                return

            if model_name:
                name = model_name.group(1).strip('\r')
                if name == 'NA' and name == '' and name == 'na':
                    sonic.execute_command('****************************')
                    sonic.execute_command('请检查硬盘name是否异常')
                    sonic.execute_command('****************************')
                    return
            else:
                sonic.execute_command('****************************')
                sonic.execute_command('未找到硬盘name')
                sonic.execute_command('****************************')
                return

            if types == 'DX510':
                dx510_output = sonic.execute_command('iSmart -d /dev/sda')
                dx510_health_pattern = r'Health:\s+(\d+\.?\d*)%'
                # dx510_health_name = r'(M\.2 \(S80\) 3ME4)'
                dx510_health_name = r'Model Name:+\s+(.*)'
                # dx510_health_name = r'Model Name:+\s+(\S+)'
                dx510_match = re.search(dx510_health_pattern, dx510_output)
                dx510_model_name = re.search(dx510_health_name, dx510_output)
                if dx510_match:
                    health = dx510_match.group(1)
                    if health == 'NA' and health == '' and health == 'na':
                        sonic.execute_command('****************************')
                        sonic.execute_command('请检查硬盘健康度是否异常')
                        sonic.execute_command('****************************')
                        return
                    else:
                        if match.group(1) != dx510_match.group(1):
                            sonic.execute_command('****************************')
                            sonic.execute_command('硬盘健康度异常,请检查日志')
                            sonic.execute_command('****************************')
                            return

                else:
                    sonic.execute_command('****************************')
                    sonic.execute_command('未找到硬盘健康度')
                    sonic.execute_command('****************************')
                    return

                if dx510_model_name:
                    name = dx510_model_name.group(1)
                    if name == 'NA' and name == '' and name == 'na':
                        sonic.execute_command('****************************')
                        sonic.execute_command('请检查硬盘name是否异常')
                        sonic.execute_command('****************************')
                        return
                    else:
                        # if model_name.group(1) != dx510_model_name.group(1):
                        if model_name.group(1).strip('\r') != dx510_model_name.group(1).strip('\r').strip():
                            print(model_name.group(1).encode())
                            print(dx510_model_name.group(1).encode())
                            sonic.execute_command('****************************')
                            sonic.execute_command('硬盘name异常')
                            sonic.execute_command('****************************')
                            return
                else:
                    sonic.execute_command('****************************')
                    sonic.execute_command('未找到硬盘name')
                    sonic.execute_command('****************************')
                    return

            if types == 'DX510-H':
                dx510_h_output = sonic.execute_command('sudo smartctl -a /dev/sda')
                dx510_h_device_name = r'Device Model:+\s+(.*)'
                dx510_h_model_name = re.search(dx510_h_device_name, dx510_h_output)
                dx510_h_health_pattern1 = r'167\s+Unknown_Attribute\s+0x0000\s+\d+\s+\d+\s+\d+\s+\w+\s+\w+\s+-\s+(\d+)'
                dx510_h_health_pattern2 = r'168\s+Unknown_Attribute\s+0x0000\s+\d+\s+\d+\s+\d+\s+\w+\s+\w+\s+-\s+(\d+)'
                dx510_h_match1 = re.search(dx510_h_health_pattern1, dx510_h_output)
                dx510_h_match2 = re.search(dx510_h_health_pattern2, dx510_h_output)
                dx510_h_health = (float(dx510_h_match2.group(1)) - float(dx510_h_match1.group(1))) / float(
                    dx510_h_match2.group(1))
                dx510_h_health_result = '{:.2%}'.format(dx510_h_health)
                # print(dx510_h_health_result)

                if dx510_h_health_result:
                    dx510_health = dx510_h_health_result
                    if health == 'NA' and health == '' and health == 'na':
                        sonic.execute_command('****************************')
                        sonic.execute_command('请检查硬盘健康度是否异常')
                        sonic.execute_command('****************************')
                        return
                    else:
                        if dx510_health.strip('%') != match.group(1):
                            print(dx510_health.strip('%'))
                            print(match.group(1))
                            sonic.execute_command('****************************')
                            sonic.execute_command('硬盘健康度异常,请检查日志')
                            sonic.execute_command('****************************')
                            return
                else:
                    sonic.execute_command('****************************')
                    sonic.execute_command('未找到硬盘健康度')
                    sonic.execute_command('****************************')
                    return

                if dx510_h_model_name:
                    name = dx510_h_model_name.group(1)
                    if name == 'NA' and name == '' and name == 'na':
                        sonic.execute_command('****************************')
                        sonic.execute_command('请检查硬盘name是否异常')
                        sonic.execute_command('****************************')
                        return
                    else:
                        if model_name.group(1).strip('\r') != dx510_h_model_name.group(1).strip('\r'):
                            print(model_name.group(1).strip('\r').encode())
                            print(dx510_h_model_name.group(1).strip('\r').encode())
                            sonic.execute_command('****************************')
                            sonic.execute_command('硬盘name异常')
                            sonic.execute_command('****************************')
                            return
                else:
                    sonic.execute_command('****************************')
                    sonic.execute_command('未找到硬盘name')
                    sonic.execute_command('****************************')
                    return

        for i in range(count):
            sonic.execute_command(
                '***************************item:10 执行次数{}***************************'.format(i + 1))
            health(types=types)
            sonic.execute_command('docker ps -a')
            sonic.execute_command('docker restart pmon')
            time.sleep(60)
            health(types=types)
        sonic.execute_command('***************************item10 执行PASS***************************')

    def optical_module_information_supports_diagnostic_information_viewing(self, count, type):
        """
        item11:光模块信息支持诊断信息查看
        """

        def normal(command, num):
            check = sonic.execute_command(command)
            pattern_check = re.findall(r'lane\d+:\s+Normal', check)
            if len(pattern_check) != num:
                sonic.execute_command('****************************')
                sonic.execute_command('normal查询异常,请查看日志')
                sonic.execute_command('****************************')
                return

        for i in range(count):
            sonic.execute_command(
                '***************************item:11 执行次数{}***************************'.format(i + 1))
            if type == '400G':
                # 检查400G信息
                # 检查在位端口是否为presence
                stat = sonic.execute_command('show interfaces  status  | grep 400G | grep SFP')  # 后期正则判断改为是否up
                prese = sonic.execute_command('show interfaces  transceiver  presence')
                st_up = re.findall(r'Ethernet(\d+)', stat)
                st_pre = re.findall(r'Ethernet(\d+)\s+Present', prese)
                stat_all = []
                prese_all = []
                for s in st_up:
                    stat_all.append(s)
                for u in st_pre:
                    prese_all.append(u)
                if stat_all != prese_all:
                    sonic.execute_command('****************************')
                    sonic.execute_command('端口对应在位状态异常,请查看日志')
                    sonic.execute_command('****************************')
                    return

                for diag in prese_all:
                    diag_nos = sonic.execute_command('cat /sys/ott_sff/{}/diagnostic'.format(diag))
                    if 'NA' in diag_nos:
                        sonic.execute_command('****************************')
                        sonic.execute_command('diagnostic节点读取异常,请查看日志')
                        sonic.execute_command('****************************')
                        return

                    lp_mode = sonic.execute_command('cat /sys/ott_sff/{}/lpmode'.format(diag))
                    pattern_lp = r'lpmode\s*(\d+)\s*admin'
                    a = re.search(pattern_lp, lp_mode)
                    if int(a.group(1)) != 0:
                        sonic.execute_command('****************************')
                        sonic.execute_command('lpmode节点读取异常,请查看日志')
                        sonic.execute_command('****************************')
                        return

                    modu = sonic.execute_command('cat /sys/ott_sff/{}/module_status'.format(diag))
                    pattern_mo = r'(Ready)'
                    b = re.search(pattern_mo, modu)
                    if b.group(1) != 'Ready':
                        sonic.execute_command('****************************')
                        sonic.execute_command('module节点读取异常,请查看日志')
                        sonic.execute_command('****************************')
                        return

                    data = sonic.execute_command('cat /sys/ott_sff/{}/datapath_status'.format(diag))
                    data_stat = re.findall(r'lane\d+:\s+Active', data)
                    if len(data_stat) != 8:
                        sonic.execute_command('****************************')
                        sonic.execute_command('active查询异常,请查看日志')
                        sonic.execute_command('****************************')
                        return
                    normal(command='cat /sys/ott_sff/{}/tx_los'.format(diag), num=8)
                    normal(command='cat /sys/ott_sff/{}/tx_disable'.format(diag), num=8)
                    normal(command='cat /sys/ott_sff/{}/tx_cdr_lol'.format(diag), num=8)
                    normal(command='cat /sys/ott_sff/{}/rx_cdr_lol'.format(diag), num=8)
                    normal(command='cat /sys/ott_sff/{}/tx_fault'.format(diag), num=8)

            # 检查100G信息
            if type == '100G':
                # 检查100G信息
                # 检查在位端口是否为presence
                stat = sonic.execute_command('show interfaces  status  | grep 100G | grep SFP')  # 后期正则判断改为是否up
                prese = sonic.execute_command('show interfaces  transceiver  presence')
                st_up = re.findall(r'Ethernet(\d+)', stat)
                st_pre = re.findall(r'Ethernet(\d+)\s+Present', prese)
                stat_all = []
                prese_all = []
                for s in st_up:
                    stat_all.append(s)
                for u in st_pre:
                    prese_all.append(u)
                if stat_all != prese_all:
                    sonic.execute_command('****************************')
                    sonic.execute_command('端口对应在位状态异常,请查看日志')
                    sonic.execute_command('****************************')
                    return

                for diag in prese_all:
                    diag_nos = sonic.execute_command('cat /sys/ott_sff/{}/diagnostic'.format(diag))
                    if 'NA' in diag_nos:
                        sonic.execute_command('****************************')
                        sonic.execute_command('diagnostic节点读取异常,请查看日志')
                        sonic.execute_command('****************************')
                        return

                    lp_mode = sonic.execute_command('cat /sys/ott_sff/{}/lpmode'.format(diag))
                    pattern_lp = r'lpmode\s*(\d+)\s*admin'
                    a = re.search(pattern_lp, lp_mode)
                    if int(a.group(1)) != 0:
                        sonic.execute_command('****************************')
                        sonic.execute_command('lpmode节点读取异常,请查看日志')
                        sonic.execute_command('****************************')
                        return

                    modu = sonic.execute_command('cat /sys/ott_sff/{}/module_status'.format(diag))
                    pattern_mo = r'(N/A)'
                    b = re.search(pattern_mo, modu)
                    if b.group(1) != 'N/A':
                        sonic.execute_command('****************************')
                        sonic.execute_command('module节点读取异常,请查看日志')
                        sonic.execute_command('****************************')
                        return

                    data = sonic.execute_command('cat /sys/ott_sff/{}/datapath_status'.format(diag))
                    pattern_mo = r'(N/A)'
                    b = re.search(pattern_mo, data)
                    if b.group(1) != 'N/A':
                        sonic.execute_command('****************************')
                        sonic.execute_command('data节点读取异常,请查看日志')
                        sonic.execute_command('****************************')
                        return

                    normal(command='cat /sys/ott_sff/{}/tx_los'.format(diag), num=4)
                    normal(command='cat /sys/ott_sff/{}/tx_disable'.format(diag), num=4)
                    normal(command='cat /sys/ott_sff/{}/tx_cdr_lol'.format(diag), num=4)
                    normal(command='cat /sys/ott_sff/{}/rx_cdr_lol'.format(diag), num=4)
                    normal(command='cat /sys/ott_sff/{}/tx_fault'.format(diag), num=4)

            # 检查Low-power Mode是否为off
            lpmode = sonic.execute_command('show interfaces  transceiver  lpmode')
            lp = re.findall(r'Ethernet(\d+)\s+(\w+)', lpmode)
            for p in lp:
                if p[1] != 'Off':
                    sonic.execute_command('****************************')
                    sonic.execute_command('Low-power异常,请查看日志')
                    sonic.execute_command('****************************')
                    return
        sonic.execute_command('***************************item11 执行PASS***************************')

    def support_to_set_the_interface_optical_power_mode(self, count):
        """
        item12:设备支持设置接口光功率模式
        """
        for i in range(count):
            sonic.execute_command(
                '***************************item:12 执行次数{}***************************'.format(i + 1))
            # 检查在位端口是否为presence
            stat = sonic.execute_command('show interfaces  status  | grep SFP')  # 后期正则判断改为是否up
            st_up = re.findall(r'Ethernet(\d+)', stat)
            stat_all = []

            for s in st_up:
                stat_all.append(s)
            pattern_lpmode = r'lpmode\s*(\d+)\s*admin'

            for i in stat_all:
                sonic.execute_command('sudo config interface transceiver  lpmode Ethernet{} enable'.format(i))
                enable = sonic.execute_command('cat /sys/ott_sff/{}/lpmode'.format(i))
                a = re.search(pattern_lpmode, enable)
                if int(a.group(1)) != 1:
                    sonic.execute_command('****************************')
                    sonic.execute_command('lpmode节点读取异常,请查看日志')
                    sonic.execute_command('****************************')
                    return
                sonic.execute_command('sudo config interface transceiver  lpmode Ethernet{} disable'.format(i))
                disable = sonic.execute_command('cat /sys/ott_sff/{}/lpmode'.format(i))
                b = re.search(pattern_lpmode, disable)
                if int(b.group(1)) != 0:
                    sonic.execute_command('****************************')
                    sonic.execute_command('lpmode节点读取异常,请查看日志')
                    sonic.execute_command('****************************')
                    return
        sonic.execute_command('***************************item12 执行PASS***************************')

    def device_supports_no_abnormal_restart_after_power_failure(self, count):
        """
        item13:设备支持掉电重启后无异常
        """
        for i in range(count):
            sonic.execute_command(
                '***************************item:13 执行次数{}***************************'.format(i + 1))
            # 执行掉电操作
            apc()
            # 检查容器状态
            docker_ps = sonic.execute_command('docker ps -a')
            pattern = r"(\w{12})\s+.*?\s+(Up|Exited)\s+.*?\s"
            matches = re.findall(pattern, docker_ps)

            for match in matches:
                container_id, status = match
                if status != 'Up':
                    sonic.execute_command('****************************')
                    sonic.execute_command('容器ID: {}, 状态: {} 异常'.format(container_id, status))
                    sonic.execute_command('****************************')
                    return

            # 检查风扇状态
            pattern = r'intake\s+Present\s+OK'
            text = sonic.execute_command('show platform fan')
            matches = re.findall(pattern, text)
            if len(matches) != 16:
                sonic.execute_command('****************************')
                sonic.execute_command('请检查风扇状态')
                sonic.execute_command('****************************')
                return

            # 检查模块状态,以及复位光模块后,再次检查状态
            stat = sonic.execute_command('show interfaces  status  | grep SFP')  # 后期正则判断改为是否up
            prese = sonic.execute_command('show interfaces  transceiver  presence')
            st_up = re.findall(r'Ethernet(\d+)', stat)
            st_pre = re.findall(r'Ethernet(\d+)\s+Present', prese)
            stat_all = []
            prese_all = []
            for s in st_up:
                stat_all.append(s)
            for u in st_pre:
                prese_all.append(u)
            if stat_all != prese_all:
                sonic.execute_command('****************************')
                sonic.execute_command('端口对应在位状态异常,请查看日志')
                sonic.execute_command('****************************')
                return

            time.sleep(120)
            for i in stat_all:
                sonic.execute_command('sudo config interface transceiver reset Ethernet{}'.format(i))
                time.sleep(5)

            stat_all.clear()
            prese_all.clear()

            stat = sonic.execute_command('show interfaces  status  | grep SFP')  # 后期正则判断改为是否up
            prese = sonic.execute_command('show interfaces  transceiver  presence')
            st_up = re.findall(r'Ethernet(\d+)', stat)
            st_pre = re.findall(r'Ethernet(\d+)\s+Present', prese)
            stat_all = []
            prese_all = []
            for s in st_up:
                stat_all.append(s)
            for u in st_pre:
                prese_all.append(u)
            if stat_all != prese_all:
                sonic.execute_command('****************************')
                sonic.execute_command('端口对应在位状态异常,请查看日志')
                sonic.execute_command('****************************')
                return

            stat_all.clear()
            prese_all.clear()

            stat = sonic.execute_command('show interfaces  status  | grep SFP')  # 后期正则判断改为是否up
            st_up = re.findall(r'Ethernet(\d+)', stat)
            stat_all = []

            for s in st_up:
                stat_all.append(s)
            pattern_lpmode = r'lpmode\s*(\d+)\s*admin'

            for i in stat_all:
                sonic.execute_command('sudo config interface transceiver  lpmode Ethernet{} enable'.format(i))
                enable = sonic.execute_command('cat /sys/ott_sff/{}/lpmode'.format(i))
                a = re.search(pattern_lpmode, enable)
                if int(a.group(1)) != 1:
                    sonic.execute_command('****************************')
                    sonic.execute_command('lpmode节点读取异常,请查看日志')
                    sonic.execute_command('****************************')
                    return
                sonic.execute_command('sudo config interface transceiver  lpmode Ethernet{} disable'.format(i))
                disable = sonic.execute_command('cat /sys/ott_sff/{}/lpmode'.format(i))
                b = re.search(pattern_lpmode, disable)
                if int(b.group(1)) != 0:
                    sonic.execute_command('****************************')
                    sonic.execute_command('lpmode节点读取异常,请查看日志')
                    sonic.execute_command('****************************')
                    return

            stat_all.clear()
            prese_all.clear()

            stat = sonic.execute_command('show interfaces  status  | grep SFP')  # 后期正则判断改为是否up
            prese = sonic.execute_command('show interfaces  transceiver  presence')
            st_up = re.findall(r'Ethernet(\d+)', stat)
            st_pre = re.findall(r'Ethernet(\d+)\s+Present', prese)
            stat_all = []
            prese_all = []
            for s in st_up:
                stat_all.append(s)
            for u in st_pre:
                prese_all.append(u)
            if stat_all != prese_all:
                sonic.execute_command('****************************')
                sonic.execute_command('端口对应在位状态异常,请查看日志')
                sonic.execute_command('****************************')
                return

            stat_all.clear()
            prese_all.clear()

        sonic.execute_command('***************************item13 执行PASS***************************')

    def there_is_no_abnormal_memory_of_the_whole_machine_and_each_container_in_the_steady_state_of_the_device(self,
                                                                                                              count):
        """
        item14:设备稳态下整机和各容器内存无异常----需要确认
        """
        for i in range(count):
            sonic.execute_command(
                '***************************item:14 执行次数{}***************************'.format(i + 1))
            sonic.execute_command('show dockers stats')
            sonic.execute_command('show processes cpu')
            sonic.execute_command('show processes memory')
        sonic.execute_command('***************************item14 执行PASS***************************')

    def warm_reboot_reboot_after_the_upgrade_the_configuration_returned_to_normal(self, count, test_version):
        """
        item15:warm-reboot升级后重启,配置恢复正常
        """
        for i in range(count):
            sonic.execute_command(
                '***************************item:15 执行次数{}***************************'.format(i + 1))
            # 进行上传文件的操作
            pattern_current_version = r'Current:\s+(.*)'
            curretn_version = sonic.execute_command('sudo sonic_installer list')
            result_current_version = re.search(pattern_current_version, curretn_version)

            sonic.execute_command('curl -O http://172.21.120.100:8001/{}'.format(test_version))
            sonic.execute_command('sudo sonic_installer install {} -y'.format(test_version))
            time.sleep(20)

            check_number1 = sonic.execute_command('show warm_restart state ')
            number1 = re.findall(r'\b\d+\b', check_number1)

            sonic.execute_command('sudo warm-reboot -y', 0)
            time.sleep(30)
            sonic.login()
            time.sleep(150)

            pattern_next_version = r'Current:\s+(.*)'
            next_version = sonic.execute_command('sudo sonic_installer list')
            result_next_version = re.search(pattern_next_version, next_version)

            version_image = sonic.execute_command('docker images')
            # 使用re.findall()函数,编写正则表达式来匹配每个镜像的REPOSITORY和SIZE
            matches = re.findall(r'(\w{12})\s+.*?\s+(\d+)MB', version_image)

            check_number2 = sonic.execute_command('show warm_restart state ')
            number2 = re.findall(r'\b\d+\b', check_number2)

            for x, y in zip(number1[:15], number2[:15]):
                if int(x) + 1 == int(y):
                    sonic.execute_command('warm_reboot 1')
                if int(x) + 1 > int(y):
                    sonic.execute_command('warm_reboot 2')
                if int(x) + 1 < int(y):
                    sonic.execute_command('warm_reboot 3')

            # 遍历匹配结果,将SIZE转换为MB,然后判断是否大于400MB
            for match in matches:
                repository, size = match
                size = int(size)
                if size < 400:
                    sonic.execute_command('****************************')
                    sonic.execute_command('镜像 {}: 的大小为 {}MB,小于400MB'.format(repository, size))
                    sonic.execute_command('****************************')
                    return

            sonic.execute_command('sudo sonic_installer set_default {}'.format(result_current_version.group(1)))
            time.sleep(10)
            sonic.execute_command('sudo reboot -y', 0)
            time.sleep(30)
            sonic.login()
            time.sleep(150)

            version_image = sonic.execute_command('docker images')
            # 使用re.findall()函数,编写正则表达式来匹配每个镜像的REPOSITORY和SIZE
            matches = re.findall(r'(\w{12})\s+.*?\s+(\d+)MB', version_image)

            # 遍历匹配结果,将SIZE转换为MB,然后判断是否大于400MB
            for match in matches:
                repository, size = match
                size = int(size)
                if size < 400:
                    sonic.execute_command('****************************')
                    sonic.execute_command('镜像 {}: 的大小为 {}MB,小于400MB'.format(repository, size))
                    sonic.execute_command('****************************')
                    return
            sonic.execute_command('sudo sonic_installer remove {} -y'.format(result_next_version.group(1)))

        sonic.execute_command('***************************item15 执行PASS***************************')

    def dog_starvation_scene_enhanced_logging_information(self, count):
        """
        item16:狗饿死场景增强日志记录信息
        """
        for i in range(count):
            sonic.execute_command(
                '***************************item:16 执行次数{}***************************'.format(i + 1))
            sonic.execute_command('sudo systemctl stop watchdog.service', 0)
            time.sleep(20)
            sonic.login()
            time.sleep(120)
            watch1 = sonic.execute_command('show logging syslog | grep "dying_gasp_handle_showstate end"')
            watch2 = sonic.execute_command('show logging syslog | grep "debug information collected, cost"')
            check1 = 'dying_gasp_handle_showstate end'
            check2 = 'debug information collected, cost'
            sonic.execute_command('timedatectl')
            if check1 not in watch1 and check2 not in watch2:
                sonic.execute_command('请检查日志信息,当前日志有误')
                return

            cause = sonic.execute_command('show reboot-cause')
            if 'Watchdog reboot' not in cause:
                return

            cause_history = sonic.execute_command('show reboot-cause  history')
            if 'Watchdog reboot' not in cause and '1        Watchdog reboot' not in cause_history:
                return
        sonic.execute_command('***************************item16 执行PASS***************************')

    def firmware_upgrade_records_the_reason_for_the_reset(self, count, test_version):
        """
        item17:固件升级记录复位原因
        """
        for i in range(count):
            sonic.execute_command(
                '***************************item:17 执行次数{}***************************'.format(i + 1))
            # 进行上传文件的操作
            pattern_current_version = r'Current:\s+(.*)'
            curretn_version = sonic.execute_command('sudo sonic_installer list')
            result_current_version = re.search(pattern_current_version, curretn_version)

            sonic.execute_command('curl -O http://172.21.120.100:8001/{}'.format(test_version))
            sonic.execute_command('sudo sonic_installer install {} -y'.format(test_version))
            time.sleep(20)

            sonic.execute_command('sudo reboot -y', 0)
            time.sleep(30)
            sonic.login()
            time.sleep(150)

            sonic.execute_command('show version')
            sonic.execute_command('show platform  firmware  updates')

            pattern_next_version = r'Current:\s+(.*)'
            next_version = sonic.execute_command('sudo sonic_installer list')
            result_next_version = re.search(pattern_next_version, next_version)

            sonic.execute_command('sudo sonic_installer set_default {}'.format(result_current_version.group(1)))
            time.sleep(10)
            sonic.execute_command('sudo reboot -y', 0)
            time.sleep(30)
            sonic.login()
            time.sleep(150)

            sonic.execute_command('show version')
            sonic.execute_command('show platform  firmware  updates')

            cause = sonic.execute_command('show reboot-cause')
            if 'Firmware upgrade: device boot with FPGA CPLD upgrade' not in cause:
                return

            cause_history = sonic.execute_command('show reboot-cause  history')
            if 'Software reboot' not in cause and 'Firmware upgrade: device boot with FPGA CPLD upgrade' not in cause_history:
                return

            sonic.execute_command('sudo sonic_installer remove {} -y'.format(result_next_version.group(1)))

        sonic.execute_command('***************************item17 执行PASS***************************')

    def power_down_and_restart_to_record_the_reset_reason(self, count):
        """
        item18:掉电重启记录复位原因
        """
        for i in range(count):
            sonic.execute_command(
                '***************************item:18 执行次数{}***************************'.format(i + 1))
            sonic.execute_command('sudo systemctl stop watchdog.service', 0)
            time.sleep(20)
            sonic.login()
            time.sleep(120)

            sonic.execute_command('timedatectl')

            cause = sonic.execute_command('show reboot-cause')
            if 'Watchdog reboot' not in cause:
                return

            cause_history = sonic.execute_command('show reboot-cause  history')
            if 'Watchdog reboot' not in cause and '1        Watchdog reboot' not in cause_history:
                return

            apc()
            sonic.execute_command('timedatectl')
            cause = sonic.execute_command('show reboot-cause')
            if 'PowerOff reboot' not in cause:
                return

            cause_history = sonic.execute_command('show reboot-cause  history')
            if 'PowerOff reboot' not in cause and 'DeviceColdReboot: device boot with EC power cycle, boot status [0x00]' not in cause_history:
                return

        sonic.execute_command('***************************item18 执行PASS***************************')


if __name__ == '__main__':
    a = Dx510H()
    # # item1:设备基本信息查询---配置信息
    # item_1_count = 1
    # item_1_version = '2.23.RC5-venus-B096'
    # a.equipment_basic_information_query(count=item_1_count, version=item_1_version)
    #
    # # item2:设备告警测试---配置信息
    # item_2_count = 1
    # item_2_alarm = 'No alarm data available'
    # a.equipment_alarm_test(count=item_2_count, alarm=item_2_alarm)
    #
    # # item3:设备光模块信息查询和命令行注入测试---配置信息
    # item_3_count = 1
    # item_3_start_port = 11
    # item_3_end_port = 12
    # a.device_optical_module_information_query_and_command_line_injection_test(count=item_3_count,
    #                                                                           start_port=item_3_start_port,
    #                                                                           end_port=item_3_end_port)
    # # item4: 光模块动态信息刷新正常---配置信息
    # item_4_count = 1
    # item_4_start_port = 11
    # item_4_end_port = 12
    # a.dynamic_information_refresh(count=item_4_count, start_port=item_4_start_port, end_port=item_4_end_port)
    #
    # # item5: xcvrd进程正常运行---配置信息
    # item_5_count = 1
    # a.xcvrd_process_runs_normally(count=item_5_count)
    #
    # # item6:查询BMC固件信息---配置信息
    # item_6_count = 1
    # a.query_bmc_firmware_information(count=item_6_count)
    #
    # # item7:bios和cpld状态检查---配置信息
    # item_7_count = 1
    # item_7_cpld = '2.20 / 2.20'
    # item_7_fpga = '1.5 / 1.5'
    # item_7_bios = 'v09.0B.00.18 / v09.0B.00.18'
    # item_7_status = 'up-to-date'
    # a.bios_and_cpld_status_check(count=item_7_count, cpld=item_7_cpld, fpga=item_7_fpga, bios=item_7_bios,
    #                              status=item_7_status)
    #
    # # item8:设备存在after_install_bios.sh文件---配置信息
    # item_8_count = 1
    # item_8_image = '2.23.RC5-venus-B096'
    # a.aftedevice_exists_in_the_after_install_bios_filer(count=item_8_count, image=item_8_image)
    #
    # # item9:通过yang读取设备实时功率和额定功率---配置信息
    # item_9_count = 2
    # a.read_device_real_time_power_and_rated_power(count=item_9_count)

    # item10:设备支持健康度查询---配置信息
    item_10_count = 1
    item_10_type = 'DX510-H'
    a.device_supports_health_query(count=item_10_count, types=item_10_type)
    #
    # # item11:光模块信息支持诊断信息查看---配置信息
    # item_11_count = 1
    # item_11_type = '400G'
    # a.optical_module_information_supports_diagnostic_information_viewing(count=item_11_count, type=item_11_type)
    #
    # # item12:设备支持设置接口光功率模式---配置信息
    # item_12_count = 1
    # a.support_to_set_the_interface_optical_power_mode(count=item_12_count)
    #
    # # item13:设备支持掉电重启后无异常---配置信息
    # item_13_count = 1
    # a.device_supports_no_abnormal_restart_after_power_failure(count=item_13_count)
    #
    # # item14:设备稳态下整机和各容器内存无异常--配置信息
    # item_14_count = 1
    # a.there_is_no_abnormal_memory_of_the_whole_machine_and_each_container_in_the_steady_state_of_the_device(
    #     count=item_14_count)
    #
    # # item15:warm-reboot升级后重启,配置恢复正常---配置信息
    # item_15_count = 1
    # item_15_test_version = 'DX510-H_2.24.RC1.B059.bin'
    # a.warm_reboot_reboot_after_the_upgrade_the_configuration_returned_to_normal(count=item_15_count,
    #                                                                             test_version=item_15_test_version)
    # # item16:狗饿死场景增强日志记录信息---配置信息
    # item_16_count = 1
    # a.dog_starvation_scene_enhanced_logging_information(count=item_16_count)
    #
    # # item17:固件升级记录复位原因---配置信息
    # item_17_count = 1
    # item_17_test_version = 'DX510-H_2.24.RC1.B059.bin'
    # a.firmware_upgrade_records_the_reason_for_the_reset(count=item_17_count, test_version=item_17_test_version)
    #
    # # item18:掉电重启记录复位原因---配置信息
    # item_18_count = 1
    # a.power_down_and_restart_to_record_the_reset_reason(count=item_18_count)

 

标签:count,sonic,execute,item,用例,华为,command,自动化,re
From: https://www.cnblogs.com/path602/p/18198115

相关文章

  • 关于华为eNSP模拟器的端口占用问题
    一、关闭虚拟化:按下WIN+R输入cmd,按ctrl+shift+enter以管理员身份运行命令提示符输入以下代码:bcdedit/sethypervisorlaunchtypeoff回车执行二、关闭Hyper-V和虚拟机平台:打开控制面板->程序->启用或关闭windows功能把Hyper-V和虚拟机平台关闭重启电脑......
  • Mybatis-plus多租户用例
    我们可以通过注解+配置实现项目多租户数据隔离注解定义:importjava.lang.annotation.ElementType;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;@Retention(RetentionPolicy.RUNTIME)@Target(......
  • 电子设计自动化实验报告
    本文内容基于电子设计自动化老师发的实验报告模板,内容仅供参考。实验一:LED流水灯设计一、实验目的熟悉QuartusII开发环境掌握FPGA开发流程二、实验内容及原理1.实验内容通过设计计数器,实现LED流水灯的效果2.实验原理LED硬件电路从上面的LED部分原理图可......
  • 电子设计自动化-实验报告
    实验一:LED流水灯设计一、实验目的熟悉QuartusII开发环境掌握FPGA开发流程二、实验内容及原理实验内容通过设计计数器,实现LED流水灯的效果实验原理LED硬件电路Fig.2.1AX301开发板LED部分原理图从上面的LED部分原理图可以看出,LED电路有两个方式,AX301......
  • 一键自动化博客发布工具,用过的人都说好(掘金篇)
    终于要讲解我们亲爱的掘金了。掘金是一个非常不错的平台。所以很多朋友会把博客发布到掘金上。发布到掘金要填写的内容也比较多。今天给大家介绍一下如何用blog-auto-publishing-tools这个工具自动把博客发布到掘金平台上去。前提条件前提条件当然是先下载blog-auto-publishin......
  • 自动化建设度量
    一、核心能力及建设要求1、资产维度 资产维度更多是在Case构建维度,如何构建出高质量自动化,主要从稳定性和覆盖率维度做引导;稳定性:希望能引导工程师关注测试用例执行结果,不断提升测试用例的可执行性和代码逻辑的正确性,用例执行通过率越高越好;覆盖率:希望能引导工程师通过用例......
  • BOSHIDA AC/DC电源模块在工业自动化领域的应用探析
    BOSHIDAAC/DC电源模块在工业自动化领域的应用探析AC/DC电源模块是一种将交流电转换为直流电的电力转换设备,在工业自动化领域具有广泛的应用。本文将从稳定性、效率和可靠性三个方面对AC/DC电源模块在工业自动化领域的应用进行探析。 首先,AC/DC电源模块在工业自动化领域的应......
  • 如何汇报自动化测试的成果
    星球里有同学问了这样一个问题:自动化测试开展了一段时间,现在需要给领导汇报成果,该怎么汇报?表面看起来这是一个技术问题,实际上这是一个向上管理问题。那么该如何向领导汇报自动化测试创造的成果呢?我们不妨从它的源头出发,思考这几个问题:为什么做自动化测试?预期的目标和结果是什......
  • 共创数字经济新生态,华为云生态领航者·AI先遣队圆满落幕
    5月9-11日,华为云生态“领航者·AI先遣队”课程班在杭州成功举办,本次课程聚焦于AI前沿探索与实践的高端研修,汇聚了来自华为云生态伙伴企业的高层决策者,共同开启了一场关于智慧升级与生态共建的深度对话。华为云生态领航者·AI先遣队大合影AI与大模型,数字经济下的时代命题随着......
  • 接口自动化测试框架【python+requests+pytest+excel+allure+jenkins】
    一.在整个框架中需要用到哪些东西?1.python环境安装https://www.python.org/downloads/windows/下载解压后直接安装,选择Addpythontopath2.JAVA环境配置安装包下载地址:https://www.oracle.com/java/technologies/downloads/环境变量设置参数如下:●变量名:JAVA_HOME●变......