依赖文件
pymodbus==3.6.3
pyserial==3.5
import binascii
import math
import time
import serial
from pymodbus.client import ModbusSerialClient as ModbusClient
import struct
# # 设备串口配置
# port = '/dev/cu.usbserial-1130' # 根据实际情况修改为实际串口号
# baudrate = 9600
# parity = 'N'
# stopbits = 1
# bytesize = 8
# # 创建串口连接
# ser = serial.Serial(port, baudrate, parity=parity, stopbits=stopbits, bytesize=bytesize)
# # 创建Modbus客户端
# print("Modbus RTU Client Connected")
# client = ModbusClient(port=ser,baudrate=9600)
# print(client)
# # 假设从地址1开始写入固件数据,每个寄存器存储两个字节
# if client.connect():
# print("Modbus RTU Client Connected")
# else:
# print("Failed to connect to Modbus RTU Client")
# firmware_file_path = 'firmware.bin'
# with open(firmware_file_path, 'rb') as f:
# firmware_data = f.read()
# # 模拟将固件数据按每两个字节分组并写入Modbus寄存器
# register_address = 0x0001 # 起始寄存器地址
# for i in range(0, len(firmware_data), 2):
# if i + 1 < len(firmware_data): # 确保不超出范围
# register_value = struct.unpack('>H', firmware_data[i:i+2])[0] # 大端模式读取两个字节
# client.write_register(register_address, register_value)
# register_address += 1
# # 可能需要根据设备响应速度调整延时
# time.sleep(0.1)
# # 关闭Modbus客户端和串口连接
# client.close()
# ser.close()
# print("Firmware data sent via Modbus.")
# # 在这里添加向设备发送固件更新结束命令的逻辑,具体命令取决于设备实现
from pymodbus.client import ModbusSerialClient
from pymodbus.exceptions import ModbusException, ConnectionException
import logging
import hashlib
# 配置日志记录
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
client = ModbusSerialClient(port='/dev/cu.usbserial-B001VTZP', baudrate=57600,
stopbits=1, bytesize=8, parity='N',slaveaddress=1) # 看文档,method='rtu'貌似没用
firmware_file_path = 'N00.bin'
def read_temperature_and_humidity(client):
try:
# 计算md5 数值是多少
# 计算分为多少包
# 发送查询modbus版本
# 发送OTA md5 文件
# 读取现在的版本信息
response = client.read_holding_registers(address=100, count=8, slave=1)
if not response.isError():
print("Register Values: ", response.registers)
else:
print("Failed to read registers")
# 读取 文件的md5 并下发给设备
with open(firmware_file_path, 'rb') as f:
firmware_data = f.read() # read file as bytes
readable_hash = hashlib.md5(firmware_data).hexdigest();
file_length = len(firmware_data)
file_length_bytes = file_length.to_bytes(4, byteorder='big')
file_length_hex_str = binascii.hexlify(file_length_bytes).decode()
file_length_byte_array = [file_length_hex_str[i:i+2] for i in range(0, len(file_length_hex_str), 2)]
hash_bytes = bytes.fromhex(readable_hash)
hex_string = binascii.hexlify(readable_hash.encode()).decode()
byte_array = [hex_string[i:i+2] for i in range(0, len(hex_string), 2)]
# 将文件长度的字节数组与MD5字节数组合并
combined_byte_array = file_length_byte_array + byte_array
print(type(combined_byte_array))
print(combined_byte_array)
byte_values = [int(x, 16) for x in combined_byte_array]
combined_bytes = bytes(byte_values)
#把数组写入write_registers
# 将合并后的字节数据按照寄存器大小(假设每个寄存器是两个字节)分割成整数列表
register_values = struct.unpack('>' + 'H' * (len(combined_bytes) // 2), combined_bytes)
print(register_values)
print("*****************")
write_response=client.write_registers(address=108, values=register_values, slave=1)
if not write_response.isError():
print("回复成功")
else:
print("回复失败")
# 下发分片任务
i = 0
register_address = 0x0001 # 起始寄存器地址
chunk_size_bytes = 200 # 每次发送的字节数量,需确保是寄存器数量的整数倍(每个寄存器2字节)
#需要分多少包
# mdobus_size = hex(int(math.ceil(len(firmware_data) / 200))).to_bytes(2, byteorder='big').hex()
mdobus_size = (math.ceil(len(firmware_data) / 200))
print("分片任务数:", mdobus_size)
while i * chunk_size_bytes < len(firmware_data):
# 假设 client 支持一次性写入多个寄存器,例如使用 write_registers 方法
# 打印数值
# 还剩200个字节则
xianzai_len=chunk_size_bytes*i
print(len(firmware_data))
print(xianzai_len)
print("第", i, "个分片,剩余", len(firmware_data)-chunk_size_bytes*i, "个字节")
if chunk_size_bytes*i<=len(firmware_data)-200:
chunk = firmware_data[i*chunk_size_bytes:(i+1)*chunk_size_bytes]
register_values = struct.unpack('>' + 'H' * (chunk_size_bytes // 2), chunk)
data_len=200;
else:
data_len=len(firmware_data)-chunk_size_bytes*i
chunk = firmware_data[i * chunk_size_bytes:i * chunk_size_bytes + data_len]
register_values = struct.unpack('>' + 'H' * (data_len // 2), chunk)
# 加上分包和总包数
# 将 mdobus_size 添加到 register_values 的开头
register_values_list=list(register_values)
register_values_list.insert(0, mdobus_size)
register_values_list.insert(0, 0)
register_values_list.insert(0, i+1)
register_values_list.insert(0, 0)
register_values_list.insert(0, data_len)
print("register_values_list", register_values_list)
register_values = tuple(register_values_list)
print("*****************")
print("register_values", register_values)
write_response=client.write_registers(address=126, values=register_values, slave=1)
if not write_response.isError():
print("Multiple registers written successfully")
else:
print("Failed to write multiple registers")
print(i * chunk_size_bytes)
print(len(firmware_data))
i += 1
print(i * chunk_size_bytes)
print("*************")
time.sleep(0.3)
# 可能需要根据设备响应速度调整延时
# i = 0
# register_address = 0x0001 # 起始寄存器地址
# chunk_size_bytes=200
# total_chunks = (len(firmware_data) + chunk_size_bytes - 1) // chunk_size_bytes # 计算总分片数
# chunk_size_bytes -= 2 # 减去分片序号和总分片数占用的两个字节
# while i * chunk_size_bytes < len(firmware_data):
# chunk = firmware_data[i*chunk_size_bytes:(i+1)*chunk_size_bytes]
# # 添加分片序号(高位字节在前)
# slice_number = i
# packet = slice_number.to_bytes(2, byteorder='big') + chunk
# # 假设 client 支持一次性写入多个寄存器,例如使用 write_registers 方法
# register_values = struct.unpack('>' + 'H' * ((chunk_size_bytes + 2) // 2), packet)
# print(register_values)
# write_response = client.write_registers(address=register_address, values=register_values, slave=1)
# if not write_response.isError():
# print("Multiple registers written successfully")
# else:
# print("Failed to write multiple registers")
# register_address += (chunk_size_bytes + 2) // 2 # 更新寄存器地址,因为每个寄存器2字节,且包含了序号信息
# i += 1
# # 可能需要根据设备响应速度调整延时
# time.sleep(0.1)
# # 最后一个数据块可能不足一个完整的chunk_size_bytes,需单独处理
# if i * chunk_size_bytes < len(firmware_data):
# remaining_chunk = firmware_data[i*chunk_size_bytes:]
# slice_number = i
# packet = slice_number.to_bytes(2, byteorder='big') + remaining_chunk
# register_values = struct.unpack('>' + 'H' * ((len(remaining_chunk) + 2) // 2), packet)
# write_response = client.write_registers(address=register_address, values=register_values, slave=1)
# if not write_response.isError():
# print("Remaining registers written successfully")
# else:
# print("Failed to write remaining registers")
# 读取寄存器地址0和1上的4个字节(两个寄存器)
# result = client.read_input_registers(address=0, count=3, unit=1) # 这个错了,这是读取输入寄存器的)0x04
# result = client.read_holding_registers(address=0, count=3, unit=1) # 这个才是读取输入寄存器的0x03 # unit参数错了,当前pymodbus版本没有这个参数,搞乌龙了,要不是用filelocator搜索函数用法,还真不知道- -
# result = client.read_holding_registers(
# address=0, count=2, slave=1) # 读取输入寄存器的0x03 # 读两个寄存器就ok,卖家说第三个寄存器是预留的,不用读
# if result.isError():
# # 处理错误
# print("读取错误:", result)
# return None, None
# 将读取到的结果转换为温度和湿度
# registers = result.registers
# temperature_reg = registers[0]
# humidity_reg = registers[1]
# # 检查是否有探头错误
# if temperature_reg == 0x8000 or humidity_reg == 0x8000:
# print("探头错误")
# return None, None
# # 计算实际的温度和湿度值
# temperature = temperature_reg * 0.1
# humidity = humidity_reg * 0.1
# # 格式化温度和湿度值,保留一位小数
# temperature = round(temperature, 1)
# humidity = round(humidity, 1)
return 1, 1
except ModbusException as e:
print("Modbus异常:", e)
return None, None
except Exception as e:
# 捕获除ModbusException之外的所有异常
print(f"An error occurred: {e}")
return None, None
def main():
try:
if client.connect(): # 尝试连接到Modbus服务器/设备
temperature, humidity = read_temperature_and_humidity(client)
# if temperature is not None and humidity is not None:
# print(f"温度: {temperature}°C, 湿度: {humidity}%RH")
client.close() # 关闭连接
else:
print("无法连接到Modbus设备")
except ConnectionException as e:
print("连接异常:", e)
if __name__ == "__main__":
main()
#导出 pip
pip freeze > requirements.txt
# 查看串口
···
ls /dev/cu.usbserial-*
···
标签:python,OTA,chunk,register,bytes,升级,values,print,size
From: https://blog.51cto.com/u_11777051/11869291