import paramiko import os import time import random import datetime import pandas as pd import re import numpy as np from sqlalchemy import text, create_engine import psycopg2 from psycopg2 import sql from sqlalchemy.orm import sessionmaker from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy import create_engine, MetaData, Table, inspect import schedule import subprocess import telnetlib import telnetlib as tn import threading import time import ipaddress import io def xieyi_modify(command1,command2): command = bytes(command1, encoding='utf-8') tn.write(command + b'\n') command_result = tn.read_very_eager().decode('ascii') time.sleep(1) # 使用正则表达式匹配并提取配置块 # 使用正则表达式匹配并提取配置信息 config_patterns = [ r'description\s+(.+)', r'ip address\s+(\d+\.\d+\.\d+\.\d+)\s+(\d+\.\d+\.\d+\.\d+)', r'duplex\s+(auto|half|full)', r'speed\s+(auto|\d+)', r'port link-type\s+(access|trunk)', r'port default vlan\s+(\d+)', ] # 初始化一个字典来存储配置信息 config_dict = {} # 遍历所有的配置模式 for pattern in config_patterns: match = re.search(pattern, command_result) if match: if len(match.groups()) == 1: key = pattern.split()[0] # 获取模式的第一个单词作为键 config_dict[key] = match.group(1) else: key = pattern.split()[0] # 获取模式的第一个单词作为键 config_dict[key] = match.groups() return config_dict time.sleep(1) def get_ospf(tn): command = "show ip ospf neighbor detail" command = bytes(command, encoding='utf-8') tn.write(command + b'\n') time.sleep(1) result_list = [] while (True): command_result = tn.read_very_eager().decode('ascii') # print(command_result) result_list.append(command_result) if re.findall(r"--More--", command_result.strip()): tn.write(b" ") elif re.findall(r"#", command_result.strip()): break else: time.sleep(0.05) continue result_str = "\n".join(result_list) dict_ouput = {} strtext = [] dict_ouput["host_ip"] = host_ip startpattern = re.compile(r'State\s+(\w+)') # print(startpattern) print(re.search(startpattern, result_str)) dict_ouput["State"]= 0 if re.search(startpattern, result_str) : strtext = re.search(startpattern, result_str).group(1) dict_ouput["State"] = strtext # print(dict_ouput) # 直接使用 result_str if dict_ouput == 0 or dict_ouput["State"] != 'FULL': cmd = "show running-config ospfv2" tn.write(cmd.encode('ascii') + b'\n') time.sleep(1) command_result = tn.read_very_eager().decode('ascii') #print(command_result) # 初始化空字典 config_dict = {} # 定义一个正则表达式模式来匹配配置项及其参数 pattern = r'(\S+)\s+(.*)' # 遍历每一行配置 for line in command_result.split('\n'): # 使用正则表达式匹配配置项 match = re.match(pattern, line.strip()) if match: key, value = match.groups() # 如果键已经存在,则将其值添加到列表中;否则创建一个新的列表 if key in config_dict: config_dict[key].append(value) else: config_dict[key] = [value] for key, values in config_dict.items(): print(f"{key}: {values}") return config_dict print('这个是opsf协议值\n') print(config_dict.items()) def get_ip_interface(tn,interface): command = "show ip interface brief" command = bytes(command, encoding='utf-8') tn.write(command + b'\n') time.sleep(1) result_list = [] while (True): command_result = tn.read_very_eager().decode('ascii') # print(command_result) result_list.append(command_result) if re.findall(r"--More--", command_result.strip()): tn.write(b" ") elif re.findall(r"#", command_result.strip()): break else: time.sleep(0.05) continue result_str = "\n".join(result_list) # 使用 io.StringIO 将字符串转换为文件对象 data_file = io.StringIO(result_str) # 使用 read_csv 加载数据,指定分隔符为一个或多个空格,并跳过第一行 df = pd.read_csv(data_file, sep='\s+', skiprows=1, names=None, header=None) # 提取第二行作为列名 column_names = df.iloc[0].tolist() # 删除前两行(第一行为表头,第二行为实际数据的第一行) df = df.drop(df.index[[0, 1]]) # 重新设置列名 df.columns = column_names # 显示 DataFrame interface_name = interface[0] # 根据接口名称筛选数据 filtered_df = df[df["Interface"] == interface_name] # 获取 IP 地址和掩码 mask_value = filtered_df[["IP-Address", "Mask"]] return mask_value def ping_test(ip_address,tn): command = f"ping {ip_address}" command = bytes(command, encoding='utf-8') tn.write(command+ b'\n') time.sleep(10) ping_result = tn.read_very_eager().decode('ascii') # 检查ping测试是否成功 print(ping_result) return "Success rate is 100 percent" in ping_result #command_result = tn.read_very_eager().decode('ascii') # print(command_result) def serch_modify(pd_result,tn): down_interface = pd_result[pd_result['Admin'] == 'down'] # 遍历每一个down的接口 for index, row in down_interface.iterrows(): interface_name = row['Interface'] commands = [ "enable", f"configure terminal", f"interface {interface_name}", "no shutdown", "exit", "exit" ] for cmd in commands: command = bytes(cmd, encoding='utf-8') tn.write(command + b'\n') time.sleep(1) # 等待命令执行 # cmd = f'enable' # command = bytes(cmd, encoding='utf-8') # tn.write(command + b'\n') # cmd = f'configure terminal' # command = bytes(cmd, encoding='utf-8') # tn.write(command + b'\n') # # cmd = f'interface {interface_name}' # command = bytes(cmd, encoding='utf-8') # tn.write(command + b'\n') # command_result = tn.read_very_eager().decode('ascii') # print(command_result) # # cmd = 'no shutdown' # command = bytes(cmd, encoding='utf-8') # tn.write(command + b'\n') # command_result = tn.read_very_eager().decode('ascii') # print(command_result) # # command = "show ip interface brief" # command = bytes(command, encoding='utf-8') # tn.write(command + b'\n') # time.sleep(1) # command_result = tn.read_very_eager().decode('ascii') # print(command_result) # # # # # cmd = 'no shutdown' # command = bytes(cmd, encoding='utf-8') # tn.write(command + b'\n') #主函数 def get_info_telnet(host_ip, username, password,mudi_ip,interface): tn = telnetlib.Telnet() # 定义需要检查的接口列表 # interfaces = ['xgei0/3/0/1', 'xgei0/3/0/2'] try: tn.open(host_ip, port=23, timeout=5) print('%s connected ssuccess !' % host_ip) tn.read_until(b'Username:', timeout=5) tn.write(username.encode('ascii') + b'\n') tn.read_until(b'Password:', timeout=5) tn.write(password.encode('ascii') + b'\n') time.sleep(1) command_result = tn.read_until(b'#', timeout=5) if b'#' not in command_result: print('%s登录失败' % host_ip) else: print('%s登录成功' % host_ip) except: print('%s网络连接失败' % host_ip) #command = "show clock" #command = bytes(command, encoding='utf-8') #tn.write(command + b'\r\n') #run_time = tn.read_until(b'#') #run_time = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2024", run_time.decode('GB18030'))[0] ping_test(mudi_ip,tn) command = "show interface brief" command = bytes(command, encoding='utf-8') tn.write(command + b'\n') time.sleep(1) result_list = [] while (True): command_result = tn.read_very_eager().decode('ascii') # print(command_result) result_list.append(command_result) if re.findall(r"--More--", command_result.strip()): tn.write(b" ") elif re.findall(r"#", command_result.strip()): break else: time.sleep(0.05) continue result_str = "\n".join(result_list) list_str = result_str.split('\n') pd_result = pd.DataFrame() list_temperature_vec = [] for j in list_str: regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S) # print(regex.findall(j)) # print(len(regex.findall(j))) if len(re.findall(r"Interface", j)) > 0: new_columns = list_find_str = re.split(r'\s+', j) new_columns = new_columns[0:8] if len(regex.findall(j)) > 0: list_find_str = regex.findall(j)[0] list_find_str = re.split(r'\s+', list_find_str) list_temperature_vec.append(list_find_str) pd_result = pd.DataFrame(list_temperature_vec) pd_result.columns = new_columns serch_modify(pd_result,tn) print("接口状态起来后的ping测结果:\n") #print(ping_test(mudi_ip, tn)) #获取对应接口ip地址和掩码 mask_value = get_ip_interface(tn,interface) #获取ospf的配置 ospf_value = get_ospf(tn) #print("269",ospf_value) #print(ospf_value) tn.close() return ospf_value, mask_value def modify_ospf(host_ip, username, password): tn = telnetlib.Telnet() try: tn.open(host_ip, port=23, timeout=5) print('%s connected ssuccess !' % host_ip) tn.read_until(b'Username:', timeout=5) tn.write(username.encode('ascii') + b'\n') tn.read_until(b'Password:', timeout=5) tn.write(password.encode('ascii') + b'\n') time.sleep(1) command_result = tn.read_until(b'#', timeout=5) if b'#' not in command_result: print('%s登录失败' % host_ip) else: print('%s登录成功' % host_ip) except: print('%s网络连接失败' % host_ip) commands = [ f"configure terminal", f"router ospf 1", f"area 0", f"interface xgei-0/1/1/49", "network point-to-multipoint", "exit" "exit" "exit" "exit" ] for cmd in commands: tn.write(cmd.encode('ascii') + b'\n') time.sleep(3) command_result = tn.read_very_eager().decode('ascii') print(command_result) print("modyfiOspf:ok") tn.close() def compare_host_ip(pd_output1, pd_output2,interface): if pd_output1["network"][0] != pd_output2["network"][0]: modify_ospf(host_ip[0], username, password) # if pd_output1["area"][0] != pd_output2["area"][0]: # modify_ospf(host_ip[1], username, password) # if pd_output1["interface"][0] and pd_output1["interface"][0]!=interface[0]: # modify_ospf(host_ip[2], username, password) # elif pd_output2["interface"][0] and pd_output2["interface"][0] != interface[1]: # modify_ospf(host_ip[3], username, password) # elif "interface" not in pd_output1.keys(): # modify_ospf(host_ip[3], username, password) # elif "interface" in pd_output2.keys(): # modify_ospf(host_ip[4], username, password) print("ospf配置ok") def are_ips_in_same_subnet(ip1, mask1, ip2, mask2): # 检查两个IP地址是否在同一个子网内 try: net1 = ipaddress.ip_network(f"{ip1}/{mask1}", strict=False) net2 = ipaddress.ip_network(f"{ip2}/{mask2}", strict=False) if net1.overlaps(net2): print("地址配置正确") return True else: print("地址错误") return False except ValueError as e: print(f"无效的网络字符串: {ip1}/{mask1} 或 {ip2}/{mask2}. 错误: {e}") return False if __name__ == '__main__': host_ip = ['',''] interface = ['xgei-0/1/1/49','gei-0/1/1/33'] mudi_ip = ['',''] username = 'zte' password = 'zte' pd_output1,mask_value1 = get_info_telnet(host_ip[0], username, password,mudi_ip[1],interface) print(mask_value1["IP-Address"].values[0]) pd_output2,mask_value2 = get_info_telnet(host_ip[1], username, password,mudi_ip[0],interface) are_ips_in_same_subnet(mask_value1["IP-Address"].values[0], mask_value1["Mask"].values[0], mask_value2["IP-Address"].values[0], mask_value2["Mask"].values[0]) #compare_host_ip(pd_output1, pd_output2,interface[0])
标签:ip,最终,tn,interface,交换机,command,result,print,相关 From: https://www.cnblogs.com/tjrk/p/18454811