首页 > 编程语言 >使用 python 升级OTA程序

使用 python 升级OTA程序

时间:2024-08-29 22:30:05浏览次数:17  
标签:python OTA chunk register bytes 升级 values print size

依赖文件

pymodbus==3.6.3
pyserial==3.5
import binascii
import math
import time
import serial
from pymodbus.client import ModbusSerialClient as ModbusClient
import struct

# # 设备串口配置
# port = '/dev/cu.usbserial-1130'  # 根据实际情况修改为实际串口号
# baudrate = 9600
# parity = 'N'
# stopbits = 1
# bytesize = 8
# # 创建串口连接
# ser = serial.Serial(port, baudrate, parity=parity, stopbits=stopbits, bytesize=bytesize)
# # 创建Modbus客户端
# print("Modbus RTU Client Connected")
# client = ModbusClient(port=ser,baudrate=9600)
# print(client)
# # 假设从地址1开始写入固件数据,每个寄存器存储两个字节
# if client.connect():
#     print("Modbus RTU Client Connected")
# else:
#     print("Failed to connect to Modbus RTU Client")
# firmware_file_path = 'firmware.bin'
# with open(firmware_file_path, 'rb') as f:
#     firmware_data = f.read()
# # 模拟将固件数据按每两个字节分组并写入Modbus寄存器
# register_address = 0x0001  # 起始寄存器地址
# for i in range(0, len(firmware_data), 2):
#     if i + 1 < len(firmware_data):  # 确保不超出范围
#         register_value = struct.unpack('>H', firmware_data[i:i+2])[0]  # 大端模式读取两个字节
#         client.write_register(register_address, register_value)
#         register_address += 1
#         # 可能需要根据设备响应速度调整延时
#         time.sleep(0.1)
# # 关闭Modbus客户端和串口连接
# client.close()
# ser.close()

# print("Firmware data sent via Modbus.")

# # 在这里添加向设备发送固件更新结束命令的逻辑,具体命令取决于设备实现



from pymodbus.client import ModbusSerialClient
from pymodbus.exceptions import ModbusException, ConnectionException
import logging
import hashlib

# 配置日志记录
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
client = ModbusSerialClient(port='/dev/cu.usbserial-B001VTZP', baudrate=57600,
                      stopbits=1, bytesize=8, parity='N',slaveaddress=1)    # 看文档,method='rtu'貌似没用

firmware_file_path = 'N00.bin'


