首页 > 其他分享 >搭建多协议的串口服务器流程:RS-232、RS-485和TCP/IP、MQTT网络协议(代码示例)

搭建多协议的串口服务器流程:RS-232、RS-485和TCP/IP、MQTT网络协议(代码示例)

时间:2024-08-26 18:21:46浏览次数:14  
标签:示例 self RS MQTT mqtt 模块 串口 data

一、项目概述

在物联网(IoT)和自动化控制的快速发展中,串口通信作为一种经典的通信方式,依然发挥着重要作用。本项目旨在构建一个支持多种协议的串口服务器,能够通过串口接收和发送数据,并通过网络协议(如TCP/IP、MQTT等)与其他设备和系统进行交互。

项目的目标和用途

本项目的目标是提供一个灵活的串口服务器,支持多种串口协议(如RS-232、RS-485)以及多种网络协议,使得不同的设备能够无缝对接,解决不同设备间的通信不畅问题。该串口服务器可以广泛应用于工业自动化、智能家居、远程监控等领域,提升系统的互联互通能力。

项目解决的问题和带来的价值

通过构建这个串口服务器,用户可以实现以下目标:

  • 设备互联:无论是工业设备、传感器还是其他通信设备,都可以通过统一的串口服务器进行数据交换。

  • 协议转化:支持多种协议的转换,简化设备间的通信复杂性。

  • 易于维护:模块化设计,方便后续的功能扩展和维护。

  • 实时监控:通过网络协议,用户可以实时监控设备状态,提高系统的可靠性。


二、系统架构

在系统架构设计中,我们需要考虑到项目的需求和目标,选择合适的硬件和软件技术栈。

系统架构设计c

我们选择了基于Linux的嵌入式设备作为串口服务器的运行平台,使用C/C++进行底层串口操作,Python处理高层协议解析。系统的主要组件包括:

  • 串口通信模块:负责与外部设备进行数据交换。

  • 协议解析模块:负责解析不同的网络协议(如TCP/IP、MQTT等)。

  • 数据处理模块:实现数据的缓存和异步处理。

  • 用户接口模块:提供CLI和REST API用于配置和监控。

技术栈选择

  • 单片机:选择树莓派作为硬件平台,具备良好的串口支持和网络功能。

  • 通信协议:支持RS-232和RS-485串口协议,TCP/IP和MQTT网络协议。

架构图

串口服务器 串口通信模块 协议解析模块 数据处理模块 用户接口模块 外部设备 网络协议 缓存系统 CLI/REST API

三、环境搭建

在进行开发之前,需要搭建合适的开发环境。

环境安装步骤和配置

  1. 安装操作系统:
  • 下载和安装树莓派操作系统(Raspberry Pi OS)。

  • 配置网络连接。

  1. 安装开发工具:
  • 安装C/C++编译器:

    sudo apt-get install build-essential
    
  • 安装Python和相关库:

    sudo apt-get install python3 python3-pip
    pip3 install pyserial flask paho-mqtt
    
  1. 配置串口:
  • 确保串口设备已连接,并配置相应的权限:

    sudo usermod -a -G dialout $(whoami)
    

配置示例和注意事项

  • 确保串口设备的波特率、数据位、停止位等参数设置正确。

  • 使用ls /dev/tty*命令检查可用的串口设备。


四、代码实现

在这一部分,我们将根据之前设计的系统架构逐步实现各个功能模块。代码实现包括串口通信模块、协议解析模块和数据处理模块。每个模块都将附带详细的代码说明和时序图,以便更好地理解其逻辑和功能。

1. 串口通信模块

1.1 功能描述

串口通信模块负责与外部设备进行数据交换。它能够读取串口数据和发送数据,同时确保数据的完整性和有效性。我们将使用Python中的pySerial库来实现这一功能。

1.2 代码实现

import serial
import time

class SerialCommunication:
    def __init__(self, port, baudrate):
        """
        初始化串口通信模块。

        :param port: 串口设备的名称,如'/dev/ttyUSB0'(Linux)或'COM3'(Windows)。
        :param baudrate: 串口通信的波特率(如9600)。
        """
        self.serial_port = serial.Serial(port, baudrate, timeout=1)
        time.sleep(2)  # 等待串口稳定

    def read_data(self):
        """
        从串口读取数据。

        :return: 读取到的数据(字符串格式)。
        """
        if self.serial_port.in_waiting > 0:  # 检查是否有可读取的数据
            return self.serial_port.read_until().decode('utf-8').strip()
        return None

    def send_data(self, data):
        """
        向串口发送数据。

        :param data: 要发送的数据(字符串格式)。
        """
        self.serial_port.write(data.encode('utf-8'))
        print(f"发送: {data}")

    def close(self):
        """
        关闭串口连接。
        """
        self.serial_port.close()
        print("串口已关闭。")

