首页 > 其他分享 >交换机相关最终_续

交换机相关最终_续

时间:2024-10-09 17:59:58浏览次数:8  
标签:ip 最终 tn interface 交换机 command result print 相关

1.包含比较检查和修改步骤

import paramiko
import os
import time
import random
import datetime
import pandas as pd
import re
import numpy as np
from sqlalchemy import text, create_engine
import psycopg2
from psycopg2 import sql
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy import create_engine, MetaData, Table, inspect
import schedule
import subprocess
import telnetlib
import telnetlib as tn
import threading
import time
import ipaddress
import io




def xieyi_modify(command1,command2):
    command = bytes(command1, encoding='utf-8')
    tn.write(command + b'\n')
    command_result = tn.read_very_eager().decode('ascii')
    time.sleep(1)
    # 使用正则表达式匹配并提取配置块
    # 使用正则表达式匹配并提取配置信息
    config_patterns = [
        r'description\s+(.+)',
        r'ip address\s+(\d+\.\d+\.\d+\.\d+)\s+(\d+\.\d+\.\d+\.\d+)',
        r'duplex\s+(auto|half|full)',
        r'speed\s+(auto|\d+)',
        r'port link-type\s+(access|trunk)',
        r'port default vlan\s+(\d+)',
    ]

    # 初始化一个字典来存储配置信息
    config_dict = {}

    # 遍历所有的配置模式
    for pattern in config_patterns:
        match = re.search(pattern, command_result)
        if match:
            if len(match.groups()) == 1:
                key = pattern.split()[0]  # 获取模式的第一个单词作为键
                config_dict[key] = match.group(1)
            else:
                key = pattern.split()[0]  # 获取模式的第一个单词作为键
                config_dict[key] = match.groups()

    return config_dict
    time.sleep(1)

def get_ospf(tn):

    command = "show ip ospf neighbor detail"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\n')
    time.sleep(1)
    result_list = []
    while (True):
        command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")

        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(0.05)
            continue
    result_str = "\n".join(result_list)
    dict_ouput = {}
    strtext = []
    dict_ouput["host_ip"] = host_ip
    startpattern = re.compile(r'State\s+(\w+)')
    # print(startpattern)
    print(re.search(startpattern, result_str))
    dict_ouput["State"]= 0
    if re.search(startpattern, result_str) :
        strtext = re.search(startpattern, result_str).group(1)
        dict_ouput["State"] = strtext
        # print(dict_ouput)
        # 直接使用 result_str
    if dict_ouput == 0 or dict_ouput["State"] != 'FULL':
        cmd = "show running-config ospfv2"
        tn.write(cmd.encode('ascii') + b'\n')
        time.sleep(1)
        command_result = tn.read_very_eager().decode('ascii')
        #print(command_result)

        # 初始化空字典
        config_dict = {}

        # 定义一个正则表达式模式来匹配配置项及其参数
        pattern = r'(\S+)\s+(.*)'

        # 遍历每一行配置
        for line in command_result.split('\n'):
            # 使用正则表达式匹配配置项
            match = re.match(pattern, line.strip())
            if match:
                key, value = match.groups()
                # 如果键已经存在,则将其值添加到列表中;否则创建一个新的列表
                if key in config_dict:
                    config_dict[key].append(value)
                else:
                    config_dict[key] = [value]
        for key, values in config_dict.items():
            print(f"{key}: {values}")

        return config_dict
        print('这个是opsf协议值\n')
        print(config_dict.items())

def get_ip_interface(tn,interface):

    command = "show ip interface brief"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\n')
    time.sleep(1)
    result_list = []
    while (True):
        command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")

        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(0.05)
            continue
    result_str = "\n".join(result_list)
    # 使用 io.StringIO 将字符串转换为文件对象
    data_file = io.StringIO(result_str)
    # 使用 read_csv 加载数据,指定分隔符为一个或多个空格,并跳过第一行
    df = pd.read_csv(data_file, sep='\s+', skiprows=1, names=None, header=None)
    # 提取第二行作为列名
    column_names = df.iloc[0].tolist()
    # 删除前两行(第一行为表头,第二行为实际数据的第一行)
    df = df.drop(df.index[[0, 1]])
    # 重新设置列名
    df.columns = column_names

    # 显示 DataFrame
    interface_name = interface[0]
    # 根据接口名称筛选数据
    filtered_df = df[df["Interface"] == interface_name]
    # 获取 IP 地址和掩码

    mask_value = filtered_df[["IP-Address", "Mask"]]
    return mask_value




