首页 > 其他分享 >MQTT详解以及实际操作

MQTT详解以及实际操作

时间:2024-05-19 10:52:49浏览次数:24  
标签:实际操作 client MQTT 详解 消息 import org 客户端

目录

1 MQTT

1.1 MQTT介绍

1.1.1 简介

MQTT 全称(Message Queue Telemetry Transport):一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。
MQTT是一种基于发布/订阅模式的轻量级通讯协议,该协议构建在TCP/IP协议上。 MQTT最大的有点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,MQTT在物联网、小型设备、移动应用等方面有广泛应用。
MQTT协议将消息的发布者(publisher)与订阅者(subscriber)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ有点类似。
图片

TCP协议位于传输层,MQTT 协议位于应用层,MQTT 协议构建于TCP/IP协议上,也就是说只要支持TCP/IP协议栈的地方,都可以使用MQTT协议。

1.1.2 特点和应用

特点:

  • 开放消息协议,简单易实现
  • 发布订阅模式,一对多消息发布
  • 基于TCP/IP网络连接,提供有序,无损,双向连接
  • 2字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量
  • 消息QoS支持,可靠传输保证

应用:

  • 物联网M2M通信,物联网大数据采集
  • Android消息推送,WEB消息推送
  • 智能硬件、智能家具、智能电器
  • 车联网通信,电动车站桩采集
  • 智慧城市、远程医疗、远程教育
  • 电力、石油与能源等行业市场

1.1.3 为什么要用 MQTT协议

MQTT 协议为什么在物联网(IOT)中如此受偏爱?而不是其它协议,比如我们更为熟悉的 HTTP协议呢?
首先HTTP协议它是一种同步协议,客户端请求后需要等待服务器的响应。而在物联网(IOT)环境中,设备会很受制于环境的影响,比如带宽低、网络延迟高、网络通信不稳定等,显然异步消息协议更为适合IOT应用程序。
HTTP是单向的,如果要获取消息客户端必须发起连接,而在物联网(IOT)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。
通常需要将一条命令或者消息,发送到网络上的所有设备上。HTTP要实现这样的功能不但很困难,而且成本极高。

1.2 MQTT控制报文的结构

MQTT通过交换一些预定义的MQTT控制报文来工作,每条MQTT命令消息的消息头都包含一个固定的报头,有些消息会携带一个可变报文头和一个负荷。消息格式如下:

固定包头,存在于所有MQTT控制包
可变包头,存在于某些MQTT控制包
载荷,存在于某些MQTT控制包

1.2.1 固定报文头(Fixed Header)

MQTT固定报文头最少有两个字节,第一个字节包含消息类型(Message Type)QoS级别等标志位。第二个字节开始是剩余长度字段,该长度是后面的可变报文头加消息负载的总长度,该字段最多允许四个字节。

剩余长度使用了一种可变长度的结构来编码,这种结构使用单一字节表示0-127的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。因此每个字节可以编码128个值,再加上一个标识位。剩余长度最多可以用四个字节来表示。

例如十进制的数字64可以被编码成一个单独的字节,十进制为64,八进制为0x40。十进制数字321(=65+2×128)被编码为两个字节,低位在前。第一个字节是65+128 = 193。注意最高位的128表示后面至少还有一个字节。第二个字节是2,表示2*127。(注:321 = 11000001 00000010,第一个字节是“标识符后面还有一个字节”+65,第二个字节是“标识符后面没有字节了”+256)。

1.2.2 可变报文头(Variable Header)

可变报文头主要包含协议名协议版本连接标志(Connect Flags)心跳间隔时间(Keep Alive timer)连接返回码(Connect Return Code)主题名(Topic Name)

1.2.3 有效负荷和消息类型

有效负荷(Payload),可以理解为消息主题(body)
MQTT 发送的消息类型是 CONNECT(连接)、PUBLISH(发布)、SUBSCRIBE(订阅)、SUBACK(订阅确认)、则会带有负荷。

各种类型消息的控制报文参考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html

MQTT的消息类型(Message Type)(控制报文类型)

