首页 > 编程语言 >Python UDP协议发送指定格式报文

Python UDP协议发送指定格式报文

时间:2024-01-23 17:24:59浏览次数:38  
标签:UDP socket Python checksum 报文 udp switch data def

 

 

import struct
import time
import socket
import threading

# udp 发送数据
def send_data(udp_socket, target_ip, target_port,send_msg):
        try:
            udp_socket.sendto(send_msg, (target_ip, target_port))
        except Exception as e:
            print(f"发送数据时出错: {e}")

# udp 接收数据
def receive_data(udp_socket):
    while True:
        try:
            recv_data, addr = udp_socket.recvfrom(1024)
            recv_msg = recv_data.decode('utf-8')
            print(f"从 {addr} 收到数据: {recv_msg}")
        except Exception as e:
            print(f"接收数据时出错: {e}")

# 计算校验位
def calculate_checksum(data):
    # 计算校验字节,即所有字节按位异或
    checksum = 0
    for byte in data:
        checksum ^= byte
    return bytes([checksum])


# 交通参与者类型
# 请参照算法方输出与协议规定将字典完善
def switch_participant_type(case_value):
    switch_dict = {
        'car': bytes([0x02]),
        'truck': bytes([0x03]),
        'default': bytes([0x00]),
    }
    return switch_dict.get(case_value, switch_dict['default'])


# 车辆颜色
# 请参照算法方输出与协议规定将字典完善
def switch_car_color(case_value):
    switch_dict = {
        'red': struct.pack('<8s', '红'.encode('utf-8')),
        'white': struct.pack('<8s', '白'.encode('utf-8')),
        'gray': struct.pack('<8s', '灰'.encode('utf-8')),
        'yellow': struct.pack('<8s', '黄'.encode('utf-8')),
        'pink': struct.pack('<8s', '粉'.encode('utf-8')),
        'purple': struct.pack('<8s', '紫'.encode('utf-8')),
        'green': struct.pack('<8s', '绿'.encode('utf-8')),
        'blue': struct.pack('<8s', '蓝'.encode('utf-8')),
        'brown': struct.pack('<8s', '棕'.encode('utf-8')),
        'black': struct.pack('<8s', '黑'.encode('utf-8')),
        'default': struct.pack('<8s', '无'.encode('utf-8')),
    }
    return switch_dict.get(case_value, switch_dict['default'])


# 车辆类型细分
# 请参照算法方输出与协议规定将字典完善
def switch_car_type_detail(case_value):
    switch_dict = {
        'car': struct.pack('<3s', 'K33'.encode('utf-8')),
        'truck': struct.pack('<3s', 'H51'.encode('utf-8')),
    }
    return switch_dict.get(case_value)



# 车道方向
# 请参照算法方输出与协议规定将字典完善
def switch_direction(case_value):
    switch_dict = {
        'w': bytes([0x40]),
        'e': bytes([0x04]),
        'n': bytes([0x01]),
        's': bytes([0x10]),
        'default': bytes([0x00]),
    }
    return switch_dict.get(case_value, switch_dict['default'])

