首页 > 其他分享 >activemq - topic模式

activemq - topic模式

时间:2024-11-04 09:11:14浏览次数:1  
标签:String 模式 topic session static private message activemq final

特点

  1. queue 是点对点模式,一条消息对应一个消费者,topic 是一对多模式,一条消息可能有一个或多个消费者

  2. queue 模式消息再发送后消费者可以在之后的任意时间消费,topic 模式如果没有订阅者,消息就是废消息,会被丢弃。

  3. queue 模式生产者与消费者之间没有时间相关性,topic 模式下生产者和消费者之间有一定的时间相关性,消费者只能接收到订阅之后的生产者发送的消息。

消息模型:

  • 队列:Point-to-Point(P2P) --- 点对点(生产者发送一条消息到 queue,只有一个消费者能收到)
  • 主题:Publish/Subscribe(Pub/Sub)--- 发布订阅(发布者发送到 topic 的消息,只有订阅了topic 的订阅者才会收到消息)

topic 消费的几种情况

  • 0 个消费者,消息会被直接废止,
  • n 个消费者,一部分消费,
  • n 个消费者,全都消费了,

JMS 规范

JMS开发步骤;

  1. 创建连接工厂(ConnectionFactory)。

  2. 创建连接(Connection)。

  3. 创建会话(Session)。

  4. 创建 Topic(Destination)。

  5. 创建消息生产者(MessageProducer)。

  6. 创建消息消费者(MessageConsumer)。

  7. 发送消息。

  8. 接收消息。

  9. 关闭资源。

案例


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 测试消息队列
 *
 * @author Mr.css
 * @version 2024-10-12 11:09
 */
public class ActiveMQSender {

    private static final String USERNAME = "admin";
    private static final String PASSWORD = "admin";
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String TOPIC_NAME = "MyQueue";

    public static void main(String[] args) throws Exception {
        // 连接池
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);

        // 连接
        Connection connection = factory.createConnection();
        connection.start();

        // 会话 - 自动确认 ACK
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic(TOPIC_NAME);

        // 消息生产者 - 非持久化消息
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        TextMessage message = session.createTextMessage("Hello ActiveMQ!");
        producer.send(message);

        System.out.println("Sent message: " + message.getText());

        session.close();
        connection.close();
    }
}




import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 测试消息队列
 *
 * @author Mr.css
 * @version 2024-10-12 11:09
 */
public class ActiveMQReceiver {

    private static final String USERNAME = "admin";
    private static final String PASSWORD = "admin";
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String TOPIC_NAME = "MyQueue";

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 连接
        Connection connection = factory.createConnection();
        connection.start();

        // 会话 - 自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic(TOPIC_NAME);

        // 消息消费者
        MessageConsumer consumer = session.createConsumer(destination);

        while (true) {
            // receive() 函数如果不带参数,则表示持续侦听消息
            TextMessage message = (TextMessage) consumer.receive();
            if (message != null) {
                System.out.println("Received message: " + message.getText());
            } else {
                // 如果是持续接受消息,这里执行不到
                break;
            }
        }

        session.close();
        connection.close();
    }
}

消息侦听器的写法

前面的案例,可以让消息侦听,运行在指定的线程内部,编码的自由度更高写(案例中为主线程)。

我们一般不需要做到这种程度,侦听器的写法,可能更符合我们的编程习惯。

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 测试消息队列
 *
 * @author Mr.css
 * @version 2024-10-12 11:09
 */
public class ActiveMQReceiver {

    private static final String USERNAME = "admin";
    private static final String PASSWORD = "admin";
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String TOPIC_NAME = "MyQueue";

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 连接
        Connection connection = factory.createConnection();
        connection.start();

        // 会话 - 自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic(TOPIC_NAME);

        // 消息消费者
        MessageConsumer consumer = session.createConsumer(destination);

