首页 > 编程语言 >java连接mqtt总是自动断开的问题排查及解决

java连接mqtt总是自动断开的问题排查及解决

时间:2023-07-11 16:25:02浏览次数:36  
标签:java 断开 mqtt client 消息 org import log

问题描述

最近在做一个视频监控平台,要同步下级平台的摄像头信息数据,是通过其他同事写的c++服务往mqtt里推数据,我这边通过java连接mqtt监听主题获取摄像头信息。

刚开始写完都还好,但是测试过一段时间,发现java client连接总是会自动断开,并且还会有丢失消息的情况。

一开始怀疑是网络不通畅,后来把mqtt服务和java client放到一台机子上还是会自动断开,而且非常频繁。

后来通过分析控制台的指标数据,发现可能是发送方推消息太快了,我这儿接收方消费根本来不及(因为我消费到消息要入库,入库之前还要拿流水号以及校验数据唯一性,这比较耗时),导致mqtt消息堆积,一些消息就被丢弃了,

 

一次有61252条数据,这么多消息一股脑推过来,我特地测试了下,我这儿拿到消息即使什么都不处理,只是打印下日志,都要接近1个小时才能全部消费完。

问题总结

总结一下,主要的问题是消息要入库,那必然消费不可能快,即使加了线程池,那也只是缓冲,消费一旦快不了,mqtt服务器就会堆积消息,消息一堆积就会导致部分消息丢失和client端断开,如此恶性循环。

下面这张截图是别人遇到的和我类似的问题:

 

 

 

通过分析下图红框里的指标,也可以得到这一结果,可以参看《mqtt常见分析指标释义

 

 解决方法:

1、断开重连后离线消息要能继续消费到

设置cleanSession=false + QOS=2即可。

把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session。

QoS 级别, 0最多发送一次,1至少会发送一次(默认)  2只发送一次

2、消费速度不及生产速度导致的消息堆积,mqtt频繁断开

我的改造主要思路就是以空间换时间,在拿到mqtt消息后先放到LinkedBlockingQueue里,然后再开多线程从queue里消费消息,这样mqtt就不会堆积消息了,但只要数据入库还慢,消息还是会堆在queue里,不过对于我来说问题不大,因为LinkedBlockingQueue可以是无界队列,最大可以是Integer.MAX_VALUE,这对于我完全够用了。

主要代码

<dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.3.2.RELEASE</version>
        </dependency>

 

MqttClient client = new MqttClient("tcp://" + videoServerProperties.getMqttAddr(), clientId);
                // MQTT配置对象
                MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
                // 设置自动重连, 其它具体参数可以查看MqttConnectOptions
                mqttConnectOptions.setAutomaticReconnect(true);
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
                mqttConnectOptions.setCleanSession(false);
                // 设置超时时间 单位为秒
                mqttConnectOptions.setConnectionTimeout(30);
                mqttConnectOptions.setUserName(videoServerProperties.getMqttUser());
                mqttConnectOptions.setPassword(videoServerProperties.getMqttPwd().toCharArray());
                // mqttConnectOptions.setServerURIs(new String[]{url});
                // 设置会话心跳时间 单位为秒
                mqttConnectOptions.setKeepAliveInterval(60);
                mqttConnectOptions.setMaxReconnectDelay(128000);
                // 允许的最大传输中消息
                mqttConnectOptions.setMaxInflight(100);
                // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
                // mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false);

                if (!client.isConnected()) {
                    IMqttToken iMqttToken = client.connectWithResult(mqttConnectOptions);
                    iMqttToken.waitForCompletion();
                }
