首页 > 编程语言 >Python 客户端类库之paho-mqtt学习总结

Python 客户端类库之paho-mqtt学习总结

时间:2024-09-22 21:13:07浏览次数:11  
标签:类库 调用 mqttc Python publish mqtt paho loop

实践环境

Python 3.9.13

paho-mqtt 2.1.0

简介

Eclipse Paho MQTT Python客户端类库实现了MQTT 协议版本 5.0, 3.1.1, 和3.1。

该类库提供一个客户端类,允许应用连接到MQTT代理并发布消息,订阅主题并检索发布的消息。同时还提供了一个写其它辅助函数,使向MQTT服务器发布一次性消息变得非常简单。

支持 Python 3.7+。

MQTT协议是一种机器对机器(M2M)/“物联网”连接协议。它被设计为一种极其轻量级的发布/订阅消息传输,对于需要小代码占用和/或网络带宽非常昂贵的远程连接非常有用。

安装

pip install paho-mqtt

已知限制

以下是已知的未实现的MQTT功能。

clean_sessionFalse时,会话仅存储在内存中,不会持久化。这意味着当客户端重新启动时(不仅仅是重新连接,通常是因为程序重新启动而重新创建对象),会话就会丢失。这可能会导致消息丢失。

客户端会话的以下部分丢失:

  • 已从服务器接收到但尚未完全确认的 QoS 2 消息。

    由于客户端会盲目确认任何PUBCOMP(QoS 2 事务的最后一条消息),因此它不会挂起,但会丢失此 QoS 2 消息。

  • 已发送到服务器但尚未完全确认的 QoS 1 和 QoS 2 消息。

    这意味着传递给 publish()的消息可能会丢失。这可以通过让传递给 publish() 的所有消息都有相应的on_publish() 调用或使用wait_for_publish来缓解。

    这也意味着代理在会话中可能有 QoS2 消息。由于客户端从一个空会话开始,它不知道它,并将重用mid。这还没有解决。

此外,当clean_sessionTrue时,此类库将在网络重新连接时重新发布 QoS > 0消息。这意味着 QoS > 0消息不会丢失。但标准规定,我们应该丢弃发送发布包的任何消息。设置为True意味着不符合标准,QoS 2 可能会被接收两次。

如果只需要一次交付的 QoS 2 保证,则应设置clean_session=False

用法与API

API详细在线文档:https://eclipse.dev/paho/files/paho.mqtt.python/html/client.html

示例:https://github.com/eclipse/paho.mqtt.python/tree/master/examples

开始

下面是一个非常简单的示例,它订阅代理$SYS主题树并打印出结果消息:

# -*- coding:utf-8 -*-

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, reason_code, properties):
    '''客户端从服务器接收到 CONNACK 响应时的回调'''

    print(f"Connected with result code {reason_code}")  # 成功连接时 reason_code 值为 Success

    # 在on_connect()中执行订阅操作,意味着如果应用失去连接并且重新连接后,订阅将被续订。
    if reason_code == 'Success':
        client.subscribe('$SYS/#')

def on_disconnect(client, userdata, flags, reason_code, properties):
    print(f'Disconnected with result code {reason_code}')


def on_message(client, userdata, msg):
    '''从服务器收到 PUBLISH 消息时的回调。'''
    print(msg.topic + ' ' + str(msg.payload)) # 输出值形如 $SYS/broker/version b'mosquitto version 2.0.18'

mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_message = on_message
 
# client.username_pw_set('testacc', 'test1234') # 设置访问账号和密码

mqttc.connect("mqtt.eclipseprojects.io", 1883, 60)

# 阻塞调用,处理网络流量、分派回调和处理重新连接
# 有其它提供线程接口和手动接口的loop*()函数可用
mqttc.loop_forever()