def ping_test(ip_address,tn):
    command = f"ping {ip_address}"
    command = bytes(command, encoding='utf-8')
    tn.write(command+ b'\n')
    time.sleep(10)
    ping_result = tn.read_very_eager().decode('ascii')
    # 检查ping测试是否成功
    print(ping_result)
    return "Success rate is 100 percent" in ping_result
    #command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)




def serch_modify(pd_result,tn):
    down_interface = pd_result[pd_result['Admin'] == 'down']

    # 遍历每一个down的接口
    for index, row in down_interface.iterrows():
        interface_name = row['Interface']

        commands = [
            "enable",
           f"configure terminal",
           f"interface {interface_name}",
           "no shutdown",
           "exit",
           "exit"
        ]


        for cmd in commands:
            command = bytes(cmd, encoding='utf-8')
            tn.write(command + b'\n')
            time.sleep(1)  # 等待命令执行
        # cmd = f'enable'
        # command = bytes(cmd, encoding='utf-8')
        # tn.write(command + b'\n')
        # cmd = f'configure terminal'
        # command = bytes(cmd, encoding='utf-8')
        # tn.write(command + b'\n')
        #
        # cmd = f'interface {interface_name}'
        # command = bytes(cmd, encoding='utf-8')
        # tn.write(command + b'\n')
        # command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)
        #
        # cmd = 'no shutdown'
        # command = bytes(cmd, encoding='utf-8')
        # tn.write(command + b'\n')
        # command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)
        #
        # command = "show ip interface brief"
        # command = bytes(command, encoding='utf-8')
        # tn.write(command + b'\n')
        # time.sleep(1)
        # command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)
        #
        #
        #
        #
        # cmd = 'no shutdown'
        # command = bytes(cmd, encoding='utf-8')
        # tn.write(command + b'\n')
#主函数
def get_info_telnet(host_ip, username, password,mudi_ip,interface):
    tn = telnetlib.Telnet()
    # 定义需要检查的接口列表
    # interfaces = ['xgei0/3/0/1', 'xgei0/3/0/2']
    try:

        tn.open(host_ip, port=23, timeout=5)
        print('%s connected ssuccess !' % host_ip)

        tn.read_until(b'Username:', timeout=5)
        tn.write(username.encode('ascii') + b'\n')

        tn.read_until(b'Password:', timeout=5)
        tn.write(password.encode('ascii') + b'\n')
        time.sleep(1)

        command_result = tn.read_until(b'#', timeout=5)
        if b'#' not in command_result:
            print('%s登录失败' % host_ip)
        else:
            print('%s登录成功' % host_ip)

    except:

        print('%s网络连接失败' % host_ip)

    #command = "show clock"
    #command = bytes(command, encoding='utf-8')
    #tn.write(command + b'\r\n')
    #run_time = tn.read_until(b'#')
    #run_time = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2024", run_time.decode('GB18030'))[0]
    ping_test(mudi_ip,tn)

    command = "show interface brief"
    command = bytes(command, encoding='utf-8')
    tn.write(command + b'\n')
    time.sleep(1)

    result_list = []
    while (True):
        command_result = tn.read_very_eager().decode('ascii')
        # print(command_result)
        result_list.append(command_result)
        if re.findall(r"--More--", command_result.strip()):
            tn.write(b" ")

        elif re.findall(r"#", command_result.strip()):
            break
        else:
            time.sleep(0.05)
            continue

    result_str = "\n".join(result_list)
    list_str = result_str.split('\n')


    pd_result = pd.DataFrame()
    list_temperature_vec = []
    for j in list_str:
        regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
        # print(regex.findall(j))
        # print(len(regex.findall(j)))
        if len(re.findall(r"Interface", j)) > 0:
            new_columns = list_find_str = re.split(r'\s+', j)
            new_columns = new_columns[0:8]

        if len(regex.findall(j)) > 0:
            list_find_str = regex.findall(j)[0]
            list_find_str = re.split(r'\s+', list_find_str)
            list_temperature_vec.append(list_find_str)

    pd_result = pd.DataFrame(list_temperature_vec)
    pd_result.columns = new_columns
    serch_modify(pd_result,tn)
    print("接口状态起来后的ping测结果:\n")
    #print(ping_test(mudi_ip, tn))
    #获取对应接口ip地址和掩码
    mask_value = get_ip_interface(tn,interface)
    #获取ospf的配置
    ospf_value = get_ospf(tn)
    #print("269",ospf_value)
    #print(ospf_value)


    tn.close()
    return ospf_value, mask_value
