首页 > 其他分享 >MQTT协议及其使用案例

MQTT协议及其使用案例

时间:2024-02-22 09:56:32浏览次数:27  
标签:协议 String mqtt topic 案例 MQTT client public log

MQTT 概述

MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。 可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。 使用MQTT协议,消息发送者与接收者不受时间和空间的限制。

Docker 部署 MQTT(采用docker-compose.yml)

version: "3" 
services:
    mqtt:
        image: eclipse-mosquitto
        container_name: mqtt
        privileged: true 
        ports: 
            - 1883:1883
            - 9001:9001
        volumes:
            - ./config:/mosquitto/config
            - ./data:/mosquitto/data
            - ./log:/mosquitto/log
  • 文件夹
    image

  • 创建 config/mosquitto.conf

persistence true
listener 1883
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
 
# 关闭匿名模式
# allow_anonymous true
# 指定密码文件
password_file /mosquitto/config/pwfile.conf
  • docker部署执行:docker compose up -d
  • 设置访问权限(用户名:admin,密码:admin123)
docker exec -it mqtt sh
touch /mosquitto/config/pwfile.conf
chmod -R 755 /mosquitto/config/pwfile.conf
mosquitto_passwd -b /mosquitto/config/pwfile.conf admin admin123
  • 重启mqtt容器:docker compose restart

Springboot 整合

  • 依赖
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/>
    </parent>
    
    <dependencies>
    		<!--  spring mqtt协议  -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <!--  lombok  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--spring boot and web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <!--Http 请求 组件-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <!--测试组件-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
        </dependency>
    </dependencies>
  • 配置文件
mqtt.host=tcp://127.0.0.1:1883
mqtt.clientId=mqttx_a071ba88
mqtt.username=admin
mqtt.password=admin123
mqtt.topic=test_topic
mqtt.timeout=36000
mqtt.keepAlive=6000
  • 配置类
@Slf4j
@Configuration
public class MyMqttConfiguration {
    @Value("${mqtt.host}")
    String broker;
    @Value("${mqtt.clientId}")
    String clientId;
    @Value("${mqtt.username}")
    String username;
    @Value("${mqtt.password}")
    String password;
    @Value("${mqtt.timeout}")
    Integer timeout;
    @Value("${mqtt.keepAlive}")
    Integer keepAlive;
    @Value("${mqtt.topic}")
    String topic;
    @Autowired
    MyHandle myHandle;