说明:

  1. Client.username_pw_set(username: str | None, password: str | None = None*) → None

    为代理身份验证设置用户名和密码(可选)。

    必须在connect()之前调用才能生效。需要支持MQTT v3.1或更高版本的代理。

    • 参数:
      • username – 要进行身份验证的用户名。需要与客户端id没有关系。必须是字符串[MQTT-3.1.3-11]。设置为“None”可将客户端重置为不使用用户名/密码进行代理身份验证。
      • password – 用于身份验证的密码。可选,如果不需要,则设置为None。如果为字符串r,那么它将被编码为UTF-8。
  2. Client.connect(host: str, port: int = 1883, keepalive: int = 60, bind_address: str = '', bind_port: int = 0, clean_start: bool | Literal[3] = 3, properties: Properties|None = None) → MQTTErrorCode
    

    连接到远程代理。这是一个阻塞调用,用于建立底层连接并传输CONNECT数据包。请注意,在收到并处理CONNACK之前,连接状态不会更新(这需要一个正在运行的网络循环,请参阅loop_start, loop_forever, loop…)).
    参数

    • host– 远程代理的主机名或IP地址。

    • port– 要连接的服务器主机的网络端口。默认为1883。请注意,SSL/TLS上MQTT的默认端口是8883,因此如果使用TLS_set()可能需要提供端口。

    • keepalive - 设置心跳的时间,单位是秒。这个值告诉MQTT客户端,在没有接收到任何通信的情况下,多久应该发送一个PING请求给服务器,以保持连接,默认60秒。

    • clean_start -(仅限MQTT v5.0)TrueFalseMQTT_CLEAN_START_FIRST_ONLY。总是设置MQTT v5.0 clean_start标志、从不或仅在第一次成功连接时。设置clean_start标志后,MQTT会话数据(如未完成的消息和订阅)在成功连接时被清除。对于MQTT v3.1.1,Clientclean_session参数应用于类似的结果。

    • properties (Properties) –( 仅仅限MQTT v5.0)需要在MQTT连接包发送的的MQTT v5.0 属性。

客户端(Client)

Client类一般使用流程如下:

  1. 创建客户端实例
  2. 使用connect*() 函数之一连接到代理
  3. 调用其中一个loop*()函数来维护代理的网络流量
  4. 使用subscribe()订阅主题并接收消息
  5. 使用publish()将消息发布到代理
  6. 使用disconnect()断开与代理的连接

将调用回调以允许应用程序根据需要处理事件。这些回调如下所述。

网络循环

这些功能是Client背后的驱动力。如果它们没有被调用,传入的网络数据将不会被处理,传出的网络数据也不会被发送。管理网络环路有四种选择。这里描述了三个,第四个在下面的“外部事件循环支持”中描述。不要混合使用不同的loop函数。

loop_start() / loop_stop()
mqttc.loop_start()

while True:
    temperature = sensor.blocking_read()
    mqttc.publish("paho/temperature", temperature)

mqttc.loop_stop()

这些函数实现了网络循环的线程接口。在connect*()之前或之后调用loop_start()一次,会在后台运行一个线程来自动调用loop()。这释放了主线程,用于可能阻塞的其他工作。此调用还处理与代理的重新连接。调用loop_stop() 以停止后台线程。如果调用disconnect(),循环也会停止。

loop_forever()
mqttc.loop_forever(retry_first_connection=False)

这是网络循环的阻塞形式,在客户端调用disconnect()之前不会返回(即调用mqttc.disconnect()后会停止阻塞,继续运行其后的代码)。它会自动处理重新连接。
除了使用connect_async时的第一次连接尝试外,使用retry_first_connection=True 使其重试第一次连接。
警告:这可能会导致客户端保持连接到不存在的主机而不会出现失败。

loop()
run = True
while run:
    rc = mqttc.loop(timeout=1.0)
    if rc != 0:
        # need to handle error, possible reconnecting or stopping the application

定期调用以处理网络事件。此调用触发select()等待,直到网络套接字可用于读取或写入,如果套接字可用,则处理流入/流出的数据。此函数最多阻塞timeout秒。timeout不能超过客户端的keepalive值,否则代理会定期断开客户端的连接。
使用这种循环,需要自己处理重新连接策略。

回调

与paho-mqtt交互的接口包括各种回调,当发生某些事件时,类库会调用这些回调。

