【pom.xml】
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.49</version>
</dependency>
【MyDemo5IMqttMessageListener.java】
package com.chz.myMqttV3.demo5;
@Slf4j
public class MyDemo5IMqttMessageListener implements IMqttMessageListener
{
@Override
public void messageArrived(String topic, MqttMessage message) {
log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
}
}
【MyDemo5MqttCallback.java】
package com.chz.myMqttV3.demo5;
@Slf4j
public class MyDemo5MqttCallback implements MqttCallbackExtended {
private MqttClient client;
private MqttConnectOptions options;
private String[] topics;
public MyDemo5MqttCallback(MqttClient client, MqttConnectOptions options, String[] topics)
{
this.client = client;
this.options = options;
this.topics = topics;
}
@SneakyThrows
@Override
public void connectionLost(Throwable throwable) {
log.error("connectionLost", throwable);
while (!client.isConnected()) {
log.info("emqx重新连接....................................................");
client.connect(options);
Thread.sleep(1000);
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
}
@SneakyThrows
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
if( token!=null ){
MqttMessage message = token.getMessage();
String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
String str = message==null ? null : new String(message.getPayload());
log.info("deliveryComplete: topic={}, message={}", topic, str);
} else {
log.info("deliveryComplete: null");
}
}
@SneakyThrows
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("connectComplete: reconnect={}, serverURI={}", reconnect, serverURI);
if( topics.length > 0 ){
int[] qosArr = new int[topics.length];
Arrays.fill(qosArr, 2);
MyDemo5IMqttMessageListener[] listeners = new MyDemo5IMqttMessageListener[topics.length];
Arrays.fill(listeners, new MyDemo5IMqttMessageListener());
client.subscribe(topics, qosArr, listeners);
}
}
}
【MyDemo5MqttClient1Test.java】
package com.chz.myMqttV3.demo5;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MyDemo5MqttClient1Test
{
public static void main(String[] args) throws MqttException
{
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("public".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(20);
options.setKeepAliveInterval(10);
MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttClient1Test", new MemoryPersistence());
client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{"device/1"}));
client.connect(options);
}
}
注意消费的主题是【device/1】
【MyDemo5MqttClient2Test.java】
package com.chz.myMqttV3.demo5;
public class MyDemo5MqttClient2Test
{
public static void main(String[] args) throws MqttException
{
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("public".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(20);
options.setKeepAliveInterval(10);
MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttClient2Test", new MemoryPersistence());
client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{"device/#"}));
client.connect(options);
}
}
注意消费的主题是【device/#】
【MyDemo5MqttClient3Test.java】
package com.chz.myMqttV3.demo5;
public class MyDemo5MqttClient3Test
{
public static void main(String[] args) throws MqttException
{
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("public".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(20);
options.setKeepAliveInterval(10);
MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttClient3Test", new MemoryPersistence());
client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{"$queue/device/1"}));
client.connect(options);
}
}
注意消费的主题是【$queue/device/1】
【MyDemo5MqttClient4Test.java】
package com.chz.myMqttV3.demo5;
public class MyDemo5MqttClient4Test
{
public static void main(String[] args) throws MqttException
{
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("public".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(20);
options.setKeepAliveInterval(10);
MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttClient4Test", new MemoryPersistence());
client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{"$share/g1/device/1"}));
client.connect(options);
}
}
注意消费的主题是【$share/g1/device/1】
【MyDemo5MqttSenderTest.java】
package com.chz.myMqttV3.demo5;
public class MyDemo5MqttSenderTest
{
public static void main(String[] args) throws UnknownHostException, MqttException, InterruptedException
{
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("public".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(20);
options.setKeepAliveInterval(10);
MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo5MqttSenderTest", new MemoryPersistence());
client.setCallback(new MyDemo5MqttCallback(client, options, new String[]{}));
client.connect(options);
for( int i=0; i<1; i++ ){
String topic = "device/1";
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setRetained(true); // 注意这一句,表示这个消息是保留消息
String msg = "I am MyMqttClient3Test, at node [192.168.44.230:1883]:" + i;
mqttMessage.setPayload(msg.getBytes(StandardCharsets.UTF_8));
client.publish(topic, mqttMessage);
System.out.println("send: " + msg);
Thread.sleep(1000L);
}
}
}
先运行【MyDemo5MqttSenderTest】,等启动完毕之后(消息已经发出去了)关掉进程。
然后再启动【MyDemo5MqttClient1Test、MyDemo5MqttClient2Test、MyDemo5MqttClient3Test、MyDemo5MqttClient4Test】。
看运行输出的日志:
可见【device/1】和【device/#】这两个广播订阅的方式可以消费到数据。
可见【$queue/device/1】和【$share/g1/device/1】这两个集群订阅的方式无法消费到数据。
总结:
- 【广播订阅】可以消费到保留消息
- 【集群订阅】无法消费到保留消息
这种情况想来也是合理的,因为【广播订阅】时每一个消费者都需要消费到消息,而【集群消费】时一个分组里面只有一个消费者可以消费到消息。所以当集群分组里面一个消费者启动时,如果分组里面已经有了其它的消费者,这个保留消息就已经被同一个分组里面的其它消费者消费过了。
标签:String,MqttConnectOptions,client,public,mqtt,例子,new,emqx,options From: https://blog.csdn.net/chenhz2284/article/details/139422833