首页 > 其他分享 >ping实现

ping实现

时间:2022-11-20 21:26:08浏览次数:28  
标签:__ return 实现 ping packet time data

ping实现

实现一

# encoding:utf-8
import time
import struct
import socket
import select
import sys



def chesksum(data):
    n=len(data)
    m=n % 2
    sum=0
    for i in range(0, n - m ,2):
        sum += (data[i]) + ((data[i+1]) << 8)#传入data以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位
    if m:
        sum += (data[-1])
    #将高于16位与低16位相加
    sum = (sum >> 16) + (sum & 0xffff)
    sum += (sum >> 16) #如果还有高于16位,将继续与低16位相加
    answer = ~sum & 0xffff
    #  主机字节序转网络字节序列(参考小端序转大端序)
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer

def request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body):
    #  把字节打包成二进制数据
    imcp_packet = struct.pack('>BBHHH32s',data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body)
    icmp_chesksum = chesksum(imcp_packet)  #获取校验和
    #  把校验和传入,再次打包
    imcp_packet = struct.pack('>BBHHH32s',data_type,data_code,icmp_chesksum,data_ID,data_Sequence,payload_body)
    return imcp_packet


def raw_socket(dst_addr,imcp_packet):
    '''
       连接套接字,并将数据发送到套接字
    '''
    #实例化一个socket对象,ipv4,原套接字,分配协议端口
    rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp"))
    #记录当前请求时间
    send_request_ping_time = time.time()
    #发送数据到网络
    rawsocket.sendto(imcp_packet,(dst_addr,80))
    #返回数据
    return send_request_ping_time,rawsocket,dst_addr


def reply_ping(send_request_ping_time,rawsocket,data_Sequence,timeout = 2):
    while True:
        #开始时间
        started_select = time.time()
        #实例化select对象,可读rawsocket,可写为空,可执行为空,超时时间
        what_ready = select.select([rawsocket], [], [], timeout)
        #等待时间
        wait_for_time = (time.time() - started_select)
        #没有返回可读的内容,判断超时
        if what_ready[0] == []:  # Timeout
            return -1
        #记录接收时间
        time_received = time.time()
        #设置接收的包的字节为1024
        received_packet, addr = rawsocket.recvfrom(1024)
        #获取接收包的icmp头
        #print(icmpHeader)
        icmpHeader = received_packet[20:28]
        #反转编码
        type, code, checksum, packet_id, sequence = struct.unpack(
            ">BBHHH", icmpHeader
        )

        if type == 0 and sequence == data_Sequence:
            return time_received - send_request_ping_time

        #数据包的超时时间判断
        timeout = timeout - wait_for_time
        if timeout <= 0:
            return -1
def dealtime(dst_addr,sumtime,shorttime,longtime,accept,i,time):
    sumtime+=time
    print(sumtime)
    if i==4:
        print("{0}的Ping统计信息:".format(dst_addr))
        print("数据包:已发送={0},接收={1},丢失={2}({3}%丢失),\n往返行程的估计时间(以毫秒为单位):\n\t最短={4}ms,最长={5}ms,平均={6}ms".format(i+1,accept,i+1-accept,(i+1-accept)/(i+1)*100,shorttime,longtime,sumtime))
def ping(host):
    send, accept, lost = 0, 0, 0
    sumtime, shorttime, longtime, avgtime = 0, 1000, 0, 0
    #TODO icmp数据包的构建
    data_type = 8 # ICMP Echo Request
    data_code = 0 # must be zero
    data_checksum = 0 # "...with value 0 substituted for this field..."
    data_ID = 0 #Identifier
    data_Sequence = 1 #Sequence number
    payload_body = b'abcdefghijklmnopqrstuvwabcdefghi' #data

    # 将主机名转ipv4地址格式,返回以ipv4地址格式的字符串,如果主机名称是ipv4地址,则它将保持不变
    dst_addr = socket.gethostbyname(host)
    print("正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host,dst_addr))
    for i in range(0,4):
        send=i+1
        #请求ping数据包的二进制转换
        icmp_packet = request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence + i,payload_body)
        #连接套接字,并将数据发送到套接字
        send_request_ping_time,rawsocket,addr = raw_socket(dst_addr,icmp_packet)
        #数据包传输时间
        times = reply_ping(send_request_ping_time,rawsocket,data_Sequence + i)
        if times > 0:
            print("来自 {0} 的回复: 字节=32 时间={1}ms".format(addr,int(times*1000)))

            accept+=1
            return_time=int(times * 1000)
            sumtime += return_time
            if return_time > longtime:
                longtime = return_time
            if return_time < shorttime:
                shorttime = return_time
            time.sleep(0.7)
        else:
            lost+=1
            print("请求超时。")

        if send == 4:
            print("{0}的Ping统计信息:".format(dst_addr))
            print("\t数据包:已发送={0},接收={1},丢失={2}({3}%丢失),\n往返行程的估计时间(以毫秒为单位):\n\t最短={4}ms,最长={5}ms,平均={6}ms".format(
                i + 1, accept, i + 1 - accept, (i + 1 - accept) / (i + 1) * 100, shorttime, longtime, sumtime/send))



if __name__ == "__main__":
    # if len(sys.argv) < 2:
    #     sys.exit('Usage: ping.py <host>')
    # ping(sys.argv[1])
    i=input("请输入要ping的主机或域名\n")
    ping(i)
    # host='www.baidu.com'
    # ping(host)

实现二

#!/usr/bin/python3.6.4
#!coding:utf-8
__author__ = 'Rosefinch'
__date__ = '2018/5/31 22:27'

import time
import struct
import socket
import select
import sys

