首页 > 其他分享 >网络功能合集

网络功能合集

时间:2024-03-05 15:12:45浏览次数:21  
标签:subnet 功能 self 网络 st import net sidebar 合集

import netaddr
from netaddr import *
import  pandas as pd
import streamlit as st
from concurrent.futures import ThreadPoolExecutor
import subprocess
import shlex
from threading import Lock
import netmiko

class compute(object):

    def __init__(self):

        st.sidebar.header(":red[IP计算器]")
        #self.cmd = st.text_input("请输入要执行的命令")
        self.net_iput = st.sidebar.text_input("请输入ip地址", help="示例:1.1.1.1/24")  # 输入要划分的网段
        self.subnet_list = []  # subnet列表

    def subnet(self):
        try:
            net = IPNetwork(self.net_iput)  # 输入要划分的网段
            choose = st.sidebar.checkbox("是否进行子网划分",key="key123")
            if choose == True:
                cidr = st.sidebar.number_input("请选择划分的子网", min_value=1, max_value=128)
                if int(cidr) - int(net.prefixlen) <20:
                    self.subnet_list = [str(snet) for snet in net.subnet(cidr)]  # 获取子网list
                    #st.write(":red[子网详情如下:]")
                    with st.sidebar.expander(":red[查看子网详情]"):
                        df_subnet = pd.DataFrame(data=self.subnet_list, columns=["子网详情"])
                        st.dataframe(df_subnet,width=1000)
                else:
                    st.sidebar.warning("子网数量过多,请重新输入")
        except netaddr.core.AddrFormatError:
            pass

    def ip_info(self):
        tab1,tab2,tab3 = st.sidebar.tabs([":red[可用地址]",":red[反掩码]",":red[掩码]"])
        try:
            ip = IPNetwork(self.net_iput)
            with tab1:
                st.write(str(ip[1]) + str("---") + str(ip[-2]))
            with tab2:
                st.write(str(ip.hostmask))
            with tab3:
                st.write(str(ip.netmask))
        except netaddr.core.AddrFormatError:
            pass


