首页 > 其他分享 >MQTT 生产者(异步)代码解读

MQTT 生产者(异步)代码解读

时间:2023-06-20 09:45:27浏览次数:317  
标签:异步 生产者 void MQTTAsync int MQTT NULL

一、问题引入

MQTT使用也有一段时间了,包括同步和异步的使用。

这里根据官方案例和本人的理解,记录以下学习过程。

二、解决过程

简要介绍编写 MQTT Producer的消息发布(异步)过程:

  • 第1步:创建客户端
LIBMQTT_API int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
		int persistence_type, void* persistence_context);
  • 第2步:设置异步回调函数
LIBMQTT_API int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
									 MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
  • 第3步:设置客户端与服务器的连接选项属性
// (异步)连接属性初始化宏定义
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}

MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  • 第4步:连接服务器
LIBMQTT_API int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options);
  • 第5步:发布响应选项设置
#define MQTTAsync_responseOptions_initializer { {'M', 'Q', 'T', 'R'}, 1, NULL, NULL, 0, 0, NULL, NULL, \
MQTTProperties_initializer, MQTTSubscribe_options_initializer, 0, NULL}

MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;

// 发布选项设置
pub_opts.onSuccess = onSend;
pub_opts.onFailure = onSendFailure;
pub_opts.context = client;
  • 第6步:设置消息发布选项属性
#define MQTTAsync_message_initializer { {'M', 'Q', 'T', 'M'}, 1, 0, NULL, 0, 0, 0, 0, MQTTProperties_initializer }

MQTTAsync_message pubmsg = MQTTAsync_message_initializer;

pubmsg.qos = 1;
pubmsg.payload = payload;
pubmsg.payloadlen = len;
pubmsg.retained = 1;
// pubmsg其他设置
  • 第7步:发布消息
LIBMQTT_API int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, 
        const MQTTAsync_message* msg, MQTTAsync_responseOptions* response);
  • 第8步:释放内存
LIBMQTT_API void MQTTAsync_destroy(MQTTAsync* handle);

2-1 数据类型的声明

MQTT 生产者的同步和异步的函数API不同,结构体声明也有所不同。

异步模式下,所有的结构体和函数声明都加上前缀:MQTTAsync

异步连接选项MQTTAsync_connectOptions结构体成员(注意:和同步中的重复部分不再例举):

成员函数 范围 功能
MQTTAsync_onSuccess* onSuccess 如果连接成功完成,这个指向回调函数的指针会被调用
MQTTAsync_onFailure* onFailure 如果连接失败,这个指向回调函数的指针会被调用
int automaticReconnect 在连接丢失的情况下自动重连
int minRetryInterval 最小重试间隔时长
int maxRetryInterval 最大重试间隔时长
MQTTProperties *connectProperties MQTT v5.0的连接属性
MQTTProperties *willProperties MQTT v5.0的遗嘱属性
MQTTAsync_onSuccess5* onSuccess5 如果连接完成,这个指向回调函数的指针会被调用
MQTTAsync_onFailure5* onFailure5 如果连接失败,这个指向回调函数的指针会被调用

异步消息结构体MQTTAsync_message和同步消息结构体MQTTClient_message完全相同
MQTTAsync_message结构体的成员:

成员名称 取值范围 功能
char struct_id[4] 结构体识别序号
int struct_version 0~1 结构体号码
int payloadlen 消息长度
void* payload 消息
int qos 0~2 消息质量
int retained True or False 消息保留标志位
int dup True or False 消息副本标志,仅在接收QOS=1的消息时有效
int msgid 消息标识符
MQTTProperties properties MQTT v5.0关联的消息属性

异步消息发布响应选项MQTTAsync_responseOptions结构体成员:

成员函数 范围 功能
char struct_id[4] 结构体识别序号
int struct_version 0~8 结构体号码
MQTTAsync_onSuccess* onSuccess 如果连接成功完成,这个指向回调函数的指针会被调用
MQTTAsync_onFailure* onFailure 如果连接失败,这个指向回调函数的指针会被调用
void* context
MQTTAsync_token token 从回调函数返回的令牌,可被用于跟踪请求的状态
MQTTAsync_onSuccess5* onSuccess5 如果连接完成,这个指向回调函数的指针会被调用
MQTTAsync_onFailure5* onFailure5 如果连接失败,这个指向回调函数的指针会被调用
MQTTProperties properties MQTT v5.0的输入属性
MQTTSubscribe_options subscribeOptions MQTT v5.0的订阅选项,仅在订阅时才能使用
int subscribeOptionsCount MQTT v5.0的订阅选项计数,仅在订阅多个才能使用
MQTTSubscribe_options* subscribeOptionsList MQTT v5.0的订阅选项数组,仅在订阅多个才能使用

2-2 回调函数

设置回调函数的原型:

LIBMQTT_API int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
  								 MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);

