import glob
import telnetlib
import re
from datetime import datetime
from time import sleep
import pandas as pd
import os
import time
from matplotlib import pyplot as plt
# Telnet 连接函数
def connect_telnet(hostname, username, password):
try:
tn = telnetlib.Telnet()
tn.open(hostname, port=23, timeout=5) # 尝试连接 Telnet
print("connected......", hostname)
# 输入用户名
tn.read_until(b'Username:', timeout=5)
tn.write(username.encode('ascii') + b'\n')
# 输入密码
tn.read_until(b'Password:', timeout=5)
tn.write(password.encode('ascii') + b'\n')
# 检查是否成功登录
login_result = tn.read_until(b'#', timeout=5)
if b'#' not in login_result:
print('登录失败!', hostname)
tn.close() # 登录失败,关闭连接
return None
else:
print('登录成功!', hostname)
return tn # 登录成功,返回 Telnet 对象
except Exception as e:
print(f"连接失败: {e}")
return None # 连接失败,返回 None
# 执行命令的函数:执行 Telnet 下的相关命令并返回结果
def execute_ip_ospf_neighbor_detail(tn, hostname):
# 执行第一个命令:显示当前系统时间
command1 = bytes("show clock", encoding='utf-8')
tn.write(command1 + b'\r\n') # 发送命令并执行
command1_result = tn.read_until(b'#') # 读取直到命令提示符结束的结果
command1_result = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2004", command1_result.decode('GB18030'))[0]
# 执行第二个命令:显示 OSPF 邻居详情
command2 = bytes("show ip ospf neighbor detail", encoding='utf-8')
tn.write(command2 + b'\r\n') # 发送命令
result_list = []
# 持续读取 OSPF 命令结果,直到读取完成
while True:
command2_result = tn.read_very_eager().decode('ascii')
result_list.append(command2_result)
if re.findall(r"--More--", command2_result.strip()): # 如果命令输出有 "--More--" 字样,按空格获取下一页
tn.write(b" ")
elif re.findall(r"#", command2_result.strip()): # 如果命令输出结束,退出循环
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list) # 将结果列表拼接成一个字符串
# 将命令执行的结果存储为字典
dict_output = {}
dict_output["host_ip"] = hostname
dict_output["time"] = command1_result # 存储命令 1(系统时间)结果
# 使用正则表达式提取 OSPF 邻居详细信息
dict_output["OSPF Router with ID"] = re.search(r'OSPF Router with ID (.+)', result_str).group(1)
dict_output["Neighbor"] = re.search(r'Neighbor\s+(\d+.\d+.\d+.\d+)', result_str).group(1)
dict_output["area"] = re.search(r'In the area\s+(.+)', result_str).group(1)
dict_output["State"] = re.search(r'State\s+(\w+), ', result_str).group(1)
# 将字典数据转换为 DataFrame 并保存为 CSV 文件
pd_output = pd.DataFrame.from_dict([dict_output])
pd_output['time'] = pd.to_datetime(pd_output['time'])
pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
ttt = str(datetime.now())
# pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + '.csv', mode='a', index=None, encoding='gb18030')
pd_output.to_csv('./' + os.sep + r'ospf_neighbor_detail' + '-' + 'hostname' + ttt +'.csv', index=None, encoding='gb18030')
# 示例调用:执行 "show interface brief" 命令
def execute_interface_brief(tn, hostname):
command = bytes("show interface brief", encoding='utf-8')
tn.write(command + b'\r\n')
result_list = []
# 持续读取命令结果,处理 "--More--" 分页输出
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list)
list_str = result_str.split('\n') # 将结果按行分割成列表
list_temperature_vec = []
# 遍历结果行,查找符合正则表达式的内容
for j in list_str:
regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
if len(re.findall(r"Interface", j)) > 0:
new_columns = list_find_str = re.split(r'\s+', j) # 提取列名
new_columns = new_columns[0:8]
if len(regex.findall(j)) > 0:
list_find_str = regex.findall(j)[0]
list_find_str = re.split(r'\s+', list_find_str) # 提取数据行
list_temperature_vec.append(list_find_str)
pd_result = pd.DataFrame(list_temperature_vec)
pd_result.columns = new_columns # 设置 DataFrame 列名
pd_result.to_csv('./' + os.sep + r'interface_brief' + '-' + str(hostname)+ str(datetime.now()) + '.csv', encoding='gb18030') # 保存结果
return pd_result # 返回结果 DataFrame
# 示例调用:光功率1:执行 "show opticalinfo brief" 命令
def execute_opticalinfo_brief(tn, hostname):
command = bytes("show opticalinfo brief", encoding='utf-8')
tn.write(command + b'\r\n')
result_list = []
# 持续读取命令结果,处理 "--More--" 分页输出
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list)
list_str = result_str.split('\n') # 将结果按行分割成列表
list_temperature_vec = []
# 遍历结果行,查找符合正则表达式的内容
for j in list_str:
regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
if len(re.findall(r"Interface", j)) > 0:
new_columns = list_find_str = re.split(r'\s+', j) # 提取列名
new_columns = new_columns[0:6]
if len(regex.findall(j)) > 0:
list_find_str = regex.findall(j)[0]
list_find_str = re.split(r'\s+', list_find_str) # 提取数据行
list_temperature_vec.append(list_find_str)
pd_result = pd.DataFrame(list_temperature_vec)
pd_result.columns = new_columns # 设置 DataFrame 列名
pd_result.to_csv('./' + os.sep + r'interface_brief' + '-' + str(hostname) + str(datetime.now()) + '.csv',
encoding='gb18030') # 保存结果
return pd_result # 返回结果 DataFrame
# 示例调用:光功率2:执行 "show opticalinfo xgei-0/3/0/2" 命令
# 这个命令还得现场看下,可以参考show interface xgei-0/2/0/13
# def execute_opticalinfo_xgei(tn, hostname):
# 示例调用:执行 "show ip interface brief" 命令,类似上面的函数
def execute_ip_interface_brief(tn, hostname):
command = bytes("show ip interface brief", encoding='utf-8')
tn.write(command + b'\r\n')
result_list = []
# 持续读取结果
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list)
list_str = result_str.split('\n')
list_temperature_vec = []
for j in list_str:
if len(re.findall(r"Interface", j)) > 0:
new_columns = re.split(r'\s+', j) # 提取列名
print("new_columns111111111:",new_columns)
new_columns = new_columns[0:6]
regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
print(regex)
if len(regex.findall(j)) > 0:
list_find_str = regex.findall(j)[0]
list_find_str = re.split(r'\s+', list_find_str)
list_temperature_vec.append(list_find_str)
pd_result = pd.DataFrame(list_temperature_vec)
pd_result.columns = new_columns # 设置列名
pd_result.to_csv('./' + os.sep + r'ip_interface_brief' + '-' + str(hostname) + '.csv', encoding='gb18030') # 保存结果
return pd_result
# 示例调用:执行 "show interface xgei-0/2/0/13" 命令,获取接口的详细信息
def execute_show_interface_xgei(tn, hostname):
command = bytes("show interface xgei-0/2/0/2", encoding='utf-8')
tn.write(command + b'\r\n')
result_list = []
# 持续读取结果
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list)
# 将结果存储为字典
dict_output = {}
dict_output["time"] = datetime.now()
dict_output["host_ip"] = hostname
dict_output['interface'] = re.search(r'\w+gei-\d\/\d\/\d\/\d+', result_str).group(0)
dict_output['In_Bytes'] = re.search(r'In_Bytes\s+(\d+)', result_str).group(1)
dict_output['E_Bytes'] = re.search(r'E_Bytes\s+(\d+)', result_str).group(1)
# 保存结果为 CSV 文件
pd_output = pd.DataFrame.from_dict([dict_output])
# pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', mode='a', index=None, encoding='gb18030')
pd_output.to_csv('./' + os.sep + r'show_interface_xgei' + '.csv', index=None, encoding='gb18030')
def excute_ping(tn, hostname):
command = bytes("ping 127.0.0.1", encoding='utf-8')
tn.write(command + b'\r\n')
time.sleep(5)
result_list = []
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
# time.sleep(20)
result_str = "\n".join(result_list)
# print(result_str)
# 解析结果并存储
dict_output = {}
dict_output["host_ip"] = hostname
dict_output["info"] = result_str
pd_output = pd.DataFrame.from_dict([dict_output])
pd_output.to_csv('./' + os.sep + r'ping' + '-' + 'hostname' + str(datetime.now()) + '.csv', mode='a', index=None, encoding='gb18030')
def files_to_one():
file_list = glob.glob(os.path.join(os.path.abspath('./'), r'ospf*.csv'))
pd_data_o = pd.DataFrame()
for file in file_list:
try:
pd_sheet = pd.read_csv(file, encoding='gb18030', doublequote=False,
converters={u'code': str}, engine="python")
except:
print('读取异常')
pd_data_o = pd.concat([pd_data_o, pd_sheet], axis=0)
pd_data_o.to_csv('./' + os.sep + r'ospf' + '.csv', index=None, encoding='gb18030')
if __name__ == '__main__':
'''
正则表达式:https://regex101.com/
'''
# hostname 列表,包含多个主机 IP 地址
# hostnames = ['127.0.0.1', '127.0.0.1', '127.0.0.1'] # 这是一个示例列表
hostnames = ['127.0.0.1'] # 这是一个示例列表
# hostnames = ['127.0.0.1', '127.0.0.1'] # 这是一个示例列表
username = 'huawei'
password = 'huawei'
# 遍历每个 hostname,依次进行 Telnet 连接和命令执行
for hostname in hostnames:
print(f"正在连接 {hostname} ...")
# 连接 Telnet
tn = connect_telnet(hostname, username, password)
# 如果连接成功,执行命令
if tn:
print(f"连接成功 {hostname}, 执行命令中...")
# 执行 show ip ospf neighbor detail 命令 ,信息为excel形式,每个主机只有一行,做一个表格合并就行files_to_one(),不需要做可视化了
# execute_ip_ospf_neighbor_detail(tn, hostname)
# 可以根据需要取消注释执行其他命令
# 返回的数据为二层接口的状态
# execute_interface_brief(tn, hostname)
# execute_ip_interface_brief(tn, hostname)
execute_show_interface_xgei(tn, hostname)
# excute_ping(tn,hostname)
# 这三个可参考excute_ping()方法进行处理
# execute_show_opentical(tn, hostname)
# excute_show_runningconfig(tn,hostname)
# excute_show_alarm_current(tn,hostname)
# 关闭 Telnet 连接
tn.close()
print(f"{hostname} 的操作已完成,连接关闭。\n")
else:
print(f"无法连接到 {hostname},请检查连接或主机状态。\n")
# 合并文件
# files_to_one()
####################
import re
import time
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
from datetime import datetime
import telnetlib
import os
# 采集接口流量数据并保存到CSV文件
def execute_show_interface_xgei(tn, hostname, interval=5, num_samples=10):
all_data = [] # 用于存储每次采集的数据
for i in range(num_samples):
command = bytes("show interface xgei-0/2/0/2", encoding='utf-8')
tn.write(command + b'\r\n')
result_list = []
# 持续读取结果
while True:
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(1)
continue
result_str = "\n".join(result_list)
# 将结果存储为字典
dict_output = {}
dict_output["time"] = datetime.now()
dict_output["host_ip"] = hostname
dict_output['interface'] = re.search(r'\w+gei-\d\/\d\/\d\/\d+', result_str).group(0)
dict_output['In_Bytes'] = re.search(r'In_Bytes\s+(\d+)', result_str).group(1)
dict_output['E_Bytes'] = re.search(r'E_Bytes\s+(\d+)', result_str).group(1)
# 将数据添加到总列表中
all_data.append(dict_output)
# 每隔 interval 秒采集一次
time.sleep(interval)
# 保存结果为 CSV 文件
pd_output = pd.DataFrame.from_dict(all_data)
# 确保文件夹存在
# if not os.path.exists('./data'):
# os.makedirs('./data')
# pd_output.to_csv('./show_interface_xgei.csv', mode='a', index=None, encoding='gb18030')
pd_output.to_csv('./show_interface_xgei.csv', mode='a', index=None, encoding='gb18030')
# 从CSV文件中读取数据并进行图形化展示
def plot_traffic_data(csv_file='./show_interface_xgei.csv'):
try:
# 从CSV文件中读取数据
data = pd.read_csv(csv_file)
# 转换时间列为datetime类型
# data['time'] = pd.to_datetime(data['time'], format='%Y-%m-%d %H:%M:%S')
data['time'] = pd.to_datetime(data['time'], format='mixed')
data.sort_values('time', inplace=True) # 按时间排序
data.reset_index(level=None, drop=True, inplace=True)
# 计算字节差分值
data['In_Bytes'] = data['In_Bytes'].diff(1)
data['E_Bytes'] = data['E_Bytes'].diff(1)
# 删除含有缺失值的行
data.dropna(axis=0, how='any', inplace=True)
except Exception as e:
print(e)
return
# 绘制流量变化趋势图
plt.figure(figsize=(10, 6))
plt.plot(data['time'], data['In_Bytes'], label='In_Bytes', marker='o')
plt.plot(data['time'], data['E_Bytes'], label='E_Bytes', marker='o')
plt.title('Interface Traffic Over Time')
plt.xlabel('Time')
plt.ylabel('Bytes')
plt.legend()
plt.grid(True)
plt.show()
# 正态分布图和统计分析
for i in ['In_Bytes', 'E_Bytes']:
X = data[i].astype(float)
# 绘制直方图
plt.figure(figsize=(10, 6))
plt.title(i + ' Histogram')
plt.hist(X, bins=50)
plt.show()
# 绘制概率密度图
plt.figure(figsize=(10, 6))
plt.title(i + ' Probability Density')
sns.kdeplot(X, kernel='gau', color="g", alpha=.7)
plt.show()
# 偏度和峰度计算
print(f"{i} Skewness:", stats.skew(X))
print(f"{i} Kurtosis:", stats.kurtosis(X))
# 计算协方差和相关系数
covariance = data['In_Bytes'].cov(data['E_Bytes'])
correlation = data['In_Bytes'].corr(data['E_Bytes'])
print("Covariance:", covariance)
print("Correlation:", correlation)
if __name__ == '__main__':
# 示例Telnet连接信息
hostname = '127.0.0.1'
username = 'huawei'
password = 'huawei'
try:
# 连接Telnet
tn = telnetlib.Telnet(hostname)
tn.read_until(b'Username:')
tn.write(username.encode('ascii') + b'\n')
tn.read_until(b'Password:')
tn.write(password.encode('ascii') + b'\n')
# 执行采集命令,每隔5秒采集一次,连续采集10次
# while True: 如果注释打开,记得缩进两个空格
execute_show_interface_xgei(tn, hostname, interval=5, num_samples=10)
tn.close()
# 绘制流量图形化展示
# plot_traffic_data()
except Exception as e:
print(f"Error: {e}")