@PostConstruct
    public void subscribe() {
        new Thread(() -> {
            while (true) {
                log.info("CMD-addPlatform-MQTT-ASYNC");
                try {
                    cameraHandler.listener(TopicTypeEnum.WEB.getClientIdPrefix() + appId,
                            TopicTypeEnum.WEB.getTopicPrefix() + appId);
                } catch (Exception e) {
                    log.error("log.info(\"CMD-addPlatform-MQTT-error.\n{}", Throwables.getStackTraceAsString(e));
                }

                try {
                    cameraHandler.send(TopicTypeEnum.WEB.getClientIdPrefix() + appId,
                            TopicTypeEnum.PROTOCOL.getTopicPrefix() + "88488848884888488848",
                            new CmdRequest());
                } catch (Exception e) {
                    e.printStackTrace();
                }

                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;

/**
 * @Author: 
 * @Date: 2022/8/12 15:47
 */
@Slf4j
@Component
public class MqttTemplate {

    @Autowired
    private MqttFactory mqttFactory;

    @Value("${video.sys.serverId}")
    private String appId;

    private static final int QOS_LEVEL = 2; // 设置为所需的 QoS 级别, 0最多发送一次,1至少会发送一次(默认)  2只发送一次


    /**
     * 发送消息
     *
     * @param topic 主题
     * @param data  消息内容
     */
    public void send(String clientId, String topic, Object data) {
        // 获取客户端实例
        MqttClient client = mqttFactory.getInstance(clientId);
        try {
            // 转换消息为json字符串
            String json = JSONObject.toJSONString(data);
            log.info("MQTT主题[{}]发送消息...\r\n{}", topic, json);
            client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));
        } catch (MqttException e) {
            log.error("MQTT主题[{}]发送消息失败,{}", topic, Throwables.getStackTraceAsString(e));
        }
    }

    /**
     * 订阅主题
     *
     * @param topic    主题
     * @param listener 消息监听处理器
     */
    public void subscribe(String clientId, String topic, IMqttMessageListener listener) {
        MqttClient client = mqttFactory.getInstance(clientId);
        try {
            log.info("MQTT订阅主题[{}]...", topic);
            client.subscribe(topic, QOS_LEVEL, listener);
        } catch (MqttException e) {
            log.error("MQTT订阅主题[{}]失败,{}", topic, Throwables.getStackTraceAsString(e));
        }
    }

}
private AtomicInteger atomicInteger = new AtomicInteger(0);
    private LinkedBlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();

    @PostConstruct
    public void takeMsg() {
        for (int i = 1; i <= 3; i++) {
            // 开三个线程是为了加快处理速度
            int finalI = i;
            taskExecutor.execute(() -> {
                while (true) {
                    try {
                        receiveCamera(blockingQueue.take());
                        int n = atomicInteger.incrementAndGet();
                        log.info("拿到的消费消息计数:{}", n);
                        if (blockingQueue.isEmpty()) {
                            TimeUnit.SECONDS.sleep(1L);
                            log.info("第{}个消费线程,blockingQueue无消息", finalI);
                        } else {
                            log.info("第{}个消费线程,计算当前blockingQueue的size:{}", finalI, blockingQueue.size());
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    /**
     * 对消息进行处理
     *
     * @param msg
     */
    @Override
//    @Transactional
    public void onApplicationEvent(CameraEvent msg) {
        taskExecutor.execute(() -> {
            try {
                log.info("准备对接收到的mqtt消息进行处理:{}", JSONUtil.toJsonStr(msg));
                JSONObject message = msg.getMessage();
                Integer cmd = message.getInteger("cmd");
                log.info("拿到的cmd:{}", cmd);
                if (cmd == 10001) {
                    // 设备接收刷新
                    blockingQueue.put(message);
                } else if (cmd == 10002) {
                    // 历史录像
                    historyVideo(message);
                }
            } catch (Exception e) {
                log.error("对接收到的mqtt消息进行处理出错.\n{}", Throwables.getStackTraceAsString(e));
            }
        });
    }
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class TaskExecutorConfig {

    /**
     *
     * @return
     */
    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(10);//核心线程数
        pool.setMaxPoolSize(20);//最大线程数
        pool.setQueueCapacity(100);//线程队列
        pool.setKeepAliveSeconds(60);//线程池线程空闲时间(秒)
        pool.setThreadNamePrefix("LCDP-TaskExecutor-");
        pool.initialize();//线程初始化
        return pool;
    }

}

 

标签:java,断开,mqtt,client,消息,org,import,log
From: https://www.cnblogs.com/shamo89/p/17545052.html

相关文章

  • java Fegin x-www-form-urlencoded 类型请求
    spring发送content-type=application/x-www-form-urlencoded和普通请求不太一样。@FeignClient(name="ocr-api",url="${orc.idcard-url}",fallbackFactory=OcrClientFallbackFactory.class)publicinterfaceOcrClient{@PostMappi......
  • finshell 连接不到服务器,报Session.connect: java.net.SocketException: Connection r
    用finshell一段时间了,非常不错,但是有段时间突然连接不上服务器,各种重装,重启服务器都不行,在网上搜方法也没有好的办法。在我一次实在烦的不得了的时候,让我发现一个好的解决方案。先上图: 是不是出现这个问题,那么我的解决方案是啥呢?看我的手速,就是点击红色的闪电图标,一般连续点击......
  • Java中Queue的实现方式有哪些?
    一、队列的概念Queue用于模拟队列这种数据结构,队列通常是指“先进先出”(FIFO=firstinfirstout)的容器。新元素插入(offer)到队列的尾部,访问元素(poll)操作会返回队列头部的元素。通常,队列不允许随机访问队列中的元素。这种结构就相当于我们排队上车,先到的站在前面,先上车,后到的得......
  • JAVA static静态变量依赖spring实例化变量,可能导致初始化出错
    在Java中,静态变量是在类加载时初始化的,而实例变量是在对象实例化时初始化的。如果静态变量依赖于Spring实例化的变量,可能会导致初始化出错的问题。这是因为Spring的实例化过程是在运行时进行的,而类加载和静态变量初始化是在编译时进行的。当静态变量依赖于Spring实例化的变量时,如果......
  • java 阿里云直播配置及推拉流地址获取
    原文地址:https://blog.csdn.net/zhanglei5415/article/details/131551685?spm=1001.2014.3001.5501一、开通阿里云直播首先进入阿里云直播产品主页:https://www.aliyun.com/product/live。点击下方的“立即开通”。如果是还未注册的用户请按照页面提示进行完成注册并实名认证......
  • JAVA 和python 多网卡情况下获取正确的IP地址
    要获取内网地址,可以尝试连接到10.255.255.255:1。如果连接成功,获取本地套接字的地址信息就是当前的内网IP。python实现:importsocketdefextract_ip():st=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)try:st.connect(('10.255.255.255',1))......
  • java如何调用python.py文件并传参
    注意:java调用python.py文件并传参,在windows和linux中使用是不一样的我在windows操作系统中,java调用python文件并传参,是这样写的:完全没问题try{IntegertotalTestCaseCount=0;//传入python文件的参数:StringxmindFilePath,StringtestCaseKeyWo......
  • Java网络编程
    1.ip和端口ip地址InetAddress//因为没有构造方法,所以不能通过new来生成对象,但是可以通过类名来调用类的静态方法InetAddressinetAddress1=InetAddress.getByName("localhost");System.out.println(inetAddress1);InetAddressinetAddress2=I......
  • 520要通过这种方式告白 html+css+javascript canvas桃心代码 可修改 【附完整代码】
    ......
  • 【JAVA开发环境配置】 我也可以让JDK版本来去自由的切换了! 哈哈哈哈 舒服!
    ......