其中包含3个回调函数:

  • MQTTAsync_connectionLost* cl :用于处理是否断开连接(如果该值被设置位NULL,那么客户端将无法处理异常断开的情况)
    其原型:typedef void MQTTAsync_connectionLost(void* context, char* cause);

实例:

void connlost(void *context, char *cause)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;

	printf("\nConnection lost\n");
	printf("     cause: %s\n", cause);

	printf("Reconnecting\n");
	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start connect, return code %d\n", rc);
		finished = 1;
	}
}
  • MQTTAsync_messageArrived* ma :用于处理订阅主题的消息是否到达,这里为生产者无需额外处理。(该值不能设置为NULL,若这时为NULL会直接报错)
    其原型:typedef int MQTTAsync_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message);

  • MQTTAsync_deliveryComplete* dc :用于处理消息是否分发完成(若不想检查是否分发成功,可以设置为NULL)
    其原型:typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token);

三、反思总结

使用异步发布消息,可以提高发布效率,经过测试:100 条/秒是可以达到的。

四、参考引用

标签:异步,生产者,void,MQTTAsync,int,MQTT,NULL
From: https://www.cnblogs.com/caojun97/p/17485960.html

相关文章

  • MQTT 生产者(同步)代码解读
    一、问题引入官方给出了MQTTClient的同步和异步发布的例子,本随笔就是同步发布的example。同步和异步都有一套API函数和结构体。同步发布消息算是最简单的案例了,这里总结一下代码。二、解决过程简要介绍编写MQTTProducer的消息发布(同步)过程:第1步:创建客户端LIBMQTT_APIi......
  • 异步操作的方法和技术
    异步操作是一种编程模式,用于处理那些可能耗时的任务,以确保应用程序在执行这些任务的同时能够继续响应其他操作。下面是一些常见的异步操作的方法和技术:回调函数(Callbacks):这是一种传统的异步编程模式,其中函数在完成任务后调用预定义的回调函数。回调函数允许在异步操作完成后执......
  • std::thread 四:异步(async)
     *:如果std::async中传递参数std::lunnch::deferred,就需要等待调用get()或者wait()才会执行,并且代码非子线程运行,而是在主线程中执行 #include<iostream>#include<thread>#include<mutex>#include<list>#include<future>usingnamespacestd;intmyThre......
  • 后台用异步线程调用的场景与常用方式
    一.异步执行的场景:完成业务后,发短信、发邮件、微信公众号等消息推送提示的功能,可以采用异步执行。在导入数量量过大等情况下,可以使用异步导入的方式,提高导入时间等。...等等二.实现的方式:1.springboot中,进行线程池配置,然后用@Async标识异步执行方法即可,如下:(需要注意的@Enable......
  • IU5365具有NTC及电池过放电电压保护功能,3A异步降压型铅酸电池充电管理IC
    IU5365E是一款异步降压型铅酸电池充电管理IC。集成功率MOS,具有最大3A的充电电流能力,充电电流可以通过外部电阻灵活可调。IU5365E通过设置合适的外围电阻,具有对电池的NTC保护功能。IU5365E通过外部电阻,可独立调节过充电压。IU5365E具有完善的保护功能,包括输入欠压和过压保护、电池充......
  • c#中tcp异步
    usingSystem;usingSystem.Collections.Generic;usingSystem.ComponentModel;usingSystem.Data;usingSystem.Drawing;usingSystem.Linq;usingSystem.Net.Sockets;usingSystem.Text;usingSystem.Threading;usingSystem.Threading.Tasks;usingSystem.Windows.Forms;n......
  • MQTT Broker 比较与选型——开源与商业服务器/服务对比
    MQTTBroker比较与选型——开源与商业服务器/服务对比  编程  2020-03-20  2020-03-21  评论数: 2开源MQTTBroker对比截止2021年,物联网行业里可选的MQTTBroker有很多,除了经典的Mosquitto和AWS、Azure,百度云、阿里云、IBM等几个提供物联网MQTT接入服务的产品外......
  • log4xx/log4j异步日志配置示例
    <?xmlversion="1.0"encoding="UTF-8"?><!DOCTYPElog4j:configurationSYSTEM"log4j.dtd"><log4j:configurationxmlns:log4j='http://jakarta.apache.org/log4j/'debug="false"><append......
  • 资源异步释放问题
    问题SQLSession上缓存了10MB内存,用于加速当前Session上的一些操作。但是,当租户资源紧张、Session数量较多时,这10MB的内存就成了负担。我们观察到,此时虽然Session数量多,但是实际活跃的Session却不多。所以,可以引入一种异步回收内存的机制来释放非活跃Session上的内存......
  • 异步编程 asynico、async、await最佳实践
    使用异步函数:Asynico是为了处理异步操作而设计的,因此使用异步函数而不是同步函数是最佳实践之一。使用async关键字将函数定义为异步函数,并使用await关键字来等待异步操作的结果。示例:importasyncioasyncdefmy_async_function():#异步操作awaitasyncio.sleep(1)......