    @Bean
    public MyMqttClient myMqttClient(){
        MyMqttClient mqttClient = new MyMqttClient(broker, username, password, clientId, timeout, keepAlive,myHandle);
        for (int i = 0; i < 10; i++) {
            try {
                mqttClient.connect();
                mqttClient.subscribe(topic,0);
                return mqttClient;
            } catch (MqttException e) {
                log.error("MQTT connect exception,connect time = " + i);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return mqttClient;
    }

}
  • 客户端
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.util.ObjectUtils;

@Slf4j
public class MyMqttClient {
    private static MqttClient client;
    private String host;
    private String clientId;
    private String username;
    private String password;
    private Integer timeout;
    private Integer keepAlive;
    private MyHandle myHandle;

    public  MyMqttClient(){
        System.out.println("MyMqttClient空构造函数");
    }

    public MyMqttClient(String host, String username, String password, String clientId, Integer timeOut, Integer keepAlive,MyHandle myHandle) {
        System.out.println("MyMqttClient全参构造");
        this.host = host;
        this.username = username;
        this.password = password;
        this.clientId = clientId;
        this.timeout = timeOut;
        this.keepAlive = keepAlive;
        this.myHandle = myHandle;
    }

    public static MqttClient getClient() {
        return client;
    }

    public static void setClient(MqttClient client) {
        MyMqttClient.client = client;
    }

    /**
     * 设置mqtt连接参数
     */
     public MqttConnectOptions setMqttConnectOptions(String username,String password,Integer timeout, Integer keepAlive){
         MqttConnectOptions options = new MqttConnectOptions();
         options.setUserName(username);
         options.setPassword(password.toCharArray());
         options.setConnectionTimeout(timeout);
         options.setKeepAliveInterval(keepAlive);
         options.setCleanSession(true);
         options.setAutomaticReconnect(true);
         return options;
     }

    /**
     * 连接mqtt服务端
     */
    public void connect() throws MqttException {
        if(client == null){
            client = new MqttClient(host,clientId,new MemoryPersistence());
            client.setCallback(new MyMqttCallback(MyMqttClient.this,this.myHandle));
        }
        MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepAlive);
        if(!client.isConnected()){
            client.connect(mqttConnectOptions);
        }else{
            client.disconnect();
            client.connect(mqttConnectOptions);
        }
        log.info("MQTT connect success");
    }

    /**
     * 断开连接
     * @throws MqttException
     */
    public void disconnect()throws MqttException{
        if(null!=client && client.isConnected()){
            client.disconnect();;
        }
    }
    /**
     * 发布,qos默认为0,非持久化
     */
     public void publish(String pushMessage,String topic,int qos){
         publish(pushMessage, topic, qos, false);
     }

    /**
     * 发布消息
     *
     * @param pushMessage
     * @param topic
     * @param qos
     * @param retained:留存
     */
    public void publish(String pushMessage, String topic, int qos, boolean retained) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(pushMessage.getBytes());
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
        if(ObjectUtils.isEmpty(mqttTopic)){
            log.error("主题不存在");
        }
        synchronized (this){
            try{
                MqttDeliveryToken mqttDeliveryToken = mqttTopic.publish(mqttMessage);
                mqttDeliveryToken.waitForCompletion(1000L);
            }catch (MqttException e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 订阅
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            MyMqttClient.getClient().subscribe(topic, qos);
            log.info("订阅主题:"+topic+"成功!");
        } catch (MqttException e) {
            log.error("订阅主题:"+topic+"失败!",e);
        }
    }
    /**
     * 取消订阅
     */
    public void cleanTopic(String topic){
        if(!ObjectUtils.isEmpty(client) && client.isConnected()){
            try{
                client.unsubscribe(topic);
            }catch (MqttException e){
                log.error("取消订阅失败!"+e);
            }
        }else{
            log.info("主题不存在或未连接!");
        }
    }
}
  • 回调类(消息发送和接收时响应)
@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {
    private MyMqttClient myMqttClient;
    private MyHandle myHandle;
    public MyMqttCallback(MyMqttClient myMqttClient,MyHandle myHandle) {
        this.myMqttClient = myMqttClient;
        this.myHandle = myHandle;
    }

    /**
     * 连接完成
     * @param reconnect
     * @param serverURI
     */
    @Override
    public void connectComplete(boolean reconnect,String serverURI) {
        log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");
        //订阅主题(可以在这里订阅主题)
        try {
            MyMqttClient.getClient().subscribe("topic1");
        } catch (MqttException e) {
            log.error("主题订阅失败");
        }
    }

    /**
     * 连接丢失 进行重连操作
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.warn("mqtt connectionLost >>> 5S之后尝试重连: {}", throwable.getMessage());
        long reconnectTimes = 1;
        while (true){
            try{
                Thread.sleep(5000);
            }catch (InterruptedException ignored){}
            try{
                if(MyMqttClient.getClient().isConnected()){ // 已连接
                    return;
                }
                reconnectTimes+=1;
                log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
                MyMqttClient.getClient().reconnect();
            }catch (MqttException e){
                log.error("mqtt断链异常",e);
            }
        }
    }

    /**
     * 订阅者收到消息之后执行
     * @param topic
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));
        myHandle.handle(topic,mqttMessage);
    }

    /**
     * * 消息到达后
     * subscribe后,执行的回调函数
     * publish后,配送完成后回调的方法
     *
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用");
        log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
    }
}
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MyHandle {
    @Async
    public void handle(String topic, MqttMessage message) {
        log.info("处理消息主题:" + topic + " 信息:" + message);
    }
}

标签:协议,String,mqtt,topic,案例,MQTT,client,public,log
From: https://www.cnblogs.com/xietingwei/p/18026662

相关文章

  • Python练习案例_Pico Fermi Bagels猜数字游戏
    案例介绍--《Python编程快速上手2》在PicoFermiBagels这个逻辑推理游戏中,你要根据线索猜出一个三位数。游戏会根据你的猜测给出以下提示之一:如果你猜对一位数字但数字位置不对,则会提示“Pico”;如果你同时猜对了一位数字及其位置,则会提示“Fermi”;如果你猜测的数字及其位置......
  • PySide基础三大件案例_计算器
    描述制作一个简单的计算器,要求可以输入0-9的数字和四则运算,=则输出计算结果,reset则清空计算器展示代码fromPySide6.QtWidgetsimportQApplication,QWidgetfromCalculator_uiimportUi_FormclassClaculator(QWidget,Ui_Form):#Mark使用多重继承的特性进行调用d......
  • PySide基础三大件的案例_登录界面
    描述制作一个简单的登录界面,拥有两个输入框和对应的Label文字提示以及一个登录按钮用户输入账密后,如果是代码中编写的账密则将两个Label的文字替换展示操作步骤使用Designer软件制作一个UI文件使用VsCode插件的功能将其转成python文件新建一个python文件写具体代码代码f......
  • selenium自动登录cnblogs案例
    代码如下:importjsonimporttimefromseleniumimportwebdriverfromselenium.webdriver.common.byimportByfromselenium.webdriver.edge.serviceimportServicefromselenium.webdriver.chrome.optionsimportOptionsser=Service()ser.path=r'D:\驱动路径......
  • 前端必学-40个精选案例实战-案例8-仿京东导航条触碰下拉效果
    导航条触碰下拉效果(理解鼠标浮动伪类、链接激活伪类)<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metahttp-equiv="X-UA-Compatible"content="IE=edge"><metaname="viewpor......
  • 前端必学-40个精选案例实战-案例7-仿爱奇艺视频首页新片预告实战
    仿爱奇艺视频首页新片预告实战案例第一步:案例图片圆角制作、图片资源:代码:<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metahttp-equiv="X-UA-Compatible"content="IE=edge"><metaname......
  • 两个死锁案例分析
    背景 死锁发生后,可以直接执行showengineinnodbstatus;查看最近的死锁日志。案例一:索引唯一键冲突导致的死锁,解决办法,在进行事务操作前先对数据进行一个排序,降低互相锁冲突的概率。 =====================================2024-02-1815:36:000x7f2146991700INNODB......
  • Vector和deque小案例
    打分案例1.目的:5个学生,10个评委,10个评委的分数去掉最高和最低分,取平均分就是学生的分数2.思路:​ 1.抽象学生​ 2.使用vector容器存储学生​ 3.把分数放入deque容器,然后对deque容器进行排序,之后删除首尾元素3.流程:​ 1.创建学生​ 2.评委给学生打分​ 3.根据学生的分数排......
  • 安防监控/视频汇聚/监控摄像头EasyCVR平台如何通过RTMP协议进行推流?
    众所周知,安防视频汇聚平台EasyCVR不仅可支持的接入协议非常多(包括:国标GB28181、RTSP/Onvif、RTMP,以及厂家的私有协议与SDK,如:海康ehome、海康sdk、大华sdk、宇视sdk、华为sdk、萤石云sdk、乐橙sdk等),同时可分发的视频流格式也非常丰富,具体包括:RTMP、RTSP、HTTP-FLV、WebSocket-FLV、......
  • C 语言实现对 Stop-and-Wait 协议的模拟
    协议设计~事件动作发送方从应用层收到数据若处于等待上层数据状态,产生一个分组并发送,启动计时器;若处于等待ACK状态,将数据存入缓冲区发送方超时重传当前未确认的数据包发送方收到ACK若对应当前数据包的ACK,停止计时器,开始发送缓冲区中的下一个数据包;若ACK不......