首页 > 其他分享 >SpringBoot 结合官网对MQTT消息队列整合记录

SpringBoot 结合官网对MQTT消息队列整合记录

时间:2024-07-25 17:10:28浏览次数:9  
标签:String MQTT client MQ org import 官网 public SpringBoot

SpringBoot 结合官网对MQTT消息队列整合

首先是 maven Pom 的引入 Mqtt Client

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>

其次编写一个Mqtt回调方法类 去实现 MqttCallback 接口 用于消息的接收


import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.Base64;

/**
 * @author caicai
 *
 *
 * 消息回调显示
 */
@Slf4j
public class PushCallback implements MqttCallback {

    /**
     * 连接丢失
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {

    }
    /**
     * 消息到达
     * @param s
     * @param mqttMessage
     * @throws Exception
     */

    @Override
    public void messageArrived(java.lang.String s, MqttMessage mqttMessage) throws Exception {
        log.info("com.kxcd.testkfka.mqtt.tools.PushCallback.messageArrived===>接收到topic {} ",s);
        byte[] payload = mqttMessage.getPayload();
        String s1 = Base64.getEncoder().encodeToString(payload);
        log.info("PushCallback.messageArrived===>接收到消息内容 {} ",mqttMessage);
    }



    /**
     * 消息发送完成
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}

为了代码的美观,这里写死配置,都定义的常量,如果真正使用的话,建议这里修改成 读取配置方式 例如 nacos

/**
 * @author caicai
 */
public class MqConfigConstant {

    public class MqClientConfig {
        // 用户名称
        public static final String MQ_USER_NAME = "test";
        // 用户密码
        public static final String MQ_PASS_WORD = "test";
        // 默认主题
        public static final String MQ_DEFAULT_TOPIC = "test";
        // 客户端ID
        public static final String MQ_CLIENT_ID = "TEST_MQ";
        // 连接地址
        public static final String MQ_CONNECT_URL = "tcp://47.102.199.23:1883";

    }
    public class ConstantNumber {
        // 默认消息质量
        public static final int MQ_QOS1 = 1;
    }
}

编写一个初始化配置类

这里本来是打算使用单例的模式进行配置的,写了一个单例的工具类,后面觉得这样方式 很麻烦 ,于是决定 修改成一次性初始化的

package com.xx.xx.mqtt.config;

import com.xx.xx.constant.MqConfigConstant;
import com.xxx.xx.mqtt.xx.PushCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @author caicai
 */

@Component
public class MqClientConfig {

    private  MqttClient mqttClientInit;
    @Bean
    public MqttClient getMqttClient() {
        // 首先判断一下是否已经初始化过,如果已经初始化过,则直接返回
        if(mqttClientInit == null){
            synchronized (MqClientConfig.class){
                if(mqttClientInit == null){
                    try {
                        MemoryPersistence persistence = new MemoryPersistence();
                        MqttClient client = new MqttClient(MqConfigConstant.MqClientConfig.MQ_CONNECT_URL, MqConfigConstant.MqClientConfig.MQ_CLIENT_ID, persistence);
                        // 设置连接时协带参数
                        MqttConnectOptions connOpts = new MqttConnectOptions();
                        //  设置连接时候需要用的用户名和密码
                        connOpts.setUserName(MqConfigConstant.MqClientConfig.MQ_USER_NAME);
                        connOpts.setPassword(MqConfigConstant.MqClientConfig.MQ_PASS_WORD.toCharArray());
                        // 连接
                        client.connect(connOpts);
                        // 调用之前的回调方法类
                        client.setCallback(new PushCallback());
                        // 设置默认订阅 -这里我为了方便测试,加的。可以去掉
                        client.subscribe(MqConfigConstant.MqClientConfig.MQ_DEFAULT_TOPIC);
                        mqttClientInit = client;
                    }catch (Exception e){
                        System.out.println("初始化MqttClient失败");
                    }
                }
            }
        }
        return mqttClientInit;
    }
}

写提供实现服务,来方便后续的调用 例如跨服务调用

@Service
public class MqttService {
    @Resource
   private MqttClient mqttClient;
    public void setSubscribe(String subTopic) throws MqttException {
        mqttClient.subscribe(subTopic);
    }
    public void  setPublish(String pubTopic,String msg) throws MqttException {
        MqttMessage message = new MqttMessage(msg.getBytes());
        mqttClient.publish(pubTopic,message);
    }
    public void setPublish(String pubTopic,String msg,int qos) throws MqttException {
        MqttMessage message = new MqttMessage(msg.getBytes());
        message.setQos(qos);
        mqttClient.publish(pubTopic,message);
    }

}

总结 :一个主题可以多个客户端订阅,如果推送消息给这个主题里,两个客户端都会收到相关消息。其次客户端可以订阅,也可发送。

标签:String,MQTT,client,MQ,org,import,官网,public,SpringBoot
From: https://www.cnblogs.com/caicai920/p/18323657

相关文章