class mult_ping(object):

    def __init__(self):
        self.log_list = []
        self.log_list_name = []
        self.success_list = []
        self.success_list_name = []
        self.failed_list = []
        self.failed_list_name = []
        self.failed_list_ip = []

        self.log_list_1 = []
        self.log_list_name_1 = []
        self.success_list_1 = []
        self.success_list_name_1 = []
        self.failed_list_1 = []
        self.failed_list_name_1 = []
        self.failed_list_ip_1 = []

    def ping(self,name,ip):
        cmd_tmp = shlex.split("ping -c 2 %s" % str(ip))
        cmd = subprocess.run(cmd_tmp, stdout=subprocess.PIPE)
        result = (cmd.stdout.decode("gbk"))
        self.log_list.append(result)
        self.log_list_name.append(str(name))
        if cmd.returncode == 0:
            result = "正常"
            self.success_list.append(result)
            self.success_list_name.append(str(name))
        else:
            result = "不通"
            self.failed_list.append(result)
            self.failed_list_name.append(str(name))
            self.failed_list_ip.append((str(ip)))

    def snmp(self,name, ip):
        lock = Lock()
        cmd_snmp = shlex.split("snmpwalk -a sha -A muS7kKrZTAFbzPa4Uu56 -v 3 -l authPriv -x aes -X muS7kKrZTAFbzPa4Uu56 -u yunqiao %s ifindex" % str(ip))
        cmd = subprocess.run(cmd_snmp, stdout=subprocess.PIPE)

        if cmd.returncode == 0:
            lock.acquire()
            result = "snmpv3正常"
            self.success_list_1.append(result)
            self.success_list_name_1.append(str(name))
            cmd_result = (cmd.stdout.decode("gbk"))
            self.log_list_1.append(cmd_result)
            self.log_list_name_1.append(str(name))
            lock.release()
        elif cmd.returncode == 1:
            cmd_snmp = shlex.split("snmpwalk -v 2c -c AXQfPuJs  %s ifindex" % str(ip))
            tmp_cmd = subprocess.run(cmd_snmp,stdout=subprocess.PIPE)
            if tmp_cmd.returncode == 0:
                lock.acquire()
                result = "snmpv2正常"
                self.success_list_1.append(result)
                self.success_list_name_1.append(str(name))
                cmd_result =  (tmp_cmd.stdout.decode("gbk"))
                self.log_list_1.append(cmd_result)
                self.log_list_name_1.append(str(name))
                lock.release()
            else:
                lock.acquire()
                result = "认证失败或超时"
                self.failed_list_1.append(result)
                self.failed_list_name_1.append(str(name))
                self.failed_list_ip_1.append((str(ip)))
                cmd_result = (cmd.stdout.decode("gbk"))
                self.log_list_1.append(cmd_result)
                self.log_list_name_1.append(str(name))
                lock.release()
        elif cmd.returncode == 2:
            lock.acquire()
            result = "终端禁止接入"
            self.failed_list_1.append(result)
            self.failed_list_name_1.append(str(name))
            self.failed_list_ip_1.append((str(ip)))
            cmd_result = (cmd.stdout.decode("gbk"))
            self.log_list_1.append(cmd_result)
            self.log_list_name_1.append(str(name))
            lock.release()
        else:
            lock.acquire()
            result = "其他问题"
            self.failed_list_1.append(result)
            self.failed_list_name_1.append(str(name))
            self.failed_list_ip_1.append((str(ip)))
            cmd_result = (cmd.stdout.decode("gbk"))
            self.log_list_1.append(cmd_result)
            self.log_list_name_1.append(str(name))
            lock.acquire()

    def all_function(self):
        upload_file = st.sidebar.file_uploader(label="请上传要进行批量ping的文件",
                         type="xlsx",
                         accept_multiple_files=False)
        if upload_file == None:
            st.stop()
        else:
            df = pd.read_excel(upload_file,None)
            sheet = df.keys()  # 获取所有sheet页
            tabs = st.tabs(sheet)
            for tab ,sheet_name,num in zip (tabs,sheet,list(range(1,10000))):
                with tab:
                    df_sheet = df[sheet_name]
                    st.write(df_sheet)
                    col1, col2 = st.columns(2)
                    with col1:
                        result_name = st.selectbox("请选择设备名称列",df_sheet.keys(),key=10+num)   # 选择最后保存的名字
                        if result_name == None:
                            st.stop()
                        df_result_name =  df_sheet[result_name]  # 获取设备名称的Dataframe
                        st.write(df_result_name)
                        dev_name_list = df_result_name.to_list()
                        #st.write(self.dev_name_list)
                    with col2:
                        ip_info = st.selectbox("请选择ip地址列",df_sheet.keys(),key=2+num)  # 选择ping的地址
                        if ip_info == None:
                            st.stop()
                        df_ip_info = df_sheet[ip_info]   # 获取IP的dataframe
                        st.write(df_ip_info)
                        ip_host_list = df_ip_info.to_list()
                    zip_list = zip(dev_name_list,ip_host_list)  # 将选择的数据进行zip打包成元组
                    # 功能选项
                    ping_tab1, snmp_tab2 = st.tabs(["ping检查", "snmp检查"])
                    with ping_tab1:
                        choose_ping = st.checkbox("是否执行ping",key=20+num)  # 是否执行CMD命令
                        if choose_ping ==True:
                            pool = ThreadPoolExecutor(7)
                            with st.spinner("wait for it"):
                                for i in zip_list:
                                    name = i[0]
                                    ip = i[1]
                                    #st.write(i)
                                    pool.submit(yc.ping, name, ip)
                                pool.shutdown()
                                #st.balloons()
                                st.snow()
                                tab1, tab2, tab3 = st.tabs([":red[succeed]", ":red[failed]", ":red[log]"])
                                with tab1:
                                    dic = {"name": self.success_list_name, "result": self.success_list}
                                    df_result = pd.DataFrame(data=dic)
                                    st.write(df_result)
                                with tab2:
                                    dic = {"name": self.failed_list_name, "ip":self.failed_list_ip,"result": self.failed_list}
                                    df_result = pd.DataFrame(data=dic)
                                    st.write(df_result)
                                with tab3:
                                    dic = {"name": self.log_list_name, "result": self.log_list}
                                    df_result = pd.DataFrame(data=dic)
                                    st.write(df_result)
                        else:
                            pass
                    with snmp_tab2:
                        choose_snmp = st.checkbox("是否执行snmp_walk",key=30+num)  #是否执行snmp命令
                        if choose_snmp ==True:
                            pool = ThreadPoolExecutor(7)
                            with st.spinner("wait for it"):
                                for i in zip_list:
                                    name = i[0]
                                    ip = i[1]
                                    #st.write(i)
                                    pool.submit(yc.snmp, name, ip)
                                pool.shutdown()
                                #st.balloons()
                                st.snow()
                                tab1, tab2, tab3 = st.tabs([":red[succeed]", ":red[failed]", ":red[log]"])
                                with tab1:
                                    dic = {"name": self.success_list_name_1, "result": self.success_list_1}
                                    df_result = pd.DataFrame(data=dic)
                                    st.write(df_result)
                                with tab2:
                                    dic = {"name": self.failed_list_name_1,"ip":self.failed_list_ip_1, "result": self.failed_list_1}
                                    df_result = pd.DataFrame(data=dic)
                                    st.write(df_result)
                                with tab3:
                                    dic = {"name": self.log_list_name_1, "result": self.log_list_1}
                                    df_result = pd.DataFrame(data=dic)
                                    st.write(df_result)
                        else:
                            pass


