首页 > 编程语言 >使用python对交换机进行排障自动化运维(锐捷)

使用python对交换机进行排障自动化运维(锐捷)

时间:2024-10-09 15:22:06浏览次数:19  
标签:pd 锐捷 运维 python list tn re result str

import glob
import telnetlib
import re
from datetime import datetime
from time import sleep

import pandas as pd
import os
import time

from matplotlib import pyplot as plt


# Telnet 连接函数
def connect_telnet(hostname, username, password):
    try:
        tn = telnetlib.Telnet()
        tn.open(hostname, port=23, timeout=5)  # 尝试连接 Telnet
        print("connected......", hostname)

        # 输入用户名
        tn.read_until(b'Username:', timeout=5)
        tn.write(username.encode('ascii') + b'\n')

        # 输入密码
        tn.read_until(b'Password:', timeout=5)
        tn.write(password.encode('ascii') + b'\n')

        # 检查是否成功登录
        login_result = tn.read_until(b'#', timeout=5)
        if b'#' not in login_result:
            print('登录失败!', hostname)
            tn.close()  # 登录失败,关闭连接
            return None
        else:
            print('登录成功!', hostname)
            return tn  # 登录成功,返回 Telnet 对象
    except Exception as e:
        print(f"连接失败: {e}")
        return None  # 连接失败,返回 None


# 执行命令的函数:执行 Telnet 下的相关命令并返回结果
def execute_ip_ospf_neighbor_detail(tn, hostname):
    # 执行第一个命令:显示当前系统时间
    command1 = bytes("show clock", encoding='utf-8')
    tn.write(command1 + b'\r\n')  # 发送命令并执行
    command1_result = tn.read_until(b'#')  # 读取直到命令提示符结束的结果
    command1_result = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2004", command1_result.decode('GB18030'))[0]

    # 执行第二个命令:显示 OSPF 邻居详情
    command2 = bytes("show ip ospf neighbor detail", encoding='utf-8')
    tn.write(command2 + b'\r\n')  # 发送命令

    result_list = []
    # 持续读取 OSPF 命令结果,直到读取完成
    while True:
        command2_result = tn.read_very_eager().decode('ascii')
        result_list.append(command2_result)
        if re.findall(r"--More--", command2_result.strip()):  # 如果命令输出有 "--More--" 字样,按空格获取下一页
            tn.write(b" ")
        elif re.findall(r"#", command2_result.strip()):  # 如果命令输出结束,退出循环
            break
        else:
            time.sleep(1)
            continue

    result_str = "\n".join(result_list)  # 将结果列表拼接成一个字符串

    # 将命令执行的结果存储为字典
    dict_output = {}
    dict_output["host_ip"] = hostname
    dict_output["time"] = command1_result  # 存储命令 1(系统时间)结果

    # 使用正则表达式提取 OSPF 邻居详细信息
    dict_output["OSPF Router with ID"] = re.search(r'OSPF Router with ID (.+)', result_str).group(1)
    dict_output["Neighbor"] = re.search(r'Neighbor\s+(\d+.\d+.\d+.\d+)', result_str).group(1)
    dict_output["area"] = re.search(r'In the area\s+(.+)', result_str).group(1)
    dict_output["State"] = re.search(r'State\s+(\w+), ', result_str).group(1)

    # 将字典数据转换为 DataFrame 并保存为 CSV 文件
    pd_output = pd.DataFrame.from_dict([dict_output])
    pd_output['time'] = pd.to_datetime(pd_output['time'])
    pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))

    ttt = str(datetime.now())
    # pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + '.csv', mode='a', index=None, encoding='gb18030')
    pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + ttt +'.csv', index=None, encoding='gb18030')

