问题描述
最近在做一个视频监控平台,要同步下级平台的摄像头信息数据,是通过其他同事写的c++服务往mqtt里推数据,我这边通过java连接mqtt监听主题获取摄像头信息。
刚开始写完都还好,但是测试过一段时间,发现java client连接总是会自动断开,并且还会有丢失消息的情况。
一开始怀疑是网络不通畅,后来把mqtt服务和java client放到一台机子上还是会自动断开,而且非常频繁。
后来通过分析控制台的指标数据,发现可能是发送方推消息太快了,我这儿接收方消费根本来不及(因为我消费到消息要入库,入库之前还要拿流水号以及校验数据唯一性,这比较耗时),导致mqtt消息堆积,一些消息就被丢弃了,
一次有61252条数据,这么多消息一股脑推过来,我特地测试了下,我这儿拿到消息即使什么都不处理,只是打印下日志,都要接近1个小时才能全部消费完。
问题总结
总结一下,主要的问题是消息要入库,那必然消费不可能快,即使加了线程池,那也只是缓冲,消费一旦快不了,mqtt服务器就会堆积消息,消息一堆积就会导致部分消息丢失和client端断开,如此恶性循环。
下面这张截图是别人遇到的和我类似的问题:
通过分析下图红框里的指标,也可以得到这一结果,可以参看《mqtt常见分析指标释义》
解决方法:
1、断开重连后离线消息要能继续消费到
设置cleanSession=false + QOS=2即可。
把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session。
QoS 级别, 0最多发送一次,1至少会发送一次(默认) 2只发送一次
2、消费速度不及生产速度导致的消息堆积,mqtt频繁断开
我的改造主要思路就是以空间换时间,在拿到mqtt消息后先放到LinkedBlockingQueue里,然后再开多线程从queue里消费消息,这样mqtt就不会堆积消息了,但只要数据入库还慢,消息还是会堆在queue里,不过对于我来说问题不大,因为LinkedBlockingQueue可以是无界队列,最大可以是Integer.MAX_VALUE,这对于我完全够用了。
主要代码
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.3.2.RELEASE</version> </dependency>
MqttClient client = new MqttClient("tcp://" + videoServerProperties.getMqttAddr(), clientId); // MQTT配置对象 MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 设置自动重连, 其它具体参数可以查看MqttConnectOptions mqttConnectOptions.setAutomaticReconnect(true); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 mqttConnectOptions.setCleanSession(false); // 设置超时时间 单位为秒 mqttConnectOptions.setConnectionTimeout(30); mqttConnectOptions.setUserName(videoServerProperties.getMqttUser()); mqttConnectOptions.setPassword(videoServerProperties.getMqttPwd().toCharArray()); // mqttConnectOptions.setServerURIs(new String[]{url}); // 设置会话心跳时间 单位为秒 mqttConnectOptions.setKeepAliveInterval(60); mqttConnectOptions.setMaxReconnectDelay(128000); // 允许的最大传输中消息 mqttConnectOptions.setMaxInflight(100); // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 // mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false); if (!client.isConnected()) { IMqttToken iMqttToken = client.connectWithResult(mqttConnectOptions); iMqttToken.waitForCompletion(); }
@PostConstruct public void subscribe() { new Thread(() -> { while (true) { log.info("CMD-addPlatform-MQTT-ASYNC"); try { cameraHandler.listener(TopicTypeEnum.WEB.getClientIdPrefix() + appId, TopicTypeEnum.WEB.getTopicPrefix() + appId); } catch (Exception e) { log.error("log.info(\"CMD-addPlatform-MQTT-error.\n{}", Throwables.getStackTraceAsString(e)); } try { cameraHandler.send(TopicTypeEnum.WEB.getClientIdPrefix() + appId, TopicTypeEnum.PROTOCOL.getTopicPrefix() + "88488848884888488848", new CmdRequest()); } catch (Exception e) { e.printStackTrace(); } try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }
import com.alibaba.fastjson.JSONObject; import com.google.common.base.Throwables; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; /** * @Author: * @Date: 2022/8/12 15:47 */ @Slf4j @Component public class MqttTemplate { @Autowired private MqttFactory mqttFactory; @Value("${video.sys.serverId}") private String appId; private static final int QOS_LEVEL = 2; // 设置为所需的 QoS 级别, 0最多发送一次,1至少会发送一次(默认) 2只发送一次 /** * 发送消息 * * @param topic 主题 * @param data 消息内容 */ public void send(String clientId, String topic, Object data) { // 获取客户端实例 MqttClient client = mqttFactory.getInstance(clientId); try { // 转换消息为json字符串 String json = JSONObject.toJSONString(data); log.info("MQTT主题[{}]发送消息...\r\n{}", topic, json); client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8))); } catch (MqttException e) { log.error("MQTT主题[{}]发送消息失败,{}", topic, Throwables.getStackTraceAsString(e)); } } /** * 订阅主题 * * @param topic 主题 * @param listener 消息监听处理器 */ public void subscribe(String clientId, String topic, IMqttMessageListener listener) { MqttClient client = mqttFactory.getInstance(clientId); try { log.info("MQTT订阅主题[{}]...", topic); client.subscribe(topic, QOS_LEVEL, listener); } catch (MqttException e) { log.error("MQTT订阅主题[{}]失败,{}", topic, Throwables.getStackTraceAsString(e)); } } }
private AtomicInteger atomicInteger = new AtomicInteger(0); private LinkedBlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>(); @PostConstruct public void takeMsg() { for (int i = 1; i <= 3; i++) { // 开三个线程是为了加快处理速度 int finalI = i; taskExecutor.execute(() -> { while (true) { try { receiveCamera(blockingQueue.take()); int n = atomicInteger.incrementAndGet(); log.info("拿到的消费消息计数:{}", n); if (blockingQueue.isEmpty()) { TimeUnit.SECONDS.sleep(1L); log.info("第{}个消费线程,blockingQueue无消息", finalI); } else { log.info("第{}个消费线程,计算当前blockingQueue的size:{}", finalI, blockingQueue.size()); } } catch (InterruptedException e) { e.printStackTrace(); } } }); } } /** * 对消息进行处理 * * @param msg */ @Override // @Transactional public void onApplicationEvent(CameraEvent msg) { taskExecutor.execute(() -> { try { log.info("准备对接收到的mqtt消息进行处理:{}", JSONUtil.toJsonStr(msg)); JSONObject message = msg.getMessage(); Integer cmd = message.getInteger("cmd"); log.info("拿到的cmd:{}", cmd); if (cmd == 10001) { // 设备接收刷新 blockingQueue.put(message); } else if (cmd == 10002) { // 历史录像 historyVideo(message); } } catch (Exception e) { log.error("对接收到的mqtt消息进行处理出错.\n{}", Throwables.getStackTraceAsString(e)); } }); }
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration public class TaskExecutorConfig { /** * * @return */ @Bean public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); pool.setCorePoolSize(10);//核心线程数 pool.setMaxPoolSize(20);//最大线程数 pool.setQueueCapacity(100);//线程队列 pool.setKeepAliveSeconds(60);//线程池线程空闲时间(秒) pool.setThreadNamePrefix("LCDP-TaskExecutor-"); pool.initialize();//线程初始化 return pool; } }
标签:java,断开,mqtt,client,消息,org,import,log From: https://www.cnblogs.com/shamo89/p/17545052.html