首页 > 其他分享 >ruoyi整合mqtt

ruoyi整合mqtt

时间:2023-04-21 19:35:12浏览次数:31  
标签:String ruoyi private mqtt client 整合 org import public

https://www.cnblogs.com/SjhCode/p/mqtt.html

ruoyi整合mqtt

mqtt
报错客户机未连接32104,可能是没连接上,也可能是两个客户端clientID相同,也可能是同一台机子subscribe(Topic,Qos)订阅了多次
在消费时,需要对方的通道有发送测试信息,我们才能取出来消费,消费完出队。

 

本地测试:

https://mqttx.app/zh

下载MQTTX测试工具,创建新连接,创建订阅。

将配置文件中的信息对应填写,编写java代码启动项目;

在MQTTX中的发送窗口中,选好对应topic,写入json发送。

看到控制台有log输出则成功;

 

导入依赖

 
        <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
 

 配置

 
  mqtt:
    username: admin
    password: admin
    host-url: tcp://broker.emqx.io:1883   # 服务器的地址和端口,这个需要改
    clientID: test1000    # 这个改不改随意,但不同的客户端肯定不能一样
    default-topic: testtopic/1       # 默认主题
    timeout: 100
    keepalive: 100
   
package com.ruoyi.web.controller.Mqtt;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @Classname MqttConfig
 * @Description mqtt相关配置信息
 * @Date 2020/3/5 11:00
 * @Created by bam
 */
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
    @Autowired
    private MqttPushClient mqttPushClient;

    /**
     * 用户名
     */
    // @Value("username")
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * 连接地址
     */
    private String hostUrl;
    /**
     * 客户Id
     */
    private String clientID;
    /**
     * 默认连接话题
     */
    private String defaultTopic;
    /**
     * 超时时间
     */
    private int timeout;
    /**
     * 保持连接数
     */
    private int keepalive;

    @Bean
    public MqttPushClient getMqttPushClient() {
//        System.out.println("hostUrl: "+ hostUrl);
//        System.out.println("clientID: "+ clientID);
//        System.out.println("username: "+ username);
//        System.out.println("password: "+ password);
//        System.out.println("timeout: "+timeout);
//        System.out.println("keepalive: "+ keepalive);
        mqttPushClient.connect(hostUrl, clientID, username, password, timeout, keepalive);
        System.out.println("MQTT服务启动成功");
        // 以/#结尾表示订阅所有以test开头的主题
        mqttPushClient.subscribe(defaultTopic, 0);
        return mqttPushClient;
    }


    public void setMqttPushClient(MqttPushClient mqttPushClient) {
        this.mqttPushClient = mqttPushClient;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getHostUrl() {
        return hostUrl;
    }

    public void setHostUrl(String hostUrl) {
        this.hostUrl = hostUrl;
    }

    public String getClientID() {
        return clientID;
    }

    public void setClientID(String clientID) {
        this.clientID = clientID;
    }

    public String getDefaultTopic() {
        return defaultTopic;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getKeepalive() {
        return keepalive;
    }

    public void setKeepalive(int keepalive) {
        this.keepalive = keepalive;
    }
}
 

 

 
package com.ruoyi.web.controller.Mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.ruoyi.web.controller.Mqtt.MqttPushClient;

/** * * @author bam * 2020年3月5日 * MqttPushClient.java * */ @Component public class MqttPushClient { private static final Logger logger = LoggerFactory.getLogger(com.ruoyi.web.controller.Mqtt.MqttPushClient.class); @Autowired private com.ruoyi.web.controller.Mqtt.PushCallback pushCallback; @Value("${spring.mqtt.default-topic}") private String topic ; private static MqttClient client; private static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { com.ruoyi.web.controller.Mqtt.MqttPushClient.client = client; } /** * 客户端连接 * * @param host ip+端口 * @param clientID 客户端Id * @param username 用户名 * @param password 密码 * @param timeout 超时时间 * @param keepalive 保留数 */ public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) { MqttClient client; try { client = new MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(true); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); com.ruoyi.web.controller.Mqtt.MqttPushClient.setClient(client); subscribe(topic,2); try { client.setCallback(pushCallback); client.connect(options); } catch (Exception e) { try { client.reconnect(); }catch (Exception e1){ e1.printStackTrace(); } e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 发布 * * @param qos 连接方式 * @param retained 是否保留 * @param topic 主题 * @param pushMessage 消息体 */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null == mTopic) { logger.error("topic not exist"); } MqttDeliveryToken token; try { logger.info(mTopic.toString()); token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅某个主题 * * @param topic 主题 * @param qos 连接方式 */ public void subscribe(String topic, int qos) { logger.info("开始订阅主题" + topic); try { MqttPushClient.getClient().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }
 

 

package com.ruoyi.web.controller.Mqtt;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.web.controller.Mqtt.MqttPushClient;
import com.ruoyi.web.controller.Mqtt.MqttConfig;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.ruoyi.common.websocket.*;

import javax.annotation.Resource;
import java.lang.reflect.Method;


/**
 * @Classname PushCallback
 * @Description 消费监听类
 * @Date 2019/4/11 23:31
 * @Created by Jack
 */
@Component
public class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private MqttConfig mqttConfig;

    private static MqttClient client;

    @Resource
    private WebSocket webSocket;

    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,一般在这里面进行重连
        logger.info("连接断开,可以做重连");
        if (null != client) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // subscribe后得到的消息会执行到这里面
        logger.info("接收消息主题 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));

        String msg= new String(mqttMessage.getPayload());
        String data = JSONObject.parseObject(msg).getString("data");
//        webSocket.sendMessage(data);

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("消息发送成功---------" + iMqttDeliveryToken.isComplete());
    }
}

标签:String,ruoyi,private,mqtt,client,整合,org,import,public
From: https://www.cnblogs.com/chuangsi/p/17341506.html

相关文章

  • ruoyi整合WebSocket
    https://www.cnblogs.com/SjhCode/p/WebSocket.html ruoyi整合WebSocket这里使用WebSocket目的:向前端推送实时消息,配合ActiveMQ接入三方使用的导入maven依赖 <!--WebSocket--><dependency><groupId>org.java-websocket</groupId><......
  • MQTT-发布与订阅的报文
    MQTT发布订阅流程在MQTT发布/订阅模式中,一个客户端既可以是发布者,也可以是订阅者,也可以同时具备这两个身份。当客户端发布一条消息时,它会被发送到代理,然后代理将消息路由到该主题的所有订阅者。当客户端订阅一个主题时,它会收到代理转发到该主题的所有消息发布消息报文-Publish......
  • springboot框架快速整合websocket
    1、【pom.xml】<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>2、【MsgType.java】/***@authorJHL*2019-08-109:56*/publicenumM......
  • 若依RuoYi框架浅析 基础篇①——日志logs本地保存
    文章目录日志保存位置在/home/ruoyi/logs/[root@iZ2ze30dygwd6yh7gu6lskZlogs]#cd/home/ruoyi/logs/[root@iZ2ze30dygwd6yh7gu6lskZlogs]#lssys-error.2021-02-28.logsys-error.logsys-info.2021-02-28.logsys-info.2021-03-01.logsys-info.logsys-user.2021-0......
  • MQTT基础介绍
    MQTT与HTTP的区别HTTP协议是客户端与服务端直连请求与响应MQTT是基于发布订阅模型的轻量级的消息传输协议MQTT能力发布:Publish订阅:Subscribe代理:Broker,管理通信执行模式:客户端发送消息到broker,broker将消息发送给订阅过的客户端MQTT通信模式一对一:点对点通信......
  • MQTT报文分析
    一、问题引入MQTT属于应用层协议,基于TCP/IP架构实现,那么它的报文是如何定义的呢?或许可以像分析http协议那样,利用抓包工具:wireshark分析报文。二、解决过程......
  • 项目-mqtt阿里云传输图像,AD,10路输入,8路输出
     程序下载1,配置阿里云物联网平台参数 2,下载程序   3,正常运行阿里云物联网平台会显示设备在线  提示:上报开关量等数据设备发布的主题为: /a1ykoHAGGPL/${deviceName}/user/update上报摄像头数据设备发布的主题为:  /a1ykoHAGGPL/${deviceName}/use......
  • 整合swagger2
    添加配置类importcom.google.common.base.Predicates;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importspringfox.documentation.builders.ApiInfoBuilder;importspringfox.documentation.build......
  • ztree初始化时选中(ruoyi版)
    ruoyi版本:4.6.0问题描述将后台传入的参数放到$.tree中,当ztree的Node中checked为true时,Node默认为选中,目前前台调用代码varurl=ctx+"获得List<Ztree>的URL";varoptions={url:url,expandLevel:2,beforeClick:function(treeId,treeNode,clickFlag){......
  • Kubuesphere部署Ruoyi(三):持久化存储配置
    按照如下教程配置NFS先服务器:https://kubesphere.io/zh/docs/v3.3/reference/storage-system-installation/nfs-server/后客户端:https://kubesphere.io/zh/docs/v3.3/installing-on-linux/persistent-storage-configurations/install-nfs-client/按照链接操作以后,在客户端上......