首页 > 系统相关 >Linux环境下使用Eclipse Paho C 实现(MQTT Client)同步模式发布和订阅Message

Linux环境下使用Eclipse Paho C 实现(MQTT Client)同步模式发布和订阅Message

时间:2024-03-24 10:00:27浏览次数:17  
标签:MQTTClient 订阅 Linux Eclipse MQTT Client 消息 rc

目录

概述

1 同步模式和异步模式

1.1 同步模式

1.2 异步模式

2 下载和安装paho.mqtt.c

3 同步方式发布和订阅消息功能实现

3.1 MQTT Client参数配置

3.2 初始化MQTT Client

3.3 发布消息功能

3.4 订阅消息功能

3.5 解析订阅的信息

4 编译和测试

4.1 编译代码

4.2 运行

5 验证MQTT Client功能

5.1 EMQX服务器上查看MQTT Client

5.2 MQTT.fx发布Topic

5.3 MQTT.fx订阅的主题

6 完整代码


概述

本文主要介绍在linux环境(ubuntu)环境下,下载和安装Eclipse Paho C MQTT 软件包,还编写一个范例实现同步发布Message的功能,并使用基于EMQX的服务验证其功能,还是用MQTT.fx订阅消息,已验证发布消息功能的可靠性。

1 同步模式和异步模式

1.1 同步模式

在同步模式下,客户机应用程序在单个线程上运行。使用MQTTClient_publish()和MQTTClient_publishMessage()函数发布消息。要确定QoS1或QoS2(请参阅服务质量)消息已成功交付,应用程必须调用MQTTClient_waitForCompletion()函数。同步发布示例中显示了显示同步发布的示例。在同步模式下接消息使用MQTTClient_receive()函数。客户机应用程序必须相对频繁地调用 MQTTClient_receive()MQTTClient_yield(),以便允许处理确认和MQTT“ping”,从而保持与服务器的网络连接处于活动状态。

总结同步模式应用方法

1)客户机应用程序在单个线程上运行

2)使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息

3)使用MQTTClient_waitForCompletion()确认消息是否发布成功

4)使用MQTTClient_receive()接收消息

5)必须频繁调用MQTTClient_receive()和MQTTClient_yield(),以确认消息

1.2 异步模式

在异步模式下,客户机应用程序在多个线程上运行。主程序调用客户端库中的函数来发布和订阅,就像同步模式一样。但是,握手和维护网络连接的处理是在后台执行的。使用调用MQTTClient_setCallbacks()(参见MQTTClient_messageArrived()、MQTTClient_connectionLost()和MQTTClient_deliveryComplete())向库注册的回调,向客户端应用程序提供状态通知和消息接收。然而,这个API不是线程安全的——在没有同步的情况下,不可能从多个线程调用它。可以为此使用MQTTAsync API来实现这些功能。

总结异步模式应用方法

1)客户机应用程序在多个线程上运行

2)主程序调用客户端库中的函数来发布和订阅,使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息;使用MQTTClient_publishMessage订阅消息

3)使用调用MQTTClient_setCallbacks(),向客户端应用程序提供状态通知和消息接收

异步模式的详细使用范例,参看文章:

Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步订阅Message-CSDN博客

Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步方式发布Message-CSDN博客

2 下载和安装paho.mqtt.c

登录mqtt官网,点击Software,可以看见如下页面,选择Eclipse Paho C进入下载页面

https://mqtt.org/

下载paho.mqtt.c

笔者选择使用命令直接安装该软件包,具体操作步骤如下:

Step -1: 下载软件包执行命令:

git clone https://github.com/eclipse/paho.mqtt.c.git

step-2: 进入paho.mqtt.c目录,执行make

cd paho.mqtt.c
make

系统会自动编译代码,等待编译结果。

编译完成后,会自动生成build文件,这时可以安装

step-3 : 执行如下命令就可以安装软件

sudo make install

3 同步方式发布和订阅消息功能实现

3.1 MQTT Client参数配置

初始化MQTT Client,必须配置一些参数,包括broker的IP地址,订阅的topic等,具体参数如下表所示:

参数功能介绍:

参数名称参数值描述
ADDRESStcp://192.168.1.11:1883mqtt broker的IP地址
CLIENTIDmqtt_ubuntu_asys设备ID
TOPICMQTTAsync发布的Topic
SUBTOPICswitch订阅的Topic
PAYLOAD12.56Topic下的payload
QOS1服务质量等级=1
TIMEOUT10000L超时计数
USERNAMEmqtt_ubuntu_user终端认证username
PASSWORD123456终端认证username对应的password

在代码中定义这些参数的位置:

3.2 初始化MQTT Client

初始化MQTT终端需要完成以下2个步骤:

step-1: 创建MQTT Client

step-2: 连接服务器

具体实现代码如下:

代码66行:创建MQTT Client,需要传入服务器IP和Client ID信息

代码73行: 心跳包时间间隔设置为20s

代码74行: 清除会话 标记设置为1,不接受离线消息

代码75行: 配置设备终端用户

代码76行: 配置设备终端用户password

