一、问题引入
官方给出了MQTT Client的异步订阅的例子,对于消息的订阅就无需讲究什么同步了。
二、解决过程
2-1 MQTT 订阅者程序流程
- 第一步:创建客户端
LIBMQTT_API int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context);
- 第二步:设置异步回调函数
LIBMQTT_API int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
- 第三步:设置客户端与服务器的连接选项属性
/* 连接属性的宏初始化 */
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- 第四步:连接服务器
LIBMQTT_API int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options);
- 第五步:主题消息的处理和连接的异常处理
void onConnect(void* context, MQTTAsync_successData* response);
void onConnectFailure(void* context, MQTTAsync_failureData* response);
void onSubscribe(void* context, MQTTAsync_successData* response);
void onSubscribeFailure(void* context, MQTTAsync_failureData* response);
void onDisconnectFailure(void* context, MQTTAsync_failureData* response);
void onDisconnect(void* context, MQTTAsync_successData* response);
/* 连接的异常处理 */
void connlost(void *context, char *cause);
/* 主题消息的处理 */
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
- 第六步:断开连接并释放内存
LIBMQTT_API int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options);
LIBMQTT_API void MQTTAsync_destroy(MQTTAsync* handle);
2-2 订阅者数据结构
发布者和订阅者的异步模式共用一套数据结构,具体参考MQTT 生产者(异步)代码解读
2-3 订阅者的回调函数
- 处理连接异常的函数
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
if (cause)
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
- 处理订阅主题消息的函数
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
此处对比一下发布者--处理发布主题消息的函数
int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
{
/* not expecting any messages */
return 1;
}
发布者仅将主题消息发送出去,不期待任何到来的消息。
这时你可能要问:“订阅者的订阅主题函数呢?”,是的,此时它该登场了。它包含在 onConnect函数中
,如下所示
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
printf("Successful connection\n");
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
opts.context = client;
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
finished = 1;
}
}
三、反思总结
应用扩展
若出现 Broker 和 Subscriber 断开连接,如何实现它们之间的重连呢?
其实,MQTT Lib
已经考虑了这样的情形,在 MQTTAsync_connectOptions conn_opts
结构中添加了数据成员:
- automaticReconnect
- minRetryInterval
- maxRetryInterval
在连接Broker之前,设置自动重连回调函数,自动重连函数的具体实现
标签:异步,订阅,void,MQTTAsync,MQTT,context,rc,NULL,opts From: https://www.cnblogs.com/caojun97/p/17585056.html