首页 > 其他分享 >activemq - mqttv3

activemq - mqttv3

时间:2024-11-04 09:09:34浏览次数:1  
标签:String mqttv3 client org message activemq options

相比于 mqtt-client,mqttv3 使用的人相对多些,如果出现问题,好排查一些。

activemq 部署 MQTT 服务

查看文件:conf\activemq.xml,

如果包含下面内容,activemq 本身已经包含 MQTT 服务,不需要任何其它配置。

activemq 不局限于下面这些,还可以继续扩展,比如:NIO、SSL。

前往官网查看:https://activemq.apache.org/components/classic/documentation/auto

<transportConnectors>
	<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
	<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

Maven 依赖

<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.5</version>
</dependency>

关键参数

CleanSession 清除会话

设置为 false 之后,在 client 断线恢复之后,能接收到期间错过的消息。

注意磁盘空间和会话保持时间,避免磁盘空间不足,或者超时自动清除。

Qos 服务质量

  • QoS 0:最多一次传输,消息可能会丢失。
  • QoS 1:至少一次传输,消息可能会重复。
  • QoS 2:仅一次传输,确保消息不会重复也不会丢失。

消息保留 retain

服务器会保留最新的一条 retain 值为 true 的消息(只有最后一条);

‌这个机制,可以确保当客户端连接到 MQTT 服务器时,可以立即获取队列最新状态。‌

代码样例

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * 发送 MQTT 消息
 *
 * @author Mr.css
 * @version 2024-10-15 9:16
 */
public class MqttPublish {

    public static void main(String[] args) throws Exception {
        try {
            String topic = "mqtt";
            String broker = "tcp://localhost:1883";
            String clientId = "MqttPublish";

            MqttClient client = new MqttClient(broker, clientId);

            // 使用自定义参数连接
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName("dev");
            options.setPassword("dev".toCharArray());
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(60);
			
			// 是否清理会话,这个配置非常重要,置为 false,client 可以获取到断线期间的消息
            options.setCleanSession(false);
            client.connect(options);

            String content = "Hello MQTT";
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(2);
            message.setRetained(false);

            client.publish(topic, message);
            System.out.println("send: " + message);

            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}


import org.eclipse.paho.client.mqttv3.*;

/**
 * 接收 MQTT 消息
 *
 * @author Mr.css
 * @version 2024-10-15 9:16
 */
public class MQTTReceiver implements MqttCallback {
    public static void main(String[] args) {
        try {
            String topic = "mqtt";
            String broker = "tcp://localhost:1883";
            String clientId = "MQTTReceiver";

            MqttClient client = new MqttClient(broker, clientId);
            MqttCallback callback = new MQTTReceiver();

            client.setCallback(callback);


            // 使用自定义参数连接
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName("dev");
            options.setPassword("dev".toCharArray());
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(60);
            client.connect(options);

            int qos = 2;
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        // 当连接丢失时调用
        System.out.println("Connection lost");
        cause.printStackTrace();
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // 当接收到消息时调用
        System.out.println("Message arrived: " + new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // 当消息发送成功时调用
        System.out.println("Delivery complete");
    }
}

标签:String,mqttv3,client,org,message,activemq,options
From: https://www.cnblogs.com/chenss15060100790/p/18524419

相关文章

  • ActiveMQ消息模式Queue和Topic机制讲解
    Docker安装ActiveMQ镜像以及通过Java生产消费activemq示例_dockeractivemq-CSDN博客背景周末由于服务器异常宕机,导致业务系统重启后出现ActiveMQ中的数据没有被正常消费,运维认为是消息积压,便联系博主排查。最终发现并不存在消息积压,是因为采用ActiveMQTopic模式生产消费......
  • kettle从入门到精通 第八十九课 ETL之kettle kettle jms activemq使用教程
     场景:群里有小伙伴求助jmsactivemq如何使用kettle进行消费数据,之前连接过kafka,rabbtimq,想着activemq应该也没啥难度,结果低估了activemq。盘他!!!插曲:ActiveMq有两个版本:ActiveMQ Classic和ActiveMQArtemis两个版本,kettle中的jms插件连接activemq只支持ActiveMQArtemis,结果......
  • ActiveMQ配置warpper.log的大小和数量[转载]
    ActiveMQ服务器data目录下wrapper.log文件,默认产生的日志是不覆盖的,文件的大小逐渐增大,经过查询资料,和阅读Activemq的官方文档,找到了解决方案:<1>找到wrapper.conf文件: <2>修改wrapper.conf文件:设置文件大小修改maxsize的值,默认是0,也就是无限制,设置10m表示最大为10m,可以重新......
  • docker-compose 安装activemq、rocketmq
    目录结构创建目录#activemq目录mkdir-p/docker/activemq/datamkdir-p/docker/activemq/conf#rocket目录mkdir-p/docker/rocketmq/broker1/confmkdir-p/docker/rocketmq/broker1/logsmkdir-p/docker/rocketmq/broker1/storemkdir-p/docker/rocketmq/names......
  • ActiveMQ 的网络连接及消息回流机制
    1、ActiveMQ的网络连接activeMQ如果要实现扩展性和高可用性的要求的话,就需要用用到网络连接模式。NetworkConnector:主要用来配置broker与broker之间的通信连接如上图所示,MQ服务器1和MQ服务器2通过NewworkConnector相连,则生产者1和生产者2发送消息,消费者3和消费者4都可......
  • CVE-2015-5254(ActiveMQ-反序列化漏洞)
    漏洞描述编号:CVE-2015-5254影响版本:ApacheActiveMQ5.13.0之前5.x版本CVE地址:CVE-2015-5254漏洞原理:该漏洞源于程序没有限制可在代理中序列化的对象。远程攻击者可借助特制的序列化的JavaMessageService(JMS)ObjectMessage对象利用该漏洞执行任意代码复现环境windows,doc......
  • 1、消息队列框架:ActiveMQ - 开源项目研究文章
    ActiveMQ是Apache软件基金会下的一个开源消息队列服务,遵循JMS1.1规范(JavaMessageService),是一种面向消息中间件(MOM)的实现。它提供高可用性、出色的性能、可扩展性、稳定性和安全性的消息传递服务。ActiveMQ的架构ActiveMQ的架构包括生产者(Producer)、消费者......
  • C#连接使用ActiveMQ消息队列
      安装部署好集群环境:192.168.209.133:61616,192.168.209.134:61616,192.168.209.135:61616因为ActiveMQ的集群模式是一种master-slave模式,master节点对外提供服务,slave节点只做数据同步备份,当master节点挂了,slave就会成为master从而继续对外提供服务,以此实现高可用。......
  • PHP用stomp对ActiveMQ连接
    在PHP中使用STOMP协议连接ActiveMQ,你可以使用Stomp-PHP库,这是一个实现了STOMP1.0和STOMP1.1协议的PHP客户端。以下是使用Stomp-PHP库连接到ActiveMQ的基本步骤:安装Stomp-PHP库:使用Composer来安装Stomp-PHP库。在你的项目目录中运行以下命令: composerrequirestomp......
  • Spring Boot中集成ActiveMQ(九)
    SpringBoot中集成ActiveMQ:全面指南......