首页 > 系统相关 >CH395+EMQX实现MQTT应用(Windows系统)

CH395+EMQX实现MQTT应用(Windows系统)

时间:2023-12-11 09:55:53浏览次数:45  
标签:socket Windows CH395 len topic TIM MQTT 数据包

  • MQTT协议

1.MQTT简介

MQTT是一种基于 发布/订阅 模式的轻量级消息协议,工作在TCP/IP协议族上。其最大的优点是用极少量的代码和有限的宽带为设备间提供实时可靠的消息服务。在物联网(IOT)和机器与机器(M2M)等方面有较广泛的应用。

2.MQTT特性

2.1 发布/订阅模式,提供一对多的消息发布,方便消息在传感器间传递,解耦Client/Server的模式,不在需要预先知道对端的存在,不用同时运行

2.2 提供服务质量管理(QoS):

  ①Qos0(至多一次):发送方发送的消息,接收方最多能收到一次。这一级别会发生消息丢失或者重复,消息的发布依赖于底层TCP/IP网络。这一级别可用于如传感器数据上发,丢掉一次无所谓的情况,因为之后会二次发送。

  ②Qos1(至少一次):发送方发送的消息,接收方最少能收到一次,因此消息可能会发生重复。

  ③Qos2(只有一次):确保消息只到达一次。这一级别在网络中开销最高,适用于一些对数据严格的场景。

2.3 MQTT有1字节固定报头,2字节心跳报文,可以最小化传输开销,有效减少网络流量。因此非常时候应用在物联网,传感器数据采集和嵌入式设备信息收集等领域。