名字 报文流动方向 描述
Reserved 0 禁止 保留
CONNECT 1 客户端到服务端 客户端请求连接服务端
CONNACK 2 服务端到客户端 连接报文确认
PUBLISH 3 两个方向都允许 发布消息
PUBACK 4 两个方向都允许 QoS 1消息发布收到确认
PUBREC 5 两个方向都允许 发布收到(保证交付第一步)
PUBREL 6 两个方向都允许 发布释放(保证交付第二步)
PUBCOMP 7 两个方向都允许 QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE 8 客户端到服务端 客户端订阅请求
SUBACK 9 服务端到客户端 订阅请求报文确认
UNSUBSCRIBE 10 客户端到服务端 客户端取消订阅请求
UNSUBACK 11 服务端到客户端 取消订阅报文确认
PINGREQ 12 客户端到服务端 心跳请求
PINGRESP 13 服务端到客户端 心跳响应
DISCONNECT 14 客户端到服务端 客户端断开连接
Reserved 15 禁止 保留

1.2.4 消息质量(QoS)

消息质量(QoS):

  • QoS 0:最多分发一次。消息的传递完全依赖于底层的TCP/IP协议,协议里没有定义应答和重试,消息要么只会到达服务端一次,要么根本没有到达。
    只会发送一次,不管成不成功
  • QoS 1:至少分发一次。服务器的消息接收由PUBACK消息进行确认,如果通信链路或发送设备异常,或者指定时间内没有收到确认消息,发送端会重发这条在消息头中设置了DUP位的消息。
    未成功会继续发送,直到成功,可能会收到多次
  • QoS 2:只分发一次。这是最高级别的消息传递,消息丢失和重复都是不可接受的,使用这个服务质量等级会有额外的开销。
    未成功会继续发送,但会保证只收到一次

通过下面的例子可以更深刻的理解上面三个传输质量等级。
比如目前流行的共享单车智能锁,智能锁可以定时使用QoS level 0质量消息请求服务器,发送单车的当前位置,如果服务器没收到也没关系,反正过一段时间又会再发送一次。
之后用户可以通过App查询周围单车位置,找到单车后需要进行解锁,这时候可以使用QoS level 1质量消息,手机App不断的发送解锁消息给单车锁,确保有一次消息能达到以解锁单车。
最后用户用完单车后,需要提交付款表单,可以使用QoS level 2质量消息,这样确保只传递一次数据,否则用户就会多付钱了。

1.4 搭建MQTT服务

在Linux上搭建MQTT服务
打开EMQ官网:https://www.emqx.io/cn/products/broker
点击开始试用
在这里插入图片描述
选择服务器对应版本
在这里插入图片描述

复制下载命令到ssh工具中执行
在这里插入图片描述
下载完成
下载完成后执行安装命令
在这里插入图片描述
安装成功后执行命令:

sudo emqx start

出现以下信息表示启动成功
在这里插入图片描述
浏览器访问ip:18083进入管理界面,默认账号为admin,密码为public
在这里插入图片描述

1.5 SpringBoot搭建提供端

1.5.1 pom.xml

<!--mqtt相关依赖-->
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

1.5.2 修改配置文件

spring:
  application:
    name: provider
  #MQTT配置信息
  mqtt:
    #MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
    url: tcp://ip:1883
    #用户名
    username: admin
    #密码
    password: public
    #客户端id(不能重复)
    client:
      id: provider-id
    #MQTT默认的消息推送主题,实际可在调用接口时指定
    default:
      topic: topic
server:
  port: 8081