# 示例调用:执行 "show interface brief" 命令
def execute_interface_brief(tn, hostname):
    command = bytes("show interface brief", encoding='utf-8')
    tn.write(command + b'\r\n')

    result_list = []
    # 持续读取命令结果,处理 "--More--" 分页输出
    while True:
        command_result = tn.read_very_eager().decode('ascii')
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")
        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(1)
            continue

    result_str = "\n".join(result_list)
    list_str = result_str.split('\n')  # 将结果按行分割成列表

    list_temperature_vec = []
    # 遍历结果行,查找符合正则表达式的内容
    for j in list_str:
        regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
        if len(re.findall(r"Interface", j)) > 0:
            new_columns = list_find_str = re.split(r'\s+', j)  # 提取列名
            new_columns = new_columns[0:8]

        if len(regex.findall(j)) > 0:
            list_find_str = regex.findall(j)[0]
            list_find_str = re.split(r'\s+', list_find_str)  # 提取数据行
            list_temperature_vec.append(list_find_str)

    pd_result = pd.DataFrame(list_temperature_vec)
    pd_result.columns = new_columns  # 设置 DataFrame 列名
    pd_result.to_csv('./' + os.sep + r'interface_brief' + '-' + str(hostname)+ str(datetime.now()) + '.csv', encoding='gb18030')  # 保存结果
    return pd_result  # 返回结果 DataFrame


# 示例调用:光功率1:执行 "show opticalinfo brief" 命令
def execute_opticalinfo_brief(tn, hostname):
    command = bytes("show opticalinfo brief", encoding='utf-8')
    tn.write(command + b'\r\n')

    result_list = []
    # 持续读取命令结果,处理 "--More--" 分页输出
    while True:
        command_result = tn.read_very_eager().decode('ascii')
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")
        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(1)
            continue

    result_str = "\n".join(result_list)
    list_str = result_str.split('\n')  # 将结果按行分割成列表

    list_temperature_vec = []
    # 遍历结果行,查找符合正则表达式的内容
    for j in list_str:
        regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
        if len(re.findall(r"Interface", j)) > 0:
            new_columns = list_find_str = re.split(r'\s+', j)  # 提取列名
            new_columns = new_columns[0:6]

        if len(regex.findall(j)) > 0:
            list_find_str = regex.findall(j)[0]
            list_find_str = re.split(r'\s+', list_find_str)  # 提取数据行
            list_temperature_vec.append(list_find_str)

    pd_result = pd.DataFrame(list_temperature_vec)
    pd_result.columns = new_columns  # 设置 DataFrame 列名
    pd_result.to_csv('./' + os.sep + r'interface_brief' + '-' + str(hostname) + str(datetime.now()) + '.csv',
                     encoding='gb18030')  # 保存结果
    return pd_result  # 返回结果 DataFrame


# 示例调用:光功率2:执行 "show opticalinfo xgei-0/3/0/2" 命令
# 这个命令还得现场看下,可以参考show interface xgei-0/2/0/13
# def execute_opticalinfo_xgei(tn, hostname):

# 示例调用:执行 "show ip interface brief" 命令,类似上面的函数
def execute_ip_interface_brief(tn, hostname):
    command = bytes("show ip interface brief", encoding='utf-8')
    tn.write(command + b'\r\n')

    result_list = []
    # 持续读取结果
    while True:
        command_result = tn.read_very_eager().decode('ascii')
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")
        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(1)
            continue

    result_str = "\n".join(result_list)
    list_str = result_str.split('\n')

    list_temperature_vec = []
    for j in list_str:
        if len(re.findall(r"Interface", j)) > 0:
            new_columns = re.split(r'\s+', j)  # 提取列名
            print("new_columns111111111:",new_columns)
            new_columns = new_columns[0:6]

        regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
        print(regex)
        if len(regex.findall(j)) > 0:
            list_find_str = regex.findall(j)[0]
            list_find_str = re.split(r'\s+', list_find_str)
            list_temperature_vec.append(list_find_str)

    pd_result = pd.DataFrame(list_temperature_vec)
    pd_result.columns = new_columns  # 设置列名
    pd_result.to_csv('./' + os.sep + r'ip_interface_brief' + '-' + str(hostname) + '.csv', encoding='gb18030')  # 保存结果
    return pd_result


