MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
1. 引入依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.14</version>
</dependency>
2. 创建配置文件类
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfiguration {
private String url;
private String clientId;
private String topics;
private String username;
private String password;
private String timeout;
private String keepalive;
}
在application.yaml中添加具体配置,这里使用的中间件是emqx
mqtt:
url: tcp://127.0.0.1:1883 #这里要写 tcp:// 不能是 mqtt,不然不是合法的 schema,源码中有
clientId: test_mqtt_client
topics: test/#
username: admin
password: public
timeout: 10
keepalive: 20
3. 创建生产端
/**
* MQTT 生产端
*/
@Configuration
public class Publisher {
@Resource
private MqttConfiguration mqttConfiguration;
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 建立MQTT连接,配置连接的参数选项
*/
@Bean
public MqttPahoClientFactory mqttOutClient() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
String[] mqttServerUrls = mqttConfiguration.getUrl().split(",");
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setServerURIs(mqttServerUrls);
mqttConnectOptions.setUserName(mqttConfiguration.getUsername());
mqttConnectOptions.setPassword(mqttConfiguration.getPassword().toCharArray());
// 接收离线消息
mqttConnectOptions.setCleanSession(false); //告诉代理客户端是否要建立持久会话 false为建立持久会话
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfiguration.getClientId() + "_outbound_iot",
mqttOutClient());
messageHandler.setDefaultTopic("test"); // 设置发送消息的默认主题,后续发送可以指定topic,所以这里无影响
messageHandler.setAsync(true);
return messageHandler;
}
}
4. 创建消费端
/**
* 实现了对 inboundtopic 中的主题监听,当有消息推送到 inboundtopic 主题上时可以接受
* MQTT 消费端
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class Consumer {
@Resource
private MqttConfiguration mqttProperties;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory mqttInClient() {
if (!mqttProperties.getEnabled()) {
return null;
}
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// 这里配置了订阅的主题
String[] mqttServerUrls = mqttProperties.getUrl().split(",");
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttServerUrls);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setKeepAliveInterval(2);
//接受离线消息
options.setCleanSession(false);
factory.setConnectionOptions(options);
return factory;
}
/**
* 配置Client,监听Topic
*/
@Bean
public MessageProducer inbound() {
if (!mqttProperties.getEnabled()) {
return null;
}
String[] inboundTopics = mqttProperties.getTopics().split(",");
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId() + "_inbound",
mqttInClient(), inboundTopics);
// adapter.addTopic(); // 添加 TOPICS
adapter.setCompletionTimeout(1000 * 5);
adapter.setQos(1);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 通过通道获取数据,即处理 MQTT 发送过来的消息,可以通过 MQTTX 工具发送数据测试
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel") // 异步处理
public MessageHandler handler() {
return message -> {
Object payload = message.getPayload();
MessageHeaders messageHeaders = message.getHeaders();
Object qos = messageHeaders.get(MqttHeaders.RECEIVED_QOS);
String topic = (String) messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
// 解析数据,分发到指定方法
String handMessage = "收到消息" + " topic ===> " + topic + "\nQOS ===> " + qos + "\n内容 ===> " + payload;
System.out.println(handMessage);
}
}
}
5. 发送mqtt消息
/**
* mqtt消息发送
*/
@Service
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateWayService {
void sendMessageToMqtt(String data);
void sendMessageToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
void sendMessageToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos);
void sendMessageToMqtt(byte[] data,@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos);
}
测试消息发送
/**
* mqtt消息测试
*/
@RestController
@RequestMapping("/mqtt")
@Validated
public class MqttController {
@Resource
private MqttGateWayService gateWay;
@PostMapping(value = "/sendMqtt")
public void sendMqtt(@RequestParam(value = "topic") String topic,
@RequestParam(value = "msg") String msg,
@RequestParam(value = "qos") int qos) {
gateWay.sendMessageToMqtt(msg, topic, qos);
}
}
6. 补充
在有些设备开发中,发送中文字符需要使用他指定的编码格式,例如GBK
发送时可以使用MqttGateWayService中的第四个方法
msg最好是String,我测试过用Map构建msg,用JSON.toJsonString(Map)转为sting发送,中文无法识别
@PostMapping(value = "/sendGBK")
public void sendMqtt(@RequestParam(value = "topic") String topic,
@RequestParam(value = "msg") String msg,
@RequestParam(value = "qos") int qos) {
gateWay.sendMessageToMqtt(msg.getBytes("GBK"), topic, qos);
}
标签:qos,Springboot,接入,mqtt,private,topic,Mqtt,public,String
From: https://www.cnblogs.com/eaglex3/p/18471489