首页 > 其他分享 >常用的物联网消息队列-Mqtt协议

常用的物联网消息队列-Mqtt协议

时间:2024-11-12 17:43:47浏览次数:3  
标签:fork String 队列 mqtt 联网 Mqtt client org import

EMQX 和 Mosquitto 都是广泛使用的 MQTT 消息代理,但它们在设计目标、功能和适用场景上有一些显著的区别。

Emqx使用教程

添加依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

添加配置

fork.mqtt.broker=tcp://127.0.0.1:1883
fork.mqtt.clientId=fork.application.forklift.userInfo
fork.mqtt.push.topic=application/2/device/fork/pushmsg
fork.mqtt.pull.topic=application/2/device/fork/pullmsg

 推送消息

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class SendMessages {
    @Value("${fork.mqtt.broker}")
    private String mqttUrl;

    @Value("${fork.mqtt.clientId}")
    private String clientId;

    @Getter
    @Setter
    private static MqttClient client;

    @Value("${fork.mqtt.push.topic}")
    private String sendTopic;


    public void publish(String json){
        try{
            MemoryPersistence persistence = new MemoryPersistence();
            client = new MqttClient(mqttUrl, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);
            //自动重连
            connOpts.setAutomaticReconnect(true);
            client.connect(connOpts);
            MqttMessage message = new MqttMessage();
            message.setQos(1);
            message.setRetained(false);
            message.setPayload(json.getBytes());
            log.info("推送消息内容:{}", message);
            client.publish(sendTopic, message);
        }catch(Exception e){
            log.error("推送信息数据失败 >>> {}", e.getMessage(), e);
        }
    }
}

 接收消息

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Subscriber {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${fork.mqtt.broker}")
    private String mqttUrl;

    @Value("${fork.mqtt.clientId}")
    private String clientId;

    @Value("${fork.mqtt.pull.topic}")
    private String topic;

    @Value("${fork.mqtt.push.topic}")
    private String sendTopic;

    @Getter
    @Setter
    private static MqttClient client;

    @Bean
    public void receiveMessage() {
        //MemoryPersistence设置clientid的保存形式,默认为以内存保存
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            client = new MqttClient(mqttUrl, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);
            //自动重连
            connOpts.setAutomaticReconnect(true);

            connOpts.setKeepAliveInterval(60); // 心跳间隔,单位为秒
            connOpts.setConnectionTimeout(30); // 连接超时时间,单位为秒
            client.connect(connOpts);
            client.setCallback(new MqttCallback() {
                //接收消息
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // subscribe后得到的消息会执行到这里面
                    byte[] payloadBytes = message.getPayload();
                    String receivedData = new String(payloadBytes);
                    // 对接收到的数据进行处理...
                    try {
                        /*CarMqttCallback mqttCallback = JSON.parseObject(receivedData, CarMqttCallback.class);*/
                        log.info("消息-----{}",receivedData);
                    } catch (Exception e) {
                        log.info("接收mqtt数据错误!!!!!!!!");
                    }
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete----------" + token.isComplete());
                }

                @Override
                public void connectionLost(Throwable cause) {
                    logger.info("[MQTT] 连接断开,30S之后尝试重连..." + cause);
                    // 连接断开
                    while (true) {
                        try {
                            Thread.sleep(10000);
                            if (null != client) {
                                // 重新连接
                                client.reconnect();
                            }
                            break;
                        } catch (Exception e) {
                            e.printStackTrace();
                            continue;
                        }
                    }

                }
            });
            client.subscribe(topic, 1);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }

}

 官方文档:EMQX 概览 | EMQX文档

标签:fork,String,队列,mqtt,联网,Mqtt,client,org,import
From: https://blog.csdn.net/weixin_40228547/article/details/143645612

