最近项目上使用了mqtt协议来传输数据,之前没了解过,故简单学习下,本文作为学习记录以便之后复习使用。
1.什么是MQTT
MQTT 是一种基于发布/订阅模式的轻量级消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用而设计,可以用极少的代码为联网设备提供实时可靠的消息服务。MQTT 协议广泛应用于物联网、移动互联网、智能硬件、车联网、智慧城市、远程医疗、电力、石油与能源等领域。按照MQTT协议作者的说法,MQTT协议必须具备这几点。
- 简单容易实现
- 支持 QoS(设备网络环境复杂)
- 轻量且省带宽(因为那时候带宽很贵)
- 数据无关(不关心 Payload 数据格式)
- 有持续地会话感知能力(时刻知道设备是否在线
而MQTT协议凭借着轻量高效、可靠的消息传递、海量连接支持、安全的双向通信等优点已成为物联网行业的首选协议。
2.MQTT工作原理
2.1 基础概念
MQTT客户端:任何运行 MQTT客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT测试工具也是客户端。
MQTT Broker: MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。(而EMQX最为优秀的mqtt broker是首选的学习对象。本文也是基于emqx来进行学习介绍的。)
发布订阅模式:发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。
主题:MQTT 协议根据主题来转发消息。主题通过 /
来区分层级,MQTT 主题支持以下两种通配符:+
和 #
。1. +
:表示单层通配符,例如 a/+
匹配 a/x
或 a/y
。 2. #
:表示多层通配符,例如 a/#
匹配 a/x
、a/b/c/d
。【注意:通配符主题只能用于订阅,不能用于发布。】
QoS(Quality of Service):
MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。
- QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
- QoS 1:消息至少传送一次。
- QoS 2:消息只传送一次。
2.2 工作流程
- 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
- 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
- MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。
3.MQTT简单使用
EMQX优势的地方在于提供了Web形式的客户端(MQTTX)以及在线试用的Broker,这对于学习MQTT的朋友提供了极大的便利。
本文使用docker形式部署本地的emqx。部署步骤如下:
- docker pull emqx/emqx:5.1.6
- docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.1.6
- 浏览器输入你的IP:18083,默认账户密码为admin/public,首次登录需改密码。
页面打开的是EMQX 提供了一个内置的管理控制台,即 EMQX Dashboard。方便用户通过 Web 页面就能轻松管理和监控 EMQX 集群,并配置和使用所需的各项功能。这也是EMQX的优势。
在EMQX Dashboard中进入客户端认证页面,添加基于password_based方式认证机制,添加时也可以选择存储认证信息的数据源,使用非常方便。【可选】
下面已Python客户端(paho-mqtt)为例演示数据接收与发送,Broker使用本地部署的,当然也可已使用EMQX提供的试用Broker。直接上代码。
import random import time from paho.mqtt import client as mqtt_client broker = 'IP' port = 8083 topic = "/python/mqtt" client_id = f'python-mqtt-{random.randint(0, 1000)}' username = '账户' password = '密码' def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id, transport="websockets") # 使用websockets方式连接,默认端口8083 client.username_pw_set(username=username, password=password) client.on_connect = on_connect client.connect(host=broker, port=port) return client def publish(client: mqtt_client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) status = result[0] if status == 0: print(f"Send {msg} to topic {topic}") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def run(): client = connect_mqtt() client.loop_start() publish(client)
import random from paho.mqtt import client as mqtt_client broker = 'IP' port = 8083 topic = "/python/mqtt" client_id = f'python-mqtt-{random.randint(0, 1000)}' username = '账户' password = '密码' def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id, transport="websockets") client.username_pw_set(username=username, password=password) client.on_connect = on_connect client.connect(host=broker, port=port) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(msg) print(f"Received {msg.payload.decode()} from {msg.topic} topic") client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever()
运行脚本后可以在控制台看到实时的看到Broker的使用情况,简直太方便了。
标签:入门,mqtt,Broker,MQTT,client,connect,简单,EMQX,客户端 From: https://www.cnblogs.com/wsmbszyn/p/17660570.html