首页 > 其他分享 >MQTT

MQTT

时间:2024-10-11 13:23:23浏览次数:13  
标签:String mqtt springframework MQTT import org payload

安装

服务端

EMQX

客户端

MQTTX
image

Java集成SrpingBoot

pom.xml

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>

application.yml

mqtt:
  broker:
    # 设备MQTT应用配置
    url: tcp://your_server_ip:1883
    username: your_username
    password: your_password
    # 自定义约定client_id
    clientId: your_client_id
  topics:
    # 默认监听
    default: your/default_topic
    # 自定义监听
    subscriptions:
      - your/topic1
      - your/topic2

# application.properties配置如下
# MQTT 设备应用配置
# mqtt.broker.url=tcp://your_server_ip:1883
# mqtt.broker.username=your_username
# mqtt.broker.password=your_password
# mqtt.broker.clientId=terminalTcpServer
# MQTT 监听的默认主题和自定义订阅主题
# mqtt.topics.default=your/default_topic
# mqtt.topics.subscriptions=your/topic1,your/topic2

MqttConfig配置类

import com.baoer.terminaltcpserver.mqtt.utils.MqttUtil;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.beans.factory.annotation.Autowired;

@Configuration
public class MqttConfig {

//    @Autowired
//    private MqttUtil mqttUtil;

    @Value("${mqtt.broker.url}")
    private String brokerUrl;

    @Value("${mqtt.broker.username}")
    private String username;

    @Value("${mqtt.broker.password}")
    private String password;

    @Value("${mqtt.broker.clientId}")
    private String clientId;

    @Value("${mqtt.topics.default}")
    private String defaultTopic;

    @Value("${mqtt.topics.subscriptions}")
    private String[] topics;

    // 配置MQTT客户端工厂
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // (接收消息:步骤一)定义一个消息通道,用于接收消息
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    // (接收消息:步骤二)配置消息驱动通道适配器,用于接收指定主题的消息
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), topics);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    // (接收消息:步骤三)配置消息处理器,用于处理接收到的消息
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler(MqttUtil mqttUtil) {
        return message -> {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
            String payload = message.getPayload().toString();
            System.out.println("接收到消息: 主题 = " + topic + ", 内容 = " + payload);

            // 发布接收到的消息事件
            mqttUtil.publishReceivedMessage(topic, payload);
        };
    }

    // 配置MQTT消息发送处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic); // 设置默认主题
        return messageHandler;
    }

    // 定义一个消息通道,用于发送消息
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

MqttUtil 工具类

import com.baoer.terminaltcpserver.mqtt.config.MqttMessageEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class MqttUtil {

    @Autowired
    private MessageChannel mqttOutboundChannel;

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    /**
     * 发送消息到默认主题
     * @param payload 消息内容
     */
    public void sendMessage(String payload) {
        mqttOutboundChannel.send(MessageBuilder.withPayload(payload).build());
    }

    /**
     * 发送消息到指定主题
     * @param topic 主题
     * @param payload 消息内容
     */
    public void sendMessage(String topic, String payload) {
        mqttOutboundChannel.send(MessageBuilder.withPayload(payload)
                .setHeader("mqtt_topic", topic)
                .build());
    }

    /**
     * (接收消息:步骤四)发布接收到的MQTT消息事件
     * @param topic 主题
     * @param payload 消息内容
     */
    public void publishReceivedMessage(String topic, String payload) {
        MqttMessageEvent event = new MqttMessageEvent(this, topic, payload);
        eventPublisher.publishEvent(event);
    }
}

MqttMessageEvent消息事件类

import org.springframework.context.ApplicationEvent;

public class MqttMessageEvent extends ApplicationEvent {

    private final String topic;
    private final String payload;

    public MqttMessageEvent(Object source, String topic, String payload) {
        super(source);
        this.topic = topic;
        this.payload = payload;
    }

    public String getTopic() {
        return topic;
    }

    public String getPayload() {
        return payload;
    }
}

MqttMessageListener消息监听器

import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageListener {

    /**
     * (接收消息:步骤五)事件监听器中处理消息
     * 说明:此方法将接收到上方 MqttUtil.publishReceivedMessage() 发布的事件并处理这些消息
     *
     * @param event 接收到的事件
     */
    @EventListener
    public void handleMqttMessage(MqttMessageEvent event) {
        String topic = event.getTopic();
        String payload = event.getPayload();
        System.out.println("处理接收到的消息: 主题 = " + topic + ", 内容 = " + payload);

        // 根据不同的主题进行不同的处理
        if ("your/topic1".equals(topic)) {
            handleTopic1Message(payload);
        } else if ("your/topic2".equals(topic)) {
            handleTopic2Message(payload);
        }
    }

    private void handleTopic1Message(String payload) {
        // 处理来自 topic1 的消息
        System.out.println("处理 topic1 消息: " + payload);
    }

    private void handleTopic2Message(String payload) {
        // 处理来自 topic2 的消息
        System.out.println("处理 topic2 消息: " + payload);
    }
}