# 示例调用:执行 "show interface xgei-0/2/0/13" 命令,获取接口的详细信息
def execute_show_interface_xgei(tn, hostname):
    command = bytes("show interface xgei-0/2/0/2", encoding='utf-8')
    tn.write(command + b'\r\n')

    result_list = []
    # 持续读取结果
    while True:
        command_result = tn.read_very_eager().decode('ascii')
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")
        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(1)
            continue

    result_str = "\n".join(result_list)

    # 将结果存储为字典
    dict_output = {}
    dict_output["time"] = datetime.now()
    dict_output["host_ip"] = hostname
    dict_output['interface'] = re.search(r'\w+gei-\d\/\d\/\d\/\d+', result_str).group(0)
    dict_output['In_Bytes'] = re.search(r'In_Bytes\s+(\d+)', result_str).group(1)
    dict_output['E_Bytes'] = re.search(r'E_Bytes\s+(\d+)', result_str).group(1)

    # 保存结果为 CSV 文件
    pd_output = pd.DataFrame.from_dict([dict_output])
    # pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', mode='a', index=None, encoding='gb18030')
    pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', index=None, encoding='gb18030')

def excute_ping(tn, hostname):
    command = bytes("ping 127.0.0.1", encoding='utf-8')
    tn.write(command + b'\r\n')
    time.sleep(5)
    result_list = []
    while True:
        command_result = tn.read_very_eager().decode('ascii')
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")
        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(1)
            continue
    # time.sleep(20)
    result_str = "\n".join(result_list)
    # print(result_str)

    # 解析结果并存储
    dict_output = {}
    dict_output["host_ip"] = hostname
    dict_output["info"] = result_str
    pd_output = pd.DataFrame.from_dict([dict_output])

    pd_output.to_csv('./' + os.sep + r'ping' + '-' + 'hostname' + str(datetime.now()) + '.csv', mode='a', index=None, encoding='gb18030')


def files_to_one():


    file_list = glob.glob(os.path.join(os.path.abspath('./'), r'ospf*.csv'))
    pd_data_o = pd.DataFrame()
    for file in file_list:
        try:
            pd_sheet = pd.read_csv(file, encoding='gb18030', doublequote=False,
                                   converters={u'code': str}, engine="python")
        except:
            print('读取异常')
        pd_data_o = pd.concat([pd_data_o, pd_sheet], axis=0)
        pd_data_o.to_csv('./' + os.sep + r'ospf' + '.csv', index=None, encoding='gb18030')


if __name__ == '__main__':

    '''
    正则表达式:https://regex101.com/
    '''
    # hostname 列表,包含多个主机 IP 地址
    # hostnames = ['127.0.0.1', '127.0.0.1', '127.0.0.1']  # 这是一个示例列表
    hostnames = ['127.0.0.1']  # 这是一个示例列表
    # hostnames = ['127.0.0.1', '127.0.0.1']  # 这是一个示例列表
    username = 'huawei'
    password = 'huawei'

    # 遍历每个 hostname,依次进行 Telnet 连接和命令执行
    for hostname in hostnames:
        print(f"正在连接 {hostname} ...")

        # 连接 Telnet
        tn = connect_telnet(hostname, username, password)

        # 如果连接成功,执行命令
        if tn:
            print(f"连接成功 {hostname}, 执行命令中...")

            # 执行 show ip ospf neighbor detail 命令 ,信息为excel形式,每个主机只有一行,做一个表格合并就行files_to_one(),不需要做可视化了
            # execute_ip_ospf_neighbor_detail(tn, hostname)

            # 可以根据需要取消注释执行其他命令
            # 返回的数据为二层接口的状态
            # execute_interface_brief(tn, hostname)


            # execute_ip_interface_brief(tn, hostname)
            execute_show_interface_xgei(tn, hostname)
            # excute_ping(tn,hostname)

            # 这三个可参考excute_ping()方法进行处理
            # execute_show_opentical(tn, hostname)
            # excute_show_runningconfig(tn,hostname)
            # excute_show_alarm_current(tn,hostname)

            # 关闭 Telnet 连接
            tn.close()
            print(f"{hostname} 的操作已完成,连接关闭。\n")
        else:
            print(f"无法连接到 {hostname},请检查连接或主机状态。\n")

    # 合并文件
    # files_to_one()


