首页 > 其他分享 >mqtt-emqx:保留消息的简单例子

mqtt-emqx:保留消息的简单例子

时间:2024-06-04 21:34:16浏览次数:10  
标签:String MqttConnectOptions client public mqtt 例子 new emqx options

【pom.xml】

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.3.12.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.49</version>
</dependency>

【MyDemo5IMqttMessageListener.java】

package com.chz.myMqttV3.demo5;

@Slf4j
public class MyDemo5IMqttMessageListener implements IMqttMessageListener
{
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
    }
}

【MyDemo5MqttCallback.java】

package com.chz.myMqttV3.demo5;

@Slf4j
public class MyDemo5MqttCallback implements MqttCallbackExtended {

    private MqttClient client;
    private MqttConnectOptions options;
    private String[] topics;

    public MyDemo5MqttCallback(MqttClient client, MqttConnectOptions options, String[] topics)
    {
        this.client = client;
        this.options = options;
        this.topics = topics;
    }

    @SneakyThrows
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("connectionLost", throwable);
        while (!client.isConnected()) {
            log.info("emqx重新连接....................................................");
            client.connect(options);
            Thread.sleep(1000);
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
    }

    @SneakyThrows
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        if( token!=null ){
            MqttMessage message = token.getMessage();
            String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
            String str = message==null ? null : new String(message.getPayload());
            log.info("deliveryComplete: topic={}, message={}", topic, str);
        } else {
            log.info("deliveryComplete: null");
        }
    }

    @SneakyThrows
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        log.info("connectComplete: reconnect={}, serverURI={}", reconnect, serverURI);

        if( topics.length > 0 ){
            int[] qosArr = new int[topics.length];
            Arrays.fill(qosArr, 2);

            MyDemo5IMqttMessageListener[] listeners = new MyDemo5IMqttMessageListener[topics.length];
            Arrays.fill(listeners, new MyDemo5IMqttMessageListener());

            client.subscribe(topics, qosArr, listeners);
        }
    }
}

【MyDemo5MqttClient1Test.java】

package com.chz.myMqttV3.demo5;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyDemo5MqttClient1Test
{
    public static void main(String[] args) throws  MqttException 
    {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttClient1Test", new MemoryPersistence());
        client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{"device/1"}));
        client.connect(options);
    }
}

注意消费的主题是【device/1】
【MyDemo5MqttClient2Test.java】

package com.chz.myMqttV3.demo5;

public class MyDemo5MqttClient2Test
{
    public static void main(String[] args) throws  MqttException
    {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttClient2Test", new MemoryPersistence());
        client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{"device/#"}));
        client.connect(options);
    }
}

注意消费的主题是【device/#】

【MyDemo5MqttClient3Test.java】

package com.chz.myMqttV3.demo5;

public class MyDemo5MqttClient3Test
{
    public static void main(String[] args) throws  MqttException 
    {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttClient3Test", new MemoryPersistence());
        client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{"$queue/device/1"}));
        client.connect(options);
    }
}

注意消费的主题是【$queue/device/1】

【MyDemo5MqttClient4Test.java】

package com.chz.myMqttV3.demo5;

public class MyDemo5MqttClient4Test
{
    public static void main(String[] args) throws  MqttException 
    {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttClient4Test", new MemoryPersistence());
        client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{"$share/g1/device/1"}));
        client.connect(options);
    }
}

注意消费的主题是【$share/g1/device/1】
【MyDemo5MqttSenderTest.java】

package com.chz.myMqttV3.demo5;

public class MyDemo5MqttSenderTest
{
    public static void main(String[] args) throws UnknownHostException, MqttException, InterruptedException 
    {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttSenderTest", new MemoryPersistence());
        client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{}));
        client.connect(options);

        for( int i=0; i<1; i++ ){
            String topic = "device/1";
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setRetained(true);    // 注意这一句,表示这个消息是保留消息
            String msg = "I am MyMqttClient3Test, at node [192.168.44.230:1883]:" + i;
            mqttMessage.setPayload(msg.getBytes(StandardCharsets.UTF_8));
            client.publish(topic, mqttMessage);
            System.out.println("send: " + msg);
            Thread.sleep(1000L);
        }
    }
}

