import paramiko
import os
import time
import random
import datetime
import pandas as pd
import re
import numpy as np
# from sqlalchemy import text, create_engine
# import psycopg2
# from psycopg2 import sql
# from sqlalchemy.orm import sessionmaker
# from sqlalchemy.dialects.postgresql import insert as pg_insert
# from sqlalchemy import create_engine, MetaData, Table, inspect
import schedule
import telnetlib
import threading
import time
def traffic_info_get():
# SSH登录采集
hostname = "10.89.164.70"
username = "root"
password = "PON@jkfx"
try:
# 创建SSH客户端
ssh = paramiko.SSHClient()
# 自动添加主机密钥
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname=hostname, port=22, username=username, password=password)
except Exception as e:
print(f"无法连接到服务器: {e}")
try:
stdin, stdout, stderr = ssh.exec_command("ifconfig -a")
ifconfig_output = stdout.read().decode()
# 使用正则表达式提取网口名称
pattern = re.compile(r'(\S+)\s+Link encap:Ethernet.*?\n.*?RX bytes:(\d+) .*?TX bytes:(\d+)', re.S)
# 查找所有匹配的网口信息
interfaces = pattern.findall(ifconfig_output)
ssh.close()
pd_data = pd.DataFrame(interfaces)
pd_data.columns = ['port_name', 'rx_bytes', 'tx_bytes']
pd_data["time"] = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H%M%S')
pd_data["id"] = pd_data["port_name"] + '_' + pd_data["time"]
except Exception as e:
print(f"获取网口信息时出错: {e}")
def fake_data():
# 假数据写入
pd_data = pd.DataFrame([{
"time": datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H%M%S'),
"port_name": 'em1',
"rx_bytes": random.randint(10000000, 20000000),
"tx_bytes": random.randint(10000000, 20000000),
}])
pd_data["id"] = pd_data["port_name"] + '_' + pd_data["time"]
return pd_data
# def database_write(pd_data):
# # 写入数据库
# database_name = 'postgres'
# table_name = 'traffic_table'
# DATABASE_URI = 'postgresql://5ga-cmcc:5ga-cmcc@127.0.0.1:5432/' + database_name
# engine = create_engine(DATABASE_URI)
# metadata = MetaData() # 创建MetaData对象
#
# table = Table(table_name, metadata, autoload_with=engine) # 正确使用autoload_with
# Session = sessionmaker(bind=engine, autocommit=False) # 关闭自动提交
# session = Session()
#
# pd_same_period = pd_data.fillna(value=0)
# dict_rows = pd_same_period.to_dict('records')
try:
# 根据表的主键或唯一索引构建upsert语句
stmt = pg_insert(table)
primary_keys = [key.name for key in inspect(table).primary_key]
stmt = stmt.on_conflict_do_update(
index_elements=primary_keys, # 使用表中的主键列表
set_={key: value for key, value in stmt.excluded.items()}
)
# 或者使用execute执行单个操作
session.execute(stmt, dict_rows)
session.commit() # 提交所有更改
except Exception as e:
session.rollback() # 如果发生异常则回滚
print("Error saving data to database:", e)
finally:
session.close()
def get_info_telnet(host_ip, username, password):
tn = telnetlib.Telnet()
try:
tn.open(host_ip, port=23, timeout=5)
print('%s connected ssuccess !' % host_ip)
tn.read_until(b'Username:', timeout=5)
tn.write(username.encode('ascii') + b'\n')
tn.read_until(b'Password:', timeout=5)
tn.write(password.encode('ascii') + b'\n')
time.sleep(1)
command_result = tn.read_until(b'#', timeout=5)
if b'#' not in command_result:
print('%s登录失败' % host_ip)
else:
print('%s登录成功' % host_ip)
except:
print('%s网络连接失败' % host_ip)
command = "show clock"
command = bytes(command, encoding='utf-8')
tn.write(command + b'\r\n')
run_time = tn.read_until(b'#')
run_time = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2024", run_time.decode('GB18030'))[0]
# command = "show interface brief"
# command = bytes(command, encoding='utf-8')
# tn.write(command + b'\n')
# time.sleep(1)
# result_list = []
# while (True):
# command_result = tn.read_very_eager().decode('ascii')
# # print(command_result)
# result_list.append(command_result)
# if re.findall(r"--More--", command_result.strip()):
# tn.write(b" ")
# elif re.findall(r"#", command_result.strip()):
# break
# else:
# time.sleep(0.05)
# continue
# result_str = "\n".join(result_list)
# list_str = result_str.split('\n')
# pd_result = pd.DataFrame()
# list_temperature_vec = []
# for j in list_str:
# regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
# # print(regex.findall(j))
# # print(len(regex.findall(j)))
# if len(re.findall(r"Interface", j)) > 0:
# new_columns = list_find_str = re.split(r'\s+', j)
# new_columns = new_columns[0:8]
# if len(regex.findall(j)) > 0:
# list_find_str = regex.findall(j)[0]
# list_find_str = re.split(r'\s+', list_find_str)
# list_temperature_vec.append(list_find_str)
# pd_result = pd.DataFrame(list_temperature_vec)
# pd_result.columns = new_columns
command = "show ip interface brief"
command = bytes(command, encoding='utf-8')
tn.write(command + b'\n')
time.sleep(1)
result_list = []
while (True):
command_result = tn.read_very_eager().decode('ascii')
# print(command_result)
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(0.05)
continue
result_str = "\n".join(result_list)
list_str = result_str.split('\n')
pd_result_2 = pd.DataFrame()
list_temperature_vec = []
for j in list_str:
regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)
if len(re.findall(r"Interface", j)) > 0:
new_columns = list_find_str = re.split(r'\s+', j)
new_columns = new_columns[0:7]
if len(regex.findall(j)) > 0:
list_find_str = regex.findall(j)[0]
list_find_str = re.split(r'\s+', list_find_str)
list_temperature_vec.append(list_find_str)
pd_result_2 = pd.DataFrame(list_temperature_vec)
pd_result_2.columns = new_columns
pd_result_2 = pd_result_2[pd_result_2['Phy'] == 'up']
# 端口流量及状态检查
pd_output = pd.DataFrame()
for check_port in pd_result_2['Interface']:
pd_each_port = pd.DataFrame()
dict_ouput = {}
dict_ouput["port_name"] = check_port
dict_ouput["time"] = run_time
command = " show interface " + check_port
command = bytes(command, encoding='utf-8')
tn.write(command + b'\n')
time.sleep(1)
result_list = []
while (True):
command_result = tn.read_very_eager().decode('ascii')
# print(command_result)
result_list.append(command_result)
if re.findall(r"--More--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(0.05)
continue
result_str = "\n".join(result_list)
startpattern = re.compile(r'In_Bytes\s+(\d+)\s+')
strtext = re.search(startpattern, str(result_str)).group(1)
strtext = int(strtext)
dict_ouput["rx_bytes"] = strtext
startpattern = re.compile(r'E_Bytes\s+(\d+)\s+')
strtext = re.search(startpattern, str(result_str)).group(1)
strtext = int(strtext)
dict_ouput["tx_bytes"] = strtext
pd_output = pd.concat([pd_output, pd.DataFrame.from_dict([dict_ouput])], axis=0)
pd_output['time'] = pd.to_datetime(pd_output['time'])
pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
pd_output["id"] = pd_output["port_name"] + '_' + pd_output["time"]
tn.close()
return pd_output
def monitor_task():
host_ip = '129.60.161.169'
username = 'zte'
password = 'zte'
pd_output = get_info_telnet(host_ip, username, password)
database_write(pd_output)
if __name__ == '__main__':
schedule.every(10).seconds.do(monitor_task)
while True:
schedule.run_pending()
time.sleep(1)
标签:运维,re,python,list,排障,command,result,pd,time
From: https://blog.51cto.com/u_16223744/12025215