1.3 代码说明

  • __init__:构造函数,初始化串口通信模块,设置串口的端口和波特率,并添加延迟确保串口稳定。

  • read_data:从串口读取数据,使用in_waiting检查是否有可读数据,读取数据直到换行符。

  • send_data:向串口发送数据,先将数据编码为字节格式,然后发送。

  • close:关闭串口连接,释放资源。

1.4 时序图

用户 SerialCommunication External Device 发送数据("Hello Device") 写入("Hello Device") 返回数据("Hello Host") 读取数据() 用户 SerialCommunication External Device

2. 协议解析模块

2.1 功能描述

协议解析模块负责处理接收到的网络协议数据,并将其转发到串口通信模块。我们将实现TCP和MQTT协议解析。

2.2 代码实现

import socket
import paho.mqtt.client as mqtt

class ProtocolParser:
    def __init__(self, mqtt_broker, mqtt_port):
        """
        初始化协议解析模块。

        :param mqtt_broker: MQTT代理的地址。
        :param mqtt_port: MQTT代理的端口。
        """
        self.mqtt_client = mqtt.Client()
        self.mqtt_client.on_message = self.on_message

        # 连接到MQTT代理
        self.mqtt_client.connect(mqtt_broker, mqtt_port)
        self.mqtt_client.loop_start()  # 开始循环处理网络事件

    def on_message(self, client, userdata, msg):
        """
        MQTT消息回调函数。

        :param client: 客户端实例。
        :param userdata: 用户定义的数据。
        :param msg: 接收到的消息。
        """
        print(f"接收到MQTT消息: {msg.payload.decode()}")
        # 处理接收到的消息
        self.handle_message(msg.payload.decode())

    def handle_message(self, message):
        """
        处理接收到的消息。

        :param message: 接收到的消息(字符串格式)。
        """
        # 这里可以添加协议解析逻辑
        print(f"处理消息: {message}")
        # 例如将消息发送到串口
        serial_comm.send_data(message)
            def start_tcp_server(self, host='0.0.0.0', port=5000):
        """
        启动TCP服务器。

        :param host: 主机地址。
        :param port: 端口号。
        """
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_socket:
            tcp_socket.bind((host, port))
            tcp_socket.listen()
            print(f"TCP服务器已启动,监听地址 {host}:{port}")
            conn, addr = tcp_socket.accept()
            with conn:
                print(f"连接来自: {addr}")
                while True:
                    data = conn.recv(1024)
                    if not data:
                        break
                    print(f"接收到TCP数据: {data.decode()}")
                    self.handle_message(data.decode())

    def subscribe_to_mqtt(self, topic):
        """
        订阅指定的MQTT主题。

        :param topic: 要订阅的主题。
        """
        self.mqtt_client.subscribe(topic)
        print(f"已订阅MQTT主题: {topic}")

2.3 代码说明

  • on_message:MQTT消息的回调函数,当接收到消息时触发,并调用handle_message处理消息。

  • handle_message:处理接收到的消息并将其发送到串口通信模块。

  • start_tcp_server:启动TCP服务器,接收来自TCP客户端的数据,并通过handle_message处理。

  • subscribe_to_mqtt:订阅指定的MQTT主题,以便接收相关消息。

2.4 时序图

用户 TCP Server ProtocolParser SerialCommunication External Device 发送数据("Hello Device") 数据接收("Hello Device") 处理消息("Hello Device") 写入("Hello Device") 返回数据("Hello Host") 读取数据() 返回数据("Hello Host") 用户 TCP Server ProtocolParser SerialCommunication External Device

3. 数据处理模块

3.1 功能描述

数据处理模块负责缓存和异步处理从串口或网络接收到的数据,确保数据的完整性和实时性。该模块将实现数据的缓冲机制,以便在网络延迟或串口传输过程中有效管理数据流。

3.2 代码实现

import threading
import queue

