本周 web 课简单了解了MQTT,在此记录下我的安装使用(用python写个订阅新闻的简单示例)流程,如有错误,欢迎指正!
1. MQTT 简介
MQTT 是一种轻量级的消息传递协议,专为低带宽、高延迟或不可靠的网络环境设计。
1.1 MQTT 的主要特点
-
轻量级和低带宽:MQTT 协议的头部非常小,这使得它在带宽有限的情况下依然有效。
-
使用 TCP/IP 进行消息传输:保证了消息的可靠传输。
-
发布-订阅模型:MQTT 使用发布-订阅消息模式,这意味着发送者(发布者)和接收者(订阅者)不直接进行通信。相反,消息通过主题交换,发布者将消息发布到主题,订阅者订阅它们感兴趣的主题并接收相应的消息。
-
支持三种消息发布服务质量(QoS):
- 0(最多一次)
- 1(至少一次)
- 2(仅一次)
1.2 核心组件
- Broker:MQTT 消息代理服务器,负责接收所有消息、过滤消息、决定谁是消息的接收者,以及将消息传输给客户端。
有许多MQTT代理可用于测试和实际应用。有免费的自托管代理,最流行的是
Mosquitto
,还有商业代理,如HiveMQ
。
Mosquitto
是一个免费的开源MQTT代理,可在Windows和Linux上运行。
- Client:任何可以与 Broker 建立连接的设备。客户端可以是发布消息的生产者,也可以是订阅消息的消费者。
对于MQTTv3.1.1,在几乎所有编程语言和主要操作系统Linux、Windows、Mac中都有客户端软件,来自Eclipse Paho项目。如:Paho Python客户端
1.3 消息流程
- 连接:客户端通过 TCP/IP 协议连接到 Broker。
- 发布(Publish):客户端向特定主题发送消息。
- 订阅(Subscribe):客户端订阅一个或多个主题,以接收来自这些主题的消息。
- 断开连接(Disconnect):结束会话时,客户端与 Broker 断开连接。
2. 在 Ubuntu 22.04 上安装 MQTT
2.1 安装 MQTT Broker (Mosquitto)
- 在终端运行以下命令安装 Mosquitto 服务器和客户端:
sudo apt-get update
sudo apt-get install mosquitto mosquitto-clients
命令执行后将安装 Mosquitto MQTT Broker 以及一些有用的客户端工具
2.2 安装 MQTT 客户端库
- 如果您还需要安装 MQTT 客户端库,如 Paho-MQTT,以便与 MQTT 服务器交互,可以使用 pip:
pip3 install paho-mqtt
3. MQTT 相关命令
注意:
- 这些命令都需要管理员权限,因此需要使用
sudo
- 在不同的 Linux 发行版上,服务管理工具可能有所不同。在较新的系统上,推荐使用
systemctl
,而在一些老旧的系统上,可能需要使用service
3.1 使用 systemctl
推荐用于新版系统,如 Ubuntu 16.04 及以后版本
- 启动 Mosquitto 服务
sudo systemctl start mosquitto
- 停止 Mosquitto 服务
sudo systemctl stop mosquitto
- 重启 Mosquitto 服务
sudo systemctl restart mosquitto
- 查看 Mosquitto 服务状态
sudo systemctl status mosquitto
- 设置 Mosquitto 服务在启动时自动运行
sudo systemctl enable mosquitto
- 禁用 Mosquitto 服务自动启动
sudo systemctl disable mosquitto
3.2 使用 service
老旧系统上的方法
- 启动 Mosquitto 服务
sudo service mosquitto start
- 停止 Mosquitto 服务
sudo service mosquitto stop
- 重启 Mosquitto 服务
sudo service mosquitto restart
- 查看 Mosquitto 服务状态
sudo service mosquitto status
4. 实现 MQTT 的 python 编程
4.1 需求
- 实现雅虎RSS生活类新闻的订阅功能
- 客户端可以通过订阅主题(
/rssnews
),获取最新的10条新闻消息(只包括新闻标题和时间)
4.2 代码实现
4.2.1 发布者
# mqtt 服务端
import time
from datetime import datetime
import feedparser
import paho.mqtt.client as mqtt
import pytz
# RSS源URL和MQTT Broker地址
BASE_URL = "https://news.yahoo.co.jp/rss/categories/life.xml"
BROKER_ADDRESS = "localhost"
UPDATE_INTERVAL = 60 # 更新间隔时间(秒)
def MQTTPublish():
# 创建一个 MQTT 客户端实例
client = mqtt.Client("NewsPublisher")
# 连接到 MQTT 服务器
client.connect(BROKER_ADDRESS)
print("MQTT 客户端已连接到服务器.")
# 持续运行,定期发布新闻
while True:
news_data = get_latest_news(BASE_URL)
client.publish("/rssnews", news_data)
print(f"已发布最新新闻到 /rssnews,等待 {UPDATE_INTERVAL} 秒后重新发布.")
time.sleep(UPDATE_INTERVAL)
# 获取最新的十条新闻
def get_latest_news(url):
feed = feedparser.parse(url)
formatted_news_list = ""
for index, item in enumerate(feed.entries[:10], start=1):
title = item.title
pub_date = convert_to_beijing_time(item.published)
formatted_news_list += f"{index}. 标题: {title}\n发布时间: {pub_date}\n\n"
return formatted_news_list
# 转换时间格式
def convert_to_beijing_time(gmt_time_str):
"""
将国际标准时间 (GMT/UTC) 的日期时间字符串转换为北京时间字符串
:param gmt_time_str: 包含日期时间的国际标准时间 (GMT/UTC) 字符串
:return: 包含日期时间的北京时间字符串,格式为"年-月-日 时:分"
"""
# 创建一个时区对象,用于表示中国北京时区
beijing_tz = pytz.timezone('Asia/Shanghai')
# 将日期时间字符串解析为 datetime 对象(原始字符串的格式为 RFC1123)
gmt_time = datetime.strptime(gmt_time_str, "%a, %d %b %Y %H:%M:%S GMT")
# 使用 astimezone 将时区从 UTC 转换为北京时区
beijing_datetime = gmt_time.replace(tzinfo=pytz.utc).astimezone(beijing_tz)
# 使用 strftime 格式化日期时间对象为指定格式
beijing_time_str = beijing_datetime.strftime('%Y年%m月%d日 %H:%M')
return beijing_time_str
if __name__ == '__main__':
MQTTPublish()
4.2.2 接收者
# mqtt 客户端
import paho.mqtt.client as mqtt
# 当客户端成功连接到 MQTT 服务器时调用的回调函数
def on_connect(client, userdata, flags, rc):
print(f"成功连接到 MQTT 服务器,返回码:{rc}")
client.subscribe("/rssnews")
print("已订阅主题 /rssnews")
# 当客户端接收到从 MQTT 服务器发送的消息时调用的回调函数
def on_message(client, userdata, msg):
print(f"从主题 {msg.topic} 收到消息:")
print(msg.payload.decode()) # 解码并打印消息内容
# MQTT 服务器地址
broker_address = "localhost"
# 创建一个 MQTT 客户端实例,并设置回调函数
client = mqtt.Client("NewsSubscriber")
client.on_connect = on_connect
client.on_message = on_message
# 连接到 MQTT 服务器
client.connect(broker_address)
print("正在连接到 MQTT 服务器...")
# 开始循环监听消息
client.loop_forever()
4.3 运行效果
- 发布者
- 接收者