def modify_ospf(host_ip, username, password):
    tn = telnetlib.Telnet()
    try:

        tn.open(host_ip, port=23, timeout=5)
        print('%s connected ssuccess !' % host_ip)

        tn.read_until(b'Username:', timeout=5)
        tn.write(username.encode('ascii') + b'\n')

        tn.read_until(b'Password:', timeout=5)
        tn.write(password.encode('ascii') + b'\n')
        time.sleep(1)

        command_result = tn.read_until(b'#', timeout=5)
        if b'#' not in command_result:
            print('%s登录失败' % host_ip)
        else:
            print('%s登录成功' % host_ip)

    except:

        print('%s网络连接失败' % host_ip)
    commands = [
        f"configure terminal",
        f"router ospf 1",
        f"area 0",
        f"interface xgei-0/1/1/49",
        "network point-to-multipoint",
        "exit"
        "exit"
        "exit"
        "exit"
    ]
    for cmd in commands:
        tn.write(cmd.encode('ascii') + b'\n')
        time.sleep(3)
        command_result = tn.read_very_eager().decode('ascii')
        print(command_result)
    print("modyfiOspf:ok")
    tn.close()


def compare_host_ip(pd_output1, pd_output2,interface):

    if pd_output1["network"][0] != pd_output2["network"][0]:
        modify_ospf(host_ip[0], username, password)
    # if pd_output1["area"][0] != pd_output2["area"][0]:
    #     modify_ospf(host_ip[1], username, password)
    # if pd_output1["interface"][0] and pd_output1["interface"][0]!=interface[0]:
    #     modify_ospf(host_ip[2], username, password)
    # elif pd_output2["interface"][0] and pd_output2["interface"][0] != interface[1]:
    #     modify_ospf(host_ip[3], username, password)
    # elif "interface" not in pd_output1.keys():
    #     modify_ospf(host_ip[3], username, password)
    # elif "interface" in pd_output2.keys():
    #     modify_ospf(host_ip[4], username, password)
    print("ospf配置ok")


def are_ips_in_same_subnet(ip1, mask1, ip2, mask2):
    # 检查两个IP地址是否在同一个子网内
    try:
        net1 = ipaddress.ip_network(f"{ip1}/{mask1}", strict=False)
        net2 = ipaddress.ip_network(f"{ip2}/{mask2}", strict=False)

        if net1.overlaps(net2):
            print("地址配置正确")
            return True
        else:
            print("地址错误")
            return False
    except ValueError as e:
        print(f"无效的网络字符串: {ip1}/{mask1} 或 {ip2}/{mask2}. 错误: {e}")
        return False


if __name__ == '__main__':
    host_ip = ['192.168.2.1','192.168.1.1']
    interface = ['xgei-0/1/1/49','gei-0/1/1/33']
    mudi_ip = ['10.0.0.1','10.0.0.1']
    username = 'zte'
    password = 'zte'
    pd_output1,mask_value1 = get_info_telnet(host_ip[0], username, password,mudi_ip[1],interface)
    print(mask_value1["IP-Address"].values[0])
    pd_output2,mask_value2 = get_info_telnet(host_ip[1], username, password,mudi_ip[0],interface)
    are_ips_in_same_subnet(mask_value1["IP-Address"].values[0], mask_value1["Mask"].values[0], mask_value2["IP-Address"].values[0], mask_value2["Mask"].values[0])
    #compare_host_ip(pd_output1, pd_output2,interface[0])

 

