首页 > 其他分享 >springboot集成mqtt

springboot集成mqtt

时间:2024-01-30 18:56:18浏览次数:38  
标签:集成 springboot options topic mqtt import 客户端 public String

SpringBoot集成MQTT(简单版)

一、docker安装emqx环境(Linux系统)

emqx:mqtt服务器(broker)

version: '3'
services:
  emqx:
    image: emqx/emqx
    container_name: emqx
    restart: always
    ports:
      - 8001:18083
      - 8002:1883
      - 8003:8083
      - 8004:8084
      - 8005:8883

端口介绍:

1883 MQTT TCP 协议端口,发送报文就是走的这个端口
8883 MQTT/TCP SSL 端口
8083 MQTT/WebSocket 端口
8084 MQTT/WebSocket with SSL 端口
8080 MQTT执行引擎HTTP API 端口
18083 EMQX Dashboard 管理控制台端口

二、Java代码

1、配置文件
# MQTT配置信息
mqtt:
  # MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开
  url: tcp://ip:1883
  # 默认的用户名
  username: admin
  # 默认的密码
  password: public
  # 客户端id(不能重复)
  consumerClientId: consumer-13579
  providerClientId: provider-24680
  # MQTT默认的消息推送主题,实际可在调用接口时指定
  defaultTopic: topic1111111
2、配置类
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;


@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@EnableConfigurationProperties(value = MqttProperties.class)
@Primary
public class MqttProperties {

    private String username;

    private String password;

    private String url;

    private String providerClientId;

    private String consumerClientId;

    private String defaultTopic;
}

3、MQTT客户端启动配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Configuration
@Slf4j
public class MqttConfig {

    @Resource
    private MqttProperties properties;

    @Resource
    private MqttConsumerCallBack consumerCallBack;

    /**
     * 客户端对象
     */
    private MqttClient providerClient;
    private MqttClient consumerClient;

    /**
     * 在bean初始化后连接到服务器
     */
    @PostConstruct
    public void init(){
        connectProvider();
        connectConsumer();
    }

    /**
     * 生产客户端连接服务端
     */
    public void connectProvider(){
//        try{
//            // 创建MQTT客户端对象-发布者
//            providerClient = new MqttClient(properties.getUrl(), properties.getProviderClientId(), new MemoryPersistence());
//            MqttConnectOptions options = getMqttConnectOptions();
//            // 服务器每次连接重新保存信息
//            options.setCleanSession(true);
//            // 设置回调
//            providerClient.setCallback(new MqttProviderCallBack());
//            providerClient.connect(options);
//        } catch(MqttException e){
//            log.error("mqtt发布端连接服务器失败", e);
//        }
    }

    /**
     * 消费客户端连接服务端
     */
    public void connectConsumer(){
        try {
            // 创建MQTT客户端对象-订阅者
            consumerClient = new MqttClient(properties.getUrl(), properties.getConsumerClientId(), new MemoryPersistence());
            MqttConnectOptions options = getMqttConnectOptions();
            // 服务器每次连接重新保存信息
            options.setCleanSession(true);
            // 设置回调,new的对象在spring中拿bean麻烦,这里直接初始化
            consumerClient.setCallback(consumerCallBack);
            // 设置自动重新连接,因为异常原因断开连接后进行重连
            options.setAutomaticReconnect(true);
            consumerClient.connect(options);
            // 订阅主题,消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
            consumerClient.subscribe(TopicDetails.topics, TopicDetails.qos);
            log.info("mqtt消费端与服务器连接成功");
        } catch (MqttException e) {
            log.error("mqtt消费端连接服务器失败", e);
        }
    }

