首页 > 编程语言 >python实现发送udp数据包数据

python实现发送udp数据包数据

时间:2023-08-19 16:46:59浏览次数:48  
标签:udp python self 发送 time message 数据包 dq

需求:python发送udp数据包数据,支持host、port、valueFile等启动参数,其中valueFile是json格式的文件,要解析编码完成转换。

分模块实现:

发送端:

导入所需的模块和类,例如sys、logging、argparse、json、socket以及datetime和timedelta。

定义一个Sender类,变量MULTI_TABLE_HEADER和SINGLE_TABLE_HEADER分别表示多表模式和单表模式下的消息头;变量interval表示发送消息的时间间隔,默认为5秒;变量is_multi_table表示是否使用多表模式,默认为True;变量udp_host和udp_port分别表示UDP服务器的主机和端口,默认为'localhost'和12345;变量value_file表示JSON值文件的路径,默认为'value.json'。

    MULTI_TABLE_HEADER = "80000000"
    SINGLE_TABLE_HEADER = "80000001"

    interval = 5
    is_multi_table = True
    udp_host = 'localhost'
    udp_port = 12345
    value_file = 'resources/value.json'

方法send_message_to_udp_server用于将消息发送到UDP服务器,使用socketAPI。

    def send_message_to_udp_server(self,host, port, message):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # message = b'Hello, UDP!'
        sock.sendto(message, (host, port))
        sock.close()

方法run是主要的运行逻辑。它读取指定的JSON文件,并构造一个DqMessage对象。然后循环发送消息到UDP服务器,发送完成后等待一定时间再次发送,直到程序停止运行,同时保存日志信息。

    def run(self):

        # 读取valueFile指定的JSON文件
        with open(self.value_file) as file:
            dq_params = json.load(file)

        # 构造DqMessage对象
        dq_message = DqMessage()
        dq_message.header = self.MULTI_TABLE_HEADER if self.is_multi_table else self.SINGLE_TABLE_HEADER
        dq_message.dataList = [
            DqParam(dq_param['tableCode'], dq_param['seq'], dq_param['type'], dq_param['value'])
            for dq_param in dq_params
        ]
        print(dq_message.dataList)

        # 设置日志输出格式和保存路径
        logging.basicConfig(filename='send_msg.log', format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)

        while True:
            # 获取当前时间
            start_time = datetime.now()

            # 构造并发送数据包
            message = dq_message.to_byte_buf(self.is_multi_table)
            self.send_message_to_udp_server(self.udp_host, self.udp_port, message)

            # 输出日志信息
            logging.info(f"发送内容16进制:{message}")
            logging.info(f"UDP消息已发送至 {self.udp_host}:{self.udp_port}")

            # 计算下一次发送的时间点
            end_time = datetime.now()
            elapsed_time = (end_time - start_time).total_seconds()
            sleep_time = max(self.interval - elapsed_time, 0)
            sleep_time = min(sleep_time, self.interval)
            next_send_time = end_time + timedelta(seconds=sleep_time)

            # 输出日志信息
            logging.info(f"下一次发送时间:{next_send_time.strftime('%Y-%m-%d %H:%M:%S')}")

            # 等待到下一次发送的时间点
            while datetime.now() < next_send_time:
                pass

数据封装模块:Dqmessage和Dqparam是实现编码转换和数据封装的模块,不与展示,根据数据具体实现。

接收端验证主要部分:

    def receive_udp_message(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.bind((self.host, self.port))
     while True:
            data, addr = sock.recvfrom(1024)  # 接收数据包和发送方地址
            print('Received message:', data)
            # 输出日志信息
            logging.info(f"接收消息:{data}")
            hex_str = binascii.hexlify(data).decode('utf-8')
            print('verified message:', hex_str)
            logging.info(f"验证消息:{hex_str}")

 

标签:udp,python,self,发送,time,message,数据包,dq
From: https://www.cnblogs.com/rossxp/p/17642602.html

相关文章

  • Python - 引用 内存地址 地址传递 值传递
    内存地址是数据在物理内存中的存储位置;Java常量池驻留,int是-5~256  __new__方法用于定义创建对象时执行的操作,__new__方法仅仅为用户提供一个创建对象时干预的入口,真正执行创建对象的操作由object类中的__new__方法完成。del操作的实质是断开变量与内存数据之间的引用,操......
  • Python练习:输入一个整数,输出该数二进制表示中1的个数。
      Python3整数对象存储为无符号数加上符号位标志,所以不存在“负数”补码形式,因此,计算“1”的数量需要按去符号后的无符号数:cnt=bin(n).count('1')另外,Python3无长整,整数长度原则上不限,所以不能以假定的32位处理。    补码+原码=2**321#-*-coding:ut......
  • centos安装python3.7
    1.安装openssl11https://www.cnblogs.com/navysummer/p/17642466.html2.安装依赖yuminstallzlib-develbzip2-develncurses-develsqlite-develreadline-develtk-develgccmakelibffi-devel-y 3.安装python3.7 wgethttps://www.python.org/ftp/python/3.7.17/Pyt......
  • python+playwright 学习-72 设置window.navigator.webdriver属性为false 跳过网站反爬
    前言有些网站有反爬机制,比如用代码启动的浏览器会被检测到,需要人机验证,用脚本去点击或者滑动滑动虽然能滑动,但是会认证失败。用playwright和selenium启动的浏览器都会用个webdriver属性。浏览器会根据这个属性判断是否是人工正常操作。window.navigator.webdriver属性人......
  • Python3 - 时间处理与定时任务
    无论哪种编程语言,时间肯定都是非常重要的部分,今天来看一下python如何来处理时间和python定时任务,注意咯:本篇所讲是python3版本的实现,在python2版本中的实现略有不同,有时间会再写一篇以便大家区分。1.计算明天和昨天的日期12345678910111213#!/usr/bin/envpython#coding=utf-8#获......
  • Python中可用分号 `;` 将多行代码写在一行上
    Python中可用分号;将多行代码写在一行上━━━━━━━━━━━━━━━━━━━━━━例如:print('Hello');print('World');print('!')这将输出三行文本,分别是Hello、World和!。不过,这种写法会降低代码的可读性,不利于代码的维护和调试。因此,除非特殊情况下需要,一般不......
  • Python小项目:利用tkinter开发测手速小游戏
    1简介利用tkinter开发测手速的小游戏,大家10s内可以点击鼠标多少次呢?想测试一下吗?来试试测手速下游戏吧!仅供娱乐!试玩视频:视频链接游戏截图:2代码分模块介绍2.1导入需要的包fromtkinterimport*fromtkinter.messageboximport*2.2定义全局变量def__init__(self,......
  • python列表添加元素
    列表添加元素df_test=pd.DataFrame({'a':[1,2],'b':[3,None]})df_test.columns+'c'#每个元素拼接'c'!不能直接添加元素!['a','b']+['c']#不能每个元素拼接,也不能直接加字符串,需要列表+列表进行扩展,且append输出为None#np.array不......
  • Python通过matplotlib包和gif包生成gif动画
    ✅作者简介:热爱科研的算法开发者,Python、Matlab项目可交流、沟通、学习。......
  • python DLL load failed while importing numpy_ops 异常
    安装https://aka.ms/vs/17/release/vc_redist.x64.exe原文章地址:ImportError:DLLloadfailedwhileimportingnumpy_ops:Thespecifiedmodulecouldnotbefound·Issue#2773·Significant-Gravitas/Auto-GPT·GitHub......