import netaddr from netaddr import * import pandas as pd import streamlit as st from concurrent.futures import ThreadPoolExecutor import subprocess import shlex from threading import Lock import netmiko class compute(object): def __init__(self): st.sidebar.header(":red[IP计算器]") #self.cmd = st.text_input("请输入要执行的命令") self.net_iput = st.sidebar.text_input("请输入ip地址", help="示例:1.1.1.1/24") # 输入要划分的网段 self.subnet_list = [] # subnet列表 def subnet(self): try: net = IPNetwork(self.net_iput) # 输入要划分的网段 choose = st.sidebar.checkbox("是否进行子网划分",key="key123") if choose == True: cidr = st.sidebar.number_input("请选择划分的子网", min_value=1, max_value=128) if int(cidr) - int(net.prefixlen) <20: self.subnet_list = [str(snet) for snet in net.subnet(cidr)] # 获取子网list #st.write(":red[子网详情如下:]") with st.sidebar.expander(":red[查看子网详情]"): df_subnet = pd.DataFrame(data=self.subnet_list, columns=["子网详情"]) st.dataframe(df_subnet,width=1000) else: st.sidebar.warning("子网数量过多,请重新输入") except netaddr.core.AddrFormatError: pass def ip_info(self): tab1,tab2,tab3 = st.sidebar.tabs([":red[可用地址]",":red[反掩码]",":red[掩码]"]) try: ip = IPNetwork(self.net_iput) with tab1: st.write(str(ip[1]) + str("---") + str(ip[-2])) with tab2: st.write(str(ip.hostmask)) with tab3: st.write(str(ip.netmask)) except netaddr.core.AddrFormatError: pass class mult_ping(object): def __init__(self): self.log_list = [] self.log_list_name = [] self.success_list = [] self.success_list_name = [] self.failed_list = [] self.failed_list_name = [] self.failed_list_ip = [] self.log_list_1 = [] self.log_list_name_1 = [] self.success_list_1 = [] self.success_list_name_1 = [] self.failed_list_1 = [] self.failed_list_name_1 = [] self.failed_list_ip_1 = [] def ping(self,name,ip): cmd_tmp = shlex.split("ping -c 2 %s" % str(ip)) cmd = subprocess.run(cmd_tmp, stdout=subprocess.PIPE) result = (cmd.stdout.decode("gbk")) self.log_list.append(result) self.log_list_name.append(str(name)) if cmd.returncode == 0: result = "正常" self.success_list.append(result) self.success_list_name.append(str(name)) else: result = "不通" self.failed_list.append(result) self.failed_list_name.append(str(name)) self.failed_list_ip.append((str(ip))) def snmp(self,name, ip): lock = Lock() cmd_snmp = shlex.split("snmpwalk -a sha -A muS7kKrZTAFbzPa4Uu56 -v 3 -l authPriv -x aes -X muS7kKrZTAFbzPa4Uu56 -u yunqiao %s ifindex" % str(ip)) cmd = subprocess.run(cmd_snmp, stdout=subprocess.PIPE) if cmd.returncode == 0: lock.acquire() result = "snmpv3正常" self.success_list_1.append(result) self.success_list_name_1.append(str(name)) cmd_result = (cmd.stdout.decode("gbk")) self.log_list_1.append(cmd_result) self.log_list_name_1.append(str(name)) lock.release() elif cmd.returncode == 1: cmd_snmp = shlex.split("snmpwalk -v 2c -c AXQfPuJs %s ifindex" % str(ip)) tmp_cmd = subprocess.run(cmd_snmp,stdout=subprocess.PIPE) if tmp_cmd.returncode == 0: lock.acquire() result = "snmpv2正常" self.success_list_1.append(result) self.success_list_name_1.append(str(name)) cmd_result = (tmp_cmd.stdout.decode("gbk")) self.log_list_1.append(cmd_result) self.log_list_name_1.append(str(name)) lock.release() else: lock.acquire() result = "认证失败或超时" self.failed_list_1.append(result) self.failed_list_name_1.append(str(name)) self.failed_list_ip_1.append((str(ip))) cmd_result = (cmd.stdout.decode("gbk")) self.log_list_1.append(cmd_result) self.log_list_name_1.append(str(name)) lock.release() elif cmd.returncode == 2: lock.acquire() result = "终端禁止接入" self.failed_list_1.append(result) self.failed_list_name_1.append(str(name)) self.failed_list_ip_1.append((str(ip))) cmd_result = (cmd.stdout.decode("gbk")) self.log_list_1.append(cmd_result) self.log_list_name_1.append(str(name)) lock.release() else: lock.acquire() result = "其他问题" self.failed_list_1.append(result) self.failed_list_name_1.append(str(name)) self.failed_list_ip_1.append((str(ip))) cmd_result = (cmd.stdout.decode("gbk")) self.log_list_1.append(cmd_result) self.log_list_name_1.append(str(name)) lock.acquire() def all_function(self): upload_file = st.sidebar.file_uploader(label="请上传要进行批量ping的文件", type="xlsx", accept_multiple_files=False) if upload_file == None: st.stop() else: df = pd.read_excel(upload_file,None) sheet = df.keys() # 获取所有sheet页 tabs = st.tabs(sheet) for tab ,sheet_name,num in zip (tabs,sheet,list(range(1,10000))): with tab: df_sheet = df[sheet_name] st.write(df_sheet) col1, col2 = st.columns(2) with col1: result_name = st.selectbox("请选择设备名称列",df_sheet.keys(),key=10+num) # 选择最后保存的名字 if result_name == None: st.stop() df_result_name = df_sheet[result_name] # 获取设备名称的Dataframe st.write(df_result_name) dev_name_list = df_result_name.to_list() #st.write(self.dev_name_list) with col2: ip_info = st.selectbox("请选择ip地址列",df_sheet.keys(),key=2+num) # 选择ping的地址 if ip_info == None: st.stop() df_ip_info = df_sheet[ip_info] # 获取IP的dataframe st.write(df_ip_info) ip_host_list = df_ip_info.to_list() zip_list = zip(dev_name_list,ip_host_list) # 将选择的数据进行zip打包成元组 # 功能选项 ping_tab1, snmp_tab2 = st.tabs(["ping检查", "snmp检查"]) with ping_tab1: choose_ping = st.checkbox("是否执行ping",key=20+num) # 是否执行CMD命令 if choose_ping ==True: pool = ThreadPoolExecutor(7) with st.spinner("wait for it"): for i in zip_list: name = i[0] ip = i[1] #st.write(i) pool.submit(yc.ping, name, ip) pool.shutdown() #st.balloons() st.snow() tab1, tab2, tab3 = st.tabs([":red[succeed]", ":red[failed]", ":red[log]"]) with tab1: dic = {"name": self.success_list_name, "result": self.success_list} df_result = pd.DataFrame(data=dic) st.write(df_result) with tab2: dic = {"name": self.failed_list_name, "ip":self.failed_list_ip,"result": self.failed_list} df_result = pd.DataFrame(data=dic) st.write(df_result) with tab3: dic = {"name": self.log_list_name, "result": self.log_list} df_result = pd.DataFrame(data=dic) st.write(df_result) else: pass with snmp_tab2: choose_snmp = st.checkbox("是否执行snmp_walk",key=30+num) #是否执行snmp命令 if choose_snmp ==True: pool = ThreadPoolExecutor(7) with st.spinner("wait for it"): for i in zip_list: name = i[0] ip = i[1] #st.write(i) pool.submit(yc.snmp, name, ip) pool.shutdown() #st.balloons() st.snow() tab1, tab2, tab3 = st.tabs([":red[succeed]", ":red[failed]", ":red[log]"]) with tab1: dic = {"name": self.success_list_name_1, "result": self.success_list_1} df_result = pd.DataFrame(data=dic) st.write(df_result) with tab2: dic = {"name": self.failed_list_name_1,"ip":self.failed_list_ip_1, "result": self.failed_list_1} df_result = pd.DataFrame(data=dic) st.write(df_result) with tab3: dic = {"name": self.log_list_name_1, "result": self.log_list_1} df_result = pd.DataFrame(data=dic) st.write(df_result) else: pass class netmiko_use(object): def __init__(self): self.result_list = [] # 存放命令结果 self.dev_name_list = [] # 存放设备名字 self.result_ip_list = [] self.failed_list = [] self.failed_name_list =[] self.failed_ip_list = [] def deal_cmd(self,ip,user,dev_type,passwd,dev_name,cmd): lock = Lock() try: devices = { 'device_type': dev_type, # 锐捷os:ruijie_os, 华三:hp_comware 中兴:zte_zxros huawei:huawei 'ip': ip, 'username': user, 'password': passwd, } connect_dev = netmiko.ConnectHandler(**devices) cmd_out = connect_dev.send_config_set(cmd, enter_config_mode=False, bypass_commands="$$") lock.acquire() self.result_list.append(cmd_out) self.dev_name_list.append(dev_name) self.result_ip_list.append(ip) lock.release() except netmiko.exceptions.NetmikoAuthenticationException: lock.acquire() result = "认证失败" self.failed_list.append(result) self.failed_name_list.append((dev_name)) self.failed_ip_list.append(ip) lock.release() except netmiko.exceptions.NetmikoTimeoutException: lock.acquire() result = "超时" self.failed_list.append(result) self.failed_name_list.append((dev_name)) self.failed_ip_list.append(ip) lock.release() def net_function(self): upload_file = st.sidebar.file_uploader(label="请上传iplist", type="xlsx", accept_multiple_files=False) if upload_file == None: st.stop() df = pd.read_excel(upload_file,None) sheet = df.keys() # 获取所有sheet页 tabs = st.tabs(sheet) for tab ,sheet_name,num in zip (tabs,sheet,list(range(1,10000))): with tab: pool = ThreadPoolExecutor(7) self.list = [] # 存放对应列表的series failed_info = "请补充设备信息,需包含(ip,user,dev_type,password,dev_name,cmd)" list_header = ["ip", "user", "dev_type", "password", "dev_name", "cmd"] df_sheet = df[sheet_name] st.write(df_sheet) colums_header = df_sheet.keys() # 获取行头部 for colum in list_header: if colum in colums_header: #st.write("正常") self.list.append(df_sheet[colum]) # series数据保存在列表中 else: st.warning(failed_info) break try: zip_result = zip(self.list[0],self.list[1],self.list[2], self.list[3],self.list[4],self.list[5]) #st.write(zip_result) choose = st.checkbox("是否执行命令",key=20+num) if choose ==True: with st.spinner("wait for it"): for dev_info in zip_result: ip = dev_info[0] user = dev_info[1] dev_type = dev_info[2] password = dev_info[3] dev_name = dev_info[4] cmd_tmp = dev_info[5] cmd = cmd_tmp.split("\n") pool.submit(yc.deal_cmd,ip,user,dev_type,password,dev_name,cmd) pool.shutdown() #st.write(self.result_list) st.snow() tab1,tab2 = st.tabs([":red[failed]", ":red[log]"]) with tab1: dic = {"name": self.failed_name_list,"ip":self.failed_ip_list,"result": self.failed_list} df_result = pd.DataFrame(data=dic) st.write(df_result) pass with tab2: dic = {"name": self.dev_name_list, "ip":self.result_ip_list,"result": self.result_list} df_result = pd.DataFrame(data=dic) st.write(df_result) except IndexError: pass if __name__ == '__main__': st.title(':red[Welcome] :sunglasses:') # 创建title,可以链接 st.sidebar.header(":red[网络大杂烩] :chicken:") func_select = st.sidebar.radio(":tm:",["子网划分功能","ping、snmp功能","网络设备配置功能"]) if func_select == "ping、snmp功能": st.snow() yc = mult_ping() yc.all_function() elif func_select == "网络设备配置功能": st.snow() yc = netmiko_use() yc.net_function() elif func_select =="子网划分功能": st.snow() yc= compute() yc.ip_info() yc.subnet()
标签:subnet,功能,self,网络,st,import,net,sidebar,合集 From: https://www.cnblogs.com/yc-tec/p/18054082