SpringBoot集成MQTT(简单版)
一、docker安装emqx环境(Linux系统)
emqx:mqtt服务器(broker)
version: '3'
services:
emqx:
image: emqx/emqx
container_name: emqx
restart: always
ports:
- 8001:18083
- 8002:1883
- 8003:8083
- 8004:8084
- 8005:8883
端口介绍:
1883 MQTT TCP 协议端口,发送报文就是走的这个端口
8883 MQTT/TCP SSL 端口
8083 MQTT/WebSocket 端口
8084 MQTT/WebSocket with SSL 端口
8080 MQTT执行引擎HTTP API 端口
18083 EMQX Dashboard 管理控制台端口
二、Java代码
1、配置文件
# MQTT配置信息
mqtt:
# MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开
url: tcp://ip:1883
# 默认的用户名
username: admin
# 默认的密码
password: public
# 客户端id(不能重复)
consumerClientId: consumer-13579
providerClientId: provider-24680
# MQTT默认的消息推送主题,实际可在调用接口时指定
defaultTopic: topic1111111
2、配置类
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@EnableConfigurationProperties(value = MqttProperties.class)
@Primary
public class MqttProperties {
private String username;
private String password;
private String url;
private String providerClientId;
private String consumerClientId;
private String defaultTopic;
}
3、MQTT客户端启动配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Configuration
@Slf4j
public class MqttConfig {
@Resource
private MqttProperties properties;
@Resource
private MqttConsumerCallBack consumerCallBack;
/**
* 客户端对象
*/
private MqttClient providerClient;
private MqttClient consumerClient;
/**
* 在bean初始化后连接到服务器
*/
@PostConstruct
public void init(){
connectProvider();
connectConsumer();
}
/**
* 生产客户端连接服务端
*/
public void connectProvider(){
// try{
// // 创建MQTT客户端对象-发布者
// providerClient = new MqttClient(properties.getUrl(), properties.getProviderClientId(), new MemoryPersistence());
// MqttConnectOptions options = getMqttConnectOptions();
// // 服务器每次连接重新保存信息
// options.setCleanSession(true);
// // 设置回调
// providerClient.setCallback(new MqttProviderCallBack());
// providerClient.connect(options);
// } catch(MqttException e){
// log.error("mqtt发布端连接服务器失败", e);
// }
}
/**
* 消费客户端连接服务端
*/
public void connectConsumer(){
try {
// 创建MQTT客户端对象-订阅者
consumerClient = new MqttClient(properties.getUrl(), properties.getConsumerClientId(), new MemoryPersistence());
MqttConnectOptions options = getMqttConnectOptions();
// 服务器每次连接重新保存信息
options.setCleanSession(true);
// 设置回调,new的对象在spring中拿bean麻烦,这里直接初始化
consumerClient.setCallback(consumerCallBack);
// 设置自动重新连接,因为异常原因断开连接后进行重连
options.setAutomaticReconnect(true);
consumerClient.connect(options);
// 订阅主题,消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
consumerClient.subscribe(TopicDetails.topics, TopicDetails.qos);
log.info("mqtt消费端与服务器连接成功");
} catch (MqttException e) {
log.error("mqtt消费端连接服务器失败", e);
}
}
/**
* 发布消息
* @param qos
* @param retained
* @param topic
* @param message
*/
public void publish(int qos,boolean retained,String topic,String message){
try {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
// 主题的目的地,用于发布/订阅信息
MqttTopic mqttTopic = providerClient.getTopic(topic);
// 提供一种机制来跟踪消息的传递进度
// 用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
MqttDeliveryToken token;
// 将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
// 一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 获取mqtt连接
* @return
*/
public MqttConnectOptions getMqttConnectOptions(){
// 连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
// 设置为true表示每次连接服务器都是以新的身份
options.setCleanSession(true);
// 设置连接用户名
options.setUserName(properties.getUsername());
// 设置连接密码
options.setPassword(properties.getPassword().toCharArray());
// 设置超时时间,单位为秒
options.setConnectionTimeout(100);
// 设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
// 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic",(properties.getProviderClientId() + "与服务器断开连接").getBytes(),0,false);
return options;
}
}
/**
* mqtt主题
*/
public class TopicDetails {
public static final String[] topics = new String[]{
// 此处添加topic,需要在下面的 qos 属性里面也要对应加一个
// "topic_3/+/test",
"topic_1",
"topic_2"
};
public static final int[] qos = new int[]{
// 此处数量保持和 topics 一致
1,
1
};
/**
* 判断当前主题是否为我们需要的主题
* @param topic
* @return
*/
public static boolean checkTopic(String topic){
for (String s : topics) {
if(s.equals(topic)){
return true;
}
}
return false;
}
}
4、消费数据回调
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* mqtt数据消费回调
*/
@Component
@Slf4j
public class MqttConsumerCallBack implements MqttCallback{
// 业务代码
@Resource
private IMqttService mqttService;
@Resource
private MqttConfig mqttConfig;
/**
* 客户端断开连接的回调
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt消费端与服务器断开连接:{}", throwable.getMessage());
mqttConfig.connectConsumer();
}
/**
* 消息到达的回调
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String content = new String(message.getPayload());
log.info("接收消息主题:{},接收消息内容:{}", topic, content);
// System.out.println(String.format("接收消息主题 : %s",topic));
// System.out.println(String.format("接收消息Qos : %d",message.getQos()));
// System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
// System.out.println(String.format("接收消息retained : %b",message.isRetained()));
// 数据存储
mqttService.senMessage(topic, content);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// 消息发布成功的回调,消费端无
}
}
5、生产数据回调
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* mqtt生产发布消息回调
*/
@Slf4j
public class MqttProviderCallBack implements MqttCallback{
/**
* 客户端断开连接的回调
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt生产端与服务器断开连接,可重连");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 消息到达的回调,发布端无
}
/**
* 消息发布成功的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info(String.format("接收消息成功"));
}
}
三、测试工具
推荐使用:MQTT.fx
标签:集成,springboot,options,topic,mqtt,import,客户端,public,String From: https://www.cnblogs.com/xy20211005/p/17997751