class DataProcessor:
    def __init__(self):
        """
        初始化数据处理模块。
        """
        self.data_queue = queue.Queue()  # 创建一个数据队列
        self.is_running = True
        self.thread = threading.Thread(target=self.process_data)
        self.thread.start()  # 启动数据处理线程

    def process_data(self):
        """
        处理数据的线程函数。
        """
        while self.is_running:
            try:
                data = self.data_queue.get(timeout=1)  # 从队列中获取数据
                print(f"处理数据: {data}")
                # 这里可以添加进一步的数据处理逻辑
                self.send_to_serial(data)  # 将数据发送到串口
            except queue.Empty:
                continue  # 如果队列为空,则继续等待

    def send_to_serial(self, data):
        """
        将数据发送到串口。

        :param data: 要发送的数据(字符串格式)。
        """
        serial_comm.send_data(data)

    def add_data(self, data):
        """
        向数据队列添加数据。

        :param data: 要添加的数据(字符串格式)。
        """
        self.data_queue.put(data)
        print(f"添加数据到队列: {data}")

    def stop(self):
        """
        停止数据处理线程。
        """
        self.is_running = False
        self.thread.join()  # 等待线程结束
        print("数据处理线程已停止。")

3.3 代码说明

  • __init__:构造函数,初始化数据队列,并启动数据处理线程。

  • process_data:处理数据的线程函数,从队列中获取数据并进行处理。

  • send_to_serial:将接收到的数据发送到串口通信模块,确保数据及时发送。

  • add_data:向数据队列添加数据,使用队列的put方法将数据放入队列中。

  • stop:设置停止标志并等待数据处理线程结束,以确保资源的正常释放。

3.4 时序图

用户 DataProcessor SerialCommunication 添加数据("测试数据") 发送数据("测试数据") 发送成功 用户 DataProcessor SerialCommunication

4. 整合与测试

在实现了各个模块后,我们将整合所有模块,进行测试,确保整个串口服务器能够正常工作,并支持多种协议。

4.1 整合代码示例

以下是如何整合之前实现的模块,以启动整个串口服务器的示例代码:

import time

# 创建串口通信实例
serial_comm = SerialCommunication(port='/dev/ttyUSB0', baudrate=9600)

# 创建协议解析实例
mqtt_broker = 'mqtt.example.com'
mqtt_port = 1883
protocol_parser = ProtocolParser(mqtt_broker, mqtt_port)

# 创建数据处理实例
data_processor = DataProcessor()

# 启动TCP服务器
tcp_thread = threading.Thread(target=protocol_parser.start_tcp_server)
tcp_thread.start()

# 订阅MQTT主题
protocol_parser.subscribe_to_mqtt("test/topic")

try:
    while True:
        # 从串口读取数据并添加到数据处理器
        serial_data = serial_comm.read_data()
        if serial_data:
            data_processor.add_data(serial_data)
        time.sleep(1)  # 主线程休眠,避免占用CPU
except KeyboardInterrupt:
    print("服务器正在关闭...")
finally:
    # 清理资源
    serial_comm.close()
    data_processor.stop()
    protocol_parser.mqtt_client.loop_stop()
    tcp_thread.join()
    print("服务器已停止。")

4.2 代码说明

  • 整合:我们创建了串口通信、协议解析和数据处理的实例,并启动TCP服务器和MQTT订阅。

  • 主循环:在主线程中,我们持续读取串口数据并将其添加到数据处理器中。

  • 关闭流程:在捕获到中断信号后,确保所有模块都能正常关闭,释放资源。

4.3 测试

  • 串口通信测试:使用串口工具(如PuTTY)向串口发送数据,检查串口服务器是否能够正确接收并处理数据。

  • TCP服务器测试:使用telnet或socket客户端连接到TCP服务器,发送数据,确保TCP服务器能正确接收并转发数据。

  • MQTT测试:使用MQTT客户端向指定主题发布消息,检查串口服务器是否能够接收到并处理这些消息。


5. 项目总结

通过本项目,我们成功构建了一个支持多种协议的串口服务器。主要实现了以下功能:

  • 串口通信模块:能够与外部设备进行数据交换,支持数据的读取和发送。

  • 协议解析模块:支持TCP和MQTT协议的解析,能够接收来自网络的数据并进行处理。

  • 数据处理模块:使用队列和线程管理数据流,确保数据的实时性和完整性。

在实现过程中,我们使用了Python的多线程和异步编程特性,以提高系统的性能和响应速度。最终的系统架构灵活且可扩展,能够适应多种设备和协议的需求。

未来可以进一步扩展以下功能:

  • 增加更多协议支持:如HTTP、WebSocket等。

  • 数据存储:将接收到的数据存储到数据库或文件中,便于后续分析和查询。

  • 图形用户界面:为用户提供更直观的操作界面,方便配置和监控。