3.MQTT实现方式

  3.1 使用MQTT协议通讯时,有三种身份:发布者(publish)、代理(Broker/服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息的代理是服务器,消息的发布者同时可以是订阅者。

  3.2 MQTT传输的消息分为主题(Topic)和负载(Payload)两部分:

   主题:即消息的类型,订阅者订阅某主题后,就会收到该主题的消息内容

   负载:即消息的具体内容

   3.3 MQTT客户端:是指一个使用MQTT协议的应用程序或设备,客户端可以:发布其他客户端可能会订阅的信息;订阅其他客户端发布的信息

    MQTT服务器:位于消息发布者和订阅者之间,服务器可以:接收客户端的连接;接收客户端发布的信息;处理客户端的订阅和退订;向订阅者转发消息。

4.MQTT数据包结构

  4.1 MQTT协议中,一个MQTT数据包由固定头、可变头、消息体三部分组成。

固定头(Fixed header):存在所有MQTT数据包中,表示数据包类型。固定头是不虚的,所有MQTT数据包中必须包含固定头。

可变头(Variable header):部分MQTT数据包有,由数据包类型所决定。

消息体(Payload):存在于部分MQTT数据包中,表示客户端收到的具体内容,也是由数据包类型决定。

  4.2 固定头

固定头包含两部分(如下图),首字节为第一字节,剩余消息报文长度是指当前数据包中剩余内容长度的字节数。

 
 
bit
7 6 5 4 3 2 1 0
Byte1 MQTT数据包类型 数据包类型标识
Byte2 剩余长度
    其中,Byte1中的MQTT数据包类型和标识:
数据包类型(说明) 7-4bit值 数据方向 bit3 bit2 bit1 bit0
保留 0000          
CONNECT  (客户端连接服务器) 0001 Client--->Server 0 0 0 0
CONNACK  (连接确认) 0010 Server--->Client 0 0 0 0
PUBLISH    (发布消息) 0011 Client<-->Server DUP Qos Qos RETAIN
PUBACK     (发布确认Qos1) 0100 Client<-->Server 0 0 0 0
PUBREC     (消息已接收  Qos2一阶段) 0101 Client<-->Server 0 0 0 0
PUBREL     (消息释放  Qos2二阶段) 0110 Client<-->Server 0 0 1 0
PUBCOMP  (发布结束  Qos2三阶段) 0111 Client<-->Server 0 0 0 0
SUBSCRIBE(客户端请求订阅) 1000 Client--->Server 0 0 1 0
SUBACK      (服务端确认订阅) 1001 Server--->Client 0 0 0 0
UNSUBACRIBE(客户端取消订阅) 1010 Client--->Server 0 0 1 0

 

        • 如果bit3 DUP位为1,表示数据包是重复的消息;如果为0则表示该数据包是第一次发布
        • 如果 bit1 和 bit2都为0,则表示Qos0;如果 bit1 为1,则表示Qos1;如果 bit2 为1,则表示Qos2;如果 bit1和 bit2都置1,则表示非法消息,会关闭当前连接。

4.3  可变头和消息体的相关报文解析可以查看官方文档

MQTT官方文档:MQTT Version 3.1.1 (oasis-open.org)

  • EMQX具体操作步骤

  1.EMQX下载: https://packages.emqx.net/emqx-ce/v4.3.22/emqx-windows-4.3.22.zip

  2.下载完成后会生成一个emqx文件夹

3.右击开始图标,选择Windows PowerShell(管理员)

4.①使用cd命令进入eqmx文件夹下的bin;

   ②进入bin文件后使用  .\emqx install ,出现成功则表示安装完成。

   ③使用 .\emqx console,会出现一个弹窗,点击确认即可,之后会打初始化一系列使用到的端口,如果最终成功,会出现EMQX is running now!

     5.打开浏览器输入localhost:18083即可进入EMQX登录界面,初始账号为admin,密码为public。

  6.当我们用395成功连接上EMQX服务器后,在客户端列表中能看到我们设备的信息,点击Client ID下的 admin1,能详细的看到我们设备的ip、端口等信息。

   7.在监视器界面能看到具体数据量,在主题界面能看到设备所发布的主题。

  •  相关代码说明

1.MQTT驱动文件

2. CH395.c

 1 /* CH395相关定义 */
 2 const UINT8 CH395IPAddr[4] = {192,168,1,100};                         /* CH395IP地址 */
 3 const UINT8 CH395GWIPAddr[4] = {192,168,1,1};                        /* CH395网关 */
 4 const UINT8 CH395IPMask[4] = {255,255,255,0};                        /* CH395子网掩码 */
 5 
 6 /* socket 相关定义*/
 7 const UINT8  Socket0DesIP[4] = {192,168,1,24};                      /* Socket 0目的IP地址 */
 8 const UINT16 Socket0DesPort = 1883;                                  /* Socket 0目的端口 */
 9 const UINT16 Socket0SourPort = 4000;                                 /* Socket 0源端口 */
10 
11 u8 publishValid = 0;
12 u16 timeCnt = 0;
13 char *username  = "user2";                         //Device name, unique for each device, available "/" for classification
14 char *password  = "user2";                         //Server login password
15 char *sub_topic = "topic_wch";                       //subscribed session name
16 char *pub_topic = "topic_wch";                       //Published session name
17 int pub_qos = 0;                                   //Publish quality of service
18 int sub_qos = 0;                                   //Subscription quality of service
19 char *payload = "WCHNET MQTT";                     //Publish content
20 //有效负载
21 u8 con_flag  = 0;                                  //Connect MQTT server flag
22 u8 pub_flag  = 0;                                  //Publish session message flag/
23 u8 sub_flag  = 0;                                  //Subscription session flag
24 u8 tout_flag = 0;                                  //time-out flag
25 u16 packetid = 5;                                  //package id

程序使用时,需要

①将Socket0DesIP改为相应电脑的本机IP

②username为用户名称,sub_topic为订阅的主题名,pub_topic为发布消息的主题名,pub_qos为发布消息的服务质量,sub_qos为订阅消息的服务质量,payload为负载。

 1 /*********************************************************************
 2  * @fn      TIM2_Init
 3  *
 4  * @brief   Initializes TIM2.
 5  *
 6  * @return  none
 7  */
 8 void TIM2_Init( void )
 9 {
10     TIM_TimeBaseInitTypeDef  TIM_TimeBaseStructure={0};
11 
12     RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM2, ENABLE);
13 
14     TIM_TimeBaseStructure.TIM_Period = SystemCoreClock / 1000000 - 1;
15     TIM_TimeBaseStructure.TIM_Prescaler = WCHNETTIMERPERIOD * 1000 - 1;
16     TIM_TimeBaseStructure.TIM_ClockDivision = 0;
17     TIM_TimeBaseStructure.TIM_CounterMode = TIM_CounterMode_Up;
18     TIM_TimeBaseInit(TIM2, &TIM_TimeBaseStructure);
19     TIM_ITConfig(TIM2, TIM_IT_Update ,ENABLE);
20 
21     TIM_Cmd(TIM2, ENABLE);
22     TIM_ClearITPendingBit(TIM2, TIM_IT_Update );
23     NVIC_EnableIRQ(TIM2_IRQn);
24 }
25 
26 void TIM2_IRQHandler(void) __attribute__((interrupt("WCH-Interrupt-fast")));
27 void TIM2_IRQHandler(void)
28 {
29     if(!publishValid){            //Set the publishValid flag every 20s
30         timeCnt += 10;
31         if(timeCnt > 20000){
32             publishValid = 1;
33             timeCnt = 0;
34         }
35     }
36    // WCHNET_TimeIsr(WCHNETTIMERPERIOD);
37     TIM_ClearITPendingBit(TIM2, TIM_IT_Update);
38 }

