一、问题引入
MQTT使用也有一段时间了,包括同步和异步的使用。
这里根据官方案例和本人的理解,记录以下学习过程。
二、解决过程
简要介绍编写 MQTT Producer的消息发布(异步)过程:
- 第1步:创建客户端
LIBMQTT_API int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context);
- 第2步:设置异步回调函数
LIBMQTT_API int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
- 第3步:设置客户端与服务器的连接选项属性
// (异步)连接属性初始化宏定义
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- 第4步:连接服务器
LIBMQTT_API int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options);
- 第5步:发布响应选项设置
#define MQTTAsync_responseOptions_initializer { {'M', 'Q', 'T', 'R'}, 1, NULL, NULL, 0, 0, NULL, NULL, \
MQTTProperties_initializer, MQTTSubscribe_options_initializer, 0, NULL}
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
// 发布选项设置
pub_opts.onSuccess = onSend;
pub_opts.onFailure = onSendFailure;
pub_opts.context = client;
- 第6步:设置消息发布选项属性
#define MQTTAsync_message_initializer { {'M', 'Q', 'T', 'M'}, 1, 0, NULL, 0, 0, 0, 0, MQTTProperties_initializer }
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.qos = 1;
pubmsg.payload = payload;
pubmsg.payloadlen = len;
pubmsg.retained = 1;
// pubmsg其他设置
- 第7步:发布消息
LIBMQTT_API int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName,
const MQTTAsync_message* msg, MQTTAsync_responseOptions* response);
- 第8步:释放内存
LIBMQTT_API void MQTTAsync_destroy(MQTTAsync* handle);
2-1 数据类型的声明
MQTT 生产者的同步和异步的函数API不同,结构体声明也有所不同。
异步模式下,所有的结构体和函数声明都加上前缀:MQTTAsync
异步连接选项MQTTAsync_connectOptions结构体成员(注意:和同步中的重复部分不再例举):
成员函数 | 范围 | 功能 |
---|---|---|
MQTTAsync_onSuccess* onSuccess |
如果连接成功完成,这个指向回调函数的指针会被调用 | |
MQTTAsync_onFailure* onFailure |
如果连接失败,这个指向回调函数的指针会被调用 | |
int automaticReconnect | 在连接丢失的情况下自动重连 | |
int minRetryInterval | 最小重试间隔时长 | |
int maxRetryInterval | 最大重试间隔时长 | |
MQTTProperties *connectProperties | MQTT v5.0的连接属性 | |
MQTTProperties *willProperties | MQTT v5.0的遗嘱属性 | |
MQTTAsync_onSuccess5* onSuccess5 | 如果连接完成,这个指向回调函数的指针会被调用 | |
MQTTAsync_onFailure5* onFailure5 | 如果连接失败,这个指向回调函数的指针会被调用 |
异步消息结构体MQTTAsync_message和同步消息结构体MQTTClient_message完全相同
MQTTAsync_message结构体的成员:
成员名称 | 取值范围 | 功能 |
---|---|---|
char struct_id[4] | 结构体识别序号 | |
int struct_version | 0~1 | 结构体号码 |
int payloadlen |
消息长度 | |
void* payload |
消息 | |
int qos |
0~2 | 消息质量 |
int retained |
True or False | 消息保留标志位 |
int dup | True or False | 消息副本标志,仅在接收QOS=1的消息时有效 |
int msgid | 消息标识符 | |
MQTTProperties properties | MQTT v5.0关联的消息属性 |
异步消息发布响应选项MQTTAsync_responseOptions结构体成员:
成员函数 | 范围 | 功能 |
---|---|---|
char struct_id[4] | 结构体识别序号 | |
int struct_version | 0~8 | 结构体号码 |
MQTTAsync_onSuccess* onSuccess |
如果连接成功完成,这个指向回调函数的指针会被调用 | |
MQTTAsync_onFailure* onFailure |
如果连接失败,这个指向回调函数的指针会被调用 | |
void* context |
||
MQTTAsync_token token |
从回调函数返回的令牌,可被用于跟踪请求的状态 | |
MQTTAsync_onSuccess5* onSuccess5 | 如果连接完成,这个指向回调函数的指针会被调用 | |
MQTTAsync_onFailure5* onFailure5 | 如果连接失败,这个指向回调函数的指针会被调用 | |
MQTTProperties properties | MQTT v5.0的输入属性 | |
MQTTSubscribe_options subscribeOptions | MQTT v5.0的订阅选项,仅在订阅时才能使用 | |
int subscribeOptionsCount | MQTT v5.0的订阅选项计数,仅在订阅多个才能使用 | |
MQTTSubscribe_options* subscribeOptionsList | MQTT v5.0的订阅选项数组,仅在订阅多个才能使用 |
2-2 回调函数
设置回调函数的原型:
LIBMQTT_API int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
其中包含3个回调函数:
- MQTTAsync_connectionLost* cl :用于处理是否断开连接(如果该值被设置位NULL,那么客户端将无法处理异常断开的情况)
其原型:typedef void MQTTAsync_connectionLost(void* context, char* cause);
实例:
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
-
MQTTAsync_messageArrived* ma :用于处理订阅主题的消息是否到达,这里为生产者无需额外处理。(该值不能设置为NULL,若这时为NULL会直接报错)
其原型:typedef int MQTTAsync_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message);
-
MQTTAsync_deliveryComplete* dc :用于处理消息是否分发完成(若不想检查是否分发成功,可以设置为NULL)
其原型:typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token);
三、反思总结
使用异步发布消息,可以提高发布效率,经过测试:100 条/秒
是可以达到的。
四、参考引用
无
标签:异步,生产者,void,MQTTAsync,int,MQTT,NULL From: https://www.cnblogs.com/caojun97/p/17485960.html