EMQX 和 Mosquitto 都是广泛使用的 MQTT 消息代理,但它们在设计目标、功能和适用场景上有一些显著的区别。
Emqx使用教程
添加依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
添加配置
fork.mqtt.broker=tcp://127.0.0.1:1883 fork.mqtt.clientId=fork.application.forklift.userInfo fork.mqtt.push.topic=application/2/device/fork/pushmsg fork.mqtt.pull.topic=application/2/device/fork/pullmsg
推送消息
import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component @Slf4j public class SendMessages { @Value("${fork.mqtt.broker}") private String mqttUrl; @Value("${fork.mqtt.clientId}") private String clientId; @Getter @Setter private static MqttClient client; @Value("${fork.mqtt.push.topic}") private String sendTopic; public void publish(String json){ try{ MemoryPersistence persistence = new MemoryPersistence(); client = new MqttClient(mqttUrl, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(false); //自动重连 connOpts.setAutomaticReconnect(true); client.connect(connOpts); MqttMessage message = new MqttMessage(); message.setQos(1); message.setRetained(false); message.setPayload(json.getBytes()); log.info("推送消息内容:{}", message); client.publish(sendTopic, message); }catch(Exception e){ log.error("推送信息数据失败 >>> {}", e.getMessage(), e); } } }
接收消息
import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component @Slf4j public class Subscriber { private final Logger logger = LoggerFactory.getLogger(getClass()); @Value("${fork.mqtt.broker}") private String mqttUrl; @Value("${fork.mqtt.clientId}") private String clientId; @Value("${fork.mqtt.pull.topic}") private String topic; @Value("${fork.mqtt.push.topic}") private String sendTopic; @Getter @Setter private static MqttClient client; @Bean public void receiveMessage() { //MemoryPersistence设置clientid的保存形式,默认为以内存保存 MemoryPersistence persistence = new MemoryPersistence(); try { client = new MqttClient(mqttUrl, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(false); //自动重连 connOpts.setAutomaticReconnect(true); connOpts.setKeepAliveInterval(60); // 心跳间隔,单位为秒 connOpts.setConnectionTimeout(30); // 连接超时时间,单位为秒 client.connect(connOpts); client.setCallback(new MqttCallback() { //接收消息 @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 byte[] payloadBytes = message.getPayload(); String receivedData = new String(payloadBytes); // 对接收到的数据进行处理... try { /*CarMqttCallback mqttCallback = JSON.parseObject(receivedData, CarMqttCallback.class);*/ log.info("消息-----{}",receivedData); } catch (Exception e) { log.info("接收mqtt数据错误!!!!!!!!"); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete----------" + token.isComplete()); } @Override public void connectionLost(Throwable cause) { logger.info("[MQTT] 连接断开,30S之后尝试重连..." + cause); // 连接断开 while (true) { try { Thread.sleep(10000); if (null != client) { // 重新连接 client.reconnect(); } break; } catch (Exception e) { e.printStackTrace(); continue; } } } }); client.subscribe(topic, 1); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
官方文档:EMQX 概览 | EMQX文档
标签:fork,String,队列,mqtt,联网,Mqtt,client,org,import From: https://blog.csdn.net/weixin_40228547/article/details/143645612