先运行【MyDemo5MqttSenderTest】,等启动完毕之后(消息已经发出去了)关掉进程。
然后再启动【MyDemo5MqttClient1Test、MyDemo5MqttClient2Test、MyDemo5MqttClient3Test、MyDemo5MqttClient4Test】。
看运行输出的日志:
在这里插入图片描述
在这里插入图片描述
可见【device/1】和【device/#】这两个广播订阅的方式可以消费到数据。

在这里插入图片描述
在这里插入图片描述
可见【$queue/device/1】和【$share/g1/device/1】这两个集群订阅的方式无法消费到数据。

总结:

  • 【广播订阅】可以消费到保留消息
  • 【集群订阅】无法消费到保留消息

这种情况想来也是合理的,因为【广播订阅】时每一个消费者都需要消费到消息,而【集群消费】时一个分组里面只有一个消费者可以消费到消息。所以当集群分组里面一个消费者启动时,如果分组里面已经有了其它的消费者,这个保留消息就已经被同一个分组里面的其它消费者消费过了。

标签:String,MqttConnectOptions,client,public,mqtt,例子,new,emqx,options
From: https://blog.csdn.net/chenhz2284/article/details/139422833

相关文章

  • golang使用阿里MQTT的通信记录
    背景:我们有业务场景就是手机App可以操作物连网设备,一年之中总会有一两次,手机无法操作设备,于是我们就需要将服务器重新启动就正常了,使用的是阿里MQTT服务。猜测:我一直怀疑,这个通信系统中的可能有BUG,消息丢失无法送达或者在传递过中发生了错乱无法正确收到消息。分析:仔细研究发现......
  • [MQTT]服务器EMQX搭建SSL/TLS连接过程(wss://)
    目录......
  • MQTT5.0
    文章目录一、MQTT5介绍1.1什么是MQTT1.2MQTT5历史1.3MQTT5设计目标1.4MQTT5应用场景二、为什么要用MQTT5为更健壮的系统更好地处理错误云原生计算的更多可扩展性更大的灵活性和更容易的集成三、MQTT5topic主题cleanSession使用场景概念QoSQos选择使用QoS0:​使......
  • JAVA SMTP例子
    一、SimpleMailSender.javapackageorg.fh.util.mail;importjava.util.Date;importjava.util.Properties;importjavax.mail.Address;importjavax.mail.BodyPart;importjavax.mail.Message;importjavax.mail.Multipart;importjavax.mail.Session;importjavax......
  • 【跟着例子学MySQL】学以致用 -- 综合练习
    文章目录前言回顾租赁系统数据库练习高级练习前言举例子,是最简单有效的学习方法。本系列文章以一个贯穿始终的场景,结合多个实例讲解MySQL的基本用法。❔为什么要写这个系列?模仿是最好的老师,实践是检验成果的方法。本系列以实操样例和应用场景为核心,将MySQL基本......
  • 使用 .NET Core 实现微服务(带例子)
    使用.NETCore实现微服务使用.NETCore实现微服务架构涉及几个关键步骤,包括服务划分、API网关、服务通信和容器化部署。下面是一个简化的示例,展示如何使用.NETCore实现一个基本的微服务架构。步骤1:创建独立的微服务定义微服务每个微服务都是一个独立的ASP.NE......
  • 一个生动的例子——通过ERC20接口访问Tether合约
    生动的例子USDT:符合ERC20标准的美元稳定币,Tether合约获得测试网上Tether合约地址通过自己写的ERC20接口访问这个合约Tether合约地址:0xdAC17F958D2ee523a2206206994597C13D831ec7IERC20.sol//SPDX-License-Identifier:GPL-3.0pragmasolidity>=0.8.2<0.9.0;inter......
  • 【Java代码调用华为云IoT MQTT】
    目录欢迎关注微信公众号:数据科学与艺术作者WX:superhe199下面是使用Java代码调用华为云IoTMQTT:importorg.eclipse.paho.client.mqttv3.*;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;publicclassHuaweiCloudMqttExample{publi......
  • PyCharm创建vue例子
     1.新增文件MyVue.vue 文件内容如下:<template><divclass="hello"><h1>{{msg}}</h1></div></template><script>exportdefault{name:'MyVue',data(){return{msg:......
  • golang kafka例子
    packagemain//生产者代码import( "fmt" "github.com/IBM/sarama" "time")//基于sarama第三方库开发的kafkaclientvarbrokers=[]string{"127.0.0.1:9092"}vartopic="hello_kafka0"//同步消息模式funcsyncProducer(conf......