def read_temperature_and_humidity(client):
    try:
        # 计算md5 数值是多少 
        # 计算分为多少包
        # 发送查询modbus版本
        # 发送OTA md5 文件

        # 读取现在的版本信息
        response = client.read_holding_registers(address=100, count=8, slave=1)
        if not response.isError():
            print("Register Values: ", response.registers)
        else:
            print("Failed to read registers")

        # 读取 文件的md5 并下发给设备
        with open(firmware_file_path, 'rb') as f:
                firmware_data = f.read() # read file as bytes
                readable_hash = hashlib.md5(firmware_data).hexdigest();
                file_length = len(firmware_data)
                file_length_bytes = file_length.to_bytes(4, byteorder='big')
                file_length_hex_str = binascii.hexlify(file_length_bytes).decode()
                file_length_byte_array = [file_length_hex_str[i:i+2] for i in range(0, len(file_length_hex_str), 2)]
        hash_bytes = bytes.fromhex(readable_hash)
        hex_string = binascii.hexlify(readable_hash.encode()).decode()
        byte_array = [hex_string[i:i+2] for i in range(0, len(hex_string), 2)]
        # 将文件长度的字节数组与MD5字节数组合并
        combined_byte_array = file_length_byte_array + byte_array
        print(type(combined_byte_array))
        print(combined_byte_array)
        byte_values = [int(x, 16) for x in combined_byte_array]
        combined_bytes = bytes(byte_values)
        #把数组写入write_registers
            # 将合并后的字节数据按照寄存器大小(假设每个寄存器是两个字节)分割成整数列表
        register_values = struct.unpack('>' + 'H' * (len(combined_bytes) // 2), combined_bytes)
        print(register_values)
        print("*****************")
        write_response=client.write_registers(address=108, values=register_values, slave=1)
        if not write_response.isError():
            print("回复成功")
        else:
            print("回复失败")
        # 下发分片任务
        i = 0
        register_address = 0x0001  # 起始寄存器地址
        chunk_size_bytes = 200  # 每次发送的字节数量,需确保是寄存器数量的整数倍(每个寄存器2字节)
        #需要分多少包 
        # mdobus_size = hex(int(math.ceil(len(firmware_data) / 200))).to_bytes(2, byteorder='big').hex()
        mdobus_size = (math.ceil(len(firmware_data) / 200))
        print("分片任务数:", mdobus_size)
        while i * chunk_size_bytes < len(firmware_data):
           
            
            # 假设 client 支持一次性写入多个寄存器,例如使用 write_registers 方法
            # 打印数值
            # 还剩200个字节则
            xianzai_len=chunk_size_bytes*i
            print(len(firmware_data))
            print(xianzai_len)
            print("第", i, "个分片,剩余", len(firmware_data)-chunk_size_bytes*i, "个字节")
            if chunk_size_bytes*i<=len(firmware_data)-200:
                chunk = firmware_data[i*chunk_size_bytes:(i+1)*chunk_size_bytes]
                register_values = struct.unpack('>' + 'H' * (chunk_size_bytes // 2), chunk)
                data_len=200;
            else:
                data_len=len(firmware_data)-chunk_size_bytes*i
                chunk = firmware_data[i * chunk_size_bytes:i * chunk_size_bytes + data_len]
                register_values = struct.unpack('>' + 'H' * (data_len // 2), chunk)
                
            # 加上分包和总包数
            # 将 mdobus_size 添加到 register_values 的开头
            register_values_list=list(register_values)
            register_values_list.insert(0, mdobus_size)
            register_values_list.insert(0, 0)
            register_values_list.insert(0, i+1)
            register_values_list.insert(0, 0)
            register_values_list.insert(0, data_len)
            print("register_values_list", register_values_list)
            register_values = tuple(register_values_list)
            print("*****************")
            print("register_values", register_values)
            write_response=client.write_registers(address=126, values=register_values, slave=1)
            if not write_response.isError():
                print("Multiple registers written successfully")
            else:
                print("Failed to write multiple registers")
            print(i * chunk_size_bytes)
            print(len(firmware_data))
            i += 1
            print(i * chunk_size_bytes)
            print("*************")
            time.sleep(0.3)


        # 可能需要根据设备响应速度调整延时
            

#         i = 0
#         register_address = 0x0001  # 起始寄存器地址
#         chunk_size_bytes=200
#         total_chunks = (len(firmware_data) + chunk_size_bytes - 1) // chunk_size_bytes  # 计算总分片数
#         chunk_size_bytes -= 2  # 减去分片序号和总分片数占用的两个字节

#         while i * chunk_size_bytes < len(firmware_data):
#             chunk = firmware_data[i*chunk_size_bytes:(i+1)*chunk_size_bytes]
            
#             # 添加分片序号(高位字节在前)
#             slice_number = i
#             packet = slice_number.to_bytes(2, byteorder='big') + chunk
            
#             # 假设 client 支持一次性写入多个寄存器,例如使用 write_registers 方法
#             register_values = struct.unpack('>' + 'H' * ((chunk_size_bytes + 2) // 2), packet)
            
#             print(register_values)

#             write_response = client.write_registers(address=register_address, values=register_values, slave=1)

#             if not write_response.isError():
#                 print("Multiple registers written successfully")
#             else:
#                 print("Failed to write multiple registers")

#             register_address += (chunk_size_bytes + 2) // 2  # 更新寄存器地址,因为每个寄存器2字节,且包含了序号信息

#             i += 1

#         # 可能需要根据设备响应速度调整延时
#             time.sleep(0.1)

# # 最后一个数据块可能不足一个完整的chunk_size_bytes,需单独处理
#         if i * chunk_size_bytes < len(firmware_data):
#             remaining_chunk = firmware_data[i*chunk_size_bytes:]
#             slice_number = i
#             packet = slice_number.to_bytes(2, byteorder='big') + remaining_chunk
#             register_values = struct.unpack('>' + 'H' * ((len(remaining_chunk) + 2) // 2), packet)
#             write_response = client.write_registers(address=register_address, values=register_values, slave=1)

#             if not write_response.isError():
#                 print("Remaining registers written successfully")
#             else:
#                 print("Failed to write remaining registers")


        # 读取寄存器地址0和1上的4个字节(两个寄存器)
        # result = client.read_input_registers(address=0, count=3, unit=1)  # 这个错了,这是读取输入寄存器的)0x04
        # result = client.read_holding_registers(address=0, count=3, unit=1)  # 这个才是读取输入寄存器的0x03  # unit参数错了,当前pymodbus版本没有这个参数,搞乌龙了,要不是用filelocator搜索函数用法,还真不知道- -
        # result = client.read_holding_registers(
        #     address=0, count=2, slave=1)  # 读取输入寄存器的0x03 # 读两个寄存器就ok,卖家说第三个寄存器是预留的,不用读

        # if result.isError():
        #     # 处理错误
        #     print("读取错误:", result)
        #     return None, None

        # 将读取到的结果转换为温度和湿度
        # registers = result.registers
        # temperature_reg = registers[0]
        # humidity_reg = registers[1]

        # # 检查是否有探头错误
        # if temperature_reg == 0x8000 or humidity_reg == 0x8000:
        #     print("探头错误")
        #     return None, None

        # # 计算实际的温度和湿度值
        # temperature = temperature_reg * 0.1
        # humidity = humidity_reg * 0.1

        # # 格式化温度和湿度值,保留一位小数
        # temperature = round(temperature, 1)
        # humidity = round(humidity, 1)

        return 1, 1

    except ModbusException as e:
        print("Modbus异常:", e)
        return None, None
    except Exception as e:
        # 捕获除ModbusException之外的所有异常
        print(f"An error occurred: {e}")
        return None, None


def main():
    try:
        if client.connect():  # 尝试连接到Modbus服务器/设备
            temperature, humidity = read_temperature_and_humidity(client)
            # if temperature is not None and humidity is not None:
            #     print(f"温度: {temperature}°C, 湿度: {humidity}%RH")
            client.close()  # 关闭连接
        else:
            print("无法连接到Modbus设备")
    except ConnectionException as e:
        print("连接异常:", e)


if __name__ == "__main__":
    main()
#导出 pip
pip freeze > requirements.txt
# 查看串口
···
ls /dev/cu.usbserial-*
···

标签:python,OTA,chunk,register,bytes,升级,values,print,size
From: https://blog.51cto.com/u_11777051/11869291

相关文章

  • Python中的“for循环”:探索其无限潜力
    引言for循环是任何Python程序员工具箱中的必备技能之一。无论是在处理数据时需要遍历数组,还是在编写Web应用时循环处理请求,亦或是进行复杂的算法实现,for循环都能派上大用场。通过掌握for循环的不同用法,我们可以更高效地解决问题,写出更加优雅且高效的代码。基础语法介绍核心概念......
  • Python中的`while`循环:探索无限可能
    引言while循环允许我们重复执行一段代码块,直到指定条件不再满足为止。这种机制非常适合处理那些不确定具体重复次数的任务场景,比如读取文件直到末尾、定时任务执行等。掌握好while循环,不仅能让你的代码更加高效、简洁,还能帮助你在面对复杂问题时找到更优的解决方案。基础语法介绍......
  • Python实现图片的拼接
    Python实现图片的拼接Python中有多种方法可以实现图片拼接,下面是一个使用Pillow库的示例:首先,你需要安装Pillow库:pipinstallpillow然后,可以使用以下代码实现图片拼接:fromPILimportImage#读取两张图片img1=Image.open('image1.jpg')img2=Image.open('image2.jpg'......
  • Python的requests库详细介绍
    Pythonrequests库是一个用于发送HTTP请求的简单而强大的库,它可以让你轻松地在Python中处理HTTP请求。这个库是Python中处理HTTP请求的标准工具,因其简洁的API和强大的功能而广受欢迎。1.安装requestspipinstallrequests2.基本用法2.1导入库importr......
  • Python深度学习股价预测、量化交易策略:LSTM、GRU深度门控循环神经网络|附代码数据
    全文链接:https://tecdat.cn/?p=37539原文出处:拓端数据部落公众号 分析师:ShuoZhang本文以上证综指近22年的日交易数据为样本,构建深度门控循环神经网络模型,从股价预测和制定交易策略两方面入手,量化循环神经网络在股票预测以及交易策略中的效果,结合一个Python深度学习股价预测......
  • python实现RC4加解密算法
    目录RC4算法简介RC4算法的加密和解密流程RC4算法的Python实现代码解释RC4算法的应用场景RC4的安全性分析总结RC4(RivestCipher4)是一种流加密算法,由RonRivest于1987年设计。RC4广泛应用于各种加密协议,如SSL/TLS和WEP/WPA等。RC4算法因其简单、高效的特点受到广泛关......
  • Python实现SM4加解密算法
    目录SM4算法简介SM4算法的加密和解密流程SM4算法的Python实现代码解释总结SM4算法是中国国家密码局设计的块密码算法,广泛应用于无线局域网标准和其他安全通信系统中。以下是SM4算法的详细介绍,包括加密解密流程和Python的完整实现。SM4算法简介SM4是一种对称分组......
  • Python入门阶段---------容易错的点
    Python中笔试中容易出错的知识点汇总前言一、Python的变量命名规范二、List列表1.返回值2.列表中添加新元素3.列表中append()和extend()的区别三、字符串中的切片四、字典中键值对1.字典中的键也分类型2.字典中的查总结前言本文总结了Python入门中容易混......
  • Python实现等距映射(ISOMAP)降维算法
    目录Python实现等距映射(ISOMAP)降维算法的博客引言ISOMAP算法原理ISOMAP的优势与局限Python实现ISOMAP算法1.创建ISOMAP类2.在瑞士卷数据集上应用ISOMAP3.结果分析总结运行结果Python实现等距映射(ISOMAP)降维算法的博客引言在高维数据处理中,降维是一种常用的技......
  • python基础(11文件读取)
    python系列文章目录python基础(01变量&数据类型&运算符)python基础(02序列共性)python基础(03列表和元组)python基础(04字符串&字典)python基础(05集合set)python基础(06控制语句)python基础(07函数)python基础(08类和对象)python基础(09闭包&装饰器)python基础(10异常处理)文章......