class netmiko_use(object):

    def __init__(self):

        self.result_list = []  # 存放命令结果
        self.dev_name_list = []  # 存放设备名字
        self.result_ip_list = []

        self.failed_list = []
        self.failed_name_list =[]
        self.failed_ip_list = []




    def deal_cmd(self,ip,user,dev_type,passwd,dev_name,cmd):
        lock = Lock()
        try:
            devices = {
                'device_type': dev_type,  # 锐捷os:ruijie_os, 华三:hp_comware 中兴:zte_zxros huawei:huawei
                'ip': ip,
                'username': user,
                'password': passwd,
            }
            connect_dev = netmiko.ConnectHandler(**devices)
            cmd_out = connect_dev.send_config_set(cmd, enter_config_mode=False, bypass_commands="$$")
            lock.acquire()
            self.result_list.append(cmd_out)
            self.dev_name_list.append(dev_name)
            self.result_ip_list.append(ip)
            lock.release()

        except netmiko.exceptions.NetmikoAuthenticationException:
            lock.acquire()
            result = "认证失败"
            self.failed_list.append(result)
            self.failed_name_list.append((dev_name))
            self.failed_ip_list.append(ip)
            lock.release()
        except netmiko.exceptions.NetmikoTimeoutException:
            lock.acquire()
            result = "超时"
            self.failed_list.append(result)
            self.failed_name_list.append((dev_name))
            self.failed_ip_list.append(ip)
            lock.release()

    def net_function(self):
        upload_file = st.sidebar.file_uploader(label="请上传iplist",
                                       type="xlsx",
                                       accept_multiple_files=False)
        if upload_file == None:
            st.stop()

        df = pd.read_excel(upload_file,None)
        sheet = df.keys()  # 获取所有sheet页
        tabs = st.tabs(sheet)
        for tab ,sheet_name,num in zip (tabs,sheet,list(range(1,10000))):
            with tab:
                pool =  ThreadPoolExecutor(7)
                self.list = []  # 存放对应列表的series
                failed_info = "请补充设备信息,需包含(ip,user,dev_type,password,dev_name,cmd)"
                list_header = ["ip", "user", "dev_type", "password", "dev_name", "cmd"]
                df_sheet = df[sheet_name]
                st.write(df_sheet)
                colums_header = df_sheet.keys() # 获取行头部
                for colum in list_header:
                    if colum in colums_header:
                        #st.write("正常")
                        self.list.append(df_sheet[colum]) # series数据保存在列表中
                    else:
                        st.warning(failed_info)
                        break
                try:
                    zip_result = zip(self.list[0],self.list[1],self.list[2],
                                     self.list[3],self.list[4],self.list[5])
                    #st.write(zip_result)
                    choose = st.checkbox("是否执行命令",key=20+num)
                    if choose ==True:
                        with st.spinner("wait for it"):
                            for dev_info in zip_result:
                                ip = dev_info[0]
                                user = dev_info[1]
                                dev_type = dev_info[2]
                                password = dev_info[3]
                                dev_name = dev_info[4]
                                cmd_tmp = dev_info[5]
                                cmd = cmd_tmp.split("\n")
                                pool.submit(yc.deal_cmd,ip,user,dev_type,password,dev_name,cmd)
                            pool.shutdown()
                            #st.write(self.result_list)
                            st.snow()
                            tab1,tab2 = st.tabs([":red[failed]", ":red[log]"])
                            with tab1:
                                dic = {"name": self.failed_name_list,"ip":self.failed_ip_list,"result": self.failed_list}
                                df_result = pd.DataFrame(data=dic)
                                st.write(df_result)
                                pass
                            with tab2:
                                dic = {"name": self.dev_name_list, "ip":self.result_ip_list,"result": self.result_list}
                                df_result = pd.DataFrame(data=dic)
                                st.write(df_result)
                except IndexError:
                    pass




if __name__ == '__main__':
    st.title(':red[Welcome] :sunglasses:')  # 创建title,可以链接
    st.sidebar.header(":red[网络大杂烩] :chicken:")
    func_select = st.sidebar.radio(":tm:",["子网划分功能","ping、snmp功能","网络设备配置功能"])
    if func_select == "ping、snmp功能":
        st.snow()
        yc = mult_ping()
        yc.all_function()
    elif func_select == "网络设备配置功能":
        st.snow()
        yc = netmiko_use()
        yc.net_function()
    elif func_select =="子网划分功能":
        st.snow()
        yc= compute()
        yc.ip_info()
        yc.subnet()

 