定时器设置,20秒给MQTT服务器上传一包数据,数据内容即为  WCHNET MQTT。

 1 /**********************************************************************************
 2 * Function Name  : InitCH395InfParam
 3 * Description    : 初始化CH395Inf参数
 4 * Input          : None
 5 * Output         : None
 6 * Return         : None
 7 **********************************************************************************/
 8 void InitCH395InfParam(void)
 9 {
10     memset(&CH395Inf,0,sizeof(CH395Inf));                            /* 将CH395Inf全部清零*/
11     memcpy(CH395Inf.IPAddr,CH395IPAddr,sizeof(CH395IPAddr));         /* 将IP地址写入CH395Inf中 */
12     memcpy(CH395Inf.GWIPAddr,CH395GWIPAddr,sizeof(CH395GWIPAddr));   /* 将网关IP地址写入CH395Inf中 */
13     memcpy(CH395Inf.MASKAddr,CH395IPMask,sizeof(CH395IPMask));       /* 将子网掩码写入CH395Inf中 */
14 }
15 
16 
17 /**********************************************************************************
18 * Function Name  : InitSocketParam
19 * Description    : 初始化socket
20 * Input          : None
21 * Output         : None
22 * Return         : None
23 **********************************************************************************/
24 void InitSocketParam(void)
25 {
26     memset(&SockInf,0,sizeof(SockInf));                              /* 将SockInf[0]全部清零*/
27     memcpy(SockInf.IPAddr,Socket0DesIP,sizeof(Socket0DesIP));        /* 将目的IP地址写入 */
28     SockInf.DesPort = Socket0DesPort;                                /* 目的端口 */
29     SockInf.SourPort = Socket0SourPort;                              /* 源端口 */
30     SockInf.ProtoType = PROTO_TYPE_TCP;                              /* TCP模式 */
31     SockInf.TcpMode = TCP_CLIENT_MODE;
32 }
33 /**********************************************************************************
34 * Function Name  : CH395SocketInitOpen
35 * Description    : 配置CH395 socket 参数,初始化并打开socket
36 * Input          : None
37 * Output         : None
38 * Return         : None
39 **********************************************************************************/
40 void CH395SocketInitOpen(void)
41 {
42     UINT8 i;
43 
44     /* socket 0为TCP 客户端模式 */
45     CH395SetSocketDesIP(0,SockInf.IPAddr);                           /* 设置socket 0目标IP地址 */
46     CH395SetSocketProtType(0,SockInf.ProtoType);                     /* 设置socket 0协议类型 */
47     CH395SetSocketDesPort(0,SockInf.DesPort);                        /* 设置socket 0目的端口 */
48     CH395SetSocketSourPort(0,SockInf.SourPort);                      /* 设置socket 0源端口 */
49 
50     i = CH395OpenSocket(0);                                          /* 打开socket 0 */
51     mStopIfError(i);                                                 /* 检查是否成功 */
52 
53     i = CH395TCPConnect(0);                                          /* TCP连接 */
54     mStopIfError(i);                                                 /* 检查是否成功 */                                              /* 检查是否成功 */
55 
56 }