回调是在代码中定义的函数,用于实现对这些事件要求的操作。这可能只是打印收到的消息,也可能是更复杂的行为。

回调API是有版本的,所选版本是我们提供给客户端构造函数的CallbackAPIVersion。目前支持两个版本:

  • CallbackAPIVersion.VERSION1:这是paho-mqtt 2.0版本之前使用的历史版本。它是在引入CallbackAPIVersion之前使用的API。此版本已弃用,将在paho-mqtt 3.0版本中删除。
  • CallbackAPIVersion.VERSION2:此版本在协议MQTT 3.x和MQTT 5.x之间更为一致。它也更适用于MQTT 5.x,因为reason_code和属性始终在可获取时提供。建议所有用户升级到此版本。强烈建议MQTT 5.x用户使用。

存在以下回调:

  • on_connec():当收到代理返回CONNACK时被调用。调用可能是针对被拒绝的连接,请检查reason_code以查看连接是成功还是被拒绝。
  • on_connect_fail():当TCP连接建立失败时,由loop_forever()loop_start()调用。当直接使用connect()reconnect()时,不会调用此回调。它仅由loop_start()loop_forever()制造的自动(重新)连接后被调用
  • on_disconnect():当连接关闭时被调用。
  • on_message():收到代理返回的MQTT消息时被调用。
  • on_publish():当MQTT消息发送到代理时被调用。取决于QoS级别,回调在不同时刻被调用:
    • 对于QoS==0,一旦消息通过网络发送,就会调用它。这可能是在相应的publish()返回之前。
    • 对于QoS==1,当收到代理返回的对应消息的PUBACK时调用它
    • 对于QoS==2,当收到代理返回的对应消息的PUBCOMP时,会调用它
  • on_subscribe():当收到代理返回的SUBACK时被调用
  • on_unsubscribe:当收到代理返回的UNSUBACK时被调用
  • on_log():当类库记录一条消息时被调用
  • onSocket_openonSocket_closeonSocket_register_writeonSocket_unregister_write:用于外部循环支持(External event loop support)的回调。详见下文。

参阅在线文档查看有关每个回调的特征。

订阅示例
# -*- coding:utf-8 -*-

import paho.mqtt.client as mqtt

def on_subscribe(client, userdata, mid, reason_code_list, properties):
    # 由于我们只订阅了一个信道,reason_code_list只包含一个条目
    # print(reason_code_list) #输出: [ReasonCode(Suback, 'Granted QoS 0')]
    if reason_code_list[0].is_failure:
        print(f"Broker rejected you subscription: {reason_code_list[0]}")
    else:
        print(f"Broker granted the following QoS: {reason_code_list[0].value}")

def on_unsubscribe(client, userdata, mid, reason_code_list, properties):
    #注意,reason_code_list仅存在于MQTTv5中,在MQTTv3中,它将始终为空
    if len(reason_code_list) == 0 or not reason_code_list[0].is_failure:
        print("unsubscribe succeeded (if SUBACK is received in MQTTv3 it success)")
    else:
        print(f"Broker replied with failure: {reason_code_list[0]}")
    client.disconnect()

def on_message(client, userdata, message):
    # userdata是我们选择提供的数据结构,这里为一个列表(通过下方的 mqttc.user_data_set([])设置,该函数参数即为userdata参数值
    userdata.append(message.payload)
    # 假设只想处理10条消息
    if len(userdata) >= 10:
        client.unsubscribe("$SYS/#")

def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code.is_failure:
        print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
    else:
        # 应该始终在 on_connect 回调中订阅以确保在重新连接时订阅依旧存在。
        client.subscribe("$SYS/#")

mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.on_unsubscribe = on_unsubscribe

mqttc.user_data_set([]) # 设置 userdata
mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_forever() # 当调用client.disconnect()后继续执行以下代码

print(f"Received the following message: {mqttc.user_data_get()}")
发布示例
# -*- coding:utf-8 -*-

import time
import paho.mqtt.client as mqtt