1.5.3 消息发布者客户端配置

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class MqttProviderConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 客户端对象
     */
    private MqttClient client;

    /**
     * 客户端连接服务端
     */

    public void connect(){
        try {
            //创建MQTT客户端对象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接到服务端都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(username);
            //设置连接密码
            options.setPassword(password.toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
            //设置回调
            client.setCallback(new MqttProviderCallBack());
            client.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
    * 发布消息
    * @param qos  服务质量等级
    *  0 只会发送一次,不管成不成功
    *  1 未成功会继续发送,直到成功,可能会收到多次
    *  2 未成功会继续发送,但会保证只收到一次
    * @param retained  保留标志
    * 如果设置为true,服务端必须存储这个应用消息和它的服务质量等级,当有订阅者订阅这个主题时,会把消息推送给这个订阅者
   * 但服务端对同一个主题只会保留一条retained消息(最后收到的那条)
   */
   public void publish(int qos,boolean retained,String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主题目的地,用于发布/订阅消息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度。
        //用于在以非阻塞方式(在后台运行)执行发布时跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成。返回的token可用于跟踪消息的传递状态。
            //一旦此方法干净地返回,消息就已被客户端接受发布。当连接可用时,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

1.5.4 消息发布客户端回调

import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;


@Configuration
public class MqttProviderCallBack implements MqttCallback {

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    /**
     * 与服务器断开连接的回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println(clientId + "与服务器断开连接");
    }

    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }

    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        IMqttAsyncClient client = iMqttDeliveryToken.getClient();
        System.out.println(client.getClientId() + "发布消息成功!");
    }
}

1.5.5 创建控制器测试发布消息

import com.xdemo.mqttprovider.mqtt.MqttProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class SendController {

    @Autowired
    private MqttProviderConfig providerClient;

    @RequestMapping("/sendMessage")
    @ResponseBody
    public String sendMessage(int qos,boolean retained,String topic,String message){
        try {
            providerClient.publish(qos,retained,topic,message);
            return "发送成功";
        }catch (Exception e){
            e.printStackTrace();
            return "发送失败";
        }
    }
}

1.6 SpringBoot搭建消费端

在父工程下创建一个Springboot项目作为消息消费者,导入以下依赖

1.6.1 pom.xml

<!--mqtt相关依赖-->
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

1.6.2 配置文件

spring:
  application:
    name: consumer
  #MQTT配置信息
  mqtt:
    #MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
    url: tcp://ip:1883
    #用户名
    username: admin
    #密码
    password: public
    #客户端id(不能重复)
    client:
      id: consumer-id
    #MQTT默认的消息推送主题,实际可在调用接口时指定
    default:
      topic: topic
server:
  port: 8082

1.6.3 消费者客户端配置

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;


@Configuration
public class MqttConsumerConfig {
    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 客户端对象
     */
    private MqttClient client;

    /**
     * 在bean初始化后连接到服务器
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客户端连接服务端
     */
    public void connect(){
        try {
            //创建MQTT客户端对象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接到服务端都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(username);
            //设置连接密码
            options.setPassword(password.toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
            //设置回调
            client.setCallback(new MqttConsumerCallBack());
            client.connect(options);
            //订阅主题
            //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
            int[] qos = {1,1};
            //主题
            String[] topics = {"topic1","topic2"};
           //订阅主题
            client.subscribe(topics,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 断开连接
     */
    public void disConnect(){
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    /**
     * 订阅主题
     */
    public void subscribe(String topic,int qos){
        try {
            client.subscribe(topic,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

1.6.4 消息消费者客户端回调

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttConsumerCallBack implements MqttCallback {
    /**
     * 客户端断开连接的回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("与服务器断开连接,可重连");
    }

    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主题 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
        System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
    }

    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}

1.6.5 控制器提供手动建立连接和断开连接方法

import com.xdemo.mqttconsumer.mqtt.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class TestController {
    @Autowired
    private MqttConsumerConfig client;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @RequestMapping("connect")
    @ResponseBody
    public String connect(){
        client.connect();
        return clientId + "连接到服务器";
    }

    @RequestMapping("disConnect")
    @ResponseBody
    public String disConnect(){
        client.disConnect();
        return clientId + "与服务器断开连接";
    }
}

1.7 测试提供端和消费端

分别启动两个项目,可以在管理界面看到创建的两个客户端
在这里插入图片描述

调用发布消息接口发布消息
在这里插入图片描述

消费者控制台打印
在这里插入图片描述

客户端断线消息恢复

把消费者与服务端断开连接
在这里插入图片描述

再调用发布消息接口发送两条消息到topic1,然后再把消费者连接到服务端
在这里插入图片描述

控制台没有东西打印
在这里插入图片描述

修改消费者客户端配置,把setCleanSession改为false

图片

重启项目,把消费者客户端断开连接,调用发布消息接口发布两条消息,再把消费者和服务端连接上

图片

标签:实际操作,client,MQTT,详解,消息,import,org,客户端
From: https://www.cnblogs.com/jingzh/p/18200117

相关文章

  • 任意文件上传漏洞详解
    当文件上传接口可以上传任意文件,但是不解析,文件上传后的路径可控。这种情况下有两种方法1、上传.htaccess和.user.ini配置文件。2、当知道网站根路径的情况下,可以上传到其他目录下。3、当不知道网站根路径的情况下,可以通过上传计划任务的方式实现命令执行。文件上传漏洞的定义文......
  • VirtualBox虚拟机远程桌面连接设置详解(包含登录密码设置)
    一、安装VirtualBox虚拟机下载与安装:访问VirtualBox官方网站,根据您的操作系统(如Linux、Windows、Mac等)下载对应版本的VirtualBox安装包。安装下载的VirtualBox软件。创建虚拟机:打开VirtualBox,点击“新建”按钮,按照向导创建新的虚拟机。例如,命名为“Winxp”。根据您的......
  • C 语言中的 sscanf 详解
    一、函数介绍函数原型:intsscanf(constchar*str,constchar*format,...);返回值:成功返回匹配成功的模式个数,失败返回-1。RETURNVALUEThesefunctionsreturnthenumberofinputitemssuccessfullymatchedandassigned,whichcanbefewerthanprovi......
  • XML Schema 复杂元素类型详解:定义及示例解析
    在XMLSchema(XSD)中,复杂元素是指包含其他元素和/或属性的XML元素。复杂元素可以分为四种类型:空元素:仅包含其他元素和/或属性的元素。仅包含其他元素的元素:不包含文本内容,只包含其他子元素的元素。仅包含文本的元素:不包含其他子元素,只包含文本内容的元素。既包含其他元素......
  • 一款基于C#开发的通讯调试工具(支持Modbus RTU、MQTT调试)
    前言今天大姚给大家分享一款基于C#、WPF、Prism、MaterialDesign、HandyControl开发的通讯调试工具(支持ModbusRTU、MQTT调试,界面色彩丰富):Wu.CommTool。工具特点工具界面色彩丰富。支持ModbusRTU、MQTT服务器、MQTT客户端。ModbusRTU自动解析数据帧。智能防粘包,速度快也......
  • 一对一视频app开发,RabbitMQ数据隔离步骤详解
    一对一视频app开发,RabbitMQ数据隔离详解一、自动创建影子队列因为SpringAMQP中的中的关键方法是私有的,无法通过继承的方式进行实现对以配置好的队列进行扩展,所以需要自定义该类,来实现对自动创建影子队列,并和交换器进行绑定代码实现改造RabbitListenerAnnotationBeanP......
  • IKNP协议详解
    一起学习OTextension的重要文章:ExtendingObliviousTransfersEfficiently.作者是YuvalIshai,JoeKilian,KobbiNissim,andErezPetrank,发表在2003的Crypto上.目录1.简介2.具体协议2.1协议流程2.2协议设计原理COT和ROT2.2.1第一步:\(OT^k_m\impliesCOT^m_k\)......
  • Redis 的安装与配置详解【Redis系列一】
    〇、前言关于Redis在日常开发中还是用的比较多的,特别是在秒杀、消息队列、排行榜等数据交互时效要求较高的场景,Redis都可以轻松应对。本文将针对Redis进行简单介绍,以及如何安装,并罗列下全部配置项。后续还将另行发文汇总Redis的常用数据结构和常见问题等。一、什么是Re......
  • 第四节:MySQL主从集群搭建、扩容与数据迁移、半同步复制详解
    一.        二.        三.         !作       者:Yaopengfei(姚鹏飞)博客地址:http://www.cnblogs.com/yaopengfei/声     明1:如有错误,欢迎讨论,请勿谩骂^_^。声     明2:原创博客请在转载......
  • NumPy 分割与搜索数组详解
    NumPy分割数组NumPy提供了np.array_split()函数来分割数组,将一个数组拆分成多个较小的子数组。基本用法语法:np.array_split(array,indices_or_sections,axis=None)array:要分割的NumPy数组。indices_or_sections:指定分割位置的整数列表或要包含每个子数组的元素数......