特点
-
queue是点对点模式,一条消息对应一个消费者,topic是一对多模式,一条消息可能有一个或多个消费者
-
queue模式消息再发送后消费者可以在之后的任意时间消费,topic模式如果没有订阅者消息就是废消息,会被丢弃。
-
queue模式生产者与消费者之间没有时间相关性,topic模式下生产者和消费者之间有一定的时间相关性,消费者只能接收到订阅之后的生产者发送的消息。
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
-
测试消息队列
-
@author Mr.css
-
@version 2024-10-12 11:09
*/
public class ActiveMQSender {private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "MyQueue";public static void main(String[] args) throws Exception {
// 连接池
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);// 连接 Connection connection = factory.createConnection(); connection.start(); // 会话 - 自动确认 ACK Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAME); // 消息生产者 - 非持久化消息 MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = session.createTextMessage("Hello ActiveMQ!"); producer.send(message); System.out.println("Sent message: " + message.getText()); session.close(); connection.close();
}
}
/**
-
测试消息队列
-
@author Mr.css
-
@version 2024-10-12 11:09
*/
public class ActiveMQReceiver {private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "MyQueue";public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
// 连接
Connection connection = factory.createConnection();
connection.start();// 会话 - 自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAME); // 消息消费者 MessageConsumer consumer = session.createConsumer(destination); while (true) { // receive() 函数如果不带参数,则表示持续侦听消息 TextMessage message = (TextMessage) consumer.receive(1000); if (message != null) { System.out.println("Received message: " + message.getText()); } else { // 如果是持续接受消息,这里执行不到 break; } } session.close(); connection.close();
}
}
操作界面上的按钮
- Name:消息队列的名称。
- Number Of Pending Messages:等待消费者处理的消息数量。
- Number Of Consumers:当前连接到消息队列的消费者数量。
- Messages Enqueued:消息队列收到的消息总数。
- Messages Dequeued:消费者处理的消息总数。
Views:查看队列详细信息的链接,例如消息的详细信息、历史消息等。
- Browse:查看队列中的所有消息,可以按照不同的排序方式对消息进行排序,也可以对消息进行删除或重新发送等操作。
- Active Consumers:查看当前正在消费该队列的所有消费者信息,包括消费者的ID、连接信息、消费状态等。
- Active Producers:查看当前正在向该队列发送消息的所有生产者信息,包括生产者的ID、连接信息、发送状态等。
Operations:执行队列操作的链接,例如删除队列、重新启动队列等。
- Send To:可以向选定的队列发送消息。可以选择发送的消息的类型,例如文本或字节消息,并设置消息属性和有效期。
- Purge:可以清空选定队列的所有消息,这个操作是不可逆的,请慎重使用。
- Delete:可以删除选定的队列。删除队列时,所有该队列的消息将被删除,并且无法恢复。请确保在删除队列之前已经备份了所需的消息数据。
- Pause:可以暂停选定队列的消息传递。这个操作可以让你在不删除队列的情况下停止消费消息,等到问题解决后再继续消费。