def on_publish(client, userdata, mid, reason_code, properties):
    '''reason_code和properties将仅出现在MQTTv5中。在MQTTv3中始终未设置
    使用不存在`uncaked_publish`中的`mid`调用`on_publish()`。这是由于不可避免的竞争情形:
    * publish() 返回已发送消息的mid。
    * 主线程将publish()返回的mid添加到uncaked_publish中
    * loop_start线程调用on_publish()
    虽然不太可能(因为on_publish()将在网络往返后调用),但是这是一种可能发生的竞争情形
    避免竞争情形的最佳解决方案是使用publish()中的msg_info。还可以尝试使用已确认的mid列表,而不是从待处理列表中删除
    但是请记住,mid可以重复使用!
    reason_code和properties将仅出现在MQTTv5中。在MQTTv3中始终未设置
    '''
    try:
        userdata.remove(mid)
    except KeyError:
        print("on_publish() is called with a mid not present in unacked_publish")
        print("This is due to an unavoidable race-condition:")
        print("* publish() return the mid of the message sent.")
        print("* mid from publish() is added to unacked_publish by the main thread")
        print("* on_publish() is called by the loop_start thread")
        print("While unlikely (because on_publish() will be called after a network round-trip),")
        print(" this is a race-condition that COULD happen")
        print("")
        print("The best solution to avoid race-condition is using the msg_info from publish()")
        print("We could also try using a list of acknowledged mid rather than removing from pending list,")
        print("but remember that mid could be re-used !")

unacked_publish = set()
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_publish = on_publish

mqttc.user_data_set(unacked_publish)
mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_start()

# 应用生产一些消息
msg_info = mqttc.publish("paho/test/topic", "my message", qos=1)
unacked_publish.add(msg_info.mid)

msg_info2 = mqttc.publish("paho/test/topic", "my message2", qos=1)
unacked_publish.add(msg_info2.mid)

# 等待所有消息被发布
while len(unacked_publish):
    time.sleep(0.1)

# 由于上述描述的竞争状态, 以下等待所有消息发布完成的方式更安全
msg_info.wait_for_publish()
msg_info2.wait_for_publish()

mqttc.disconnect()
mqttc.loop_stop()

说明:

  1. Client.max_inflight_messages_set(inflight: int) → None

    设置一次可以通过其网络流的QoS>0的消息的最大数量(可以简单理解为允许多大数量的QoS>0的消息被同时进行传输处理)。默认值为20。

  2. Client.max_queued_messages_set(queue_size:int)→ Client
    设置传出消息队列中的最大消息数量。0表示无限制。

  3. MQTTMessageInfo.wait_for_publish(timeout: float | None = None) → None

    阻塞,直到与此对象关联的消息被发布,或者直到超时发生。如果timeoutNone,则永远不会超时。将超时设置为正数秒,例如1,2,以启用超时。
    抛出:

    • ValueError–如果消息因传出队列已满而未排队。
    • RuntimeError-如果消息因其他原因未发布。
  4. 实践过程中发现,采用多线程并发发布消息时,如果服务器因为限流的原因不返回消息确认,那么运行一小段时间后,出现消息无法发布成功的情况(不报错,但是消息无法抵达broker),通过合理的参数调用以上三个函数,可以缓解这个问题。

Logger

客户端会发出一些日志消息,这些消息在故障排除过程中可能很有用。启用日志最简单的方法是调用enable_logger()。可以提供自定义记录器或使用默认记录器

示例:

import logging
import paho.mqtt.client as mqtt

logging.basicConfig(level=logging.DEBUG)

mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.enable_logger()

mqttc.connect("mqtt.eclipseprojects.io", 1883, 60)
mqttc.loop_start()

# Do additional action needed, publish, subscribe, ...
[...]

还可以定义一个on_log回调,它将接收所有日志消息的副本。例子:

import paho.mqtt.client as mqtt

def on_log(client, userdata, paho_log_level, messages):
    if paho_log_level == mqtt.LogLevel.MQTT_LOG_ERR:
        print(message)

mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_log = on_log

mqttc.connect("mqtt.eclipseprojects.io", 1883, 60)
mqttc.loop_start()

# Do additional action needed, publish, subscribe, ...
[...]

