首页 > 其他分享 >MQTT再学习 -- 安装MQTT客户端及测试

MQTT再学习 -- 安装MQTT客户端及测试

时间:2023-04-03 20:04:27浏览次数:59  
标签:include -- msg MQTT mosquitto lFirst NULL 客户端


上一篇文章我们已经讲了 MQTT 服务器的搭建,参看:MQTT再学习 -- 搭建MQTT服务器及测试

接下来我们看一下 MQTT 客户端。

一、客户端下载

首先,客户端也有多种,我们需要面临选择了。

参看:基于mqtt的消息推送(三)客户端实现

现有客户端sdk分析,基本分为两大类:一类移植自C类库,如Mosquitto,一类是用objc或者swift原生实现。
各种sdk对比如下,我选用的是MQTT-Client,使用swift的同学可以使用CocoaMQTT,这个sdk的作者同时也是服务端实现emqtt的作者。

Name

Type

Programming Language

Code

Paho

Original

C

Open-Source. Eclipse project

IBM

Original

C

Close Source. IBM SDK

Mosquitto

Original

C

Open-Source. Eclipse project

MQTTKit

Wrapper (Mosquitto)

Objective-C

Open-Source. Github

Marquette

Wrapper (Mosquitto)

Objective-C

Open-Source. Github

Moscapsule

Wrapper (Mosquitto)

Swift

Open-Source. Github

Musqueteer

Wrapper (Mosquitto)

Objective-C

MQTT-Client

Native

Objective-C

Open-Source. Github

MQTTSDK

Native

Objective-C

CocoaMQTT

Native

Swift

Open-Source. Github


我们上一篇就用到了 Mosquitto 就是它了。

如果你下载的是上面这个链接里的,比如说  org.eclipse.mosquitto-1.4.8.tar.gz 

安装的时候和上一篇文章是一样的。

不过遇到出现两个问题:

参看:mosquitto 使用时出现的一些问题及其解决办法

xsltproc:命令未找到
解决方法:sudo apt-get install xsltproc

make[1]: 正在进入目录 `/home/tarena/project/MQTT/org.eclipse.mosquitto-1.4.8/man'
xsltproc mosquitto.8.xml
warning: failed to load external entity "/usr/share/xml/docbook/stylesheet/docbook-xsl/manpages/docbook.xsl"
compilation error: file manpage.xsl line 3 element import
xsl:import : unable to load /usr/share/xml/docbook/stylesheet/docbook-xsl/manpages/docbook.xsl
compilation error: file mosquitto.8.xml line 4 element refentry
xsltParseStylesheetProcess : document is not a stylesheet
make[1]: *** [mosquitto.8] 错误 5
make[1]:正在离开目录 `/home/tarena/project/MQTT/org.eclipse.mosquitto-1.4.8/man'
make: *** [docs] 错误 2
解决方法:
sudo apt-get install docbook-xsl

二、自写客户端

看完客户端源码(其实先写的这个,源码还没看...),那么接下来我们自己来写一个客户端。

首先讲一下会用到的一些 C/UINX 的知识点,当然你要是基础很扎实再好不过。

这里用到了 消息队列、线程、fgets、共享库等一些知识点。


看到这里,不由得庆幸一下,看来我用了一年时间总结基础知识是没有白费的,虽然很多已经忘的差不多了。但是回过头来,再看就明白了。特么,我都佩服我自己,一篇文章写这么多,我都快看不下去了。

参看:UNIX再学习 -- XSI IPC通信方式

参看:UNIX再学习 -- 线程

参看:C语言再学习 -- 输入/输出

参看:UNIX再学习 -- 静态库与共享库

(1)自写源码

言归正传。 先将我写的源码贴出。当然是阉割版的,其中的 PM2.5 上一篇文章也提到了采集器不同协议也不一样。并且我手头没有,我木有什么办法呀啊。所以只能发送接收一个 hello world ,向世界问声好了。

#include <sys/types.h>
#include <sys/msg.h>
#include <sys/time.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <stdbool.h>
#include <string.h>
#include <pthread.h>
#include "mosquitto.h"
#include "net_zslf.h"
#include <string.h>
#include <pthread.h>
#include <sys/wait.h>
#include <sys/types.h>

//const char *mqtt_broker_address = "192.168.x.xx"; /* mqtt_broker ip address */
int mqtt_broker_port = 1883; /* mqtt_broker port number */
long msgtype = 10; /* pm sensor message type */
int msgsize = 100; /* pm sensor message size */