测试

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttUtil mqttUtil;

    @RequestMapping("/send")
    @ResponseBody
    public String sendMessage() {
        mqttUtil.sendMessage("your/topic1", "Hello from Spring Boot");
        return "Message sent!";
    }
}

调用接口发送成功
image
使用MQTT X查看是否接受成功,并发送测试
image
查看EMQX Dashboard中MQTTX工具的连接
image

运行流程

1.在 件中配置MQTT服务器的URL、用户名、密码、client_id、默认主题和订阅的主题。
2.启动MQTT Broker。
3.确保硬件设备连接到MQTT Broker,并发布和订阅消息。
4.启动Spring Boot应用程序,确保它可以发送和接收MQTT消息。
5.在控制台中查看接收到的消息,并根据不同的主题进行处理(设备上行消息要用不同的topic主题发布)。

参考:
https://blog.csdn.net/weixin_43822632/article/details/141194093

标签:String,mqtt,springframework,MQTT,import,org,payload
From: https://www.cnblogs.com/aeolian/p/18457763

相关文章

  • Nodered学习记录-MQTT
    安装EMQXEMQX(以前称为EMQ)是一个开源的、高度可扩展且高可用的分布式MQTT消息代理,专为物联网(IoT)、机器对机器(M2M)通信和移动应用程序设计。它支持MQTT和其他IoT协议如CoAP/LwM2M,能够处理数百万并发连接,并提供强大的消息路由能力。通过docker安装官方文档$dockerpullem......
  • .NET 开源高性能 MQTT 类库
    阅读目录前言项目介绍功能说明功能特点应用场景使用方法项目地址总结最后前言随着物联网(IoT)技术的迅猛发展,MQTT(消息队列遥测传输)协议凭借其轻量级和高效性,已成为众多物联网应用的首选通信标准。MQTTnet作为一个高性能的.NET开源库,为.NET平台上的MQTT客户端......
  • STM32F103C8T6+ESP8266+MQTT+EMQX完成数据上传和点灯环节
    本文参考以下文章:【最简单】STM32+ESP8266+MQTT+EMQX完成数据上传和点灯环节_stm32如何连接emqx-CSDN博客STM32+ESP8266通过MQTT协议连接本地EMQX(保姆级教学!资料开放!)_stm32驱动esp8266本地emqx-CSDN博客 一、在windows环境下或Ubuntu搭建EMQX云平台 参看这篇文章:搭建自己的M......
  • Linux安装MQTT 服务器(图文教程)
    MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,专为低带宽和不稳定的网络环境设计,非常适合物联网(IoT)应用。官网地址:https://www.emqx.com/一、版本选择根据自己的操作系统进行下载即可,推荐使用rpm安装方式。下载地址:https://www.emqx.com/zh/downloads-and-i......
  • 阿里云ecs使用nginx部署mqtt服务的tcp转发
    一、什么是MQTT?MQTT(MessageQueuingTelemetryTransport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。二、为什么MQTT是适用于物联网的最......
  • ARMxy 嵌入式工业计算机中 MQTT 通信协议的热门案例探讨
    MQTT通信协议案例1案例说明案例功能:使用node-red工具与libmosquitto(MQTTversion3.1.1clientlibrary)的API通过MQTT代理服务器通信。基于MQTT通信协议,实现发布和订阅消息功能。程序流程图如下图2案例测试本案例使用设备node-red工具与上位机tto工具通信。请......
  • 实现高效物联网通信:MQTT协议深入解析
    MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,最初由IBM于1999年开发,目的是为了监控远程设备的传感器和嵌入式系统之间的通信。它的目标是提供一种简单、高效、可靠的消息传递机制,以满足低带宽、不稳定网络环境下的通信需求。MQTT是适用于物联网的最佳协议M......
  • HomeAssistant|【实战篇】基于MQTT的零代码、少配置,设备高效接入方法
    HomeAssistant是一个智能家居自动化平台,允许跨生态跨平台的设备连接到一起,做统一管理和设备自动化等功能,十分强大和灵活;在前面两篇关于MQTT接入HomeAssistant的文章【入门篇】和【高级篇】中,我们已经了解到如何把一个MQTT设备接入到HA,并且也知道了一些比较高级的用法,但是文......
  • Python 客户端类库之paho-mqtt学习总结
    实践环境Python3.9.13paho-mqtt2.1.0简介EclipsePahoMQTTPython客户端类库实现了MQTT协议版本5.0,3.1.1,和3.1。该类库提供一个客户端类,允许应用连接到MQTT代理并发布消息,订阅主题并检索发布的消息。同时还提供了一个写其它辅助函数,使向MQTT服务器发布一次性消息变......
  • 使用Postman测试MQTT协议接口
    MQTT概述MQTT(MessageTelemetryTransport)是一种用于物联网(IoT)的消息传递协议。它的使用范围从家庭自动化和可穿戴设备的小型设备到大型工业机械的自动化。它是一种轻量级技术,以发布/订阅模式为模型,其中连接到单个代理的客户端可以将消息发布到不同的主题,并订阅主题以接收来自......