Paho日志级别和标准日志级别的对应关系如下:

Paho logging
MQTT_LOG_ERR logging.ERROR
MQTT_LOG_WARNING logging.WARNING
MQTT_LOG_NOTICE logging.INFO (no direct equivalent)
MQTT_LOG_INFO logging.INFO
MQTT_LOG_DEBUG logging.DEBUG

外部事件循环支持

为了支持其他网络循环,如asyncio(参见示例),类库公开了一些方法和回调来支持这些用例。

存在以下循环方法:

  • loop_read:应该在套接字可读取时调用。
  • loop_write:应该在套接字可写并且类库需要写入数据时调用。
  • loop_misc:应每隔几秒钟调用一次,以处理消息重试和ping。

用伪代码表示如下:

while run:
    if need_read:
        mqttc.loop_read()
    if need_write:
        mqttc.loop_write()
    mqttc.loop_misc()

    if not need_read and not need_write:
        # But don't wait more than few seconds, loop_misc() need to be called regularly
        wait_for_change_in_need_read_or_write()
    updated_need_read_and_write()

棘手的部分是实现updated_need_read_and_write并等待条件变更。为了支持这一点,存在以下方法:

  • socket:当TCP连接打开时返回socket对象。此调用对于基于select循环特别有用。请参阅examples/loop_select.py

  • want_write():如果有数据等待写入,则返回True。这接近于上述伪代码的need_writew,但还是应该检查套接字是否可写。

  • 回调函数on_socket_*

    • on_socket_open:在套接字打开时调用。
    • on_socket_open:在套接字打开时调用。
    • on_socket_close:当套接字即将关闭时调用。
    • on_socket_register_write:当客户端想要在套接字上写入数据时调用
    • on_socket_unregister_write:当套接字上没有更多数据要写入时调用。

    回调对于事件循环特别有用,在事件循环中,可以注册或注销用于读写的套接字。请参阅examples/loop_asyncio.py 获取示例。

回调总是按以下顺序调用:

  • on_socket_open

  • 0或者更多次:

    • on_socket_register_write
    • on_socket_unregister_write
  • on_socket_close

全局辅助函数

客户端模块还提供了一些全局辅助函数。

topic_matches_sub(sub, topic)可用于检查主题(topic)是否与订阅(subscription)匹配。
例如:

主题foo/bar 将与订阅foo/#+/bar匹配
主题non/matching 将不匹配订阅non/+/+

发布

此模块提供了两个辅助函数single()multiple(),允许以一次性方式直接发布消息。换句话说,它们对于有一个/多个消息要发布到代理,然后断开连接而不需要其他任何东西的情况非常有用。

提供的两个函数是single()multiple()

这两个函数都支持MQTT v5.0,但目前不允许在连接或发送消息时设置任何属性。

Single

发布一条消息到代理,然后彻底断开连接。

例子:

import paho.mqtt.publish as publish

publish.single("paho/test/topic", "payload", hostname="mqtt.eclipseprojects.io")

Multiple

发布多条消息到代理,然后彻底断开连接。

例子:

from paho.mqtt.enums import MQTTProtocolVersion
import paho.mqtt.publish as publish