int main(int argc, char** argv) 
{
	int gflags;
	int msgid;
	key_t key;
	pthread_t thread1, thread2;
	int ret;
	/* struct msqid_ds msg_ginfo, msg_sinfo; */
	char *msgpath = "home/tarena/project/MQTT/test/a.txt";
	  
	//消息队列的 键
	key = ftok(msgpath, 'a');
	gflags = IPC_CREAT;
        //创建消息队列
	msgid = msgget(key, gflags | 00666);
        if(msgid == -1)
        {
        DUG_PRINTF("msg create error\n");
		return -1;
        }
	/* msg_stat(msgid,msg_ginfo); */
	//创建消息队列发送线程
        ret = pthread_create(&thread1, NULL, &start_thread_msgsend, (void *)&msgid);
	if (ret != 0) {
    	perror("pthread msgsend create error\n");
    	return -1;
	}
        //创建消息队列接收线程
        ret = pthread_create(&thread2, NULL, &start_thread_msgrcv, (void *)&msgid);
	if (ret != 0){
        perror("pthread msgrcv create error\n");
    	return -1;
	}
	//线程等待 
        pthread_join(thread1, NULL);
	pthread_join(thread2, NULL);
	return 0;
}

//消息队列接收线程
void *start_thread_msgrcv(void *arg)
{
        int rflags = 0;
        int ret;
	int msgid = *(int *)(arg);
        MSG_data_buf msg_rbuf;  //消息队列类型
	struct mosquitto *mosq; //保存一个MQTT客户端连接的所有信息
	//下面的代码是从xml文件中读取
	FILE *fp; 
	char szFileBuff[1024] = {0};
	char serverADDR[16] = {0},devID[15] = {0},devName[15] = {0};
	char Longitude[15] = {0},Latitude[15] = {0},frequency[3] = {0};
	char *lFirst, *lEnd; 
	char devInfo[70];

	FILE *fp_re;
	char buffer_re[4];
	//打开xml文件 
	fp = fopen("/home/tarena/project/MQTT/test/deviceCfg.xml","r"); 
	if (fp==NULL) 
	{ 
	 	DUG_PRINTF("read XML file error!\n"); 
	} 
	//你只要知道while里面是获取xml信息的,至于这种操作有点6 
	while(fgets(szFileBuff, 1023, fp)) 
	{ 
		if ((lFirst = strstr(szFileBuff, "<serverADDR>")) != NULL) 
		{ 
			lEnd = strstr(lFirst + 1, "</serverADDR>"); 
			memcpy(serverADDR, lFirst + 12, lEnd - lFirst - 12); 
		} 
		if ((lFirst = strstr(szFileBuff, "<devID>")) != NULL) 
		{ 
			lEnd = strstr(lFirst + 1, "</devID>"); 
			memcpy(devID, lFirst + 7, lEnd - lFirst - 7); 
		 } 
		if ((lFirst = strstr(szFileBuff, "<devName>")) != NULL) 
		{ 
			lEnd = strstr(lFirst + 1, "</devName>"); 
			memcpy(devName, lFirst + 9, lEnd - lFirst - 9); 
		} 
		if ((lFirst = strstr(szFileBuff, "<Longitude>")) != NULL) 
		{ 
			lEnd = strstr(lFirst + 1, "</Longitude>"); 
			memcpy(Longitude, lFirst + 11, lEnd - lFirst - 11); 
		} 
		if ((lFirst = strstr(szFileBuff, "<Latitude>")) != NULL) 
		{ 
			lEnd = strstr(lFirst + 1, "</Latitude>"); 
			memcpy(Latitude, lFirst + 10, lEnd - lFirst - 10); 
		}
		//下面这个语句是用于分频率传送数据的
		if ((lFirst = strstr(szFileBuff, "<frequency>")) != NULL) 
		{ 
			lEnd = strstr(lFirst + 1, "</frequency>"); 
			memcpy(frequency, lFirst + 11, lEnd - lFirst - 11); 
		} 
		if ((lFirst = strstr(szFileBuff, "</display>")) != NULL) 
		{ 
			sprintf(devInfo, "&%s&%s&%s&%s&\n",devID,devName,Longitude,Latitude);
		 } 

	}
	fclose(fp);

	//这里是关键了,MQTT协议 要上演了  这部分为 pub 发布内容
	//MQTT 库初始化
	mosquitto_lib_init();
	//新建
	mosq = mosquitto_new(devID, true, NULL);
	//连接回调设置
	mosquitto_connect_callback_set(mosq, my_connect_callback);
	//断开回调设置
	mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
	//发布回调设置
	mosquitto_publish_callback_set(mosq, my_publish_callback);
	//MQTT连接 
	if(mosquitto_connect(mosq, serverADDR, mqtt_broker_port, 600) != MOSQ_ERR_SUCCESS)
	{
		DUG_PRINTF("mosquitto connection error\n");
		//销毁
		mosquitto_destroy(mosq);
		//清空
		mosquitto_lib_cleanup();
	}
	//这里开始 消息队列接收消息
	while (1)
	{
		//接收消息
		ret=msgrcv(msgid, &msg_rbuf, msgsize, msgtype, rflags);
		sleep(1);
		if (ret == -1)
		{
			DUG_PRINTF("read msg error\n");
		}
		DUG_PRINTF("%s\n", msg_rbuf.mtext);
		//将接收到的信息发布
		mosquitto_publish(mosq, NULL, "pmsensor", ret, (void *)msg_rbuf.mtext, 0, 0);
		sleep(5);
	}
	/* should never run below */
	//销毁
	mosquitto_destroy(mosq);
	//清空
	mosquitto_lib_cleanup();
	return NULL;
}