标签:示例,self,RS,MQTT,mqtt,模块,串口,data
From: https://blog.csdn.net/qq_40431685/article/details/141568702

相关文章

  • 【流式编程】Stream.of()用法解析及使用示例
    Stream.of()是Java8引入的StreamAPI中的一个静态方法,用于从给定的元素创建一个顺序流(SequentialStream)。这个方法非常灵活,允许你直接从一组元素中创建一个流,而不需要这些元素已经存在于某个集合或数组中。这对于快速创建和操作流非常有用。用法解析Stream.of()......
  • 1047 Student List for Course【超简单思路,map,vector,对于超时问题】
    ZhejiangUniversityhas40,000studentsandprovides2,500courses.Nowgiventheregisteredcourselistofeachstudent,youaresupposedtooutputthestudentnamelistsofallthecourses.InputSpecification:Eachinputfilecontainsonetestcase.Fo......
  • 【跨域问题解决】Access to XMLHttpRequest at xxx from origin xxx has been blocked
    这个错误是由于浏览器的同源策略(CORS,Cross-OriginResourceSharing)导致的。当从一个源(origin)向另一个源请求资源时,如果这两个源的协议、域名或端口号不同,就会触发CORS策略。解决方法要解决这个问题,你需要在你的后端服务中添加CORS支持,以便它允许来自你的请求。这通常......
  • 体育数据API纳米足球数据API:足球数据接口文档API示例①
    纳米体育数据的数据接口通过JSON拉流方式获取200多个国家的体育赛事实时数据或历史数据的编程接口,无请求次数限制,可按需购买,接口稳定高效;覆盖项目包括足球、篮球、网球、电子竞技、奥运等专题、数据内容。纳米数据API2.0版本包含http协议以及websocket协议,主要通过http获取数......
  • [ARC182C] Sum of Number of Divisors of Product 题解
    题目链接点击打开链接题目解法我怎么不会分治/fn首先把\(x\)分解成\(\prodp_i^{x_i}(0\lei\le5)\)的形式,正因数个数为\(\prod(x_i+1)\)有一个很牛的想法是:合并两个\(x_i\)序列(假设一个叫\(x_0,...,x_5\),另一个叫\(y_0,...,y_5\))先不考虑后面的\(+1\)(可以最后......
  • 腾讯地图SDK Android版开发 8 覆盖物示例2动画
    腾讯地图SDKAndroid版开发8覆盖物示例2动画动画相关的类和接口帧动画Animation动画Marker接口继承关系Animation接口类及其子接口类AnimationableIAnimationSetTencentMapComponent地图组件接口类Marker动画示例界面布局MapMarkAnimate类常量成员变量初始值创建......
  • 网站提示505 HTTP Version Not Supported:服务器不支持请求的HTTP版本怎么办
    当遇到“505HTTPVersionNotSupported”错误时,这意味着服务器不支持客户端请求中使用的HTTP版本。这种情况通常发生在客户端尝试使用较新的HTTP版本,而服务器仅支持老版本的协议时。解决方案检查客户端使用的HTTP版本确认客户端使用的HTTP版本。如果客户端使用的是HTTP/......
  • 网站提示5xx Server Errors(服务器错误状态码)怎么办
    当遇到“5xxServerErrors”时,这意味着服务器在处理请求时遇到了错误,这些错误通常与服务器端的问题有关。5xx系列的状态码包括但不限于:500InternalServerError:服务器遇到了一个未曾预料的状况,导致它无法完成对请求的处理。501NotImplemented:服务器不支持请求的功能或API......
  • 【K8s】专题十二(3):Kubernetes 存储之 PersistentVolumeClaim
    本文内容均来自个人笔记并重新梳理,如有错误欢迎指正!如果对您有帮助,烦请点赞、关注、转发、订阅专栏!专栏订阅入口Linux专栏 | Docker专栏 | Kubernetes专栏往期精彩文章【Docker】(全网首发)KylinV10下MySQL容器内存占用异常的解决方法【Docker】(全网首发)Kyli......
  • Access-Control-Allow- 设置 跨域资源共享 CORS 详解
    跨域访问的项目常在过滤器或者拦截器中添加如下配置   response.setHeader("Access-Control-Allow-Origin","*");response.setHeader("Access-Control-Allow-Methods","POST,OPTIONS,GET");response.setHeader("Access-Control-Max-Age&qu......