首页 > 其他分享 >SpringBoot:Springboot整合Mqtt并处理问题

SpringBoot:Springboot整合Mqtt并处理问题

时间:2024-01-25 16:55:06浏览次数:34  
标签:SpringBoot mqtt springframework topic String Mqtt import org Springboot

搭建mqtt服务

Docker搭建MQTT服务:https://www.cnblogs.com/nhdlb/p/17960641

项目结构

这是我的项目结构,主要有两个模块 base-modules(业务模块)、base-utils(工具模块) 组成,其中base-mqtt服务为工具模块,用于提供给其他业务模块引用依赖的。

base-mqtt模块

pom.xml

这里我的Springboot版本为2.7.17

    <dependencies>
        <!--  lombok依赖  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--mqtt相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
    </dependencies>

MqttConfig 配置类

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * @Author: 
 * @Date: 2024/1/24 11:00
 * @Description:
 */
@Data
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {

    /**
     * 订阅的bean名称
     */
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
    /**
     * 发布的bean名称
     */
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

    private final String subStrP = "-provider";
    private final String subStrC = "-consumer";

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String url;

    @Value("${spring.mqtt.clientId}")
    private String clientId;

    @Value("${spring.mqtt.topic}")
    private String topics;

    /**
     * MQTT连接器选项
     *
     * @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions}
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        // 这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        // 设置连接的用户名
        options.setUserName(username);
        // 设置连接的密码
        options.setPassword(password.toCharArray());
        options.setServerURIs(url.split(","));
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(100);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
        options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);
        return options;
    }

    /**
     * MQTT管道适配器
     * @param factory
     * @return
     */
//    @Bean
//    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
//        return new MqttPahoMessageDrivenChannelAdapter(clientId, factory, topics.split(","));
//    }

    /**
     * MQTT客户端
     *
     * @return {@link MqttPahoClientFactory}
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    /**
     * MQTT信息通道(生产者)
     *
     * @return {@link org.springframework.messaging.MessageChannel}
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT信息通道(消费者)
     *
     * @return {@link org.springframework.messaging.MessageChannel}
     */
    @Bean(name = CHANNEL_NAME_IN)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息处理器(生产者)
     *
     * @return {@link org.springframework.messaging.MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                clientId+subStrP,
                mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(topics.split(",")[0]);
        messageHandler.setDefaultRetained(false);
        return messageHandler;
    }

    /**
     * MQTT消息订阅绑定(消费者)
     *
     * @return {@link org.springframework.integration.core.MessageProducer}
     */
    @Bean
    public MessageProducer inbound() {
        // 可以同时消费(订阅)多个Topic
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        clientId+subStrC, mqttClientFactory(),
                        topics.split(","));
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(0);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

}

IMqttSender 消息发布类

import com.higentec.mqtt.config.MqttConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

/**
 * MQTT生产者消息发送接口
 * <p>MessagingGateway要指定生产者的通道名称</p>
 * @author
 */
@Configuration
// 启动Springboot时这里会有问题,原因为@ComponentScan扫描不到这个注解!!!
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {

    /**
     * 发送信息到MQTT服务器
     *
     * @param data 发送的文本
     */
    void sendToMqtt(String data);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param message 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    String message);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param qos 对消息处理的几种机制。
     *  0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
     *  1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
     *  2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param message 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    @Header(MqttHeaders.QOS) int qos,
                    String message);
}

重点:

启动Springboot时这里会有问题,原因:@ComponentScan扫描不到@MessagingGateway注解!!!

@MessagingGateway的源码,可以看到这样一些注释!!

大概意思是: @MessagingGateway注解需要配合 @Configuration注解 使用,并通过 @IntegrationComponentScan注解扫描才能识别到;而@ComponentScan注解无法扫描到;@IntegrationComponentScan注解用于扫描特定于Spring Integration的组件,它们都带有Spring Integration 4.0。

解决办法:

在业务模块的启动类加上@IntegrationComponentScan注解

/**
 * 设备管理模块
 *
 */