void *start_thread_msgsend(void *arg)
{
        /* 这里写 PM.25 采集 */
	int retval;	
	MSG_data_buf msg_sbuf; //消息队列类型
	int msgid = *(int *)(arg);
	int sflags=IPC_NOWAIT;  
	msg_sbuf.mtype = msgtype;
	strcpy(msg_sbuf.mtext,"hello world");
	//消息队列发送
	retval = msgsnd(msgid, &msg_sbuf,msgsize, sflags);
	if(retval == -1)		
	{
		DUG_PRINTF("message send error\n");	
	}
}

(2)源码分析

简单来看一下上面的代码

首先创建了一个消息队列

//消息队列的 键
	key = ftok(msgpath, 'a');
	gflags = IPC_CREAT;
    //创建消息队列
	msgid = msgget(key, gflags | 00666);
    if(msgid == -1)
    {
        DUG_PRINTF("msg create error\n");
		return -1;
    }

然后创建了两个线程

//创建消息队列发送线程
    ret = pthread_create(&thread1, NULL, &start_thread_msgsend, (void *)&msgid);
	if (ret != 0) {
    	perror("pthread msgsend create error\n");
    	return -1;
	}
    //创建消息队列接收线程
    ret = pthread_create(&thread2, NULL, &start_thread_msgrcv, (void *)&msgid);
	if (ret != 0){
        perror("pthread msgrcv create error\n");
    	return -1;
	}

先看一下发送消息队列线程

strcpy(msg_sbuf.mtext,"hello world");
//消息队列发送
retval = msgsnd(msgid, &msg_sbuf,msgsize, sflags);

无非将 hello world 替换成采集到的PM2.5数据。通过消息队列函数 msgsnd 发送。

再看一下接收消息队列线程


打开xml文件,从 xml 文件中获取信息


fp = fopen("/home/tarena/project/MQTT/test/deviceCfg.xml","r"); 
if (fp==NULL) 
{ 
	 DUG_PRINTF("read XML file error!\n"); 
}

然后 while循环里的获取信息,居然还有这种操作...  

然后就是关键了,MQTT 协议要上演了

//MQTT 库初始化
mosquitto_lib_init();
//新建
mosq = mosquitto_new(devID, true, NULL);
//连接回调设置
mosquitto_connect_callback_set(mosq, my_connect_callback);
//断开回调设置
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
//发布回调设置
mosquitto_publish_callback_set(mosq, my_publish_callback);

接收消息队列、发布内容

//接收消息
ret=msgrcv(msgid, &msg_rbuf, msgsize, msgtype, rflags);
//将接收到的信息发布
mosquitto_publish(mosq, NULL, "pmsensor", ret, (void *)msg_rbuf.mtext, 0, 0);

最后销毁清空 MQTT

//销毁
mosquitto_destroy(mosq);
//清空
mosquitto_lib_cleanup();

(3)项目下载

将 deviceCfg.xml 里的 IP 地址换成你自己的,服务器IP还是不能暴露滴。

下载:MQTT 客户端

(4)演示

MQTT再学习 -- 安装MQTT客户端及测试_客户端

MQTT再学习 -- 安装MQTT客户端及测试_消息队列_02


