首页 > 其他分享 >springboot整合mqtt

springboot整合mqtt

时间:2024-07-18 15:08:55浏览次数:16  
标签:springboot integration springframework mqtt 整合 org import String

安装emqx

https://blog.csdn.net/weixin_41542513/article/details/134328627

springboot整合mqtt

1、引入依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
2、MqttConfig

用于配置客户端信息

package com.tan.mqtt;

import com.tan.mqtt.logic.LogicService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.*;

@Slf4j
@Configuration
public class MqttConfig {

    @Resource
    private LogicService logicService;

    // 要是你下载了mqttx。mqttx上面的协议对应java的协议关系
    // mqtt=>tcp
    // mqtts=>ssl(这个我没试过)
    // ws=>ws
    // wss=>wss
    @Value("${spring.mqtt.host:tcp://127.0.0.1:1883}")
    private String serverUrl;

    @Value("${spring.mqtt.username:username}")
    private String username;

    @Value("${spring.mqtt.password:password}")
    private String password;

    /**
     * 创建mqtt线程池
     */
    @Bean
    public ThreadPoolTaskExecutor mqttExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(16);  // 根据实际需求调整
        executor.setMaxPoolSize(32); 
        executor.setQueueCapacity(1000);  // 队列容量
        executor.setThreadNamePrefix("MQTT-Executor-");
        executor.initialize();
        return executor;
    }

    /**
     * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
     *
     * @return factory
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        // 设置代理端的URL地址,可以是多个
        options.setServerURIs(new String[]{serverUrl});
        options.setMaxInflight(1000);
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new ExecutorChannel(mqttExecutor());
    }

    /**
     * 入站
     */
    @Bean
    public MessageProducer inboundRtk() {
        // Paho客户端消息驱动通道适配器,主要用来订阅主题
        String clientId = UUID.randomUUID().toString().replaceAll("-", "");
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
                mqttClientFactory(), "topic/#");
        adapter.setCompletionTimeout(5000);
        // Paho消息转换器
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        // 按字节接收消息
        adapter.setConverter(defaultPahoMessageConverter);
        adapter.setQos(2); // 设置QoS
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }


    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInputChannel())
                .handle(message -> {
                    String payload = message.getPayload().toString();
                    String topic = message.getHeaders().get("mqtt_receivedTopic") + "";
                    //处理topic接收的参数
                    logicService.handle(topic, payload);
                })
                .get();
    }

    /**
     * 出站通道
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * 出站
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler outbound() {
        // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
        String clientId = UUID.randomUUID().toString().replaceAll("-", "");
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
        messageHandler.setAsync(true); // 如果设置成true,即异步,发送消息时将不会阻塞。
        messageHandler.setDefaultTopic("command");
        messageHandler.setDefaultQos(2);
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        messageHandler.setConverter(defaultPahoMessageConverter);
        return messageHandler;
    }

}

3、MqttGateway

用户发送消息

package com.tan.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

4、LogicService

工厂

package com.tan.mqtt.logic;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class LogicService {
    @Autowired
    private List<BasicDemo> basicDemos; // 这个就是把BasicDemo类型的bean都注入进来。还有个map的注入方式https://www.cnblogs.com/9zhe/p/17924891.html 这个文章上有

    public void handle(String topic, String massage) {
        for (BasicDemo basicDemo : basicDemos) {
            if (basicDemo.judge(topic)) {
                basicDemo.test(massage);
                break;
            }
        }
    }
}
5、BasicDemo
public interface BasicDemo {
    /**
     * 判断是否使用这个bean的业务处理
     */
    Boolean judge(String topic);

    /**
     * 业务处理
     */
    void test(String message);
}

6、Demo1

使用这个bean的话,topic就得包含demo1。需要监听多个topic就去实现BasicDemo接口。

package com.tan.mqtt.logic;


import com.tan.mqtt.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;

@Service
public class Demo1 implements BasicDemo {
    @Autowired
    private MqttGateway mqttGateway;

    @Override
    public Boolean judge(String topic) {
        return topic.contains("demo1");
    }

    @Override
    public void test(String message) {
        mqttGateway.sendToMqtt("aa/cc", 2, "demo1");
    }
}

其他

emqx官网教程网站:https://www.emqx.com/zh/mqtt-guide

1、MaxInflight

Sets the "max inflight". please increase this value in a high traffic environment.
The default value is 10

设置“最大飞行”。请在高流量环境中增加此值。
默认值为10