        // 这里会启用子线程进行侦听,程序会继续向下执行
        consumer.setMessageListener(message -> {
            if(message instanceof TextMessage){
                try {
                    System.out.println("Received message: " + ((TextMessage) message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        // 这里需要阻塞主线程,不然主线程跑完,消息侦听线程就跟着结束了,
        // 想要关闭程序,控制台随便输入点什么,程序就会继续执行了,
        // 以 springboot 项目为例,代码写到这就够了,后面的 close() 放到 容器销毁侦听事件中。
        System.in.read();

        consumer.close();
        session.close();
        connection.close();
    }
}

标签:String,模式,topic,session,static,private,message,activemq,final
From: https://www.cnblogs.com/chenss15060100790/p/18524412

相关文章

  • activemq - 断线恢复
    什么是消息持久化?业务需求:如果有人重启队列,队列里的东西要留着,不能给清空了。在activemq中,通过设置DeliveryMode来控制消息是否持久化。DeliveryMode.NON_PERSISTENT:不持久化;DeliveryMode.PERSISTENT:持久化;queue默认是持久化的;topic默认是不持久化的;topic模式下,如......
  • activemq - ack机制
    疑问:在写demo的时候,如果client被强制中断,消息来不及处理,这时候消息又出队列了,这样不是会产生严重的问题嘛?一个会话中,可以同时处理一批数据,如果一条失败了,之前的也要求回滚的话,要怎么处理?获取一个消息之后,发现程序无法处理这条消息,想要退还回去,该怎么办?方案:实际上,active......
  • activemq - jms规范
    什么是JMS?‌ActiveMQJMS是JavaMessageService的缩写。‌JMS是Java平台上的一个标准API,用于实现应用程序之间的消息传递和通信。它是JavaEE规范的一部分,旨在提供一个与厂商无关的API,以便访问不同的消息中间件系统‌。JMS的组成结构和特点很多内容之前已经提到......
  • activemq - mqttv3
    相比于mqtt-client,mqttv3使用的人相对多些,如果出现问题,好排查一些。activemq部署MQTT服务查看文件:conf\activemq.xml,如果包含下面内容,activemq本身已经包含MQTT服务,不需要任何其它配置。activemq不局限于下面这些,还可以继续扩展,比如:NIO、SSL。前往官网查看:https://a......
  • 实验9:桥接模式
    [实验任务一]:两个维度的桥接模式用桥接模式实现在路上开车这个问题,其中,车可以是car或bus,路可以是水泥路或沥青路。 1.类图   1. 源代码1.Car.javapackagetest9; publicclassCarimplementsVehicle{    @Override    publicvoiddrive(){   ......
  • 桥接模式
    1. 类图:  2.源代码 //车接口interfaceVehicle{    voiddrive();} //小汽车类classCarimplementsVehicle{    privateStringname;     publicCar(Stringname){        this.name=name;    }     @Override......
  • 适配器模式——双向适配器
    一、问题描述  设计和实现一个双向适配器实例,使得猫Cat可以学狗Dog叫Cry(),狗可以学猫抓老鼠CatchMouse(),设计类图并编程实现。1.类图2.源代码1.Cat接口: publicinterfaceCat{  voidcry();  voidcatchMouse();}2.实体Cat类(另一个类似): publicclassConc......
  • 08.装饰者模式设计思想
    08.装饰者模式设计思想目录介绍01.装饰者模式基础1.1装饰者模式由来1.2装饰者模式定义1.3装饰者模式场景1.4装饰者模式思考02.装饰者模式实现2.1罗列一个场景2.2装饰者结构2.3装饰者基本实现03.装饰者实例演示3.1需求分析3.2案例基础实现3.3演变......
  • 微服务设计模式:节流模式(Throttling Pattern)
    微服务设计模式:节流模式(ThrottlingPattern)定义节流模式(ThrottlingPattern)是一种控制资源使用速率的设计模式,广泛应用于云计算和微服务架构中,以防止服务过载和资源耗尽。它通过限制客户端请求的数量,保证系统稳定性和可用性。结构节流模式的核心组件包括:请求过滤器:拦......
  • 2024-11-03:得到更多分数的最少关卡数目。用go语言,Alice 和 Bob 正在进行一个有 n 个关
    2024-11-03:得到更多分数的最少关卡数目。用go语言,Alice和Bob正在进行一个有n个关卡的游戏,其中每个关卡要么是困难模式(possible[i]==0),要么是简单模式(possible[i]==1)。玩家在游戏中获得分数的规则如下:通过简单模式的关卡可得1分,而遇到困难模式的关卡将扣除1分。Alice从......