@SpringBootApplication
@EnableFeignClients(basePackages = "com.higentec.feign")
@ComponentScan({"com.higentec"})
// 用于扫描MQTT接口类
@IntegrationComponentScan({"com.higentec.mqtt"})
public class DeviceApplication {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(DeviceApplication.class);
        application.setApplicationStartup(new BufferingApplicationStartup(2048));
        application.run(args);
        System.out.println("= = = = >>>>>> 设备管理模块启动成功 ********** ");
    }
}

base-device模块

pom.xml

    <dependencies>
        <!-- 引入mqtt工具模块 -->
        <dependency>
            <groupId>com.higentec</groupId>
            <artifactId>base-mqtt</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
     </dependencies>

application.yml

spring:
  #MQTT配置信息
  mqtt:
    #MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
    url: tcp://127.0.0.1:1883
    #用户名
    username: admin
    #密码
    password: admin
    #客户端id(不能重复)
    clientId: ${spring.application.name}
    #MQTT默认的消息推送主题,实际可在调用接口时指定
    topic: test,weigh

MqttEventListener 消息监听类

import com.alibaba.fastjson2.JSONObject;
import com.higentec.device.enums.DeviceType;
import com.higentec.mqtt.config.MqttConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class MqttEventListener {

    // 自定义的业务service
    @Autowired
    private MyDeviceService deviceService;

    /**
     * MQTT消息处理器(消费者)
     * @return {@link org.springframework.messaging.MessageHandler}
     */
    @Bean
    @Async
    @ServiceActivator(inputChannel = MqttConfig.CHANNEL_NAME_IN)
    public MessageHandler handler() {
        return message -> {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
            String source = message.getPayload().toString();
            issue(topic, source);
        };
    }

    /**
     * 方法描述: 消息分发
     *
     * @param topic
     * @param source
     */
    public void issue(String topic, String source) {
        DeviceType deviceType = DeviceType.parseDeviceType(topic);
        if (deviceType != null) {
            switch (deviceType) {
                // 称重设备
                case WEIGH:
                    WeighEntity weigh = JSONObject.parseObject(source, WeighEntity.class);
                    deviceService.insertWeigh(weigh);
                    System.out.println("称重设备 " + weigh.toJSONString());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + topic);
            }
        }
    }

}

DeviceType 枚举类

import java.util.HashMap;
import java.util.Map;

public enum DeviceType {

    WEIGH("weigh", "称重设备"),
    ;

    private final String topic;

    private final String desc;

    DeviceType(String topic, String desc) {
        this.topic = topic;
        this.desc = desc;
        InnerClass.commandMap.put(topic, this);
    }

    private static class InnerClass {
        static Map<String, DeviceType> commandMap = new HashMap<>();
    }

    public static DeviceType parseDeviceType(String topic) {
        return InnerClass.commandMap.get(topic);
    }

    String getTopic() {
        return topic;
    }

    String getDesc() {
        return desc;
    }
}

启动类

/**
 * 设备管理模块
 *
 */
@SpringBootApplication
@EnableFeignClients(basePackages = "com.higentec.feign")
@ComponentScan({"com.higentec"})
// 用于扫描MQTT接口类
@IntegrationComponentScan({"com.higentec.mqtt"})
public class DeviceApplication {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(DeviceApplication.class);
        application.setApplicationStartup(new BufferingApplicationStartup(2048));
        application.run(args);
        System.out.println("= = = = >>>>>> 设备管理模块启动成功 ********** ");
    }
}

接口类

/**
 * 接口类Controller
 */
@Tag(name = "设备信息")
@RestController
@RequestMapping("/device/base")
public class DeviceBaseInfoController extends BaseController {
    
    // 引入消息发布工具类
    @Resource
    private IMqttSender mqttSender;

    /**
     * 测试mqtt发送消息
     */
    @Operation(summary = "测试mqtt发送消息")
    @GetMapping(value = "/mqtt")
    public void getInfo(@RequestParam("topic") String topic, @RequestParam("message") String message) {
        mqttSender.sendToMqtt(topic,message);
    }
}

 

 文章整合至:https://blog.csdn.net/kumubajie/article/details/122939003https://www.pianshen.com/article/90891585920/https://blog.csdn.net/qq_40674081/article/details/106815495https://www.cnblogs.com/xct5622/p/15094017.html

