首页 > 其他分享 >MQTT 订阅者(异步)代码解读

MQTT 订阅者(异步)代码解读

时间:2023-08-02 12:11:07浏览次数:51  
标签:异步 订阅 void MQTTAsync MQTT context rc NULL opts

一、问题引入

官方给出了MQTT Client的异步订阅的例子,对于消息的订阅就无需讲究什么同步了。

二、解决过程

2-1 MQTT 订阅者程序流程

  • 第一步:创建客户端
LIBMQTT_API int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
		int persistence_type, void* persistence_context);
  • 第二步:设置异步回调函数
LIBMQTT_API int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
									 MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
  • 第三步:设置客户端与服务器的连接选项属性
/* 连接属性的宏初始化 */
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}

MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;

  • 第四步:连接服务器
LIBMQTT_API int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options);
  • 第五步:主题消息的处理和连接的异常处理
void onConnect(void* context, MQTTAsync_successData* response);
void onConnectFailure(void* context, MQTTAsync_failureData* response);

void onSubscribe(void* context, MQTTAsync_successData* response);
void onSubscribeFailure(void* context, MQTTAsync_failureData* response);

void onDisconnectFailure(void* context, MQTTAsync_failureData* response);
void onDisconnect(void* context, MQTTAsync_successData* response);

/* 连接的异常处理 */
void connlost(void *context, char *cause);

/* 主题消息的处理 */
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
  • 第六步:断开连接并释放内存
LIBMQTT_API int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options);

LIBMQTT_API void MQTTAsync_destroy(MQTTAsync* handle);

2-2 订阅者数据结构

发布者和订阅者的异步模式共用一套数据结构,具体参考MQTT 生产者(异步)代码解读

2-3 订阅者的回调函数

  • 处理连接异常的函数
void connlost(void *context, char *cause)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;

	printf("\nConnection lost\n");
	if (cause)
		printf("     cause: %s\n", cause);

	printf("Reconnecting\n");
	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	conn_opts.onSuccess = onConnect;
	conn_opts.onFailure = onConnectFailure;
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start connect, return code %d\n", rc);
		finished = 1;
	}
}
  • 处理订阅主题消息的函数
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
    printf("Message arrived\n");
    printf("     topic: %s\n", topicName);
    printf("   message: %.*s\n", message->payloadlen, (char*)message->payload);
    MQTTAsync_freeMessage(&message);
    MQTTAsync_free(topicName);
    return 1;
}

此处对比一下发布者--处理发布主题消息的函数

int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
{
	/* not expecting any messages */
	return 1;
}

发布者仅将主题消息发送出去,不期待任何到来的消息。

这时你可能要问:“订阅者的订阅主题函数呢?”,是的,此时它该登场了。它包含在 onConnect函数中,如下所示

void onConnect(void* context, MQTTAsync_successData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
	int rc;

	printf("Successful connection\n");

	printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
	opts.onSuccess = onSubscribe;
	opts.onFailure = onSubscribeFailure;
	opts.context = client;
	if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start subscribe, return code %d\n", rc);
		finished = 1;
	}
}

三、反思总结

应用扩展

若出现 Broker 和 Subscriber 断开连接,如何实现它们之间的重连呢?

其实,MQTT Lib 已经考虑了这样的情形,在 MQTTAsync_connectOptions conn_opts 结构中添加了数据成员:

  • automaticReconnect
  • minRetryInterval
  • maxRetryInterval

在连接Broker之前,设置自动重连回调函数,自动重连函数的具体实现

标签:异步,订阅,void,MQTTAsync,MQTT,context,rc,NULL,opts
From: https://www.cnblogs.com/caojun97/p/17585056.html

相关文章

  • 异步实现邮件发送
    问题描述:在写接口的时候,遇到一个问题,前端要求直接返回结果再去运行其他代码。问题分析:因为经费紧张,本次使用的是网易发送邮件,也就是用你的账号给其他人发送邮件这种,这也存在一些问题,就是不能在短时间内大量发送邮件信息,于是我加了一个定时器。限制为每半分钟发送一次,后台在......
  • java中使用异步方式调用接口@Async
    @Async使用:1、首先在启动类上开启注解@EnableAsync2、然后需要异步操作的方法上加上@Async*/publicclassAsyncTest{@Asyncpublicvoidtest()throwsInterruptedException{//做处理Thread.sleep(1000);}/**如果需要返回值的话,通过AsyncResult进行封装*/@AsyncpublicF......
  • 微信小程序如何清除订阅消息消息列表
    问题: 如果订阅授权框选择【总是保持以上选择】,无论是“允许”还是“拒绝”,都会进入:小程序右上角--->管理--->订阅消息的通知管理列表里,并且再次调用wx.requestSubscribeMessage()不会再弹出授权框。但是这样对于开发阶段不是很友好,那开发阶段怎么清除通知管理列表呢?wx.......
  • Unity 将UnityWebRequest改为async/await异步
    花了一点时间,对UnityWebRequest进行了简单封装,使用起来更方便一些,顺便实现了post接口轮询、重试的功能usingSystem;usingSystem.Collections;usingSystem.Collections.Generic;usingSystem.Runtime.CompilerServices;usingSystem.Text;usingSystem.Threading.Tasks;......
  • C#中Socket编程,异步实现Server端定时发送消息
    在最近项目需求中,要求服务端定时向客服端发送消息。由于客户端从机的特性,只能接收Server发送的消息后回复,不能主动向服务端发送消息。起初,并未使用异步的方法进行编程,使用了Accept()、Revice()等方法。由于从机不能主动发送消息的特性,并未考虑到从机断电不能接收到Server消息的情......
  • ESP01S刷MQTT固件
    刷个ESP01S把我卡了将近一天,特难受,记录一下正确的烧录固件方式。之前刷了固件总是没一点返回值(输入AT\r\n),只是在电源重接时才会接收到一些乱码,总以为是CH340接出的3.3V电压不稳,换成PL2303HXD串口也没是一样没反应,改成用MCU里提供的电源照样没反应,其实就是刷固件的某些细节弄......
  • AJAX--关于什么情况下使用同步或异步
    一、什么是异步?什么是同步1.ajax请求1和ajax请求2,同步并发,就是异步2.ajax请求1和ajax请求2,只要发生等待就是同步二、异步或者同步代码上的实现xhr1.open("请求方式","url",false)第三个参数为false时,表示ajax请求1不支持异步,也就是说在ajax请求1发送之后,会影响其他ajax请......
  • 异步通信点灯
    目录前言一、串口通信二、异步通信和同步通信三、异步通信的端口四、实现异步通信1.设置引脚2.设置异步通信3.完整代码4.编写测试代码5.串口发送字符串6.通过串口控制LED亮灭五、使用中断串口通信来控制LED1.打开串口的中断六、上位机总结前言前面我们学习了PWM定时器脉冲来进行......
  • springboot整合mqtt 消费端
    用到的工具:EMQX,mqttx,idea工具使用都很简单,自己看看就能会。订阅端config代码:packagecom.example.demo.config;importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.*;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;imp......
  • Android多线程及异步处理问题
    1、问题提出1)为何需要多线程?2)多线程如何实现?3)多线程机制的核心是啥?4)到底有多少种实现方式?2、问题分析1)究其为啥需要多线程的本质就是异步处理,直观一点说就是不要让用户感觉到“很卡”。eg:你点击按钮下载一首歌,......