标签:ip,最终,tn,interface,交换机,command,result,print,相关
From: https://www.cnblogs.com/tjrk/p/18454811

相关文章

  • 使用python对交换机进行排障自动化运维(锐捷)
    importglobimporttelnetlibimportrefromdatetimeimportdatetimefromtimeimportsleepimportpandasaspdimportosimporttimefrommatplotlibimportpyplotasplt#Telnet连接函数defconnect_telnet(hostname,username,password):  try:  ......
  • 交换机相关最终
    用判断语句实现所有可能场景`importtelnetlibimporttimeimportpandasaspdimportredeftelnet_login(host_ip,username,password):tn=telnetlib.Telnet()try:tn.open(host_ip,port=23,timeout=5)tn.read_until(b'Username:',timeout=5)tn.write(username.encode......
  • 虚拟存储器的相关知识
    题目考查的是虚拟存储器的相关知识。虚拟存储器的概念虚拟存储器是一种内存管理技术,它允许计算机使用比物理内存(RAM)更多的内存。通过将部分内存内容暂时存储在硬盘上,操作系统可以为运行的程序提供比实际物理内存更大的地址空间。局部性原理局部性原理是指程序在执行过程中,对内......
  • 连通性相关
    一些概念连通:无向图中的任意两点都可以互相到达。强连通:有向图中的任意两点都可以互相到达。连通分量:无向图的极大连通子图。强连通分量:有向图的极大强连通子图。DFS生成树:对一张图进行深度优先遍历得到的生成树。树边:在DFS生成树上的边。前向边:由子树的根连向子树内的......
  • RAG系统评测实践详细版:Coze及相关产品评测对比,以及下一代RAG技术
    AIRAG系统评测实践:Coze及相关产品评测对比RAG(检索增强生成)是一种AI框架,它将传统信息检索系统(例如数据库)的优势与生成式大语言模型(LLM)的功能结合在一起,通过将这些额外的知识与自己的语言技能相结合,AI可以撰写更准确、更具时效性且更贴合您的具体需求的文字。RAG通过几个......
  • Linux下操作Nginx相关命令
    1、查看Nginx进程ps-aux|grepnginx圈出的就是Nginx的二进制文件2、测试Nginx配置文件/usr/sbin/nginx-t可以看到nginx配置文件位置3、nginx的使用(启动、重启、关闭)首先利用配置文件启动nginx。nginx-c/usr/local/nginx/conf/nginx.conf重启服务:servicenginxrestar......
  • 在Windows 10中,您可以使用以下命令来转换系统版本(例如,从家庭版升级到专业版)。主要使用
    在Windows10中,您可以使用以下命令来转换系统版本(例如,从家庭版升级到专业版)。主要使用的是slmgr和DISM工具。以下是相关命令:1. 查看当前版本和激活状态bashCopyCodeslmgr/dli2. 输入新产品密钥bashCopyCodeslmgr/ipk<新产品密钥>请将<新产品密钥>替换为您要升......
  • MATLAB两类栅格数据之间的相关性计算
    两类栅格数据之间的相关性计算(输出为tif影像)栅格数据做相关性分析前的预处理(批量定义投影、栅格投影、重采样)栅格影像行列号需要一致,行列号不一致可以在ArcGIS中批量处理:1.重采样2.裁剪右键空白环境设置处理完毕后,进行相关分析。Matlab代码-年[a,R]=geotiffread(......
  • 统计学(十三)——相关分析
    相关分析是用于研究多个变量之间相互关系的统计方法,最早由英国统计学家卡尔·皮尔逊(KarlPearson)于1896年提出。皮尔逊通过对变量间线性关系的深入研究,提出了“皮尔逊相关系数”(PearsonCorrelationCoefficient),标志着相关分析方法的诞生。随着统计学的发展,相关分析逐渐扩展,形成......
  • jvm相关命令
    jpsjps打印当前java进程jinfojinfo进程id,获取当前java进程的jvm参数jinfo-flagPrintGC进程id查看当前java进程是否开启打印GC的选项jinfo-flags进程id查看当前java进程的所有jvm配置参数jinfo-flag+PrintGC进程id为当前java进程开启打印gc日志 jmapjmap-......