# 数据打包
def pack_data_frame(data):
    # 定义常量值
    FRAME_START = bytes([0xC0])  # 帧开始
    VERSION = bytes([0x01])  # 版本号
    SENDER_ID = bytes([0x02])  # 发送方ID
    RECEIVER_ID = bytes([0x01])  # 接收方ID
    DATA_LINK_CODE = bytes([0x03])  # 数据链路码
    OPERATION_TYPE = bytes([0x82])  # 操作类型-主动发送
    OBJECT_ID = bytes([0x01])  # 对象标识:目标检测结果
    RESERVED = bytes([0x01] * 5)  # 保留字段

    # 计算数据帧长度
    data_length = 21 + 1 * 61  # 数据包长度为 固定长度 + 参与者*固定长度
    data_length_bytes = struct.pack('<H', data_length)

    # 其他字段初始化
    data_frame_number = bytes([0x00])  # 数据包序号,初始值为0
    total_frame_count = bytes([0x00])  # 数据帧总数量,初始值为0
    current_frame_number = bytes([0x00])  # 当前数据帧序号,初始值为0

    # 构造识别帧时间,使用系统当前时间
    current_time = int(time.time() * 1000)  # 获取当前时间戳(毫秒级)
    minutes = (current_time // (1000 * 60)) % 60  # 分钟
    seconds = (current_time // 1000) % 60  # 秒
    milliseconds = current_time % 1000  # 毫秒
    recognition_frame_time = struct.pack('<BBHH', minutes, seconds, milliseconds, 0)

    # 交通参与者,默认1个
    num_participants = bytes([0x01])

    # 构造数据内容
    # '<I': 是格式字符串,表示不同字段的数据类型和顺序。每个字符对应一个字段的数据类型。具体解释如下:
    # <: 小端字节序
    # I: 无符号整型(4字节)
    # B: 无符号字节(1字节)
    # H: 无符号短整型(2字节)
    # f: 浮点数 (4字节)
    # 5s: 字符串,5个字节
    participant_id = struct.pack('<I', 0)  # 交通参与者id
    world_x = struct.pack('<I', 0)  # 统一坐标系 x 坐标
    world_y = struct.pack('<I', 0)  # 统一坐标系 y 坐标
    participant_type = bytes([0x00])  # 交通参与者类型
    longitude = struct.pack('<f', 0.0)  # 经度坐标
    latitude = struct.pack('<f', 0.0)  # 纬度坐标
    lane_no = bytes([0x00])  # 车道号
    lane_attribute = bytes([0x00])  # 车道属性
    direction = bytes([0x00])  # 车道方向
    participant_length = bytes([0x00])  # 交通参与者长度
    participant_width = bytes([0x00])  # 交通参与者宽度
    participant_height = bytes([0x00])  # 交通参与者高度
    participant_v = struct.pack('<H', 0)  # 交通参与者速度
    participant_a = struct.pack('<H', 0)  # 交通参与者加速度
    participant_angle = struct.pack('<H', 0)  # 交通参与者航向角
    plate_no = struct.pack('<8s', '00000000'.encode('utf-8'))  # 车牌号
    plate_color = bytes([0x01])  # 号牌颜色
    car_color = struct.pack('<8s', '无'.encode('utf-8'))  # 车辆颜色
    car_type_detail = struct.pack('<3s', 'K33'.encode('utf-8'))   # 车辆类型细分

    for index, value in enumerate(data):
        # 交通参与者id
        if index == 1:
            participant_id = struct.pack('<I', value)
        # 统一坐标系 x 坐标
        if index == 2:
            world_x = struct.pack('<I', value)
        # 统一坐标系 y 坐标
        if index == 3:
            world_y = struct.pack('<I', value)
        # 车道方向
        if index == 4:
            direction = switch_direction(value)
        # 交通参与者类型
        if index == 5:
            participant_type = switch_participant_type(value)
            car_type_detail = switch_car_type_detail(value)
        # 车辆颜色
        if index == 6:
            car_color = switch_car_color(value)
        # 交通参与者航向角
        if index == 7:
            participant_angle = struct.pack('<H', 0)

    # 数据合并
    data_content = (
            participant_id +
            world_x +
            world_y +
            participant_type +
            longitude +
            latitude +
            lane_no +
            lane_attribute +
            direction +
            participant_length +
            participant_width +
            participant_height +
            participant_v +
            participant_a +
            participant_angle +
            plate_no +
            plate_color +
            car_color +
            car_type_detail
    )


    # 计算校验字节
    checksum_data = VERSION + SENDER_ID + RECEIVER_ID + DATA_LINK_CODE + \
                    OPERATION_TYPE + OBJECT_ID + data_length_bytes + \
                    data_frame_number + total_frame_count + current_frame_number + \
                    RESERVED + recognition_frame_time + num_participants + data_content

    frame_checksum = calculate_checksum(checksum_data)

    # 帧结束
    frame_end = bytes([0xC0])

    # 组装数据帧
    data_frame = (
            FRAME_START + VERSION + SENDER_ID + RECEIVER_ID + DATA_LINK_CODE +
            OPERATION_TYPE + OBJECT_ID + data_length_bytes +
            data_frame_number + total_frame_count + current_frame_number +
            RESERVED + recognition_frame_time + data_content +
            frame_checksum + frame_end
    )

    return data_frame





def main():
    # 创建一个UDP套接字
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # 将套接字绑定到特定的地址和端口
    local_address = ('192.168.124.28', 10050)
    udp_socket.bind(local_address)

    # 设置发送数据的目标地址和端口
    target_ip = '192.168.124.34'
    target_port = 6666

    # 启动一个线程用于接收数据
    receive_thread = threading.Thread(target=receive_data, args=(udp_socket))
    receive_thread.start()




    # 示例数据
    input_data = [
        [1, 3, 1426, 1438, "w", "truck", "red", "init"],
        [1, 5, 1274, 1263, "w", "car", "blue", "init"],
        [1, 7, 3099, 872, "w", "car", "blue", "init"],
        [1, 2030, 2618, 634, "w", "car", "blue", "init"],
        [1, 2029, 2352, 402, "w", "car", "black", "init"]
    ]

    # 声明一个空数组,用于存储每次循环生成的数据帧二进制表示
    data_frames_binary = []

    # 转换并输出每条数据的二进制表示
    for entry in input_data:
        data_frame_binary = pack_data_frame([entry])
        data_frames_binary.append(data_frame_binary)

    # 在主线程中发送所有数据
    for data_frame_binary in data_frames_binary:
        send_data(udp_socket,target_ip,target_port,data_frame_binary)


if __name__ == '__main__':
    main()

 

标签:UDP,socket,Python,checksum,报文,udp,switch,data,def
From: https://www.cnblogs.com/onecyl/p/17982938

相关文章

  • 【python】SSTI模版注入
    0x00  PythonVene环境及介绍venv虚拟环境:创建和管理虚拟环境的模块首先aptupdate更新一下包管理安装你当前版本的python-venv选择一个目录,安装venv虚拟环境。我取的名是flask-venv。如何选中当前的venv呢?执行以下命令可以发现多了一个前缀flask-venv在当前目录安装f......
  • python安装包(模块)的八种方法
    1.使用easy_installeasy_install这应该是最古老的包安装方式了,目前基本没有人使用了。下面是easy_install的一些安装示例#通过包名,从PyPI寻找最新版本,自动下载、编译、安装$easy_installpkg_name#通过包名从指定下载页寻找链接来安装或升级包$easy_install-fhttp:/......
  • 通过Python计算有效降水量
    有效降水量是指能够提供给作物蒸发蒸腾,从而减少作物对灌溉水需求的雨量,可美国农业部土壤保持局推荐的方法进行逐日计算,本文将介绍其在Python环境中的实现方式。其公式如下:\[P_e=\sum_{i=1}^NP_{ei}=\begin{cases}\sum_{i=1}^N\frac{P_i(4.17-0.2P_i)}{4.17}&\quad\text{($P_i<8......
  • python简单去除视频水印
    只能去除固定水印位置。如果水印位置会变,那么后面的就去除不掉。而且如果视频第一帧没有水印出现,那么后面也没办法去掉。。。。。。。。文件夹格式为importosimportsysimportcv2importnumpyfrommoviepyimporteditorVIDEO_PATH='video'OUTPUT_PATH='output'......
  • ubuntu 安装python 3.1 具体步骤
    安装Python3.1的步骤如下:打开终端(Terminal)。输入以下命令来安装Python3.1的依赖:sudoapt-getupdatesudoapt-getinstallbuild-essentialcheckinstallsudoapt-getinstalllibreadline-gplv2-devlibncursesw5-devlibssl-devlibsqlite3-devtk-devlibgdb......
  • Python的sort自定义compare函数
    记住这个规律:1.无论是什么,都先左侧的大,ifx>y:固定写法。2.大的放右边就return1;如果想要大的放左边就return-1;这里记忆是【-1,1】坐标轴上,-1在左侧,1在右侧 #默认sort是左小-右大,的return1#要排序大的,就右侧大的return1defcompare(x,y):ifx>y:......
  • python截取视频中的一段成gif
    frommoviepy.editorimportVideoFileClipvideo=VideoFileClip('xxxx.mp4')#剪辑从第1秒到第6秒的视频片段clipped_video=video.subclip(0,6)#保存成gifclipped_video.write_gif('output.gif')#加载GIF文件clip=VideoFileClip('output.gif')#......
  • python文字转语音
    abc.txt我喜欢唱跳rap&篮球importosimportpyttsx3#创建一个TTS引擎engine=pyttsx3.init()voices=engine.getProperty('voices')forvinvoices:print("ID:",v.id)print("Name:",v.name)print("Languages:",......
  • python随机生成图片验证码第二篇
    Python生成随机验证码,需要使用PIL模块.安装: pip3installpillow基本使用1.创建图片fromPILimportImageimg=Image.new(mode='RGB',size=(120,30),color=(255,255,255))#在图片查看器中打开#img.show()#保存在本地withopen('code.png','wb')asf......
  • Java开发者的Python进修指南:JSON利器之官方json库、demjson和orjson的实用指南
    JSONJSON作为目前最流行的传输格式,在Python中也有相应的实现方式。由于JSON格式的文本可以跨平台并且简单易用,因此被广泛传播。因此,我们今天的主要讨论内容是如何熟练地应用Python的JSON库来处理将JSON映射到文本,以及如何从文本映射到对象中。现在,让我们开始探讨这个话题。官方j......