因为MQTT协议是基于TCP/IP协议,所以初始化395后,需要设置一个socket为tcp模式。

 1 /*********************************************************************
 2  * @fn      MQTT_Connect
 3  *
 4  * @brief   Establish MQTT connection.
 5  *
 6  * @param   username - user name.
 7  *          password - password
 8  *
 9  * @return  none
10  */
11 void MQTT_Connect(char *username, char *password)
12 {
13     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
14     u32 len;
15     u8 buf[200];
16 
17     data.clientID.cstring = "admin1";
18     data.keepAliveInterval = 2000;
19     data.cleansession = 1;
20     data.username.cstring = username;
21     data.password.cstring = password;
22 
23     len = MQTTSerialize_connect(buf,sizeof(buf),&data);//
24     Transport_SendPacket(buf,len);
25 }

MQTT连接函数

17行 设置设备id,

18行 为心跳间隔,

19行 设置Clean Session标志,当设置为0时,表示创建一个持久对话,在客户端断开连接时,会话仍会保持并保存离线消息,直到会话超时注销。

    当设置成1时,表示创建一个新的临时对话,当客户端断开时,会话自动注销。

(此部分为报文类型CONNECT时,可变头中的相关内容,详细报文内容可以查阅官方手册)

 1 /*********************************************************************
 2  * @fn      MQTT_Subscribe
 3  *
 4  * @brief   MQTT subscribes to a topic.
 5  *
 6  * @param   topic - Topic name to subscribe to.
 7  *          req_qos - quality of service
 8  *
 9  * @return  none
10  */
11 void MQTT_Subscribe( char *topic,int req_qos)
12 {
13     MQTTString topicString = MQTTString_initializer;
14     u32 len;
15     u32 msgid = 1;//
16     u8 buf[200];
17 
18     topicString.cstring = topic;
19     len = MQTTSerialize_subscribe(buf,sizeof(buf),0,msgid,1,&topicString,&req_qos);//
20     Transport_SendPacket(buf,len);//
21 }
22 /*********************************************************************
23  * @fn      MQTT_Unsubscribe
24  *
25  * @brief   MQTT unsubscribe from a topic.
26  *
27  * @param   topic - Topic name to unsubscribe to.
28  *
29  * @return  none
30  */
31 void MQTT_Unsubscribe(char *topic)
32 {
33     MQTTString topicString = MQTTString_initializer;
34     u32 len;
35     u32 msgid = 1;
36     u8 buf[200];
37 
38     topicString.cstring = topic;
39     len = MQTTSerialize_unsubscribe(buf,sizeof(buf),0,msgid,1,&topicString);
40     Transport_SendPacket(buf,len);
41 }
42 /*********************************************************************
43  * @fn      MQTT_Publish
44  *
45  * @brief   MQTT publishes a topic.
46  *
47  * @param   topic - Published topic name.
48  *          qos - quality of service
49  *          payload - data buff
50  *
51  * @return  none
52  */
53 void MQTT_Publish(char *topic, int qos, char *payload)
54 {
55     MQTTString topicString = MQTTString_initializer;
56     u32 payloadlen;
57     u32 len;
58     u8 buf[1024];
59 
60     topicString.cstring = topic;
61     payloadlen = strlen(payload);
62     len = MQTTSerialize_publish(buf,sizeof(buf),0,qos,0,packetid++,topicString,payload,payloadlen);
63     Transport_SendPacket(buf,len);
64 }

