Practice 1
import paramiko
import time
from ncclient import manager
from ncclient.xml_ import to_ele
from pysnmp.hlapi import *
import socket
class Device:
def init(self,ip,name):
self.ip = ip
self.name = name
self.vty = None
self.ssh_session = None
def ssh_connect(self):
self.ssh_session = paramiko.SSHClient()
self.ssh_session.set_missing_host_key_policy(paramiko.AutoAddPolicy)
self.ssh_session.connect(hostname=self.ip,port=22,username='huawei',password='Admin@123')
self.vty = self.ssh_session.invoke_shell()
def ssh_close(self):
self.ssh_session.close()
def ssh_send(self,cmd):
self.vty.send((cmd+'\n').encode('utf-8'))
time.sleep(0.5)
res = self.vty.recv(6666)
time.sleep(0.5)
res = (res.decode('utf-8'))
print(res)
return res
def config_by_file(self,path):
self.ssh_connect()
with open(path,'r') as file:
lines = file.readlines()
self.ssh_send('sys imm')
for line in lines:
line = line.replace('\n','')
self.ssh_send(line)
self.ssh_send('q')
self.ssh_close()
def set_ge(self,int,ip,mask):
self.ssh_connect()
self.ssh_send('sys imm')
self.ssh_send(f'int {int}')
self.ssh_send('undo sh')
self.ssh_send('undo ports')
self.ssh_send(f'ip address {ip} {mask}')
self.ssh_send('q')
self.ssh_send('q')
self.ssh_close()
def put_file(self,local,remote):
self.ssh_connect()
tran = paramiko.Transport(self.ip,22)
tran.connect(username='huawei',password='Admin@123')
sftp = paramiko.SFTPClient.from_transport(tran)
sftp.put(local,remote)
time.sleep(1)
self.ssh_close()
def set_defalut_route(self):
self.ssh_connect()
self.ssh_send('sys imm')
self.ssh_send('ip route-static 0.0.0.0 0 NULL0')
self.ssh_send('q')
self.ssh_close()
def get_device_info(self):
res = getCmd(
SnmpEngine(),
UsmUserData(
userName='admin',
authKey='Huawei@123',
privKey='Huawei@123',
authProtocol=usmHMACSHAAuthProtocol,
privProtocol=usmAesCfb128Protocol,
),
UdpTransportTarget((self.ip,161)),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
)
return next(res)
def write_commands(self,local):
self.ssh_connect()
res = ''
res += '\n###############################\n'
res += self.ssh_send("display clock")
res += '\n###############################\n'
res += self.ssh_send('display device')
res += '\n###############################\n'
res += self.ssh_send('display health')
res += '\n###############################\n'
res += self.ssh_send('display power')
res += '\n###############################\n'
res += self.ssh_send('display fan')
res += '\n###############################\n'
self.ssh_close()
with open(local,'a') as file:
file.write(self.name+ ':\n')
file.writelines(res)
file.write('\n------------------------------\n')
def disable_ntp(self):
try:
with manager.connect(host=self.ip,
port=830 ,
username='huawei' ,
password='Admin@123' ,
allow_agent= False,
hostkey_verify= False,
look_for_keys= False,
device_params={'name':'huawei'}) as m:
res = ''
with open('test/disable__ntp.txt','r') as file:
lines = file.readlines()
for line in lines:
res += line
content = to_ele(res)
res = m.rpc(content)
if '<ok/>' in str(res):
print('success')
else:
print('Failed!')
except Exception as e :
print('Error:',e)
if name == 'main':
host1 = Device('192.168.56.10','CE1')
host2 = Device('192.168.56.20','CE2')
host_list = []
host_list.append(host1)
host_list.append(host2)
host1.set_ge('vlanif 1','192.168.255.1','24')
host1.set_ge('vlanif 1','192.168.255.2','24')
host1.set_defalut_route()
host1.set_ge('ge 1/0/8','10.1.1.1','24')
host1.set_ge('ge 1/0/9','10.1.2.1','24')
for host in host_list:
host.config_by_file('sftp.txt')
for host in host_list:
host.put_file('snmp.txt','/snmp.txt')
for host in host_list:
host.config_by_file('snmp.txt')
for host in host_list:
res = host.get_device_info()[-1][-1]
print(f'{host.name}:\n{res}')
for host in host_list:
host.write_commands('log_2.txt')
for host in host_list:
host.config_by_file('netconf.txt')
for host in host_list:
host.disable_ntp()
client:
client_ip = '192.168.56.20'
client_port = 6666
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.connect((client_ip,client_port))
msg = (b'z'.zfill(1024)).encode()
sock.send(msg)
res = sock.recv(1024)
print(res.decode())
sock.close()
server:
server_ip = '192.168.56.10'
server_port = 6666
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.bind((server_ip,server_port))
sock.listen(5)
while True:
sock_client,sock_ip = sock.accept()
print(sock_ip,' has been connected!')
res = sock_client.recv(1024)
if res:
print(res)
msg = ('success').encode()
sock_client.send(msg)
sock_client.close()
socket:
UDP
import socket
server_ip = '127.0.0.1'
server_port = 6666
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.bind((server_ip,server_port))
while True:
data,client_ip = sock.recvfrom(1024)
print(client_ip,'has connected')
print(data.decode())
msg = 'connected has been created'
sock.sendto(msg.encode(),client_ip)
UDP
import socket
server_ip = '127.0.0.1'
server_port = 6666
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
msg = b'a'.zfill(1024)
sock.sendto(msg,(server_ip,server_port))
res = sock.recv(1024)
print(res.decode())
sock.close()
标签:Network,res,self,sock,host,Program,ssh,ip,Log From: https://www.cnblogs.com/heydom/p/18458095