目的
- 在子网中进行udp广播,通过ICMP回文判断存活主机
分析
- 相比上个实验嗅探器,同样也开启混杂模式通过原始套接字抓取流经本机的流量,不同点在于多了个主动进行udp广播,以及对ICMP回文进行了解析确认
- 判断存活主机的依据:发送一个UDP数据报时,如果主机上的UDP端口未开启,会返回一个ICMP包来提示目标端口不可访问
- 注意,虽然ICMP和IP同属于网络层,但是ICMP的数据是借助于IP数据包进行发送的
代码
- udp广播,使用了ipaddress库遍历子网ip,进行子网广播很方便
def udp_sender():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sender:
for ip in ipaddress.ip_network(SUBNET).hosts():
sender.sendto(bytes(MESSAGE, 'utf8'), (str(ip), 65212))
- 开启网卡的混杂模式,通过原始套接字抓取流经的IP数据报
- 解析IP头,判断是不是ICMP协议
- 解析ICMP数据内容,比对消息字段,确认是存活主机的回文
SUBNET = '192.168.43.0/24'
MESSAGE = 'Z5ONK0RULES'
class Scanner:
def __init__(self, host):
self.host = host
if os.name == 'nt':
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP
self.socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
self.socket.bind((host, 0))
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if os.name == 'nt':
self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
def sniff(self):
hosts_up = set([f'{str(self.host)}'])
try:
while True:
raw_buffer = self.socket.recvfrom(65535)[0]
ip_header = IP(raw_buffer[0:20])
if ip_header.protocol == "ICMP":
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset+8]
icmp_header = ICMP(buf)
if icmp_header.code == 3 and icmp_header.type == 3:
src_address = ip_header.src_address
if ipaddress.ip_address(src_address) in ipaddress.IPv4Network(SUBNET):
if raw_buffer[len(raw_buffer) - len(MESSAGE) :] == bytes(MESSAGE, 'utf8'):
if str(src_address) != self.host and str(src_address) not in hosts_up:
hosts_up.add(str(src_address))
print(f'Host Up: {str(src_address)}')
except KeyboardInterrupt:
if os.name == 'nt':
self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
print('\nUser interrupted.')
if hosts_up:
print(f'\n\nSummary: Hosts up on {SUBNET}')
for host in sorted(hosts_up):
print(f'{host}')
print('')
sys.exit()
- 解析IP头
class IP:
def __init__(self, buff=None):
header = struct.unpack('<BBHHHBBH4s4s', buff)
self.ver = header[0] >> 4
self.ihl = header[0] & 0xF
self.tos = header[1]
self.len = header[2]
self.id = header[3]
self.offset = header[4]
self.ttl = header[5]
self.protocol_num = header[6]
self.sum = header[7]
self.src = header[8]
self.dst = header[9]
self.src_address = ipaddress.ip_address(self.src)
self.dst_address = ipaddress.ip_address(self.dst)
self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}
try:
self.protocol = self.protocol_map[self.protocol_num]
except Exception as e:
print ('%s No protocol for %s' % (e, self.protocol_num))
self.protocol = str(self.protocol_num)
- 解析ICMP数据报
class ICMP:
def __init__(self, buff):
header = struct.unpack('<BBHHH', buff)
self.type = header[0]
self.code = header[1]
self.sum = header[2]
self.id = header[3]
self.seq = header[4]
效果
- 在windows主机上扫描同网段存活主机,成功扫到了我的ubuntu虚拟机和网关