    /**
     * 发布消息
     * @param qos
     * @param retained
     * @param topic
     * @param message
     */
    public void publish(int qos,boolean retained,String topic,String message){
        try {
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(retained);
            mqttMessage.setPayload(message.getBytes());
            // 主题的目的地,用于发布/订阅信息
            MqttTopic mqttTopic = providerClient.getTopic(topic);
            // 提供一种机制来跟踪消息的传递进度
            // 用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
            MqttDeliveryToken token;
            // 将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            // 一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取mqtt连接
     * @return
     */
    public MqttConnectOptions getMqttConnectOptions(){
        // 连接设置
        MqttConnectOptions options = new MqttConnectOptions();
        // 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
        // 设置为true表示每次连接服务器都是以新的身份
        options.setCleanSession(true);
        // 设置连接用户名
        options.setUserName(properties.getUsername());
        // 设置连接密码
        options.setPassword(properties.getPassword().toCharArray());
        // 设置超时时间,单位为秒
        options.setConnectionTimeout(100);
        // 设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        options.setKeepAliveInterval(20);
        // 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
        options.setWill("willTopic",(properties.getProviderClientId() + "与服务器断开连接").getBytes(),0,false);
        return options;
    }
}

/**
 * mqtt主题
 */
public class TopicDetails {

    public static final String[] topics = new String[]{
            // 此处添加topic,需要在下面的 qos 属性里面也要对应加一个
//            "topic_3/+/test",
            "topic_1",
            "topic_2"
    };

    public static final int[] qos = new int[]{
            // 此处数量保持和 topics 一致
            1,
            1
    };

    /**
     * 判断当前主题是否为我们需要的主题
     * @param topic
     * @return
     */
    public static boolean checkTopic(String topic){
        for (String s : topics) {
            if(s.equals(topic)){
                return true;
            }
        }
        return false;
    }

}

4、消费数据回调
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * mqtt数据消费回调
 */
@Component
@Slf4j
public class MqttConsumerCallBack implements MqttCallback{

	// 业务代码
    @Resource
    private IMqttService mqttService;
    @Resource
    private MqttConfig mqttConfig;

    /**
     * 客户端断开连接的回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("mqtt消费端与服务器断开连接:{}", throwable.getMessage());
        mqttConfig.connectConsumer();
    }

    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String content = new String(message.getPayload());
        log.info("接收消息主题:{},接收消息内容:{}", topic, content);
//        System.out.println(String.format("接收消息主题 : %s",topic));
//        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
//        System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
//        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
        // 数据存储
        mqttService.senMessage(topic, content);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        // 消息发布成功的回调,消费端无
    }
}

5、生产数据回调
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * mqtt生产发布消息回调
 */
@Slf4j
public class MqttProviderCallBack implements MqttCallback{

    /**
     * 客户端断开连接的回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("mqtt生产端与服务器断开连接,可重连");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // 消息到达的回调,发布端无
    }

    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info(String.format("接收消息成功"));
    }
}

三、测试工具

推荐使用:MQTT.fx

标签:集成,springboot,options,topic,mqtt,import,客户端,public,String
From: https://www.cnblogs.com/xy20211005/p/17997751

相关文章

  • springboot项目启动时候初始化一些数据
    最近在看缓存预热的问题的时候,其中有一种解决方法,就是在项目启动的时候就自动加载到缓存中那缓存我就不说了,就关于项目启动的时候,可以初始化一些数据,以下为两种初始化的方式,可以参考1、编写类去实现ApplicationRunner接口,实现run()方法。2、编写类去实现CommandLineRunner接口,......
  • 7000字详解Spring Boot项目集成RabbitMQ实战以及坑点分析
    本文给大家介绍一下在SpringBoot项目中如何集成消息队列RabbitMQ,包含对RibbitMQ的架构介绍、应用场景、坑点解析以及代码实战。最后文末有免费领取龙年红包封面以及腾讯云社区答题领奖福利,欢迎大家领取。我将使用waynboot-mall项目作为代码讲解,项目地址:https://github.co......
  • Java 系统学习 | Springboot 数据验证
    本篇使用Springboot3框架,IDEA2022编辑器,java17版本。在上一篇的基础上进行优化添加依赖在pom.xml中添加依赖,记得更新maven<!--validation依赖--><dependency><groupId>org.springframework.boot</groupId><artifactI......
  • Springboot开发者的福音!免费好用的一站式IDE解决方案来了!SpringToolSuite4登场!
    SpringToolSuite4介绍最近由于工作原因,需要自己编写springboot应用(不是特别复杂),代码量不是很大,但是在选择IDE上却浪费了我很多时间!如果大家跟我一样,在开发springboot应用的过程中遇到如下两个问题:苦于Idea的版权问题讨厌在VisualStudio中安装各种令人头疼的插件那么我们不妨试一下......
  • SpringBoot实现分页的四种方式
    一自己封装Page对象实现二使用sql实现分页2.1场景分析前段传递给给后台什么参数?当前页码currentPage每页显示条数pageSize后台给前端返回什么数据?当前页数据List总记录数totalCount、2.2前段代码<template><el-pagination@size-change="handleSizeChan......
  • SpringBootTest
    引入依赖pom文件中添加以下依赖<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-test</artifactId><scope>test</scope>&l......
  • 最全的项目部署+持续集成解决方案:Jenkins + git + docker
    最全的项目部署+持续集成解决方案:Jenkins+git+docker:https://blog.csdn.net/m0_45806184/article/details/126408527?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-126408527-blog-128137274.235^v43^control&spm=1001.21......
  • SpringBoot中集成Minio高性能分布式存储文件服务入门
    场景若依前后端分离版手把手教你本地搭建环境并运行项目:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108465662参考上面搭建项目。MinioMinio是基于Go语言编写的对象存储服务,适合于存储大容量非结构化的数据,例如图片、音频、视频、日志文件、备份数据和容器/......
  • 小项目:使用MQTT上传温湿度到Onenet服务器
    前言我们之前分别编写了DHT11、ESP8266和MQTT的代码,现在我们将它们仨整合在一起,来做一个温湿度检测小项目。这个项目可以实时地将DHT11传感器获取到的温湿度数据上传到OneNET平台。通过登录OneNET,我们随时随地可以查看温湿度数据。这种环境监测项目的应用场景有很多,其中......
  • 解决 JUnit 版本引起的 SpringBoot 测试环境加载问题
    SpringBoot项目初始化后尝试自己编写测试类时报错空指针异常,在此记录下解决方法,如有错误,欢迎指正!1.问题描述1.1报错信息在执行SpringBoot单元测试时遇到如下报错信息:java.lang.NullPointerException atcom.thr.usercenter.SampleTest.testSelect(SampleTest.java:25......