【pom.xml】
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.49</version>
</dependency>
【MyDemo6MqttCallback.java】
package com.chz.myMqttV3.demo6;
@Slf4j
public class MyDemo6MqttCallback implements MqttCallbackExtended {
private MqttClient client;
private MqttConnectOptions options;
private String[] topics;
public MyDemo6MqttCallback(MqttClient client, MqttConnectOptions options, String[] topics)
{
this.client = client;
this.options = options;
this.topics = topics;
}
@SneakyThrows
@Override
public void connectionLost(Throwable throwable) {
log.error("connectionLost", throwable);
while (!client.isConnected()) {
log.info("emqx重新连接....................................................");
client.connect(options);
Thread.sleep(1000);
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
}
@SneakyThrows
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
if( token!=null ){
MqttMessage message = token.getMessage();
String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
String str = message==null ? null : new String(message.getPayload());
log.info("deliveryComplete: topic={}, message={}", topic, str);
} else {
log.info("deliveryComplete: null");
}
}
@SneakyThrows
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("connectComplete: reconnect={}, serverURI={}", reconnect, serverURI);
if( topics.length > 0 ){
int[] qosArr = new int[topics.length];
Arrays.fill(qosArr, 2);
MyDemo6MqttMessageListener[] listeners = new MyDemo6MqttMessageListener[topics.length];
Arrays.fill(listeners, new MyDemo6MqttMessageListener());
client.subscribe(topics, qosArr, listeners);
}
}
}
【MyDemo6MqttMessageListener.java】
package com.chz.myMqttV3.demo6;
@Slf4j
public class MyDemo6MqttMessageListener implements IMqttMessageListener
{
@Override
public void messageArrived(String topic, MqttMessage message) {
log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
}
}
【MyDemo6MqttClient1Test.java】
package com.chz.myMqttV3.demo6;
public class MyDemo6MqttClient1Test
{
public static void main(String[] args) throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("public".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(20);
options.setKeepAliveInterval(10);
MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo6MqttClient1Test", new MemoryPersistence());
client.setCallback(new MyDemo6MqttCallback(client, options, new String[]{"device/#"}));
client.connect(options);
}
}
【MyDemo6MqttSenderTest.java】
package com.chz.myMqttV3.demo6;
public class MyDemo6MqttSenderTest
{
public static void main(String[] args) throws UnknownHostException, MqttException, InterruptedException {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("public".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(20);
options.setKeepAliveInterval(10);
// 这里设置遗嘱消息
options.setWill("device/1", "I am MyDemo6MqttSenderTest, I am dead!!!".getBytes(), 1, false);
MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo6MqttSenderTest", new MemoryPersistence());
client.setCallback(new MyDemo6MqttCallback(client, options, new String[]{}));
client.connect(options);
}
}
启动【MyDemo6MqttSenderTest、MyDemo6MqttClient1Test】,等两个进程都正常启动完之后,将【MyDemo6MqttSenderTest】进程杀掉。会发现【MyDemo6MqttClient1Test】自动收到消息【I am MyDemo6MqttSenderTest, I am dead!!!】