首页 > 其他分享 >四、activemq 消息持久化

四、activemq 消息持久化

时间:2022-09-29 15:08:45浏览次数:36  
标签:持久 jms topic session 消息 import activemq javax conn

activemq 通过设置 DeliveryMode来控制消息是否持久化:

  • DeliveryMode.NON_PERSISTENT:不持久化;
  • DeliveryMode.PERSISTENT:持久化

queue消息持久化

 queue默认是持久化的;当启动生产者,生产1000条消息;此时关闭mq服务,再开启mq服务,此时后台还是显示1000条消息待处理;

queue设置为非持久化;代码如下;当启动生产者,生产1条消息;此时关闭mq服务,再开启mq服务,此时后台显示没有消息待处理;如下图所示:

1 package com.mock.utils;
2
3 import javax.jms.Connection;
4 import javax.jms.DeliveryMode;
5 import javax.jms.JMSException;
6 import javax.jms.MessageProducer;
7 import javax.jms.ObjectMessage;
8 import javax.jms.Queue;
9 import javax.jms.Session;
10
11 import org.apache.activemq.ActiveMQConnectionFactory;
12
13 public class TestActiveMqProducerCanDelete {
14 private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15 private static final String QUEUE_NAME = "queue_01";
16 private static final String QUEUE_NAME_2 = "queue_02";
17
18 public static void main(String[] args) throws JMSException {
19 // 创建连接工厂,按照给定的URL,采用默认用户名密码
20 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
21 // 通过连接工厂 获取connection 并启动访问
22 Connection conn = activeMQConnectionFactory.createConnection();
23 conn.start();
24 // 创建session会话
25 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
26 // 创建目的地 (具体是队列还是主题topic)
27 Queue queue = session.createQueue(QUEUE_NAME);
28 // 创建消息的生产者
29 MessageProducer messageProducer = session.createProducer(queue);
30 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
31 // Byte类型的数据
32 ObjectMessage message = session.createObjectMessage();
33 User user = new User();
34 user.setAddress("嘉兴");
35 user.setName("Joy");
36 message.setObject(user);
37 message.setStringProperty("StringProperty", "我是 属性xxxxxxx");
38 messageProducer.send(message);
39
40 messageProducer.close();
41 session.close();
42 conn.close();
43 System.out.println("发送消息成功");
44 }
45
46

View Code

 

 

四、activemq 消息持久化_用户名

四、activemq 消息持久化_java_02

 

 

topic消息持久化:

topic 默认是不持久化的。或者说topic的持久化需要修改代码;在hello world中的实现方式,消息是不会被持久化的;

  • topic消息持久化,就好比关注了微信公众号,之后如果退出了微信,仍然可以接收到公众号的在客户端退出之后的一些推送;
  • topic消息非持久化:就好比关注了微信公众号,之后如果退出了微信,无法接收到公众号在退出这段时间内的推送;如果此时设备又上线了,那么离线这段时间内的推送就没有了;此时如果又在线了,只能接收到在线这个时间内的推送;

消费者代码:创建一个持久化的订阅者,clientId为 xx;监听消息;

1 package com.mock.utils;
2
3 import java.io.IOException;
4
5 import javax.jms.Connection;
6 import javax.jms.JMSException;
7 import javax.jms.Message;
8 import javax.jms.MessageListener;
9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 import javax.jms.Topic;
12 import javax.jms.TopicSubscriber;
13
14 import org.apache.activemq.ActiveMQConnectionFactory;
15
16 public class TestActiveMqTopicConsumer {
17 private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
18 private static final String TOPIC_NAME = "TOPIC_NAME_1";
19
20 public static void main(String[] args) throws JMSException, IOException {
21 System.out.println("****我是1号消费者******");
22 // 创建连接工厂,按照给定的URL,采用默认用户名密码
23 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
24 // 通过连接工厂 获取connection 并启动访问
25 Connection conn = activeMQConnectionFactory.createConnection();
26 conn.setClientID("xx");
27 // 创建session会话
28 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
29 // 创建目的地 (具体是队列还是主题topic)
30 Topic topic = session.createTopic(TOPIC_NAME);
31 TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "xx_xx");
32 conn.start();
33 topicSubscriber.setMessageListener(new MessageListener() {
34 @Override
35 public void onMessage(Message message) {
36 if (message != null && message instanceof TextMessage) {
37 TextMessage textMessage = (TextMessage) message;
38 try {
39 System.out.println("收到消息:" + textMessage.getText());
40 } catch (JMSException e) {
41 // TODO Auto-generated catch block
42 e.printStackTrace();
43 }
44 }
45 }
46
47 });
48 System.in.read();
49
50 topicSubscriber.close();
51 session.close();
52 conn.close();
53
54 }
55
56

View Code

生产者代码:

1 package com.mock.utils;
2
3 import javax.jms.Connection;
4 import javax.jms.DeliveryMode;
5 import javax.jms.JMSException;
6 import javax.jms.MessageProducer;
7 import javax.jms.Session;
8 import javax.jms.TextMessage;
9 import javax.jms.Topic;
10
11 import org.apache.activemq.ActiveMQConnectionFactory;
12
13 public class TestActiveMqTopicProducer {
14 private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15 private static final String TOPIC_NAME = "TOPIC_NAME_1";
16
17 public static void main(String[] args) throws JMSException {
18 // 创建连接工厂,按照给定的URL,采用默认用户名密码
19 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
20 // 通过连接工厂 获取connection 并启动访问
21 Connection conn = activeMQConnectionFactory.createConnection();
22
23 // 创建session会话
24 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
25 // 创建目的地 (具体是队列还是主题topic)
26 Topic topic = session.createTopic(TOPIC_NAME);
27
28 // 创建消息的生产者
29 MessageProducer messageProducer = session.createProducer(topic);
30 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
31 conn.start();
32 for (int i = 0; i < 3; i++) {
33 // 创建消息;可以理解为学生按照要求写好问题
34 TextMessage textMessage = session.createTextMessage("mession-------" + i);
35 // 通过messageProducer 发送给mq
36 messageProducer.send(textMessage);
37 }
38 messageProducer.close();
39 session.close();
40 conn.close();
41 System.out.println("发送消息成功");
42 }
43
44

View Code由于​JMSDeliveryMod​​​e默认是持久化的,所以,生产者代码也可以这么写:​

1 public static void main(String[] args) throws JMSException {
2 // 创建连接工厂,按照给定的URL,采用默认用户名密码
3 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
4 // 通过连接工厂 获取connection 并启动访问
5 Connection conn = activeMQConnectionFactory.createConnection();
6 conn.start();
7 // 创建session会话
8 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
9 // 创建目的地 (具体是队列还是主题topic)
10 Topic topic = session.createTopic(TOPIC_NAME);
11
12 // 创建消息的生产者
13 MessageProducer messageProducer = session.createProducer(topic);
14
15 for (int i = 0; i < 3; i++) {
16 // 创建消息;可以理解为学生按照要求写好问题
17 TextMessage textMessage = session.createTextMessage("mession-------" + i);
18 // 通过messageProducer 发送给mq
19 messageProducer.send(textMessage);
20 }
21 messageProducer.close();
22 session.close();
23 conn.close();
24 System.out.println("发送消息成功");
25

View Code

 

activemq 管理后台显示如下图,启动消费者代码,显示如下图;

四、activemq 消息持久化_mq_03

启动消费者,显示如下图,即在线的拥有TOPIC 订阅者;

四、activemq 消息持久化_用户名_04

如果此时关闭消费者,那么后台显示该同永久Topic订阅者离线;

四、activemq 消息持久化_java_05

在永久Topic订阅者离线情况下,生产者生产了3条消息,那么后台管理显示 永久Topic订阅者有3条待处理的消息;

四、activemq 消息持久化_mq_06

 

我从来不相信什么懒洋洋的自由。我向往的自由是通过勤奋和努力实现的更广阔的人生。 我要做一个自由又自律的人,靠势必实现的决心认真地活着。



标签:持久,jms,topic,session,消息,import,activemq,javax,conn
From: https://blog.51cto.com/u_10632206/5722935

相关文章

  • Redis Stream实现全部节点机器推送消息
    背景有时候,在微服务时代,我们需要对全部的机器节点进行通知。在常规情况下,一个请求经过负载均衡只有一个机器可以收到。那么,如何能让全部的机器都收到同样的请求呢?需要借助......
  • ActiveMQ的最简单应用-队列消息
    有一段时间不使用JMS了。现在的项目又有可能需要应用JMS,来提高服务质量和提高系统资源的利用率。提高服务质量,主要是保证不间断的服务。用JMS服务器接收任务,排成队列。应用......
  • [答疑]这个消息名是写发送数据还是接收数据
    ​​软件方法(下)分析和设计第8章连载[20210723更新]>>​​睡鱼(61***11)16:08:29 睡鱼(61***11)16:08:58他们说这个图有问题UML菜鸟(1***22)16:10:55有点暗睡鱼(61***11)......
  • [答疑]想表示消息返回值为Customer集合
    道奈特(240***10)14:34:55EA中序列图。我想表示消息返回值为Customer集合。目前只有一个Customer实体类,我需要另外新建一个CustomerList类吗?潘加宇(3504847)17:01:26不......
  • Linux系统编程——进程间通信:消息队列
    概述消息队列提供了一种在两个不相关的进程之间传递数据的简单高效的方法,其特点如下:1)消息队列可以实现消息的随机查询。消息不一定要以先进先出的次序读取,编程时可以按消息......
  • 【模板】可持久化Trie树
    理解了所谓⌈可持久化⌋的含义之后也就没多难了。\(\textrm{luoguP4735最大异或和}\)#include<iostream>constintN=3e5+10;structTRIE{ intsid[2]; intcn......
  • protobuf入门教程(二):消息类型
    操作流程1)在.proto文件中定义消息格式2)使用protobuf编译器生成C++类3)使用C++API来读写消息C++编程指导:​​https://developers.google.com/protocol-buffers/docs/c......
  • 使用Python的Win32api接口实现后台的键鼠模拟的消息模拟
    importtimeimportwin32apiimportwin32conimportwin32guiclassVirtual_Keyboard(object):def__init__(self,hwnd):self.hwnd=hwnds......
  • AcWing 算法提高课 可持久化
    可持久化的前提:数据结构本身的拓扑结构不变trie、线段树、树状数组、堆等都可持久化平衡树(一般)需要左旋和右旋,不可持久化  可持久化希望将数据结构的全部修改记录下......
  • Pinia持久化
    Pinia是Vue的存储库安装1、vue3npminstallpinia--save2、vue2npminstallpinia@vue/composition-api--save引入1、vue3再main.js中import{createPinia......