首页 > 其他分享 >mqtt-emqx:设置遗嘱消息

mqtt-emqx:设置遗嘱消息

时间:2024-06-08 22:29:24浏览次数:15  
标签:String public mqtt client 遗嘱 new emqx message options

【pom.xml】

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.3.12.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.49</version>
</dependency>

【MyDemo6MqttCallback.java】

package com.chz.myMqttV3.demo6;

@Slf4j
public class MyDemo6MqttCallback implements MqttCallbackExtended {

    private MqttClient client;
    private MqttConnectOptions options;
    private String[] topics;

    public MyDemo6MqttCallback(MqttClient client, MqttConnectOptions options, String[] topics)
    {
        this.client = client;
        this.options = options;
        this.topics = topics;
    }

    @SneakyThrows
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("connectionLost", throwable);
        while (!client.isConnected()) {
            log.info("emqx重新连接....................................................");
            client.connect(options);
            Thread.sleep(1000);
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
    }

    @SneakyThrows
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        if( token!=null ){
            MqttMessage message = token.getMessage();
            String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
            String str = message==null ? null : new String(message.getPayload());
            log.info("deliveryComplete: topic={}, message={}", topic, str);
        } else {
            log.info("deliveryComplete: null");
        }
    }

    @SneakyThrows
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        log.info("connectComplete: reconnect={}, serverURI={}", reconnect, serverURI);

        if( topics.length > 0 ){
            int[] qosArr = new int[topics.length];
            Arrays.fill(qosArr, 2);

            MyDemo6MqttMessageListener[] listeners = new MyDemo6MqttMessageListener[topics.length];
            Arrays.fill(listeners, new MyDemo6MqttMessageListener());

            client.subscribe(topics, qosArr, listeners);
        }
    }
}

【MyDemo6MqttMessageListener.java】

package com.chz.myMqttV3.demo6;

@Slf4j
public class MyDemo6MqttMessageListener implements IMqttMessageListener
{
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
    }
}

【MyDemo6MqttClient1Test.java】

package com.chz.myMqttV3.demo6;

public class MyDemo6MqttClient1Test
{
    public static void main(String[] args) throws  MqttException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo6MqttClient1Test", new MemoryPersistence());
        client.setCallback(new MyDemo6MqttCallback(client, options, new String[]{"device/#"}));
        client.connect(options);
    }
}

【MyDemo6MqttSenderTest.java】

package com.chz.myMqttV3.demo6;

public class MyDemo6MqttSenderTest
{
    public static void main(String[] args) throws UnknownHostException, MqttException, InterruptedException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);
        // 这里设置遗嘱消息
        options.setWill("device/1", "I am MyDemo6MqttSenderTest, I am dead!!!".getBytes(), 1, false);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo6MqttSenderTest", new MemoryPersistence());
        client.setCallback(new MyDemo6MqttCallback(client, options, new String[]{}));
        client.connect(options);
    }
}

启动【MyDemo6MqttSenderTest、MyDemo6MqttClient1Test】,等两个进程都正常启动完之后,将【MyDemo6MqttSenderTest】进程杀掉。会发现【MyDemo6MqttClient1Test】自动收到消息【I am MyDemo6MqttSenderTest, I am dead!!!】
在这里插入图片描述

标签:String,public,mqtt,client,遗嘱,new,emqx,message,options
From: https://blog.csdn.net/chenhz2284/article/details/139536948

相关文章

  • vue 连接mqtt
    下载mqtt服务:npminstallmqttconstmqttConfig={//你的MQTT服务器配置protocolId:'MQTT',protocolVersion:4,clean:true,clientId:'xxxx',reconnectPeriod:1000,connectTimeout:60*1000,//will:{//topic:�......
  • MQTTX使用
    windows10-EMQX安装及配置使用教程一、下载安装1.1下载1.2安装1.3设置开机自启动二、连接MQTT2.1MQTT下载安装2.1.1下载2.1.2安装及配置三、EMQX常用命令  本文介绍的是在windows10系统下的emqx的安装、配置及使用教程。一、下载安装1.1下载下载链接:emqx官网-版本......
  • java mqtt自动重连注意点
    1、在使用Java的 org.eclipse.paho.client.mqttv3 MQTT客户端库时,options.setAutomaticReconnect(false) 的设置是用来指定在连接丢失后,客户端是否应该自动尝试重新连接。将其设置为 false 意味着如果连接丢失,客户端不会自动尝试重新连接。然而,即使设置了自动重连为 fa......
  • mqtt-emqx:保留消息的简单例子
    【pom.xml】<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.12.RELEASE</version></dependency><dependency><grou......
  • golang使用阿里MQTT的通信记录
    背景:我们有业务场景就是手机App可以操作物连网设备,一年之中总会有一两次,手机无法操作设备,于是我们就需要将服务器重新启动就正常了,使用的是阿里MQTT服务。猜测:我一直怀疑,这个通信系统中的可能有BUG,消息丢失无法送达或者在传递过中发生了错乱无法正确收到消息。分析:仔细研究发现......
  • [MQTT]服务器EMQX搭建SSL/TLS连接过程(wss://)
    目录......
  • MQTT5.0
    文章目录一、MQTT5介绍1.1什么是MQTT1.2MQTT5历史1.3MQTT5设计目标1.4MQTT5应用场景二、为什么要用MQTT5为更健壮的系统更好地处理错误云原生计算的更多可扩展性更大的灵活性和更容易的集成三、MQTT5topic主题cleanSession使用场景概念QoSQos选择使用QoS0:​使......
  • 【Java代码调用华为云IoT MQTT】
    目录欢迎关注微信公众号:数据科学与艺术作者WX:superhe199下面是使用Java代码调用华为云IoTMQTT:importorg.eclipse.paho.client.mqttv3.*;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;publicclassHuaweiCloudMqttExample{publi......
  • MQTT和kafka搭配使用 集成 emq iot 物联网
    MQTT历史MQTT协议于1999年发明,用于石油和天然气行业。工程师需要一种协议来实现最小带宽和最小电池损耗,以通过卫星监控石油管道。最初,该协议被称为MessageQueuingTelemetryTransport(消息队列遥测传输),得名于首先支持其初始阶段的IBM产品MQ系列。2010年,IBM发布了......
  • 使用EMQX搭建MQTT服务
    简介:EMQX是一款开源的大规模分布式MQTT消息服务器,功能丰富,专为物联网和实时通信应用而设计。EMQX5.0单集群支持MQTT并发连接数高达1亿条,单服务器的传输与处理吞吐量可达每秒百万级MQTT消息,同时保证毫秒级的低时延。EMQX支持多种协议,包括MQTT(3.1、3.1.1......