最大飞行:完成应答前,最多允许同时投递的 QoS 1 和 QoS 2 消息数量。

MqttConfig注意一下这个参数

2、要是并发大的话并且消息QoS都是2的话注意一下这边的配置

标签:springboot,integration,springframework,mqtt,整合,org,import,String
From: https://www.cnblogs.com/9zhe/p/18309555

相关文章

  • SSM 整合(Spring + MyBatis;Spring + Spring MVC)
    1.SSM整合(Spring+MyBatis;Spring+SpringMVC)文章目录1.SSM整合(Spring+MyBatis;Spring+SpringMVC)2.引入相关依赖3.SSM整合3.1创建包结构4.Spring整合+MyBatis4.1编写jdbc.properties4.2编写DataSourceConfig数据源配置4.3编写MyBatisConf......
  • springboot博客交流平台-计算机毕业设计源码56406
    摘要博客交流平台作为一种重要的网络平台,为用户提供了展示自我、分享经验和与他人互动的空间。在国内外,研究者们关注博客交流平台的各个方面,并取得了显著的进展。研究内容主要包括用户体验和界面设计、社交化和互动性、多媒体内容支持、移动设备适配和跨平台体验、数据分析......
  • springboot访问多个mysql数据库配置多数据源
    一、参考地址:https://github.com/baomidou/dynamic-datasource二、使用方法引入dynamic-datasource-spring-boot-starter或者dynamic-datasource-spring-boot3-starter。spring-boot1.5.x2.x.x点击查看代码<dependency><groupId>com.baomidou</groupId><art......
  • Socket、WebSocket 和 MQTT 的区别
    Socket协议定义:操作系统提供的网络通信接口,抽象了TCP/IP协议,支持TCP和UDP。特点:通用性:不限于Web应用,适用于各种网络通信。协议级别:直接使用TCP/UDP,需要手动管理连接和数据传输。实现复杂性:需要编写代码处理连接、数据传输和错误。使用场景:实时通信(聊天应用)、文件传输......
  • 24年最新版工作流形AI绘画ComfyUI整合包一键安装教程(附安装包)
    今天我们带来了ComfyUI的整合安装包安装教程,可以创建工作流一键生成图片。如果你是一个初学者,建议从AI绘画StableDiffusion保姆级入门教程,看完连老奶奶都能上手!开始。ComfyUI简介ComfyUI是一个基于节点工作流的StableDiffusion用户界面。它通过将StableDiffusion......
  • 2024最新的AI绘画工具 Stable Diffusion 整合包安装教程,SD安装分享(附整合包)
    大家好,我是灵魂画师向阳自从AI绘画开始进入大众视野之后,AI绘画工具StableDiffusion技术以其创新的人工智能能力而著称,它拥有根据用户输入的文字描述来创造细致且富有表现力的图像的独特本领。SD不仅能够生成图像,还能执行图像修复、扩展以及在文本指导下的图像变换等多样......
  • SpringBoot整合MyBatis+MySQL
    一、添加mysql驱动mysqlmysql-connector-java二、添加MyBatis依赖org.mybatis.spring.bootmybatis-spring-boot-starter3.0.1三、添加配置spring:datasource:name:xx-datasourcedriverClassName:com.mysql.cj.jdbc.Driverurl:jdbc:mysql://127.0.0.1:3306/xx-......
  • SpringBoot 跨域请求处理全攻略:从原理到实践
    文章目录SpringBoot如何处理跨域请求?你能说出几种方法?跨域请求概述跨域解决方案1.使用@CrossOrigin注解2.使用WebMvcConfigurer配置类3.使用过滤器(Filter)4.使用SpringSecurity处理CORS5.使用SpringCloudGateway处理CORS补充1.预检请求(PreflightRequests)2.其......
  • SpringBoot与Thymeleaf模板技术整合
    以下是一个简单的SpringBoot整合Thymeleaf的入门案例:1.创建一个SpringBoot项目,并添加Thymeleaf依赖。org.springframework.bootspring-boot-starter-thymeleaforg.springframework.bootspring-boot-starter-web2.在src/main/resources/templates目录下创建一个HTML模......
  • 整合Maven后加载不到Jar包解决方案
    简介:在整合Maven项目时,有时可能会遇到无法加载Jar包的问题。本文将提供解决此问题的方法和步骤,帮助您顺利解决加载不到Jar包的困境。当您在整合Maven项目后遇到无法加载Jar包的问题时,这通常是由于以下原因之一导致的:1、Maven依赖未正确配置:确保您的pom.xml文件中正确配置了......