安装emqx
https://blog.csdn.net/weixin_41542513/article/details/134328627
springboot整合mqtt
1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2、MqttConfig
用于配置客户端信息
package com.tan.mqtt;
import com.tan.mqtt.logic.LogicService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.util.*;
@Slf4j
@Configuration
public class MqttConfig {
@Resource
private LogicService logicService;
// 要是你下载了mqttx。mqttx上面的协议对应java的协议关系
// mqtt=>tcp
// mqtts=>ssl(这个我没试过)
// ws=>ws
// wss=>wss
@Value("${spring.mqtt.host:tcp://127.0.0.1:1883}")
private String serverUrl;
@Value("${spring.mqtt.username:username}")
private String username;
@Value("${spring.mqtt.password:password}")
private String password;
/**
* 创建mqtt线程池
*/
@Bean
public ThreadPoolTaskExecutor mqttExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16); // 根据实际需求调整
executor.setMaxPoolSize(32);
executor.setQueueCapacity(1000); // 队列容量
executor.setThreadNamePrefix("MQTT-Executor-");
executor.initialize();
return executor;
}
/**
* 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
*
* @return factory
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(username);
options.setPassword(password.toCharArray());
// 设置代理端的URL地址,可以是多个
options.setServerURIs(new String[]{serverUrl});
options.setMaxInflight(1000);
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new ExecutorChannel(mqttExecutor());
}
/**
* 入站
*/
@Bean
public MessageProducer inboundRtk() {
// Paho客户端消息驱动通道适配器,主要用来订阅主题
String clientId = UUID.randomUUID().toString().replaceAll("-", "");
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
mqttClientFactory(), "topic/#");
adapter.setCompletionTimeout(5000);
// Paho消息转换器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// 按字节接收消息
adapter.setConverter(defaultPahoMessageConverter);
adapter.setQos(2); // 设置QoS
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInputChannel())
.handle(message -> {
String payload = message.getPayload().toString();
String topic = message.getHeaders().get("mqtt_receivedTopic") + "";
//处理topic接收的参数
logicService.handle(topic, payload);
})
.get();
}
/**
* 出站通道
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 出站
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outbound() {
// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
String clientId = UUID.randomUUID().toString().replaceAll("-", "");
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
messageHandler.setAsync(true); // 如果设置成true,即异步,发送消息时将不会阻塞。
messageHandler.setDefaultTopic("command");
messageHandler.setDefaultQos(2);
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
}
3、MqttGateway
用户发送消息
package com.tan.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
4、LogicService
工厂
package com.tan.mqtt.logic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class LogicService {
@Autowired
private List<BasicDemo> basicDemos; // 这个就是把BasicDemo类型的bean都注入进来。还有个map的注入方式https://www.cnblogs.com/9zhe/p/17924891.html 这个文章上有
public void handle(String topic, String massage) {
for (BasicDemo basicDemo : basicDemos) {
if (basicDemo.judge(topic)) {
basicDemo.test(massage);
break;
}
}
}
}
5、BasicDemo
public interface BasicDemo {
/**
* 判断是否使用这个bean的业务处理
*/
Boolean judge(String topic);
/**
* 业务处理
*/
void test(String message);
}
6、Demo1
使用这个bean的话,topic就得包含demo1。需要监听多个topic就去实现BasicDemo接口。
package com.tan.mqtt.logic;
import com.tan.mqtt.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
@Service
public class Demo1 implements BasicDemo {
@Autowired
private MqttGateway mqttGateway;
@Override
public Boolean judge(String topic) {
return topic.contains("demo1");
}
@Override
public void test(String message) {
mqttGateway.sendToMqtt("aa/cc", 2, "demo1");
}
}
其他
emqx官网教程网站:https://www.emqx.com/zh/mqtt-guide
坑
1、MaxInflight
Sets the "max inflight". please increase this value in a high traffic environment.
The default value is 10设置“最大飞行”。请在高流量环境中增加此值。
默认值为10最大飞行:完成应答前,最多允许同时投递的 QoS 1 和 QoS 2 消息数量。
MqttConfig注意一下这个参数