标签:SpringBoot,mqtt,springframework,topic,String,Mqtt,import,org,Springboot
From: https://www.cnblogs.com/nhdlb/p/17987529

相关文章

  • springBoot自定义参数注解
    springBoot自定义参数注解前置条件:新建一个springboot项目1.新建一个标记注解@Authimportjava.lang.annotation.ElementType;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;/***@authorwa......
  • 解决跨域问题的8种方法,含网关、Nginx和SpringBoot~
    跨域问题是浏览器为了保护用户的信息安全,实施了同源策略(Same-OriginPolicy),即只允许页面请求同源(相同协议、域名和端口)的资源,当JavaScript发起的请求跨越了同源策略,即请求的目标与当前页面的域名、端口、协议不一致时,浏览器会阻止请求的发送或接收。解决跨域问题方案跨域问题......
  • Springboot整合logback
    Springboot整合logback1、引入maven依赖<!--slf4j日志门面--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.26</version>&......
  • SpringBoot开启动态定时任务并手动、自动关闭
    场景需求:在执行某个方法的两小时之后进行某个操作涉及:定时任务、哈希表需要注意:业务逻辑层是单一实例的,所以在定时任务类内操作业务逻辑层的某个属性和在业务逻辑层内操作的都是同一个。疑问:ThreadPoolTaskScheduler线程池需不需要规定线程数量?定时任务类@Componentpublicc......
  • Java21 + SpringBoot3整合Redis,使用Lettuce连接池,推荐连接池参数配置,封装Redis操作
    目录前言相关技术简介Redis实现步骤引入maven依赖修改配置文件定义Redis配置类定义Redis服务类,封装Redis常用操作使用Redis服务类总结前言近日心血来潮想做一个开源项目,目标是做一款可以适配多端、功能完备的模板工程,包含后台管理系统和前台系统,开发者基于此项目进行裁剪和扩展......
  • 搞起来,使用 SpringBoot 框架徒手撸一个安全、可靠的本地缓存工具
    在实现本地缓存的时候,我们经常使用线程安全的ConcurrentHashMap来暂存数据,然后加上SpringBoot自带的@Scheduled定时刷新缓存。虽然这样可以实现本地缓存,但既不优雅也不安全。那看一下我的思路,首先看一张图!1.每个处理器都有缓存名字、描述信息、缓存初始化顺序等信息,所以应该定义一......
  • 策略模式【结合springboot实现】
    Hello!~大家好啊,很高兴我们又见面了,今天我们一起学习设计模式–【策略模式】初次对此模式不懂的,或者想偷懒的,我强烈建议大家跟着我的一起把概念和代码一起敲一遍!~为啥子??因为我就是这样学会的,哈哈哈!1.首先我们看下此模式的整体UML图selector:选择器又叫做上下文conte......
  • 如何查找SpringBoot应用中的请求路径(不使用idea)
    背景昨天有个同事向我咨询某个接口的物理表是哪个,由于公司业务较多、这块业务的确不是我负责的,也没有使用idea不能全局搜索(eclipse搜不到jar内的字符串),也就回复了不清楚。除了自己写代码输出servlet的路径和类外,发现了一个我之前没用过的方法:SpringBootActuator,分享给大家。......
  • SpringBoot可视化接口开发工具magic-api
    magic-api简介magic-api是一个基于Java的接口快速开发框架,编写接口将通过magic-api提供的UI界面完成,自动映射为HTTP接口,无需定义Controller、Service、Dao、Mapper、XML、VO等Java对象。在SpringBoot中使用1、添加magic-api相关依赖<!--接口快速开发框架 magic-api--><depen......
  • SpringBoot实现RequestBodyAdvice和ResponseBodyAdvice接口
    SpringBoot提供了一种机制,允许开发者在请求体(RequestBody)和响应体(ResponseBody)被处理之前和之后执行自定义逻辑。这通过RequestBodyAdvice和ResponseBodyAdvice接口实现。RequestBodyAdvice:此类用于在请求体被处理之前执行自定义逻辑。例如,你可以使用它来解析请求体,或者......