activemq 通过设置 DeliveryMode来控制消息是否持久化:
- DeliveryMode.NON_PERSISTENT:不持久化;
- DeliveryMode.PERSISTENT:持久化
queue消息持久化
queue默认是持久化的;当启动生产者,生产1000条消息;此时关闭mq服务,再开启mq服务,此时后台还是显示1000条消息待处理;
queue设置为非持久化;代码如下;当启动生产者,生产1条消息;此时关闭mq服务,再开启mq服务,此时后台显示没有消息待处理;如下图所示:
1 package com.mock.utils;
2
3 import javax.jms.Connection;
4 import javax.jms.DeliveryMode;
5 import javax.jms.JMSException;
6 import javax.jms.MessageProducer;
7 import javax.jms.ObjectMessage;
8 import javax.jms.Queue;
9 import javax.jms.Session;
10
11 import org.apache.activemq.ActiveMQConnectionFactory;
12
13 public class TestActiveMqProducerCanDelete {
14 private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15 private static final String QUEUE_NAME = "queue_01";
16 private static final String QUEUE_NAME_2 = "queue_02";
17
18 public static void main(String[] args) throws JMSException {
19 // 创建连接工厂,按照给定的URL,采用默认用户名密码
20 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
21 // 通过连接工厂 获取connection 并启动访问
22 Connection conn = activeMQConnectionFactory.createConnection();
23 conn.start();
24 // 创建session会话
25 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
26 // 创建目的地 (具体是队列还是主题topic)
27 Queue queue = session.createQueue(QUEUE_NAME);
28 // 创建消息的生产者
29 MessageProducer messageProducer = session.createProducer(queue);
30 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
31 // Byte类型的数据
32 ObjectMessage message = session.createObjectMessage();
33 User user = new User();
34 user.setAddress("嘉兴");
35 user.setName("Joy");
36 message.setObject(user);
37 message.setStringProperty("StringProperty", "我是 属性xxxxxxx");
38 messageProducer.send(message);
39
40 messageProducer.close();
41 session.close();
42 conn.close();
43 System.out.println("发送消息成功");
44 }
45
46
View Code
topic消息持久化:
topic 默认是不持久化的。或者说topic的持久化需要修改代码;在hello world中的实现方式,消息是不会被持久化的;
- topic消息持久化,就好比关注了微信公众号,之后如果退出了微信,仍然可以接收到公众号的在客户端退出之后的一些推送;
- topic消息非持久化:就好比关注了微信公众号,之后如果退出了微信,无法接收到公众号在退出这段时间内的推送;如果此时设备又上线了,那么离线这段时间内的推送就没有了;此时如果又在线了,只能接收到在线这个时间内的推送;
消费者代码:创建一个持久化的订阅者,clientId为 xx;监听消息;
1 package com.mock.utils;
2
3 import java.io.IOException;
4
5 import javax.jms.Connection;
6 import javax.jms.JMSException;
7 import javax.jms.Message;
8 import javax.jms.MessageListener;
9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 import javax.jms.Topic;
12 import javax.jms.TopicSubscriber;
13
14 import org.apache.activemq.ActiveMQConnectionFactory;
15
16 public class TestActiveMqTopicConsumer {
17 private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
18 private static final String TOPIC_NAME = "TOPIC_NAME_1";
19
20 public static void main(String[] args) throws JMSException, IOException {
21 System.out.println("****我是1号消费者******");
22 // 创建连接工厂,按照给定的URL,采用默认用户名密码
23 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
24 // 通过连接工厂 获取connection 并启动访问
25 Connection conn = activeMQConnectionFactory.createConnection();
26 conn.setClientID("xx");
27 // 创建session会话
28 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
29 // 创建目的地 (具体是队列还是主题topic)
30 Topic topic = session.createTopic(TOPIC_NAME);
31 TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "xx_xx");
32 conn.start();
33 topicSubscriber.setMessageListener(new MessageListener() {
34 @Override
35 public void onMessage(Message message) {
36 if (message != null && message instanceof TextMessage) {
37 TextMessage textMessage = (TextMessage) message;
38 try {
39 System.out.println("收到消息:" + textMessage.getText());
40 } catch (JMSException e) {
41 // TODO Auto-generated catch block
42 e.printStackTrace();
43 }
44 }
45 }
46
47 });
48 System.in.read();
49
50 topicSubscriber.close();
51 session.close();
52 conn.close();
53
54 }
55
56
View Code
生产者代码:
1 package com.mock.utils;
2
3 import javax.jms.Connection;
4 import javax.jms.DeliveryMode;
5 import javax.jms.JMSException;
6 import javax.jms.MessageProducer;
7 import javax.jms.Session;
8 import javax.jms.TextMessage;
9 import javax.jms.Topic;
10
11 import org.apache.activemq.ActiveMQConnectionFactory;
12
13 public class TestActiveMqTopicProducer {
14 private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15 private static final String TOPIC_NAME = "TOPIC_NAME_1";
16
17 public static void main(String[] args) throws JMSException {
18 // 创建连接工厂,按照给定的URL,采用默认用户名密码
19 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
20 // 通过连接工厂 获取connection 并启动访问
21 Connection conn = activeMQConnectionFactory.createConnection();
22
23 // 创建session会话
24 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
25 // 创建目的地 (具体是队列还是主题topic)
26 Topic topic = session.createTopic(TOPIC_NAME);
27
28 // 创建消息的生产者
29 MessageProducer messageProducer = session.createProducer(topic);
30 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
31 conn.start();
32 for (int i = 0; i < 3; i++) {
33 // 创建消息;可以理解为学生按照要求写好问题
34 TextMessage textMessage = session.createTextMessage("mession-------" + i);
35 // 通过messageProducer 发送给mq
36 messageProducer.send(textMessage);
37 }
38 messageProducer.close();
39 session.close();
40 conn.close();
41 System.out.println("发送消息成功");
42 }
43
44
View Code由于JMSDeliveryMod
e默认是持久化的,所以,生产者代码也可以这么写:
1 public static void main(String[] args) throws JMSException {
2 // 创建连接工厂,按照给定的URL,采用默认用户名密码
3 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
4 // 通过连接工厂 获取connection 并启动访问
5 Connection conn = activeMQConnectionFactory.createConnection();
6 conn.start();
7 // 创建session会话
8 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
9 // 创建目的地 (具体是队列还是主题topic)
10 Topic topic = session.createTopic(TOPIC_NAME);
11
12 // 创建消息的生产者
13 MessageProducer messageProducer = session.createProducer(topic);
14
15 for (int i = 0; i < 3; i++) {
16 // 创建消息;可以理解为学生按照要求写好问题
17 TextMessage textMessage = session.createTextMessage("mession-------" + i);
18 // 通过messageProducer 发送给mq
19 messageProducer.send(textMessage);
20 }
21 messageProducer.close();
22 session.close();
23 conn.close();
24 System.out.println("发送消息成功");
25
View Code
activemq 管理后台显示如下图,启动消费者代码,显示如下图;
启动消费者,显示如下图,即在线的拥有TOPIC 订阅者;
如果此时关闭消费者,那么后台显示该同永久Topic订阅者离线;
在永久Topic订阅者离线情况下,生产者生产了3条消息,那么后台管理显示 永久Topic订阅者有3条待处理的消息;
我从来不相信什么懒洋洋的自由。我向往的自由是通过勤奋和努力实现的更广阔的人生。 我要做一个自由又自律的人,靠势必实现的决心认真地活着。