  • SpringBootApplication入口调用service类方法
    要在publicstaticvoidmain(String[]args)中调用Service的方法,需要在Application类中手动获取Spring容器,并从中获取Service的实例。示例如下:启动入口程序@SpringBootApplicationpublicclassRouteApplication{publicstaticvoidmain(String[]args){......
  • SpringBoot3.x整合Druid数据库连接池
    引入依赖<!--Druid数据库连接池--><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-3-starter</artifactId><version>1.2.21</version></dependency......
  • springboot+vue前后端分离项目:导出功能报错Request processing failed: cn.hutool.cor
    1.报错截图: 2.hutool官网,推荐引入poi-ooxml依赖 3.mvn仓库找到依赖 4.用最新版依赖 5.复制到本项目pom.xml,刷新maven 解决......
  • springboot自学(2)Bean的加载控制
    Bean的加载控制Bean的加载控制指根据特定情况对bean进行选择性加载以达到适用于项目的目标。那么哪些加载方式适用于编程的形式加载控制呢    控制的演示  注解形式控制bean加载使用@conditional注解的派生注解设置各种组合条件控制bean的加载 有诸多......
  • Springboot网络安全宣传小程序 毕业设计源码70468
                         摘 要随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,网络安全宣传小程序被用户普遍使用,为方便用户能够......
  • SpringBoot + MyBatis 实现 MySQL 主从复制动态数据源切换
    概述在项目中的某些场景中,需要对数据库进行一些优化。常用的有如下的实现方法:读写分离、引入缓存技术、主从复制、分库分表等。今天来简单介绍一些如何在程序中实现动态切换数据源,可能某台服务器性能比较好,让流量多的方法执行切换到此数据源去操作等等。当然这种思想也可以扩展......
  • Java毕业设计:基于Springboot+vue的电影院管理系统
    【辰兮要努力】:hello你好我是辰兮,很高兴你能来阅读,昵称是希望自己能不断精进,向着优秀程序员前行!博客来源于项目以及编程中遇到的问题总结,偶尔会有读书分享,我会陆续更新Java前端、后台、数据库、项目案例等相关知识点总结,感谢你的阅读和关注,希望我的博客能帮助到更多的人,分享......
  • Java毕业设计:基于SpringBoot+Vue的养老院系统
    一、选题背景意义......
  • 错误 1 error LNK2019: 无法解析的外部符号 _MQTTClient_create,该符号在函数 "protect
    前言全局说明错误1errorLNK2019:无法解析的外部符号_MQTTClient_create,该符号在函数"protected:virtualint__thiscallCmfc_mqttclientpoweronoffDlg::OnInitDialog(void)"(?OnInitDialog@Cmfc_mqttclientpoweronoffDlg@@MAEHXZ)中被引用一、说明环境:Windows1......
  • go对接mqtt
    在Go语言中对接MQTT服务,你可以使用 paho.mqtt.golang 这个库,这是一个EclipsePahoMQTT客户端的Go语言实现。以下是一些基本步骤和示例代码,帮助你开始使用Go语言对接MQTT服务。1. 安装MQTT客户端库:使用Go的包管理器安装 paho.mqtt.golang 库:gogetgithub.com/eclipse/......