标签:subnet,功能,self,网络,st,import,net,sidebar,合集
From: https://www.cnblogs.com/yc-tec/p/18054082

相关文章

  • 计算机网络汇总
    一网络分层模型OSI七层模型 是国际标准化组织提出一个网络分层模型,其大体结构以及每一层提供的功能如下图所示:TCP/IP四层模型是目前被广泛采用的一种模型,我们可以将TCP/IP模型看作是OSI七层模型的精简版本,由以下4层组成:应用层传输层网络层网络接口层需要......
  • 深度学习-卷积神经网络-Faster RCNN anchor详解-53
    目录1.Anchor参考:https://zhuanlan.zhihu.com/p/86403390?utm_id=01.Anchor我第一次接触Anchor(中文叫做锚)的时候,比较懵逼的,什么是锚这个问题让思考了好久,这也是阻碍大家学习FasterRCNN最大的绊脚石索性我们就先把anchor理解为一个个按照固定比例(长宽、大小)预定义的框lib/ne......
  • 【专题】保险行业数字化洞察白皮书报告PDF合集分享(附原数据表)
    报告链接:https://tecdat.cn/?p=33203原文出处:拓端数据部落公众号近年来,"养老"、"三胎政策"、"医疗成本"等一系列备受关注的民生话题,使得保险服务备受瞩目,并逐渐渗透到每个人的生活中。自2020年以来,由于多种因素的影响,人们对健康的意识不断提高,这正在重新塑造中国消费者对保险的......
  • 【专题】2024中国ESG消费报告PDF合集分享(附原数据表)
    原文链接:https://tecdat.cn/?p=35253原文出处:拓端数据部落公众号消费者展现出了既有不变的坚持也有变化的需求。他们一直期望企业或品牌能够通过可持续产品与他们进行价值对话,例如产品配方的环境友好性、包装更新对生态利益的照顾以及循环再造的可能性等。这些具有可持续价值的......
  • 计算机网络概述
    1.互联网的2个特点:联通性、资源共享2.计算机网络:由若干节点和连接这些节点的链路组成3.互“连”网:有多个网络通过一些路由器相互连接起来,构成了个覆盖范围更大的计算机网络。4.互联网提供者:ISP互联网交换点:IXP5.计算机之间的通信:主机A的某个进程和主机B上的另一个进程进行通......
  • CentOS 设置系统时间与网络时间同步
    CentOS设置系统时间与网络时间同步一、Linux的时间分为(两种)SystemClock(系统时间)指当前LinuxKernel中的时间RealTimeClock(硬件时间,简称RTC)主板上有电池供电的时间二、查看系统时间的命令系统时间指令:#date设置系统时间的命令date-set(年/月/日时......
  • C++面试,实现memcpy,strcpy这2个函数的功能
    `strcpy`和`memcpy`都是用于内存复制的函数,但它们之间有几个关键的区别:1.**复制的对象**:-`strcpy`主要用于复制字符串,它将从源字符串的起始位置开始复制字符,直到遇到源字符串的空字符('\0'),然后将空字符也复制到目标字符串中,表示字符串的结束。-`memcpy`则是通用的内存复......
  • ROS 同一工作空间下两个功能包如何相互调用
    如何在同一工作空间下调用另一ros功能包本项目有两个ros功能包,a_pack和b_pack,a_pack调用了b_pack的函数,本项目为了展示如何配置b_pack和a_pack功能包,让a_pack可以调用b_pack。在c++编程中,常见用法是将b_pack编译成库文件,然后b_pack对外提供头文件,a_pack对b_pack的调用就是通过引......
  • docker系列教程04---容器网络
    一、Docker不启动时默认的网络情况这里先把Docker停掉了,然后咱们ifconfig看一眼:ifconfig ens33不用多说了,lo为回环。这里可能还会有一个virbr0,在CentOS7的安装过程中如果有选择相关虚拟化的的服务安装系统后,启动网卡时会发现有一个以网桥连接的私网地址的virbr0网卡(virbr......
  • Jitsi Meet 是一组开源项目,使用户能够使用和部署具有最先进视频质量和功能的视频会议
    JitsiMeet是一组开源项目,使用户能够使用和部署具有最先进视频质量和功能的视频会议平台。 为了在运行Docker和DockerCompose的机器上快速运行JitsiMeet,请执行以下步骤:下载并解压缩最新版本。不要克隆git仓库。如果您对运行测试映像感兴趣,请参阅下文:wget$(curl-sht......