相关文章

  • atrm——删除待执行任务队列中的指定任务
    转自于:https://github.com/jaywcjlove/linux-command,后不赘述atrm删除待执行任务队列中的指定任务补充说明atrm命令用于删除待执行任务队列中的指定任务。语法atrm(选项)(参数)选项-V:显示版本号。参数任务号:指定待执行队列中要删除的任务。实例删除已经排队的任......
  • 2024 年产品经理必看的项目管理工具:互联网行业产品经理有哪些选择?
    一、产品经理与项目管理工具的紧密关联产品经理在项目管理中扮演着至关重要的角色。他们负责产品从概念到上市的全过程,需要掌控各个环节的进度,确保产品按时按质交付,满足用户需求。在这个过程中,项目管理工具成为了产品经理的得力助手。产品经理需要进行项目规划,明确项目的目标、......
  • 云消息队列 Kafka 版全面升级:经济、弹性、稳定,成本比自建最多降低 82%
    作者:娜米本文整理于2024年云栖大会阿里云智能集团产品专家张凤婷带来的主题演讲《云消息队列Kafka版全面升级:经济、弹性、稳定》云原生消息产品十年磨一剑消息产品的演进可以大致分为三个主要阶段:起步阶段:初期,市场上缺乏能够支撑大规模业务场景的优秀消息产品,无论是商......
  • [阻塞队列]
    目录1.阻塞队列2.阻塞队列的优点(1)实现服务器之间的"低耦合".(2)实现"削峰填谷"的功能.3.阻塞队列代码举例4.自己实现阻塞队列1.阻塞队列我们知道,标准库中原有的队列Queue及其子类,都是线程不安全的,所以java封装了一个名为"阻塞队列"(BlockingQueue)......
  • 郝玩的数据结构1——单调栈,单调队列
    栈和队列是很郝咏的Stl,那么,我们手搓——用数组模拟故事背景:辣鸡老o在学单调栈&单调队列——我栈top为栈顶,易得出出栈即top--,入栈++top=(ovo)......——完全不会讲,那么上马:点击查看代码#include<bits/stdc++.h>usingnamespacestd;constintN=114514;intstk[N],top=0;......
  • 【MQTT】代理服务比较RabbitMQ、Mosquitto 和 EMQX
    前言目前要处理大量设备同时频繁发送数据的情况,MQTT协议确实是一个更优的选择,因为它特别适合需要低带宽和高效能的物联网应用,下面是对目前主流协议的对比数据截止日期:2024年11月10日基础设施后端:springcloud项目设备端:IOT设备,每秒上报数据对比项特性RabbitMQMosqui......
  • 迟来的2023秋招总结,互联网&银行&国企&腾讯
    复制自本人知乎首先现在是2023年四月中旬,毕业的事情暂时告一段落,于是想吐槽顺便记录一下。23秋招其实是在2022年,而2022实在是这几年来行情最差的一年。犹记得20、21年秋天各个大厂号称“史上最大规模的秋招“,hc多开的薪资也高,一年比一年倒挂。当时我还觉得23年会更美好,没想到现......
  • 代码随想录算法训练营第十天 | 232.用栈实现队列+225. 用队列实现栈+20. 有效的括号+1
    加入训练营有点晚,没跟上任务就开始有些偷懒没有真的认真开始,从第十天开始下决心认真完成每天任务还有博客记录学习过程。栈与队列理论基础首先是学习了栈和队列的理论基础,队列 先进先出,栈 先进后出。栈 以底层容器完成其所有的工作,且对外接口统一,有.push,.pop等,不提供......
  • web组态--新一代全流程低代码物联网平台
    实际完成效果​​​​​​​​​1.添加应用图纸 登录by组态后台:http://www.byzt.net:90​​点击组态管理-画面管理,先新建一个组态画面,填写画面名称,保存,进入组态画面。​​选择画面管理,点击图示位置编辑画面,来构建组态。​​开始画组态图。​​2.组态绘......
  • 10. 基于 Redis 实现消息队列
    消息队列在分布式系统中非常重要,能够有效解耦系统的各个模块,提供异步处理能力和缓冲能力。Redis作为一个高性能的内存数据库,除了缓存和持久化存储,它还能充当轻量级的消息队列。使用Redis处理消息队列有助于提高系统的吞吐量和可扩展性。一、使用场景消息队列的应用场景......