以下为使用 Python 实现批量 ping 的多种方法及代码示例:
方法一:
import subprocess
filepath = 'E:\\Python\\tools\\AutoMatic\\hosts.txt'
with open(filepath, 'r') as f:
hosts = f.readlines()
for host in hosts:
result = subprocess.check_output(('ping', '-n', '1', host.strip()))
print(result)
运行效果如下:
运行结果
将回显中的关键信息提取出来并以更易读的格式输出:
import subprocess
filepath = 'E:\\Python\\tools\\AutoMatic\\hosts.txt'
with open(filepath, 'r') as f:
hosts = f.readlines()
for host in hosts:
result = subprocess.check_output(('ping', '-n', '1', host.strip()))
result_str = result.decode('gbk')
lines = result_str.split('\r\n')
for line in lines:
if '来自' in line:
parts = line.split('来自 ')
ip = parts[1].split(' 的回复')[0]
if '时间<' in line:
time_part = line.split('时间<')[1].split('ms')[0]
print(f"主机:{host.strip()}, IP:{ip}, 响应时间:{time_part}ms")
else:
print(f"主机:{host.strip()}, IP:{ip}, 响应时间未知")
方法二:
import asyncio
import datetime
import ipaddress
import re
import socket
from prettytable import PrettyTable
class MultiICMP:
def __init__(self, hosts, pool_size=150, timeout=1, SType='ICMP', EMode=1, isEcho=True):
self.hosts = hosts
self.timeout = timeout
self.pool_size = pool_size
self.SType = SType
self.EMode = EMode
self.isEcho = isEcho
self.result = ()
async def ICMP(self, ip):
result = await asyncio.create_subprocess_shell("ping -n 2 -w 35 {ip}".format(ip=ip), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await result.communicate()
date = stdout.decode('gbk')
if self.SType == 'ICMP':
res = {'Destination': ip, 'IP': "", 'TTL': '', 'MIN': '', 'MAX': '', 'AVERAGE': '', 'STATE': 'DOWN'}
elif self.SType == 'ARP':
self.EMode = 2
res = {'Destination': ip, 'IP': "", 'TTL': '', 'MIN': '', 'MAX': '', 'AVERAGE': '', 'STATE': 'DOWN', 'MAC': ''}
if "正在 Ping" in date:
date2 = date.splitlines()
r = re.compile(r'(?:Ping).*具有').search(date2(1)).group()
if '(' in r:
r = r.replace('Ping ', '').split('(')
res('IP') = r(1).split(')')(0)
else:
res('IP') = res('Destination')
if 'TTL' in date:
if 'TTL' in date2(2):
res('TTL') = date2(2).split('TTL=')(1)
else:
res('TTL') = date2(3).split('TTL=')(1)
r = date2(8).split(',')
res('MIN') = r(0).split('=')(1)
res('MAX') = r(1).split('=')(1)
res('AVERAGE') = r(2).split('=')(1)
res('STATE') = 'UP'
self.result.append(res)
else:
if self.EMode > 1:
self.result.append(res)
elif self.EMode == 3:
self.result.append(res)
def addMac(self):
import subprocess
result = subprocess.Popen("arp -a", shell=True, stdout=subprocess.PIPE).stdout.read().decode('gbk')
k = result.splitlines()
arptable = {}
for f in k:
if '态' in f:
date2 = f.split()
arptable(date2(0)) = date2(1)
for i in self.result:
if i('IP') in arptable:
i('MAC') = arptable(i('IP'))
方法三:
import ping3
def ping_ip(ip):
result = ping3.ping(ip)
if result is not None:
print(f"{ip} is reachable.")
else:
print(f"{ip} is unreachable.")
def batch_ping(ips):
for ip in ips:
ping_ip(ip)
if __name__ == "__main__":
ips = ("192.168.0.1", "192.168.0.2", "192.168.0.3")
batch_ping(ips)
方法四:
import subprocess
def ping_host(host, is_windows):
"""发送一个ping请求到指定的主机,并返回ping的结果。"""
# 根据操作系统类型选择ping命令和参数
if is_windows:
params = ('ping', '-n', '1', host)
else:
params = ('ping', '-c', '1', host)
# 发送ping请求并获取输出
result = subprocess.run(params, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output = result.stdout + result.stderr
# 分析ping输出
if is_windows:
# 对于Windows系统,检查是否有"请求超时"这样的输出
return "请求超时" not in output
else:
# 对于UNIX-like系统,检查是否有"64 bytes from"或"0% packet loss"这样的输出
return ("64 bytes from" in output) or ("0% packet loss" in output) or (not output)
# 显示操作系统选择菜单
print("请选择您的操作系统:")
print("1. Windows")
print("2. Linux/macOS")
# 获取用户输入
choice = input("请输入对应的序号:")
# 根据用户输入确定操作系统类型
is_windows = choice == "1"
# 读取iplist.txt文件中的IP地址,"iplist.txt"换成自己的ip字典文件的文件名
with open('iplist.txt', 'r') as file:
ips = file.read().splitlines()
# 对每个IP地址执行ping操作,并输出结果
for ip in ips:
if ping_host(ip, is_windows):
print(f"ping {ip} 是通的")
else:
print(f"ping {ip} 不通")
这些方法可以根据不同的需求和场景选择使用。
Python 批量 ping 方法一的代码解析
在第一种方法中,通常会使用一些内置模块和特定的函数来实现批量 ping 操作。例如,可能会用到 subprocess
模块来执行系统的 ping
命令,并通过读取命令的输出结果来判断目标主机的连通性。代码可能会先定义一个读取网段或 IP 列表的函数,然后通过循环逐个对这些网段或 IP 进行 ping
操作。在处理 ping
结果时,会根据返回的状态码或特定的输出字符串来判断是否连通。比如,如果返回的代码为 0 ,可能表示连通,否则表示不通。同时,还会考虑对结果的存储和处理,以便后续的分析和统计。
为了更好地理解,我们来看一个简单的示例代码:
import subprocess
def check_alive(ip):
result = subprocess.call('ping -w 1000 -n 1 %s' %ip,stdout=subprocess.PIPE,shell=True)
if result == 0:
h = subprocess.getoutput('ping ' ip)
returnnum = h.split('平均 = ')(1)
info = ('\\033(32m%s\\033(0m 能ping通,延迟平均值为:%s' %(ip,returnnum))
print('\\033(32m%s\\033(0m 能ping通,延迟平均值为:%s' %(ip,returnnum))
else:
with open('notong.txt','a') as f:
f.write(ip)
info = ('\\033(31m%s\\033(0m ping 不通!' % ip)
print('\\033(31m%s\\033(0m ping 不通!' % ip)
if __name__ == '__main__':
print("开始批量ping所有IP!")
with open('ip.txt', 'r') as f: # ip.txt为本地文件记录所有需要检测连通性的ip
for i in f:
这段代码首先定义了一个 check_alive
函数来执行 ping
操作并处理结果,然后在主程序中读取一个包含 IP 地址的文件进行批量 ping
。
Python 批量 ping 方法二的代码详解
第二种方法可能会采用不同的模块和策略。比如,使用 ping3
模块来发送 ping
请求。在代码中,会先安装所需的模块,然后定义相关的函数来执行具体的 ping
操作。可能会通过设置一些参数,如超时时间、发送次数等,来控制 ping
的行为。在处理结果时,根据模块返回的值来判断主机的连通性,并进行相应的输出或记录。
以下是一个可能的示例:
import ping3
def ping(host):
response_time = ping3.ping(host)
if response_time is not None:
print(f"Ping {host} 成功,响应时间为 {response_time} 毫秒")
else:
print(f"Ping {host} 失败")
ping("example.com")
这个示例简单地展示了如何使用 ping3
模块对单个主机进行 ping
测试。
Python 批量 ping 方法三的实现细节
在第三种方法中,可能会利用线程池或进程池来提高批量 ping
的效率。通过将 ping
任务分配到多个线程或进程中并行执行,可以大大减少执行时间。代码中会涉及到线程或进程的创建、任务的分配和结果的收集。
比如,可能会像下面这样:
import time
import threading
import subprocess
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
def get_ip_list(net_segment, ip_num):
# 创建一个队列
IP_QUEUE = Queue()
ip_list = ()
list_segment = net_segment.split('.')
ip_index = 1
# 将需要 ping 的 ip 加入队列
for i in range(1, 254):
list_segment(-1) = str(ip_index + i)
addr = ('.').join(list_segment)
IP_QUEUE.put(addr)
# 定义一个执行 ping 的函数
def ping_ip(ip):
res = subprocess.call('ping -n 2 -w 5 %s' % ip, stdout=subprocess.PIPE)
# linux 系统将 '-n' 替换成 '-c'
# 打印运行结果
print(ip, True if res == 0 else False)
if res!= 0:
if lock.acquire():
if len(ip_list) < ip_num:
ip_list.append(ip)
lock.release()
# 创建一个最大任务为 100 的线程池
pool = ThreadPoolExecutor(max_workers=100)
lock =
这种方式通过合理利用线程资源,能够同时对多个 IP 进行 ping
操作。
Python 批量 ping 方法四的代码解读
第四种方法可能会结合一些更高级的技术或模块,以实现更复杂或更优化的批量 ping
功能。可能会涉及到网络协议的底层处理,或者对 ping
结果进行更细致的分析和统计。
例如:
import asyncio
import datetime
import ipaddress
import re
import socket
from prettytable import PrettyTable
class MultiICMP:
def __init__(self, hosts, pool_size=150, timeout=1, SType='ICMP', EMode=1, isEcho=True):
self.hosts = hosts
self.timeout = timeout
self.pool_size = pool_size
self.SType = SType
self.EMode = EMode
self.isEcho = isEcho
self.result = ()
async def ICMP(self, ip):
result = await asyncio.create_subprocess_shell("ping -n 2 -w 35 {ip}".format(ip=ip), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await result.communicate()
date = stdout.decode('gbk')
if self.SType == 'ICMP':
res = {'Destination': ip, 'IP': "", 'TTL': '', 'MIN': '', 'MAX': '', 'AVERAGE': '', 'STATE': 'DOWN'}
elif self.SType == 'ARP':
self.EMode = 2
res = {'Destination': ip, 'IP': "", 'TTL': '', 'MIN': '', 'MAX': '', 'AVERAGE': '', 'STATE': 'DOWN','MAC': ""}
这段代码使用了异步编程的方式来执行 ping
操作,提高了程序的并发性能。
总之,使用 Python 实现批量 ping
有多种方法,每种方法都有其特点和适用场景。您可以根据具体的需求选择合适的方法来实现高效、准确的批量 ping
功能。