msgs = [{'topic':"paho/test/topic", 'payload':"multiple 1"},
    ("paho/test/topic", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="mqtt.eclipseprojects.io", protocol=MQTTProtocolVersion.MQTTv5)

订阅

此模块提供了两个辅助函数simple()callback(),以允许直接订阅和处理消息。

这两个函数都支持MQTT v5.0,但目前不允许在连接或发送消息时设置任何属性。

Simple

订阅一组主题并返回收到的消息。这是一个阻塞函数。
例子:

import paho.mqtt.subscribe as subscribe

msg = subscribe.simple("paho/test/topic", hostname="mqtt.eclipseprojects.io")
print("%s %s" % (msg.topic, msg.payload))

使用回调(Callback)

订阅一组主题,并使用用户提供的回调处理收到的消息。

例子:

import paho.mqtt.subscribe as subscribe

def on_message_print(client, userdata, message):
    print("%s %s" % (message.topic, message.payload))
    userdata["message_count"] += 1
    if userdata["message_count"] >= 5:
        # it's possible to stop the program by disconnecting
        client.disconnect()

subscribe.callback(on_message_print, "paho/test/topic", hostname="mqtt.eclipseprojects.io", userdata={"message_count": 0})

参考连接

https://github.com/eclipse/paho.mqtt.python

https://eclipse.dev/paho/files/paho.mqtt.python/html/client.html

标签:类库,调用,mqttc,Python,publish,mqtt,paho,loop
From: https://www.cnblogs.com/shouke/p/18417803

相关文章

  • 使用Postman测试MQTT协议接口
    MQTT概述MQTT(MessageTelemetryTransport)是一种用于物联网(IoT)的消息传递协议。它的使用范围从家庭自动化和可穿戴设备的小型设备到大型工业机械的自动化。它是一种轻量级技术,以发布/订阅模式为模型,其中连接到单个代理的客户端可以将消息发布到不同的主题,并订阅主题以接收来自......
  • appium+python自动化代码示例
    fromselenium.webdriver.common.byimportByimporttime#设置Appium连接参数caps={"platformName":"Android","platformVersion":"7.1.2",#根据夜神模拟器的Android版本进行修改"deviceName":"127.0.0.......
  • 开放食物营养库python SDK套件:openfoodfacts-python
    官网源码:GitHub-openfoodfacts/openfoodfacts-python:......
  • Python语法进阶之路
    一、Python基础1.1注释定义和作用对代码解释说明,增强可读性单行注释#多行注释"""这是一个多行注释""" 1.2变量及变量类型 定义和作用计算机目的是计算,编程是为了更方便计算,计算对象就是变量,可以在程序运行过程中,临时存储数据 变量基本使用变量初始化变......
  • python如何操作mysql
    首先要通过pipinstallpymysql安装一个pymysql模块直接上代码:importpymysql#连接数据库conn=pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test')#创建游标cursor=conn.cursor()#创建一张表#sql="createtablemy......
  • python-爬虫入门
    前言:由于个人负责的运维组,其中有个同事每回在某个项目发版更新后,需手动在k8s容器平台web界面上复制出几百个微服务的名称以及镜像版本等信息,用来更新微服务清单,个人决定抽时间写个爬虫脚本自动完成手动执行的任务。由于公司信息需保密,这里介绍个简单入门的爬虫脚本做为范例。Pyth......
  • 强化学习基础:主要算法框架与Python实现示例
    创作不易,您的打赏、关注、点赞、收藏和转发是我坚持下去的动力!强化学习(ReinforcementLearning,RL)是一种通过与环境交互来学习策略的机器学习方法。RL主要包含以下几个关键组件:状态(State)、动作(Action)、奖励(Reward)、策略(Policy)和价值函数(ValueFunction)。常见的强化学习主流......
  • Python语言的基本要素
    1.Python的语句(1)程序语句中,所有的字符都必须是英文字符(半角),不能是中文字符(全角)。(2)"#"后是注释,注释的内容不执行。("ctr+/"可加多行注释)2.常量(1)常量表示固定不变的数据(值)(2)常见常量有:数值型(整数)、字符串、布尔型(true表示真、false表示假)、空(None表示啥也不是)3.变量(1)变量是......
  • 书生大模型实战(从入门到进阶)L0-Python
    目录Python实现WordCountVscode连接InternStudiodebugdebug单个python文件在vscode使用命令行进行debug本文是对书生大模型L0-Python部分的学习和实现,学习地址如下:学习地址:‬​​​​⁠​​‌⁠‍⁠​‬​​​​​​​⁠‬​​​⁠​​‌​​​​​‍​​​⁠​​​学......
  • 【python】石头剪刀布,模拟十次并统计获胜次数
    解决问题下面是一个使用Python编写的剪刀、石头、布游戏的程序,包含玩家与计算机对战和模拟计算机对战10次的功能。importrandomdefget_computer_choice():  returnrandom.randint(0,2)defget_user_choice():  choice=input("请输入剪刀(0)、石头(1)、布(......