1、问题:
在使用mqtt进行数据接收并将其转发到kafka的过程中,出现了个问题,mqtt总是自动断开并尝试重连,但是很快就又断开尝试重连,如此反复。
在代码中通过实现 MqttCallbackExtended 接口来进行mqtt的主题订阅,重连,消息接收等功能;
1)、clientID也使用了时间戳来定义,保证不会出现重复的情况
2)、连接的账号也使用的最大权限的账号
但是还是不停的出现断开并自动重连的情况,导致接收到的消息无法转发到kafka中,让下游服务进行消费。
2、解决:
使用debug模式进行一步步定位,发现在转发消息的时候抛出了个异常,而这个异常会导致mqtt连接断开,并丢失当前的消息,所以在接收消息的方法上加个try...catch...就解决了。
@Override public void messageArrived(String topic, MqttMessage mqttMessage) { try { String message = new String(mqttMessage.getPayload()); if (mqttMessage.getPayload() != null) { publisher.publishEvent(new MessageArrivedEvent(topic, message)); } } catch (Exception e) { log.error("消息消费失败", e); } }
标签:接收,断开,mqttMessage,mqtt,重连,String From: https://www.cnblogs.com/Silentness/p/18183829