手把手搭建自己私有的MQTT服务器,完成设备上云 原创 ds小龙哥 DS小龙哥 嵌入式技术资讯 2024年09月19日 13:21 重庆 5人听过 一、前言 在众多物联网通信协议中,MQTT(Message Queuing Telemetry Transport)因其轻量、高效的特点而被广泛应用于各种物联网场景。它不仅能够满足设备低功耗的需求,还支持设备与云端之间稳定可靠的消息传递,是实现设备上云的理想选择之一。本文介绍利用EMQX服务器搭建自己私有的MQTT服务器,构建一套完整的物联网解决方案,涵盖从设备接入到数据存储,再到数据转发及上位机开发等多个关键环节。 EMQX,全称为Erlang/Enterprise Middleware MQTT Broker,是一款基于Erlang/OTP平台开发的开源物联网消息中间件。它专为大规模物联网应用设计,能够处理海量并发连接,并提供稳定的消息发布/订阅服务。作为一款高性能的MQTT协议服务器,EMQX不仅支持标准的MQTT v3.1、v3.1.1以及最新的v5.0版本协议,还提供了丰富的扩展功能来满足不同场景下的需求。 EMQX的核心优势在于其卓越的性能表现和高度可伸缩性。单个EMQX集群可以轻松管理数百万级别的设备连接,同时保持低延迟的消息传递能力。这使得EMQX成为构建大型物联网系统时的理想选择之一。此外,通过灵活配置规则引擎,用户可以根据业务逻辑定制化处理接收到的数据流,实现复杂事件处理、数据转换等功能。例如,当特定条件被触发时,可以自动执行预设的动作或将信息转发给其他系统进行进一步分析。 安全性方面,EMQX支持多种认证机制如用户名密码验证、客户端证书验证等,以确保只有授权用户才能访问敏感资源;同时也提供了TLS/SSL加密通信能力,保障了数据传输过程中的安全性和完整性。对于需要严格遵守行业标准的企业来说,这些特性尤为重要。 在集成度方面,EMQX展现了极高的灵活性与兼容性。无论是与其他数据库系统的对接(如MySQL, PostgreSQL, MongoDB等),还是与各种云服务提供商(如阿里云、AWS)的无缝衔接,EMQX都能很好地适应并促进整个生态系统的健康发展。EMQX还配备了详细的文档资料和技术支持服务,帮助开发者快速上手并解决遇到的问题。 图片 本文章里用到的全部工具软件都可以在这里下载(放在网盘里了)。 https://pan.quark.cn/s/145a9b3f7f53 二、准备服务器 EMQX-服务器部署目前支持以下操作系统: RedHat CentOS RockyLinux AmazonLinux Ubuntu Debian macOS Linux Windows(旧版本是支持的,新版不支持Windows系统,官网没有下载链接了) 前期测试阶段,华为云、阿里云、腾讯云这些平台都有免费试用的服务器,可以领取一个月ECS服务器试用测试。 也可以自己本地Windows电脑上搭建服务器,这个我在另一篇文章里有详细介绍,还有对应的视频(Windows系统搭建)。 搭建自己的MQTT服务器、实现设备上云(windows+EMQX) 我下面是领取了华为云一个月的ECS服务器搭建此项目的服务器用于进行整体项目的测试。 2.1 登录官网 https://www.huaweicloud.com/ 图片 2.2 领取一个月的服务器 【1】选择服务器 图片 配置过程中,系统选择ubuntu18.04。 【2】返回弹性服务器的控制台 服务器领取配置好之后回到主界面。 图片 【3】点击服务器名字,可以进入到详情页面。 图片 2.3 配置安全组 要确保MQTT服务器常用的几个端口已经开放出出来。 图片 2.4 安装FinalShell Windows下安装 FinalShell 终端,方便使用SSH协议远程登录到云服务器。(当然,使用其他方式登录也是一样的) 图片 2.5 远程登录到云服务器终端 【1】新建连接,选择SSH连接。 图片 【2】填入IP地址、用户名、密码 这里的主机就是填服务器的公网IP地址,密码就是创建服务器输入的密码,用户名直接用root。 图片 【3】点击连接服务器 图片 【4】第一次登录会弹出提示框,选择接受并保存 图片 【5】接下来可以看到服务器已经登录成功了。 图片 三、Linux下安装EMQX 本章节将介绍如何在 Ubuntu 系统中下载安装并启动 EMQX。 支持的 Ubuntu 版本: Ubuntu 22.04 Ubuntu 20.04 Ubuntu 18.04 2.1 官网地址 链接:https://www.emqx.io/docs/zh/v5.2/deploy/install-ubuntu.html 2.2 通过Apt源安装 EMQX 支持通过 Apt 源安装,免除了用户需要手动处理依赖关系和更新软件包等的困扰,具有更加方便、安全和易用等优点。 在命令行终端,复制下面的命令过去,按下回车键。 【1】通过以下命令配置 EMQX Apt 源: curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash 【2】运行以下命令安装 EMQX: sudo apt-get install emqx 【3】运行以下命令启动 EMQX: sudo systemctl start emqx 过程如下: 图片 图片 2.3 EMQX常用的命令 sudo systemctl emqx start 启动 sudo systemctl emqx stop 停止 sudo systemctl emqx restart 重启 四、配置EMQX-MQTT服务器 4.1 登录EMQX内置管理控制台 EMQX 提供了一个内置的管理控制台,即 EMQX Dashboard。方便用户通过 Web 页面就能轻松管理和监控 EMQX 集群,并配置和使用所需的各项功能。 在浏览器里输入: http://122.112.225.194:18083 就可以访问EMQX的后台管理页面。可以管理以连接的客户端或检查运行状态。 这里面的IP地址,就是自己ECS云服务器的公网IP地址。 打开浏览器后,输入地址后打开的效果: 图片 默认用户名和密码: 用户名:admin 密码:public 第一次登录会提示你修改新密码,如果不想设置,也可以选择跳过(公网服务器部署,还是要修改密码安全些)。 下面修改新密码: 图片 登录成功的页面显示如下: 图片 4.2 MQTT配置 这里可以配置MQTT的一些参数,根据自己的需求进行配置。 图片 4.3 测试MQTT通信 新建一个客户端,点击连接。 图片 连接之后,然后点击订阅,和发布,如果下面消息能正常的接收。说明MQTT服务器通信是已经正常,没问题了。 并且在这个页面也可以看到主题发布和主题订阅的格式。 图片 4.4 MQTT客户端登录服务器测试 接下来就打开我们自己的MQTT客户端登录MQTT服务器进行测试数据的通信。 端口选择: 1883 根据软件参数填入参数,登录,进行主题的发布和订阅。 图片 说明:目前还没有配置客户端认证,现在只要IP和端口输入正确,MQTT三元组可以随便输入,都可以登录上服务器的,服务器没有对三元组做校验。 EMQ X 默认配置中启用了匿名认证,任何客户端都能接入 EMQX。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQX 将根据匿名认证启用情况决定是否允许客户端连接。 然后打开EMQX的管理后台,可以看到我们的设备已经登录服务器了,名字为test1。 图片 在订阅主题的页面也可以看到我们客户端设备订阅的主题。 图片 4.5 客户端认证配置 EMQX 默认配置中启用了匿名认证,任何客户端都能接入 EMQX。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQX 将根据匿名认证启用情况决定是否允许客户端连接。 在正式产品里肯定是要启用认证的,不然任何设备都能接入。 下面就介绍如何配置 客户端认证。 【1】打开客户端认证页面 图片 【2】选择密码认证 图片 【3】选择内置数据库 图片 【4】设置认证方式(都可以默认,不用改),直接点击创建。 图片 【5】创建成功后,点击用户管理 图片 【6】添加用户 图片 图片 【7】添加成功 图片 【8】添加完毕之后,打开MQTT客户端可以进行测试。 登录的时候,MQTT用户名和密码必须输入正确,按照上一步添加的信息进行如实填写,否则是无法登录服务器的。 图片 4.6 客户端授权配置 客户端授权页面可以配置每个客户端(设备)的主题发布,订阅权限。限制它是否可以发布主题,订阅主题。如果有需要就可以进行配置。 http://127.0.0.1:18083/#/authorization/detail/built_in_database?tab=users 【1】创建数据源 图片 【2】选择内置数据库 图片 【3】完成创建 图片 【4】点击权限管理 图片 【5】选择客户端ID,点击添加 图片 【6】配置权限 图片 4.7 数据转发(集成) 在集成选项里,可以对设备数据处理。比如:转发到自己的HTTP服务器,转发到自己其他的MQTT服务器,创建规则,某些事件触发某些动作等等。 图片 选择数据桥接。 可以把数据发送端自己的HTTP服务器,或者发送到其他的MQTT服务器。 图片 选择HTTP服务 (如果自己有HTTP服务器,可以将数据转发给自己的HTTP服务器)。 图片 五、MQTT客户端消息互发测试 5.1 添加2个设备 为了方便测试设备间互相订阅主题,数据收发,在客户端认证页面至少添加2个设备。我这里分别添加了test1和test2。 图片 5.2 设备间测试 设备A订阅设备B的主题,设备B订阅设备A的主题,实现数据互发。 图片 设备A的MQTT信息: MQTT服务器地址:122.112.225.194 MQTT服务器端口号:1883 MQTT客户端ID:AAA MQTT用户名:test1 MQTT登录密码:12345678 订阅主题:BBB/# 发布主题:AAA/1 发布的消息:{ "msg": "我是AAA设备" } 设备B的MQTT信息: MQTT服务器地址:122.112.225.194 MQTT服务器端口号:1883 MQTT客户端ID:BBB MQTT用户名:test2 MQTT登录密码:12345678 订阅主题:AAA/# 发布主题:BBB/1 发布的消息:{ "msg": "我是BBB设备" } 五、单片机设备上云 只要是MQTT客户端能正常上云通信了,那么单片机也是一样的。 上位机也可以采用MQTT协议接入服务器,订阅设备的主题,就可以实时接收设备的消息(当然,也可以采用HTTP协议接入)。 六、数据桥接 EMQX支持将设备上传的数据转发到其他地方,比如,自己的HTTP服务器。方便自己服务器进行其他的处理。 通过数据桥接,用户可以实时地将消息从 EMQX 发送到外部数据系统,或者从外部数据系统拉取数据并发送到 EMQX 的某个主题。而 EMQX Dashboard 提供了可视化创建数据桥接的能力,只需在页面中配置相关资源即可。 本章节就介绍如何搭建自己的HTTP服务器。配置EMQX转发数据到自己的HTTP服务器,保存处理数据。 6.1 搭建HTTP服务器 我这里直接使用python写代码搭建一个HTTP服务器。ECS服务器上默认没有安装python3,需要先安装一下。 【1】安装python3 root@emqx:~/emqx# apt install python3 Reading package lists... Done Building dependency tree Reading state information... Done python3 is already the newest version (3.6.7-1~18.04). 0 upgraded, 0 newly installed, 0 to remove and 1 not upgraded. 【2】编写代码 from flask import Flask, json, request app = Flask(__name__) @app.route('/', methods=['POST']) def print_messages(): reply= {"result": "ok", "message": "success"} print("got post request: ", request.get_data()) return json.dumps(reply), 200 if __name__ == '__main__': app.run(host='0.0.0.0', port=8000) 将以上代码保存到一个名为 server.py 的文件中。 这段代码创建了一个使用 Flask 框架的 Web 服务器,可以接收根路径的 POST 请求。当接收到 POST 请求时,服务器会将请求的数据打印到终端,并返回一个 JSON 格式的响应给客户端。服务器将在本地运行,并监听默认的 8000 端口。 【3】运行程序 # 安装 flask 依赖 pip install flask pip3 install flask # 启动服务 python3 server.py 在命令行中执行 python3 server.py,就可以启动一个简单的HTTP服务器,可以接收并处理POST请求。当有POST请求发生时,服务器将返回收到的POST数据。可以根据自己的需要,进一步扩展处理POST请求的逻辑。 运行示例: (代码可以在本地写好上传到服务器,也可以直接 vim server.py 打开编辑器直接编写 ) 图片 可以通过发送 POST 请求到 http://your-server-ip:8000/ 的方式来测试这个服务器。 比如: http://122.112.225.194:8000/ 6.2 数据转发配置 【1】在集成选项里,可以对设备数据处理,将数据转发到自己的HTTP服务器。 图片 【2】选择Webhook。 Webhook,使用 Webhook 来转发数据到 HTTP 服务; 使用 Webhook 其实就是将 EMQX 接收并处理后的数据发送到一个 HTTP 服务上,再根据预设好的 HTTP 服务来处理和集成业务数据。 同样用户需要有一个预先搭建好的 HTTP 服务,需要在配置信息页面填写 HTTP 请求的服务地址,选择一个请求方法 POST、GET、PUT 或 DELETE,配置请求头,将需要发送的数据使用模板语法填写到请求体(body)中即可。 图片 【3】选配置Webhook 触发器选择所有消息和事件,URL里填自己的服务器地址。 图片 【4】点击测试。测试服务器是否OK。 图片 【6】没问题就直接点击保存 图片 【7】创建成功 图片 【8】保存之后。会自动创建规则和数据桥接。非常方便。 图片 图片 图片 6.3 测试转发效果 【1】打开MQTT客户端,发送数据测试。 图片 【2】看python服务器的终端,可以看到收到了EMQX服务器转发过来的数据。 如果自己接下来想要进行其他的操作,服务器写代码进行对应的处理即可。 图片 【3】 点击这个服务可以看到已经触发转发的详情。 图片 图片 七、API接口说明 一般开发一套完整的物联网产品。一般会分为设备端,服务器,上位机部分。 设备端:就是硬件端。采集本地传感器的数据上传到服务器,或者接收服务器下发的指令完成某些控制。 比如:STM32 + ESP8266 + 各种传感器 就是一个硬件设备端。可以通过ESP8266联网上传数据。 服务器:也就是MQTT服务器端,比如: 自己采用EMQX搭建的MQTT服务器,或者采用阿里云、华为云、OneNet这些平台的IOT服务器。 上位机:上位机就是给用户使用的,用于远程控制设备,查看设备。比如:微信小程序、手机APP、电脑上位机、web网页端等等。 那么这个章节,就介绍利用EMQX提供的API接口与MQTT客户端设备进行通信,完成数据上传,命令下发等功能,可以利用此接口完成上位机的开发。 7.1 查看全部的API接口 帮助文档地址: https://www.emqx.io/docs/zh/v5.0/admin/api.html#%E8%AE%A4%E8%AF%81 EMQX 提供了管理监控 REST API,这些 API 遵循 OpenAPI (Swagger) 3.0 规范。 EMQX 在 REST API 上做了版本控制,EMQX 5.0.0 以后的所有 API 调用均以 /api/v5 开头。 EMQX 服务启动后,可以访问 http://localhost:18083/api-docs/index.html (opens new window)来查看 API 的文档。还可以直接在 Swagger UI 上尝试执行一些 API。 比如: 我的EMQX服务器是在华为云ECS服务器上搭建,公网IP是: 122.112.225.194 那我访问API文档的地址就是下面这样的格式: 在浏览器里打开即可。 http://122.112.225.194:18083/api-docs/index.html 访问效果如下: 图片 7.2 创建API密匙 【1】登录EMQX的后台管理页面: http://122.112.225.194:18083/ 【2】找到菜单里的 系统设置选项-->API密匙。 图片 【3】创建密匙。 图片 【4】填写密匙名称 图片 【5】创建成功 图片 【6】得到API Key 和 Secret Key API Key : f072a6e9758b8cdf Secret Key : LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG 7.3 测试API: 获取节点信息 上一步已经创建好API的访问密匙,这里就以 获取节点信息为例,调用获取节点信息的API接口,获取节点 信息。 接口在API文档里的介绍: 图片 根据前面的API访问路径规则说明;那么,获取节点信息的API完整访问路径为: http://122.112.225.194:18083/api/v5/nodes 接下来就用python写一份代码,测试一下接口是否可以正常访问。python代码直接放服务器运行(主要是我本地没有安装python环境,云服务器的环境是已经安装OK的,测试方便)。 【1】在云服务器上创建一个python文件,方便测试代码 图片 【2】创建之后FinaShell自动上传到服务器 图片 【3】编辑代码 在这里双击要编辑的文件,就可以打开文件进行编辑。默认采用内置的编辑器,也可以选择自己电脑上的外置编辑器。 图片 【4】代码编辑完成,按下键盘快捷键Ctrl + S 保存文件内容,保存之后文件内容会自动同步到服务器。 图片 保存后提示,自动上传。 图片 写入的代码如下: import urllib.request import json import base64 username = 'f072a6e9758b8cdf' password = 'LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG' url = 'http://122.112.225.194:18083/api/v5/nodes' req = urllib.request.Request(url) req.add_header('Content-Type', 'application/json') auth_header = "Basic " + base64.b64encode((username + ":" + password).encode()).decode() req.add_header('Authorization', auth_header) with urllib.request.urlopen(req) as response: data = json.loads(response.read().decode()) print(data) 【5】执行代码,返回结果 通过返回信息来看,节点信息获取是没有问题的。 root@emqx:~/emqx# python3 http_api_test.sh [{'connections': 0, 'edition': 'Opensource', 'live_connections': 0, 'load1': 0.0, 'load15': 0.0, 'load5': 0.0, 'log_path': '/var/log/emqx', 'max_fds': 1048576, 'memory_total': '3.66G', 'memory_used': '612.59M', 'node': '[email protected]', 'node_status': 'running', 'otp_release': '25.3.2-2/13.2.2', 'process_available': 2097152, 'process_used': 543, 'role': 'core', 'sys_path': '/usr/lib/emqx', 'uptime': 84000040, 'version': '5.3.1-alpha.1'}] root@emqx:~/emqx# python3 http_api_test.sh [{'connections': 0, 'edition': 'Opensource', 'live_connections': 0, 'load1': 0.0, 'load15': 0.0, 'load5': 0.0, 'log_path': '/var/log/emqx', 'max_fds': 1048576, 'memory_total': '3.66G', 'memory_used': '613.23M', 'node': '[email protected]', 'node_status': 'running', 'otp_release': '25.3.2-2/13.2.2', 'process_available': 2097152, 'process_used': 543, 'role': 'core', 'sys_path': '/usr/lib/emqx', 'uptime': 84008046, 'version': '5.3.1-alpha.1'}] 图片 7.4 在线调试(获取主题列表) 在编写代码之前,可以先测试下API接口的效果,可以直接在Swagger UI界面直接调试API。 地址: http://122.112.225.194:18083/api-docs/index.html#/ 例如:以获取以订阅主题列表的API接口为例。 【1】在Swagger UI界面上找到对应的API接口。 图片 【2】点击API说明,展开详情 图片 【3】点击右边的试试看按钮。 图片 【4】点击执行 图片 【5】然后会弹出提示框,让你填入用户名和密码。 这个用户名和密码就是前面创建API密匙生成的API Key(用户名) 和 Secret Key(密码) 。 API Key : f072a6e9758b8cdf Secret Key : LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG 图片 【6】根据提示输入用户名和密码,再点击登录。 图片 【7】再次点击执行,就可以看到接口返回的数据了。 并且在页面上也写出了,请求的信息。使用curl命令行给出详细的请求过程,参考这个就可以自己写代码了。 图片 【8】API调用,curl命令行执行代码如下: curl -X 'GET' \ 'http://122.112.225.194:18083/api/v5/topics?node=emqx%40127.0.0.1&page=1&limit=50' \ -H 'accept: application/json' 【9】python代码实现 import requests url = 'http://122.112.225.194:18083/api/v5/topics?node=emqx%40127.0.0.1&page=1&limit=50' headers = {'accept': 'application/json'} response = requests.get(url, headers=headers) if response.status_code == 200: data = response.json() # 在这里处理返回的数据 print(data) else: print("请求失败,状态码:", response.status_code) 7.5 在线调试(发布主题) API里也支持发布主题,利用HTTP协议发布主题消息,如果设备端订阅了该主题,就可以收到API接口发布的消息。 【1】先找到发布主题的API接口 图片 【2】点击API名字,展开详情 图片 【3】点击右边的Try it out按钮。 图片 【4】参数填写说明 因为这个接口是发送主题的,需要填参数,填自己需要发布什么主题,什么消息。 出来的框框里就是发布信息,根据自己需要修改。 图片 topic就是发布的主题。 payload 就是发布的消息内容。只要MQTT客户端订阅了这个主题,就可以收到发布的消息。 这个主题的名字可以随便改的。我这里就用默认的名字和内容测试。 图片 { "payload_encoding": "plain", "topic": "api/example/topic", "qos": 0, "payload": "hello emqx api", "properties": { "payload_format_indicator": 0, "message_expiry_interval": 0, "response_topic": "some_other_topic", "correlation_data": "string", "user_properties": { "foo": "bar" }, "content_type": "text/plain" }, "retain": false } 【5】MQTT客户端登录。 打开MQTT客户端,登录服务器,订阅api/example/topic主题。 图片 【6】在API调试页面,点击执行 图片 【7】执行之后,在MQTT客户端的就可以收到API下发的消息了。 图片 【8】API接口调用,curl命令行执行的代码如下: curl -X 'POST' \ 'http://122.112.225.194:18083/api/v5/publish' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "payload_encoding": "plain", "topic": "api/example/topic", "qos": 0, "payload": "hello emqx api", "properties": { "payload_format_indicator": 0, "message_expiry_interval": 0, "response_topic": "some_other_topic", "correlation_data": "string", "user_properties": { "foo": "bar" }, "content_type": "text/plain" }, "retain": false }' 【9】Python代码实现 import requests import json url = 'http://122.112.225.194:18083/api/v5/publish' headers = { 'accept': 'application/json', 'Content-Type': 'application/json' } data = { "payload_encoding": "plain", "topic": "api/example/topic", "qos": 0, "payload": "hello emqx api", "properties": { "payload_format_indicator": 0, "message_expiry_interval": 0, "response_topic": "some_other_topic", "correlation_data": "string", "user_properties": { "foo": "bar" }, "content_type": "text/plain" }, "retain": False } response = requests.post(url, headers=headers, data=json.dumps(data)) if response.status_code == 200: print("消息发布成功") else: print("消息发布失败,状态码:", response.status_code) ds小龙哥 阅读 1.6万 修改于2024年09月19日
标签:手把手,上云,MQTT,API,服务器,EMQX,客户端,图片 From: https://www.cnblogs.com/cheyunhua/p/18515632