def chesksum(data):
    """
    校验
    """
    n = len(data)
    m = n % 2
    sum = 0 
    for i in range(0, n - m ,2):
        sum += (data[i]) + ((data[i+1]) << 8)#传入data以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位
    if m:
        sum += (data[-1])
    #将高于16位与低16位相加
    sum = (sum >> 16) + (sum & 0xffff)
    sum += (sum >> 16) #如果还有高于16位,将继续与低16位相加
    answer = ~sum & 0xffff
    #主机字节序转网络字节序列(参考小端序转大端序)
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer 

    '''
    连接套接字,并将数据发送到套接字
    '''
def raw_socket(dst_addr,imcp_packet):
    rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp"))
    send_request_ping_time = time.time()
    #send data to the socket
    rawsocket.sendto(imcp_packet,(dst_addr,80))
    return send_request_ping_time,rawsocket,dst_addr

    '''
    request ping
    '''
def request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body):
    #把字节打包成二进制数据
    imcp_packet = struct.pack('>BBHHH32s',data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body)
    icmp_chesksum = chesksum(imcp_packet)#获取校验和
    imcp_packet = struct.pack('>BBHHH32s',data_type,data_code,icmp_chesksum,data_ID,data_Sequence,payload_body)
    return imcp_packet
    '''
    reply ping
    '''
def reply_ping(send_request_ping_time,rawsocket,data_Sequence,timeout = 2):
    while True:
        started_select = time.time()
        what_ready = select.select([rawsocket], [], [], timeout)
        wait_for_time = (time.time() - started_select)
        if what_ready[0] == []:  # Timeout
            return -1
        time_received = time.time()
        received_packet, addr = rawsocket.recvfrom(1024)
        icmpHeader = received_packet[20:28]
        type, code, checksum, packet_id, sequence = struct.unpack(
            ">BBHHH", icmpHeader
        )
        if type == 0 and sequence == data_Sequence:
            return time_received - send_request_ping_time
        timeout = timeout - wait_for_time
        if timeout <= 0:
            return -1

    '''
    实现 ping 主机/ip
    '''
def ping(host):
    data_type = 8 # ICMP Echo Request
    data_code = 0 # must be zero
    data_checksum = 0 # "...with value 0 substituted for this field..."
    data_ID = 0 #Identifier
    data_Sequence = 1 #Sequence number
    payload_body = b'abcdefghijklmnopqrstuvwabcdefghi' #data
    dst_addr = socket.gethostbyname(host)#将主机名转ipv4地址格式,返回以ipv4地址格式的字符串,如果主机名称是ipv4地址,则它将保持不变
    print("正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host,dst_addr))
    for i in range(0,4):
        icmp_packet = request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence + i,payload_body)
        send_request_ping_time,rawsocket,addr = raw_socket(dst_addr,icmp_packet)
        times = reply_ping(send_request_ping_time,rawsocket,data_Sequence + i)
        if times > 0:
            print("来自 {0} 的回复: 字节=32 时间={1}ms".format(addr,int(times*1000)))
            time.sleep(0.7)
        else:
            print("请求超时。")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        sys.exit('Usage: ping.py <host>')
    ping(sys.argv[1])

标签:__,return,实现,ping,packet,time,data
From: https://www.cnblogs.com/startstart/p/16909560.html

相关文章

  • Dubbo-Activate实现原理
    前言在Dubbo中有Filter使用,对于Filter来说我们会遇到这样的问题,Filter自身有很多的实现,我们希望某种条件下使用A实现,另外情况下使用B实现,这个时候我们前面介绍@SPI和@Adap......
  • 基于close channel广播机制来实现TimingWheel
    代码packagemainimport( "fmt" "sync" "time")typeTimingWheelstruct{ sync.Mutex intervaltime.Duration ticker*time.Ticker quitchanstruct{......
  • Dockerfile基础实现
    Dockerfile实现1)Dockerfile概述我们目前都是手动拉取镜像,手动进行配置,手动安装依赖,手动编译安装,创建用户……这个过程类似于命令行使用ansible模块(繁琐,不方便重复执行......
  • socket模块实现网络编程及struct模块解决黏包问题
    目录一、socket模块1、简介2、基于文件类型的套接字家族3、基于网络类型的套接字家族二、socket代码简介三、socket代码优化1.聊天内容自定义2.让聊天循环起来3.用户输入的......
  • 十大排序算法的各种编程语言的实现
    排序算法可以分为内部排序和外部排序,内部排序是数据记录在内存中进行排序,而外部排序是因排序的数据很大,一次不能容纳全部的排序记录,在排序过程中需要访问外存。常见的内部排......
  • Kubernetes 1.25.4数据平面自带nginx负载均衡实现高可用
    1、环境准备要点:1、使用一个FQDN统一作为APIServer的接入点;2、加入集群之前,每个节点都将该FQDN解析至第一个Master;3、加入集群之后,每个Master节点将该FQDN都解析至自身......
  • 45.自定义函数实现分组统计
    #自定义函数实现分组统计#能过自定义的函数实现分组统计importpandasaspddf=pd.read_excel('电脑配件销售记录.xlsx')#print(df.head()))#回顾知识点#p......
  • 005.实现用户登录View层
    1.login.html<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>慕课网OA办公系统</title><!--引入样式--><linkrel="style......
  • 004.实现用户登录Controller层
    1. com.imooc.oa.controller创建servletpackagecom.imooc.oa.controller;importcom.fasterxml.jackson.annotation.JsonInclude;importcom.fasterxml.jackson.dat......
  • 《ASP.NET Core技术内幕与项目实战》精简集-EFCore2.9:泛型仓储实现IRepository
    本节内容,部分为补充内容,部分涉及到5.2(P131-133)。主要NuGet包:如前章节所述 仓储模式,将数据访问层抽象出来,隐藏了底层对数据源的CRUD操作,这样在应用层或控制器中,我们直接......