####################
import re
import time
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
from datetime import datetime
import telnetlib
import os

# 采集接口流量数据并保存到CSV文件
def execute_show_interface_xgei(tn, hostname, interval=5, num_samples=10):
    all_data = []  # 用于存储每次采集的数据

    for i in range(num_samples):
        command = bytes("show interface xgei-0/2/0/2", encoding='utf-8')
        tn.write(command + b'\r\n')

        result_list = []
        # 持续读取结果
        while True:
            command_result = tn.read_very_eager().decode('ascii')
            result_list.append(command_result)
            if re.findall(r"--More--", command_result.strip()):
                tn.write(b" ")
            elif re.findall(r"#", command_result.strip()):
                break
            else:
                time.sleep(1)
                continue

        result_str = "\n".join(result_list)

        # 将结果存储为字典
        dict_output = {}
        dict_output["time"] = datetime.now()
        dict_output["host_ip"] = hostname
        dict_output['interface'] = re.search(r'\w+gei-\d\/\d\/\d\/\d+', result_str).group(0)
        dict_output['In_Bytes'] = re.search(r'In_Bytes\s+(\d+)', result_str).group(1)
        dict_output['E_Bytes'] = re.search(r'E_Bytes\s+(\d+)', result_str).group(1)

        # 将数据添加到总列表中
        all_data.append(dict_output)

        # 每隔 interval 秒采集一次
        time.sleep(interval)

    # 保存结果为 CSV 文件
    pd_output = pd.DataFrame.from_dict(all_data)
    # 确保文件夹存在
    # if not os.path.exists('./data'):
    #     os.makedirs('./data')
    # pd_output.to_csv('./show_interface_xgei.csv', mode='a', index=None, encoding='gb18030')
    pd_output.to_csv('./show_interface_xgei.csv', mode='a', index=None, encoding='gb18030')


# 从CSV文件中读取数据并进行图形化展示
def plot_traffic_data(csv_file='./show_interface_xgei.csv'):
    try:
        # 从CSV文件中读取数据
        data = pd.read_csv(csv_file)

        # 转换时间列为datetime类型
        # data['time'] = pd.to_datetime(data['time'], format='%Y-%m-%d %H:%M:%S')
        data['time'] = pd.to_datetime(data['time'], format='mixed')
        data.sort_values('time', inplace=True)  # 按时间排序
        data.reset_index(level=None, drop=True, inplace=True)

        # 计算字节差分值
        data['In_Bytes'] = data['In_Bytes'].diff(1)
        data['E_Bytes'] = data['E_Bytes'].diff(1)

        # 删除含有缺失值的行
        data.dropna(axis=0, how='any', inplace=True)

    except Exception as e:
        print(e)
        return

    # 绘制流量变化趋势图
    plt.figure(figsize=(10, 6))
    plt.plot(data['time'], data['In_Bytes'], label='In_Bytes', marker='o')
    plt.plot(data['time'], data['E_Bytes'], label='E_Bytes', marker='o')
    plt.title('Interface Traffic Over Time')
    plt.xlabel('Time')
    plt.ylabel('Bytes')
    plt.legend()
    plt.grid(True)
    plt.show()

    # 正态分布图和统计分析
    for i in ['In_Bytes', 'E_Bytes']:
        X = data[i].astype(float)

        # 绘制直方图
        plt.figure(figsize=(10, 6))
        plt.title(i + ' Histogram')
        plt.hist(X, bins=50)
        plt.show()

        # 绘制概率密度图
        plt.figure(figsize=(10, 6))
        plt.title(i + ' Probability Density')
        sns.kdeplot(X, kernel='gau', color="g", alpha=.7)
        plt.show()

        # 偏度和峰度计算
        print(f"{i} Skewness:", stats.skew(X))
        print(f"{i} Kurtosis:", stats.kurtosis(X))

    # 计算协方差和相关系数
    covariance = data['In_Bytes'].cov(data['E_Bytes'])
    correlation = data['In_Bytes'].corr(data['E_Bytes'])

    print("Covariance:", covariance)
    print("Correlation:", correlation)