标签:include,--,msg,MQTT,mosquitto,lFirst,NULL,客户端
From: https://blog.51cto.com/u_15979522/6167170

相关文章

  • FFmpeg再学习 -- FFmpeg+SDL+MFC实现图形界面视频播放器
    继续看雷霄骅的课程资料-基于FFmpeg+SDL的视频播放器的制作最后一篇,主要是想学一下MFC创建和配置。一、创建MFC工程文件->新建->项目->VisualC++->MFC应用程序应用程序类型,选择基于对话框生成效果如下:二、设置控件找到“工具箱”,就可以将相应的控件拖拽至应用程序对话框......
  • 图像和流媒体 -- Sapera 安装遇到的问题
    一、下载安装包参看:GenieNanoM1930-NIR点击软件及例程下载二、安装遇到的问题(1)Installationdirectorymustbeonalocalharddrive解决方法:clsicacls%temp%/reset/T/Q/Cpause以上文件复制到txt中将后缀名修改为bat以管理员执行即可。windows自身权限的的问题。(2)安......
  • Problem B. Harvest of Apples 组合数求和(莫队没怎么看懂)
    ProblemB.HarvestofApplesTimeLimit:4000/2000MS(Java/Others)    MemoryLimit:262144/262144K(Java/Others)TotalSubmission(s):3775    AcceptedSubmission(s):1450 ProblemDescriptionTherearenapplesonatree,numberedfrom1ton.Count......
  • MQTT再学习 -- 漫谈MQTT协议
    MQTT服务器搭建我们已经完成了,现在回过头来看协议。参看:MQTT官网参看:MQTT_V3.1_Protocol_Specific参看:MQTT协议中文版参看:MQTT协议中文版上面这几篇文章,已经说明了一切。下面着重讲一下MQTT的消息格式和主要特征。一、什么是MQTT首先你要知道什么是MQTT。额,这个很重要。官网是......
  • 日常生活小技巧 -- 惠普 Windows10 进入安全模式
    今天手贱,是真的很贱。将用户模式从管理员组改为标准用户方法是:WIN+R打开controluserpasswords2然后出现了用户账户控制,你要允许此应用对你的设备进行更改吗?最关键的是没有“是”选项。试了各种方法都不成功。比如网上说的进入命令提示符(管理员)或者更改用户账户控制设置,都会弹出......
  • IDEA Spring-boot 使用@Component注解的工具类,用@Autowired注入 @Service或者@Reposit
    IDEASpring-boot使用@Component注解的工具类,用@Autowired注入@Service或者@Repository会空指针(使用@PostContruct)原文链接:https://blog.csdn.net/ld_secret/article/details/104627597/使用idea编译器时,对于spring-boot的项目,大都使用注解,那么:一、现象:@Component标注的U......
  • 2019牛客暑期多校训练营(第四场) K numbers
    链接:https://ac.nowcoder.com/acm/contest/884/K?&headNav=acm&headNav=acm来源:牛客网 题目描述300iqlovesnumberswhoaremultipleof300.Onedayhegotastringconsistedofnumbers.Hewantstoknowhowmanysubstringsinthestringaremultiplesof300whe......
  • 电梯演讲和原型展示(医学文献)
    .各位领导:当前医疗行业中医学文献检索的痛点:大量的医学文献依靠工作人员人工阅读记录并筛选,效率低下,且造成巨大的人力消耗与浪费。对于工作人员有极高的专业判断水平要求。常规系统的检索功能无法识别到医学影像图片中的文字,导致大量的重要医学文献与信息无法及时提供。我们的......
  • 图像和流媒体 -- I 帧,B帧,P帧,IDR帧的区别
    参看:什么是I帧,P帧,B帧参看:H264编码原理以及I帧B帧P帧一、H246简介  H264是新一代的编码标准,以高压缩高质量和支持多种网络的流媒体传输著称,在编码方面,我理解的他的理论依据是:参照一段时间内图像的统计结果表明,在相邻几幅图像画面中,一般有差别的像素只有10%以内的点,亮度差值变......
  • mp4v2再学习 -- H264视频编码成MP4文件
    一、H264视频编码成MP4文件参看:H264视频编码成MP4文件参看:mp4v2在VS2010下的编译与在项目中的使用最近做项目需要将H264文件封装为mp4文件,从网上找到了MP4V2库,下载下来后不知道从何下手,官方网站https://code.google.com/p/mp4v2/在windows下的编译过程介绍的很简短,对刚刚开始使用VS......