因项目需要,IoT平台需要支持vc++2008接入。因为Paho的c++客户端不支持低版本vc++,所以不得不尝试通过c语言的库实现。
类库下载
从github下载c语言包。例如: eclipse-paho-mqtt-c-win32-1.3.12.zip
https://github.com/eclipse/paho.mqtt.c/releases
类库整合和配置
解压出来的c语言类库包可以看到很多库文件,详情如下图所示。不同的包适用于不同的场景,本案列用了异步客户端,所以用的是paho-mqtt3a这个系列。
配置步骤:
- 拷贝"paho-mqtt3a.dll"到vc++项目的根路径。
- 拷贝"paho-mqtt3a.lib"到vc++项目的"lib"目录(或其他自定义目录)。
- 拷贝"include"目录里面的头文件到vc++项目的"mqtt"目录(或其他自定义目录)。
- 在其他include目录的设置上, 把上一步的包含了mqtt头文件的".\mqtt"目录相对路径输入。 Project(right click)->Properties->Configuration Properties->C/C++->Additional Include Directories
- 在链接配置上, 输入mqtt库文件的相对路径"lib\paho-mqtt3a.lib"。 Project(right click)->Properties->Configuration Properties->Linker->Input->Additional Dependencies
客户端帮助类
这个帮助类会把MQTT客户端封装成单例。为了提高可靠性,启用了buffer消息文件持久化和断线自动重连。
MqttHelper.h
#include "MQTTAsync.h" #pragma once #define TIMEOUT 10000L #define QOS 2 #define ADDRESS "mqtt://xxxx.com:1883" #define USER "demo" #define PASSWORD "demo" class MqttHelper { private: // Private constructor to prevent instantiation MqttHelper() { Connected = false; } MqttHelper(const MqttHelper&); // Disable copy constructor MqttHelper& operator=(const MqttHelper&); // Disable assignment operator MQTTAsync client; public: bool Connected; static MqttHelper& getInstance() { static MqttHelper instance; return instance; } public: bool Connect(); void Close(); bool Subcribe(char* topic); bool Publish(char* topic, void*payload, int payloadLength); bool Publish(char* topic, void*payload, int payloadLength, int qos); int OnMsgReceived(void *context, char *topicName, int topicLen, MQTTAsync_message *message); void OnMsgPublished(void* context, MQTTAsync_token token); void OnConnected(void* context, MQTTAsync_successData* response); private: char* GetClientId(); };
MqttHelper.cpp
#include "StdAfx.h" #include "MqttHelper.h" #include <windows.h> void OnSuccessCallback(void* context, MQTTAsync_successData* response) { MqttHelper* mqttHelper = static_cast<MqttHelper*>(context); mqttHelper->OnConnected(context, response); } int OnMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message){ MqttHelper* mqttHelper = static_cast<MqttHelper*>(context); return mqttHelper->OnMsgReceived(context, topicName, topicLen, message); } void OnDeliveryComplete(void* context, MQTTAsync_token token){ MqttHelper* mqttHelper = static_cast<MqttHelper*>(context); mqttHelper->OnMsgPublished(context, token); } bool MqttHelper::Connect(){ if (Connected){ return true; } MQTTAsync_create(&client, ADDRESS, GetClientId(), MQTTCLIENT_PERSISTENCE_DEFAULT, NULL); if (MQTTAsync_setCallbacks(client, client, NULL, OnMessageArrived,OnDeliveryComplete) != MQTTASYNC_SUCCESS) { return false; } MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 0; conn_opts.username = USER; conn_opts.password = PASSWORD; conn_opts.automaticReconnect = 1; conn_opts.onSuccess = OnSuccessCallback; conn_opts.MQTTVersion = MQTTVERSION_3_1_1; int rc = MQTTAsync_connect(client, &conn_opts); if (rc != MQTTASYNC_SUCCESS) { return false; } Connected = true; return true; } void MqttHelper::Close(){ if (client != NULL) { MQTTAsync_disconnect(client, NULL); MQTTAsync_destroy(&client); } } bool MqttHelper::Subcribe(char* topic){ int rc = MQTTAsync_subscribe(client, topic, QOS, NULL); if (rc != MQTTASYNC_SUCCESS) { return false; } return true; } bool MqttHelper::Publish(char* topic, void*payload, int payloadLength){ return Publish(topic, payload, payloadLength, QOS); } bool MqttHelper::Publish(char* topic, void*payload, int payloadLength, int qos){ MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pub_msg = MQTTAsync_message_initializer; pub_msg.payload = payload; pub_msg.payloadlen = payloadLength; pub_msg.qos = qos; pub_msg.retained = 0; int rc = MQTTAsync_sendMessage(client, topic, &pub_msg, &pub_opts); return rc == MQTTASYNC_SUCCESS; } char* MqttHelper::GetClientId(){ TCHAR computerName[MAX_COMPUTERNAME_LENGTH + 1]; DWORD size = sizeof(computerName) / sizeof(computerName[0]); if (!GetComputerName(computerName, &size)){ lstrcpy(computerName, _T("unknown")); } TCHAR path[MAX_PATH]; TCHAR* processName; if (GetModuleFileName(NULL, path, sizeof(path)) != 0) { processName = _tcsrchr(path, '\\'); if (processName != NULL) processName++; // Move past the backslash else processName = path; } TCHAR clientId[MAX_PATH]; lstrcpy(clientId, computerName); lstrcat(clientId, _T(".")); lstrcat(clientId, processName); int charArrayLength = WideCharToMultiByte( CP_ACP, // Code page for the conversion 0, // Conversion flags (0 for default behavior) clientId, // Source TCHAR array -1, // Length of the source TCHAR array (-1 for null-terminated) NULL, // Destination char array 0, // Size of the destination char array NULL, // Default character used for conversion errors (optional) NULL // Indicates if the function used a default character (optional) ); if (charArrayLength > 0) { char* charArray = new char[charArrayLength]; WideCharToMultiByte( CP_ACP, // Code page for the conversion 0, // Conversion flags (0 for default behavior) clientId, // Source TCHAR array -1, // Length of the source TCHAR array (-1 for null-terminated) charArray, // Destination char array charArrayLength, // Size of the destination char array NULL, // Default character used for conversion errors (optional) NULL // Indicates if the function used a default character (optional) ); return charArray; } return "unknown"; } int MqttHelper::OnMsgReceived(void *context, char *topicName, int topicLen, MQTTAsync_message *message){ MQTTAsync client = (MQTTAsync)context; printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: %.*s\n", message->payloadlen, (char*)message->payload); MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void MqttHelper::OnMsgPublished(void* context, MQTTAsync_token token){ printf("Message arrived\n"); printf(" token: %d\n", token); } void MqttHelper::OnConnected(void* context, MQTTAsync_successData* response){ printf("client connected\n"); MQTTAsync client = (MQTTAsync)context; //subcribe topic /*int rc = MQTTAsync_subscribe(client, "demo/in", QOS, NULL); if (rc != MQTTASYNC_SUCCESS) { printf("subscribe success\n"); } else { printf("subscribe failed\n"); }*/ }把以上代码整合到你的项目,然后就可以通过调用"MqttHelper::getInstance().Publish("demo/test", byteData, strlen(byteData))"来发送消息了。
用MQTTX辅助调试
- 下载并安装MQTTX。
https://mqttx.app/ - 创建连接到MQTT broker.
- 连接并订阅主题"demo/#"。
转载请注明出处: https://i.cnblogs.com/posts/edit;postId=17611371