if __name__ == '__main__':
    # 示例Telnet连接信息
    hostname = '127.0.0.1'
    username = 'huawei'
    password = 'huawei'

    try:
        # 连接Telnet
        tn = telnetlib.Telnet(hostname)
        tn.read_until(b'Username:')
        tn.write(username.encode('ascii') + b'\n')
        tn.read_until(b'Password:')
        tn.write(password.encode('ascii') + b'\n')

        # 执行采集命令,每隔5秒采集一次,连续采集10次
        # while True: 如果注释打开,记得缩进两个空格
        execute_show_interface_xgei(tn, hostname, interval=5, num_samples=10)
        tn.close()

        # 绘制流量图形化展示
        # plot_traffic_data()

    except Exception as e:
        print(f"Error: {e}")

标签:pd,锐捷,运维,python,list,tn,re,result,str
From: https://blog.csdn.net/u012820312/article/details/142787920

相关文章

  • 基于yolov10的花卉识别检测,支持图像、视频和摄像实时检测【pytorch框架、python】
    更多目标检测和图像分类识别项目可看我主页其他文章功能演示:基于yolov10的花卉识别检测系统,支持图像、视频和摄像实时检测【pytorch框架、python】_哔哩哔哩_bilibili(一)简介基于yolov10的花卉识别检测系统是在pytorch框架下实现的,这是一个完整的项目,包括代码,数据集,训练好的......
  • (2024最新毕设合集)基于SpringBoot的乡村书屋小程序-31881|可做计算机毕业设计JAVA、PHP
    摘要随着信息技术的快速发展和互联网的广泛普及,数字化服务的需求不断增长,乡村书屋作为传统的文化服务机构也需要适应这一变革。本研究将使用Java开发技术,通过springboot作为框架,结合微信小程序,和MySQL作为数据存储的技术,开发一套功能齐备可移动的乡村书屋小程序,旨在提升乡......
  • python/NumPy库的使用
    1.NumPy的主要特点:高性能的多维数组对象:NumPy的核心是ndarray,它是一个高性能的多维数组对象。广播功能:NumPy提供了广播(broadcasting)功能,允许不同形状的数组进行数学运算。集成C/C++代码:NumPy可以无缝集成C/C++代码,提高性能。广泛的数学函数库:提供了大量的数学函数,包括线性代数......
  • 基于python+flask框架的研招信息管理和预测系统(开题+程序+论文) 计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着高等教育普及率的提升和就业竞争的加剧,越来越多的学生选择继续深造,报考研究生的人数逐年攀升。然而,研究生招生信息的管理和获取却面临......
  • 基于python+flask框架的中医古方名方信息管理系统(开题+程序+论文) 计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景中医作为中华民族的传统医学,承载着千年的智慧与经验。在浩瀚的中医典籍中,古方名方犹如璀璨的星辰,闪烁着独特的光芒。这些古方名方不仅蕴含......
  • 基于python+flask框架的医院门诊预约挂号系统(开题+程序+论文) 计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着医疗技术的不断进步和人们健康意识的日益增强,医院门诊的就诊需求呈现出快速增长的趋势。传统的挂号方式往往存在排队时间长、挂号效率......
  • 深入理解Python的生成器与迭代器:编写高效的代码
    深入理解Python的生成器与迭代器:编写高效的代码在Python编程中,生成器(Generators)和迭代器(Iterators)是编写高效代码的重要工具。它们帮助我们节省内存、优化性能,尤其在处理大数据时表现尤为出色。这篇博客将深入探讨生成器与迭代器的工作原理、如何使用它们编写高效代码,并通......