上面三个函数分别为订阅主题,取订主题,发布消息的功能函数,直接调用即可。

 1 /**********************************************************************************
 2 * Function Name  : CH395SocketInterrupt
 3 * Description    : CH395 socket 中断,在全局中断中被调用
 4 * Input          : sockindex
 5 * Output         : None
 6 * Return         : None
 7 **********************************************************************************/
 8 void CH395SocketInterrupt(UINT8 sockindex)
 9 {
10     int qos, payloadlen;
11    MQTTString topicName;
12    UINT8  sock_int_socket;
13    UINT8 i;
14    UINT32 len;
15 //   UINT16 tmp;
16    unsigned short packetid;
17    unsigned char retained, dup;
18    unsigned char *payload;
19 
20    sock_int_socket = CH395GetSocketInt(sockindex);                   /* 获取socket 的中断状态 */
21    if(sock_int_socket & SINT_STAT_SENBUF_FREE)                       /* 发送缓冲区空闲,可以继续写入要发送的数据 */
22    {
23    }
24    if(sock_int_socket & SINT_STAT_SEND_OK)                           /* 发送完成中断 */
25    {
26    }
27    if(sock_int_socket & SINT_STAT_RECV)                              /* 接收中断 */
28    {
29        len = CH395GetRecvLength(sockindex);                          /* 获取当前缓冲区内数据长度 */
30      #if CH395_DEBUG
31             printf("receive len = %d\n",len);
32      #endif
33             if(len == 0)return;
34             if(len > 512)len = 512;                                       /* MyBuffer缓冲区长度为512 */
35             CH395GetRecvData(sockindex,len,MyBuffer);                     /* 读取数据 */
36 //            for(tmp =0; tmp < len; tmp++)                                 /* 将所有数据按位取反 */
37 //            {
38 //               MyBuffer[tmp] = ~MyBuffer[tmp];
39 //            }
40             switch(MyBuffer[0] >>4)//判断数据包类型
41             {
42             case CONNACK:
43                 printf("CONNACK\r\n");
44                  con_flag = 1;
45                  MQTT_Subscribe(sub_topic, sub_qos);
46                  break;
47             case PUBLISH:
48                 MQTTDeserialize_publish(&dup,&qos,&retained,&packetid,&topicName,
49                                                     &payload,&payloadlen,MyBuffer,len);
50                 msgDeal(payload, payloadlen);
51                 break;
52 
53             case SUBACK:
54                 sub_flag = 1;
55                 printf("SUBACK\r\n");
56                 break;
57 
58             default:
59 
60                 break;
61             }
62             memset(MyBuffer, 0 ,sizeof(MyBuffer));
63          //   CH395SendData(sockindex,MyBuffer,len);
64 
65    }
66 
67    /*
68    **产生断开连接中断和超时中断时,CH395默认配置是内部主动关闭,用户不需要自己关闭该Socket,如果想配置成不主动关闭Socket需要配置
69    **SOCK_CTRL_FLAG_SOCKET_CLOSE标志位(默认为0),如果该标志为1,CH395内部不对Socket进行关闭处理,用户在连接中断和超时中断时调用
70    **CH395CloseSocket函数对Socket进行关闭,如果不关闭则该Socket一直为连接的状态(事实上已经断开),就不能再去连接了。
71    */
72    if(sock_int_socket & SINT_STAT_CONNECT)                          /* 连接中断,仅在TCP模式下有效*/
73   {
74 //   CH395SetKeepLive(sockindex,1);                               /*打开KEEPALIVE保活定时器*/
75 //   CH395SetTTLNum(sockindex,40);                                /*设置TTL*/
76        MQTT_Connect(username, password);
77        printf("TCP Connect Success\r\n");
78   }
79 
80     if(sock_int_socket & SINT_STAT_DISCONNECT)                        /* 断开中断,仅在TCP模式下有效 */
81     {
82            con_flag = 0;
83             printf("TCP Disconnect\r\n");
84     }
85    if(sock_int_socket & SINT_STAT_TIM_OUT)                           /* 超时中断 */
86    {
87        printf("time out \n");
88        con_flag = 0;
89 //      i = CH395CloseSocket(sockindex);
90 //     mStopIfError(i);
91    }
92 }

在socket中断的接收中断中,判断服务器发给395的数据包的类型,具体协议包类型可以看MQTT协议中的类型表格,在连接中断中发送连接函数,在断开中断和超时中断中将连接标志位置0。

 1     while(1)
 2     {
 3      //   WCHNET_MainTask();
 4         if(CH395_INT_WIRE==RESET)                                    /* 当中断引脚电平变化,进入全局中断 */
 5         {
 6             CH395GlobalInterrupt();
 7         }
 8         if (publishValid == 1) {
 9                     publishValid = 0;
10 
11                     if(con_flag)
12                         {
13 
14                         MQTT_Publish(pub_topic, pub_qos, payload);
15                         }
16         //            if(con_flag) MQTT_Pingreq();                                   //heartbeat packet
17                 }
18     }

在main函数的while循环中,当 publishValid标志为1,则表示20s间隔时间已到,在此条件下如果连接标志 con_flag为1,则上传消息。


 

完整工程链接:

 https://files.cnblogs.com/files/blogs/808422/EXAM_mqtt.zip?t=1702258990&download=true