代码78行: MQTT连接Broker

3.3 发布消息功能

要实现发布消息功能,需要将payload及其相关参数填到MQTTClient_message定义的数据结构中,下面介绍整个public message 函数的功能。

代码39行: 装载payload

代码40行:payload的字符长度

代码41行:消息服务等级参数

代码42行:配置为保留消息

代码44行:使用MQTTClient_publishMessage函数发布消息

代码53行:使用MQTTClient_waitForCompletion等待消息发布完毕

3.4 订阅消息功能

要取消订阅的Topic,调用MQTTClient_unsubscribe函数可以实现该功能。实现范例如下:

3.5 解析订阅的信息

代码69行: 使用MQTTClient_receive接收订阅的消息

代码70行: 通过检测rc的值,并判断topic的值是否有效,以确定是否要解析消息

4 编译和测试

4.1 编译代码

使用如下命令编译代码

 gcc test_03_Synchronous.c -lpaho-mqtt3c -lpthread

4.2 运行

执行.out文件后,可以看见,MQTT Client订阅和发布消息成功了,服务器端收到消息后,token值会自动加1

5 验证MQTT Client功能

5.1 EMQX服务器上查看MQTT Client

在ubuntu上运行MQTT Client后,EMQX服务器会显示MQTT Client的运行状态,登录EMQX服务器可以看见

在订阅管理面板上,也可以看见mqtt_ubuntu_asys订阅了Topic为"switch"

5.2 MQTT.fx发布Topic

使用MQTT.fx发布Topic为switch的消息,Client ID 为mqtt_ubuntu_asys的客户端订阅了该消息,那么当MQTT.fx发布消息之后,mqtt_ubuntu_asys会收到该消息,并在终端上打印出来。

要使用MQTT.fx MQTT Client工具订阅MQTTsync,首先保证MQTT.fx能正常连接至EMQX服务器

使用MQTT.fx发布Topic为switch的消息

{ 
   "switch": false 
}

使用MQTT.fx发布Topic为switch的消息

{ 
    "switch": false 
}

在EMQX的保留信息页面,查看MQTT.fx发布Topic为switch的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端

5.3 MQTT.fx订阅的主题

在EMQX的订阅管理页面,查看MQTT.fx订阅Topic为MQTTsync的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端发布的消息

在MQTT.fx上查看Topic为MQTTsync的消息

6 完整代码

创建test_03_Synchronous.c,编写如下代码:

/***************************************************************
Copyright  2024-2029. All rights reserved.
文件名  : test_03_Synchronous.c
作者    : [email protected]
版本    : V1.0
描述    : mqtt同步发布和订阅消息
日志    : 初版V1.0 2024/03/13
​
***************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
​
#include "MQTTClient.h"
 
#define ADDRESS     "tcp://192.168.1.11:1883"
#define CLIENTID    "mqtt_ubuntu_asys"
#define TOPIC       "MQTTsync"
#define SUBTOPIC    "switch"
​
#define PAYLOAD     "12.56"
#define QOS         1
#define TIMEOUT     10000L
​
#define USERNAME    "mqtt_ubuntu_user"
#define PASSWORD    "123456" 
​
static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
static MQTTClient_message pubmsg = MQTTClient_message_initializer;
static MQTTClient client;
static MQTTClient_deliveryToken deliveredtoken, temptoken;
int count;
​
​
MQTTClient_deliveryToken user_publicMsg( void )
{
    MQTTClient_deliveryToken token;
    int rc;
        
    pubmsg.payload = PAYLOAD;
    pubmsg.payloadlen = (int)strlen(PAYLOAD);
    pubmsg.qos = QOS;
    pubmsg.retained = 1;
    
    if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
    {
         printf("Failed to publish message, return code %d\n", rc);
         exit(EXIT_FAILURE);
    }
 
    printf("Waiting for up to %d seconds for publication of %s\n"
            "on topic %s for client with ClientID: %s\n",
            (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    printf("Message with delivery token %d delivered\n", token);
    
    return token;
}
​
int receive_subMessage( void )
{
    int topicLen;
    int rc;
    char *topic = SUBTOPIC;
    MQTTClient_message *message = NULL;
    
    rc = MQTTClient_receive( client, &topic, &topicLen, &message, TIMEOUT);
    if( rc == MQTTCLIENT_SUCCESS && topic!= NULL ){
        printf("Message arrived \n");
        printf("     topic: %s\n", topic);
        printf("   message: %.*s\n", message->payloadlen, (char*)message->payload);
    }
    printf("MQTTClient receive message, return code %d\n", rc);
    
    return rc;
}
​
​
/* 线程  */
void thread_subMsg(void)
{
    usleep(1000000L);
    count++;
}
​
​
int main(int argc, char* argv[])
{
    int current_cnt;
    pthread_t id;
    int ret;
    int rc;
 
    deliveredtoken = 0;
    if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
    {
         printf("Failed to create client, return code %d\n", rc);
         exit(EXIT_FAILURE);
    }
 
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.username = USERNAME;   //用户名
    conn_opts.password = PASSWORD;   //密码
    
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }
    
    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", 
            SUBTOPIC, CLIENTID, QOS);
            
    if ((rc = MQTTClient_subscribe(client, SUBTOPIC, QOS)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to subscribe, return code %d\n", rc);
        rc = EXIT_FAILURE;
    }
    
    ret = pthread_create(&id,NULL,(void *) thread_subMsg,NULL);
    if(ret!=0){
        printf ("Create pthread error!\n");
        exit (1);
    }
​
    while(1)
    {
        receive_subMessage();
        temptoken = user_publicMsg(); 
        
        if(count != current_cnt )
        {
            current_cnt = count;
            if(temptoken != deliveredtoken){
                deliveredtoken = temptoken;
            }
        }
    }
 
    if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
        printf("Failed to disconnect, return code %d\n", rc);
    MQTTClient_destroy(&client);
    
    return rc;
}
​

