目录
1 MQTT
1.1 MQTT介绍
1.1.1 简介
MQTT
全称(Message Queue Telemetry Transport
):一种基于发布/订阅(publish/subscribe
)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing
)中的一个标准传输协议。
MQTT
是一种基于发布/订阅
模式的轻量级通讯协议,该协议构建在TCP/IP
协议上。 MQTT
最大的有点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽
占用的即时通讯协议,MQTT
在物联网、小型设备、移动应用等方面有广泛应用。
MQTT
协议将消息的发布者(publisher
)与订阅者(subscriber
)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ
有点类似。
TCP
协议位于传输层,MQTT
协议位于应用层,MQTT
协议构建于TCP/IP
协议上,也就是说只要支持TCP/IP
协议栈的地方,都可以使用MQTT
协议。
1.1.2 特点和应用
特点:
- 开放消息协议,简单易实现
- 发布订阅模式,一对多消息发布
- 基于
TCP/IP
网络连接,提供有序,无损,双向连接
- 2字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量
- 消息
QoS
支持,可靠传输保证
应用:
- 物联网M2M通信,物联网大数据采集
- Android消息推送,WEB消息推送
- 智能硬件、智能家具、智能电器
- 车联网通信,电动车站桩采集
- 智慧城市、远程医疗、远程教育
- 电力、石油与能源等行业市场
1.1.3 为什么要用 MQTT协议
MQTT
协议为什么在物联网(IOT
)中如此受偏爱?而不是其它协议,比如我们更为熟悉的 HTTP
协议呢?
首先HTTP
协议它是一种同步协议
,客户端请求后需要等待服务器的响应。而在物联网(IOT
)环境中,设备会很受制于环境的影响,比如带宽低、网络延迟高、网络通信
不稳定等,显然异步消息协议更为适合IOT
应用程序。
HTTP
是单向的,如果要获取消息客户端必须发起连接,而在物联网(IOT
)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。
通常需要将一条命令或者消息,发送到网络上的所有设备上。HTTP
要实现这样的功能不但很困难,而且成本极高。
1.2 MQTT控制报文的结构
MQTT
通过交换一些预定义的MQTT
控制报文来工作,每条MQTT
命令消息的消息头都包含一个固定的报头
,有些消息会携带一个可变报文头
和一个负荷
。消息格式如下:
固定包头,存在于所有MQTT控制包
可变包头,存在于某些MQTT控制包
载荷,存在于某些MQTT控制包
1.2.1 固定报文头(Fixed Header)
MQTT
固定报文头最少有两个字节
,第一个字节包含消息类型(Message Type)
和QoS级别
等标志位。第二个字节开始是剩余长度
字段,该长度是后面的可变报文头加消息负载的总长度,该字段最多允许四个字节。
剩余长度
使用了一种可变长度的结构来编码,这种结构使用单一字节
表示0-127
的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。因此每个字节可以编码128个值,再加上一个标识位。剩余长度最多可以用四个字节来表示。
例如十进制的数字64可以被编码成一个单独的字节,十进制为64,八进制为0x40。十进制数字321(=65+2×128)被编码为两个字节,低位在前。第一个字节是65+128 = 193。注意最高位的128表示后面至少还有一个字节。第二个字节是2,表示2*127。(注:321 = 11000001 00000010,第一个字节是“标识符后面还有一个字节”+65,第二个字节是“标识符后面没有字节了”+256)。
1.2.2 可变报文头(Variable Header)
可变报文头主要包含协议名
、协议版本
、连接标志(Connect Flags)
、心跳间隔时间(Keep Alive timer)
、连接返回码(Connect Return Code)
、主题名(Topic Name)
等
1.2.3 有效负荷和消息类型
有效负荷(Payload),可以理解为消息主题(body)
当 MQTT
发送的消息类型是 CONNECT
(连接)、PUBLISH
(发布)、SUBSCRIBE
(订阅)、SUBACK
(订阅确认)、则会带有负荷。
各种类型消息的控制报文参考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html
MQTT
的消息类型(Message Type)(控制报文类型)
名字 | 值 | 报文流动方向 | 描述 |
---|---|---|---|
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客户端到服务端 | 客户端请求连接服务端 |
CONNACK | 2 | 服务端到客户端 | 连接报文确认 |
PUBLISH | 3 | 两个方向都允许 | 发布消息 |
PUBACK | 4 | 两个方向都允许 | QoS 1消息发布收到确认 |
PUBREC | 5 | 两个方向都允许 | 发布收到(保证交付第一步) |
PUBREL | 6 | 两个方向都允许 | 发布释放(保证交付第二步) |
PUBCOMP | 7 | 两个方向都允许 | QoS 2消息发布完成(保证交互第三步) |
SUBSCRIBE | 8 | 客户端到服务端 | 客户端订阅请求 |
SUBACK | 9 | 服务端到客户端 | 订阅请求报文确认 |
UNSUBSCRIBE | 10 | 客户端到服务端 | 客户端取消订阅请求 |
UNSUBACK | 11 | 服务端到客户端 | 取消订阅报文确认 |
PINGREQ | 12 | 客户端到服务端 | 心跳请求 |
PINGRESP | 13 | 服务端到客户端 | 心跳响应 |
DISCONNECT | 14 | 客户端到服务端 | 客户端断开连接 |
Reserved | 15 | 禁止 | 保留 |
1.2.4 消息质量(QoS)
消息质量(QoS
):
QoS 0
:最多分发一次。消息的传递完全依赖于底层的TCP/IP
协议,协议里没有定义应答和重试,消息要么只会到达服务端一次,要么根本没有到达。
只会发送一次,不管成不成功QoS 1
:至少分发一次。服务器的消息接收由PUBACK
消息进行确认,如果通信链路或发送设备异常,或者指定时间内没有收到确认消息,发送端会重发这条在消息头中设置了DUP位的消息。
未成功会继续发送,直到成功,可能会收到多次QoS 2
:只分发一次。这是最高级别的消息传递,消息丢失和重复都是不可接受的,使用这个服务质量等级会有额外的开销。
未成功会继续发送,但会保证只收到一次
通过下面的例子可以更深刻的理解上面三个传输质量等级。
比如目前流行的共享单车智能锁,智能锁可以定时使用QoS level 0
质量消息请求服务器,发送单车的当前位置,如果服务器没收到也没关系,反正过一段时间又会再发送一次。
之后用户可以通过App查询周围单车位置,找到单车后需要进行解锁,这时候可以使用QoS level 1
质量消息,手机App不断的发送解锁消息给单车锁,确保有一次消息能达到以解锁单车。
最后用户用完单车后,需要提交付款表单,可以使用QoS level 2
质量消息,这样确保只传递一次数据,否则用户就会多付钱了。
1.4 搭建MQTT服务
在Linux上搭建MQTT服务
打开EMQ官网:https://www.emqx.io/cn/products/broker
点击开始试用
选择服务器对应版本
复制下载命令到ssh工具中执行
下载完成
下载完成后执行安装命令
安装成功后执行命令:
sudo emqx start
出现以下信息表示启动成功
浏览器访问ip:18083
进入管理界面,默认账号为admin,密码为public
1.5 SpringBoot搭建提供端
1.5.1 pom.xml
<!--mqtt相关依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.5.2 修改配置文件
spring:
application:
name: provider
#MQTT配置信息
mqtt:
#MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
url: tcp://ip:1883
#用户名
username: admin
#密码
password: public
#客户端id(不能重复)
client:
id: provider-id
#MQTT默认的消息推送主题,实际可在调用接口时指定
default:
topic: topic
server:
port: 8081
1.5.3 消息发布者客户端配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class MqttProviderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客户端对象
*/
private MqttClient client;
/**
* 客户端连接服务端
*/
public void connect(){
try {
//创建MQTT客户端对象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接到服务端都是以新的身份
options.setCleanSession(true);
//设置连接用户名
options.setUserName(username);
//设置连接密码
options.setPassword(password.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
//设置回调
client.setCallback(new MqttProviderCallBack());
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发布消息
* @param qos 服务质量等级
* 0 只会发送一次,不管成不成功
* 1 未成功会继续发送,直到成功,可能会收到多次
* 2 未成功会继续发送,但会保证只收到一次
* @param retained 保留标志
* 如果设置为true,服务端必须存储这个应用消息和它的服务质量等级,当有订阅者订阅这个主题时,会把消息推送给这个订阅者
* 但服务端对同一个主题只会保留一条retained消息(最后收到的那条)
*/
public void publish(int qos,boolean retained,String topic,String message){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主题目的地,用于发布/订阅消息
MqttTopic mqttTopic = client.getTopic(topic);
//提供一种机制来跟踪消息的传递进度。
//用于在以非阻塞方式(在后台运行)执行发布时跟踪消息的传递进度
MqttDeliveryToken token;
try {
//将指定消息发布到主题,但不等待消息传递完成。返回的token可用于跟踪消息的传递状态。
//一旦此方法干净地返回,消息就已被客户端接受发布。当连接可用时,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
1.5.4 消息发布客户端回调
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
public class MqttProviderCallBack implements MqttCallback {
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 与服务器断开连接的回调
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println(clientId + "与服务器断开连接");
}
/**
* 消息到达的回调
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
/**
* 消息发布成功的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
IMqttAsyncClient client = iMqttDeliveryToken.getClient();
System.out.println(client.getClientId() + "发布消息成功!");
}
}
1.5.5 创建控制器测试发布消息
import com.xdemo.mqttprovider.mqtt.MqttProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SendController {
@Autowired
private MqttProviderConfig providerClient;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(int qos,boolean retained,String topic,String message){
try {
providerClient.publish(qos,retained,topic,message);
return "发送成功";
}catch (Exception e){
e.printStackTrace();
return "发送失败";
}
}
}
1.6 SpringBoot搭建消费端
在父工程下创建一个Springboot项目作为消息消费者,导入以下依赖
1.6.1 pom.xml
<!--mqtt相关依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.6.2 配置文件
spring:
application:
name: consumer
#MQTT配置信息
mqtt:
#MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
url: tcp://ip:1883
#用户名
username: admin
#密码
password: public
#客户端id(不能重复)
client:
id: consumer-id
#MQTT默认的消息推送主题,实际可在调用接口时指定
default:
topic: topic
server:
port: 8082
1.6.3 消费者客户端配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class MqttConsumerConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客户端对象
*/
private MqttClient client;
/**
* 在bean初始化后连接到服务器
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客户端连接服务端
*/
public void connect(){
try {
//创建MQTT客户端对象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接到服务端都是以新的身份
options.setCleanSession(true);
//设置连接用户名
options.setUserName(username);
//设置连接密码
options.setPassword(password.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
//设置回调
client.setCallback(new MqttConsumerCallBack());
client.connect(options);
//订阅主题
//消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
int[] qos = {1,1};
//主题
String[] topics = {"topic1","topic2"};
//订阅主题
client.subscribe(topics,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 断开连接
*/
public void disConnect(){
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅主题
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
1.6.4 消息消费者客户端回调
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttConsumerCallBack implements MqttCallback {
/**
* 客户端断开连接的回调
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("与服务器断开连接,可重连");
}
/**
* 消息到达的回调
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主题 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
/**
* 消息发布成功的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
1.6.5 控制器提供手动建立连接和断开连接方法
import com.xdemo.mqttconsumer.mqtt.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class TestController {
@Autowired
private MqttConsumerConfig client;
@Value("${spring.mqtt.client.id}")
private String clientId;
@RequestMapping("connect")
@ResponseBody
public String connect(){
client.connect();
return clientId + "连接到服务器";
}
@RequestMapping("disConnect")
@ResponseBody
public String disConnect(){
client.disConnect();
return clientId + "与服务器断开连接";
}
}
1.7 测试提供端和消费端
分别启动两个项目,可以在管理界面看到创建的两个客户端
调用发布消息接口发布消息
消费者控制台打印
客户端断线消息恢复
把消费者与服务端断开连接
再调用发布消息接口发送两条消息到topic1,然后再把消费者连接到服务端
控制台没有东西打印
修改消费者客户端配置,把setCleanSession改为false
重启项目,把消费者客户端断开连接,调用发布消息接口发布两条消息,再把消费者和服务端连接上
标签:实际操作,client,MQTT,详解,消息,import,org,客户端 From: https://www.cnblogs.com/jingzh/p/18200117