标签:socket,Windows,CH395,len,topic,TIM,MQTT,数据包
From: https://www.cnblogs.com/wchwchlq/p/17851635.html

相关文章

  • Windows安装JDK
    Windows安装JDK1.安装JDK选择安装目录安装过程中会出现两次安装提示。第一次是安装jdk,第二次是安装jre。建议两个都安装在同一个java文件夹中的不同文件夹中。(不能都安装在java文件夹的根目录下,jdk和jre安装在同一文件夹会出错)(如下图所示)2:安装jdk随意选择目录只需把默认......
  • 在Windows电脑上使用多开工具提升工作效率的方法
    提升工作效率的利器——在Windows电脑上使用多开工具导言:在现代社会,电脑已经成为我们生活和工作中不可或缺的工具。然而,对于一些需要同时处理多个任务的人来说,单一窗口的限制可能会影响工作效率。在这种情况下,多开工具成为一个强大的助手,可以极大地提升工作效率。本文将介绍如何......
  • Hadoop 配置Windows 客户端
    1.根据Hadoop版本下载Windows依赖,并放置到非中文目录下https://github.com/cdarlint/winutils2.配置环境变量HADOOP_HOME->放置的目录地址PATH->追加%HADOOP_HOME%\bin3.测试环境双击winutils.exe,如出现运行错误,则需要安装相关的运行库解决。......
  • Windows 12发布时间曝光!系统需求大幅提高 老电脑恐难更新
    多方消息显示,微软正在准备发布“突破性”的以人工智能为中心的新一代Windows版本,内部代号“HudsonValley”(哈德逊河谷)。WC最新报道称,“HudsonValley”将于2024年下半年推出。微软已经在WindowsInsiderCanary频道中测试下一版本Windows的早期代码和平台工作。据悉,新版Window......
  • Windows提权3
    本次学习通过msf提权当我们进行渗透时,已经成功上传了木马并返回了shell,这个时候为了后续渗透需要对目标机器进行提权当前用户为admin,我们想要提权到system最高权限当目标是Windowsserver2003,管理员用户运行木马时,直接使用getsystem提权  切换到普通用户运行木马,则无法直......
  • 多开软件对Windows电脑上的网络性能有何影响?
    多开软件对Windows电脑上的网络性能的影响随着计算机和互联网的普及,人们对网络性能的要求越来越高。在使用Windows电脑时,我们经常需要同时打开多个应用程序,这就需要考虑到多开软件对网络性能的影响。本文将探讨多开软件对Windows电脑上的网络性能可能产生的影响,并提出一些优化建......
  • Redis和Springboot在Windows上面设置开机启动的方法
    Redis和Springboot在Windows上面设置开机启动的方法背景同事遇到一个问题Windows晚上自动更新服务然后第二天Springboot开发的程序没有启动起来.所以基于此想创建一个开机启动的服务设置很早之前自己研究过Winsw等工具但是感觉对springboot比较复杂的应用使用起来比......
  • 仿windows12网盘,私有云盘部署教程,支持多种网盘
    仿windows12网盘,私有云盘部署教程,支持多种网盘资源宝分享:www.httple.net宝塔部署方式:1.验证是否安装jdk,没有安装请看安装教程推荐安装jdk8(注意您是yum还是apt安装自行选择)一、登录ssl终端获取jdk版本yumsearchjava|grepjdkapt-cachesearchjava8二、执行安装命令yum......
  • Windows 11 cmd命令行修改背景色、设置指定图片、桌面背景
    前言全局说明Windows11cmd命令行修改背景色、设置指定图片、桌面背景一、找到设置--外观可以自定义图片,也可以使用桌面背景图片(二选一)如果设置图片位置或高、宽,没有达到你想要的,可以在“拉伸模式”、“图像对齐”设置二、设置不透明度1.设置背景100%透明度效果......
  • Windows 11命令提示符cmd,默认路径修改
    前言全局说明cmd命令提示符终端,启动默认目录是当前用户的目录下。为了方便使用,默认修改成其他路径一、cmd默认路径默认路径是%USERPROFILE%,如果为空“就使用父路径”二、设置成环境变量值路径%TEMP%设置后,记得点保存效果:如何获取系统变量和系统变量列表:https://......