标签:MQTTClient,订阅,Linux,Eclipse,MQTT,Client,消息,rc
From: https://blog.csdn.net/mftang/article/details/136706932

相关文章

  • Linux进程查看与杀死进程
    Linux进程查看与杀死进程摘要:在Linux操作系统中,我们经常需要查看当前正在运行的进程,有时也需要结束某个不响应或占用资源过多的进程。本文将详细介绍如何在Linux中使用命令行工具查看和杀死进程。一、进程查看ps命令:ps是"processstatus"的缩写,用于显示系统中的当前进程......
  • 工作中常用到的Linux命令
    系统,用户信息操作相关命令查看主机ip地址ifconfig获取用户信息id修改用户密码passwd查看链接用户who创建新用户账号useradd删除用户账号userdel修改用户账号的属性usermod查看系统发行版本cat/proc/version说明适用于所有版本。示例[root@vps ~]# cat /pro......
  • 【Linux】详谈命令行参数&&环境变量
    目录一、浅谈命令行参数二、环境变量2.1环境变量的内涵以及理解2.2PATH环境变量:2.3输入程序名就能运行我们的程序2.4系统中的环境变量2.5导出环境变量 三、main函数的第三个参数3.1获得环境变量的三种方法 四、本地变量一、浅谈命令行参数        我......
  • 在Linux中,如何判断一个进程是否存活,如果不存活,如何告实现警?
    在Linux中,判断一个进程是否存活并实现告警,可以通过一系列步骤来完成。以下是详细的步骤说明:1.判断进程是否存活使用ps命令:ps命令是Linux中用来查看当前进程状态的常用命令。你可以结合grep来过滤出特定进程的信息。例如,要查看名为"example_process"的进程是否存在,可以运行以下......
  • 在Linux中,有一个文件,10行9列,如何打印最后一列,如何打印最前一列?
    在Linux中,打印文本文件的某一列可以使用awk或cut命令。这里分别演示如何打印一个10行9列文件的最后一列和最前一列:1.打印最后一列:使用awk命令:awk'{print$NF}'filename.txt这里的NF是内部变量,表示当前行的字段数量,所以$NF就是最后一个字段,也就是最后一列。使用cut命令:......
  • 在Linux中,如何查看内核版本?内核版本信息包含什么?
    在Linux中查看内核版本有多种方法,下面列举了几种常用且详细的命令:方法一:uname命令仅查看内核版本:uname-r这个命令会打印出当前系统运行的内核版本号,例如:4.15.0-72-generic。查看详细系统信息:uname-a这个命令会输出所有与内核相关的详细信息,包括内核名称、主机名、......
  • 01.绝对路径和相对路径(Linux基本概念)
    基础认知:        电脑的目录结构是一颗多叉树。不管是Linux还是windows,目录结构都是一样的。所以我们在查找某个目录或者文件的时候,本质就是在多叉树结点的查找。多叉树示例图如下:                ​​​​​​​        ​​​​​​​  ......
  • 在Linux中,内存怎么看?磁盘状态怎么看?
    在Linux系统中,查看内存和磁盘状态主要依靠一系列命令行工具来进行。以下是分别查看内存和磁盘状态的详细说明:1.查看内存状态:free命令free-h#或者free-m用以显示内存使用状况,单位可以是人类友好的KB,MB,GB等这个命令会显示系统的总内存、已使用内存、空闲内存、缓......
  • 在Linux中,什么是Linux操作系统,它的特点是什么?
    Linux操作系统是一种免费使用和自由传播的类UNIX操作系统,其内核最初由林纳斯·本纳第克特·托瓦兹于1991年10月5日首次发布。它主要受到Minix和Unix思想的启发,是一个基于POSIX的多用户、多任务、支持多线程和多CPU的操作系统。Linux操作系统的主要特点如下:开源性:Linux操作系统的......
  • Linux和Windows时间不一致问题
    问题描述装过双系统或者虚拟机装Linux的人都知道,Linux的时间和Windows往往是不同步的,在编写跨平台程序的时候特别是对时间敏感的代码就带来很大的困扰解决办法这个问题可以在Linux下解决先用命令查看时区timedatectl如果系统刚刚装好,没有设置好时区,有可能默认的是America/......