首页 > 其他分享 >ActiveMQ的使用

ActiveMQ的使用

时间:2022-10-31 21:48:05浏览次数:36  
标签:Queue Session 创建 Topic session 消息 使用 ActiveMQ

应用场景:异步处理、应用解耦、流量消峰

简介:Apache出品,是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现

JMS消息模型:

  1) P2P 点对点模型(Queue队列模型)

    特点:每个消息只有一个消费者(消息一旦被消费,消息就不再存在消息队列中)

          生产者和消费者之间在时间上没有依赖性,不需要同时处于运行状态

       消费者在成功消费之后,需要向队列应答成功

  2) Publish/Subscribe(Pub/Sub) 发布/订阅模式(Topic主题模型)

    特点:每个消息可以有多个消费者

          发布者和订阅者有时间依赖性(先订阅主题,再发送消息)

          订阅者必须保持运行状态,才能接收到消息

JMS编程API:

  1)ConnectionFactory 

    创建Connection对象的工厂,针对两种模型分别有QueueConnection和TopicConnectionFactory

  2)Destination

    对消费者来说是消息来源,某个队列(Queue)或某个主题(Topic),对生产者来说是某个队列(Queue)或某个主题(Topic)  

  3)Connection

    表示客户端与JMS系统之间建立的链接(对TCP/IP socket的包装) ,Connection可以产生一个或多个Session

  4) Session

    是对消息进行操作的接口,可以创建生产者、消费者、消息等。也提供了事务功能,如果需要可以将发送/接收消息的动作放到一个事务中

  5)Producter

    生产者:由Session创建,并用于将消息发送到Destination对象中。分两种类型 QueueSender和TopicPublisher,可调用生产者的send或publish方法发送消息

  6)Consumer

    消费者:由Session创建,用于接收Destination中的消息。分两种类型 QueueReceiver和TopicSubscriber,分别通过createReceiver(Queue)或createSubScriber(Topic)来创建。 也可以用createDurableSubscriber来创建持久化的订阅者。

  7)MessageListener

    消息监听器:如果注册了消息监听器,一旦消息到达,自动调用onMessage方法。EJB中的MDB(Message Driven Bean)就是一种MessageLinstener

  

 原生JMS-点对点

Producer:

public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.112.129:61616");

//2.创建连接
Connection connection = factory.createConnection();

//3.打开连接
connection.start();

//4.创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//5.创建目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Queue queue = session.createQueue("queue01");

//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);

//7.发送消息
TextMessage test_message = session.createTextMessage("test message");
producer.send(test_message);

//8.释放资源
session.close();
connection.close();
}

Consumer:
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.112.129:61616");

//2.创建连接
Connection connection = factory.createConnection();

//3.打开连接
connection.start();

//4.创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//5.指定目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Queue queue01 = session.createQueue("queue01");

//6.创建消息消费者
MessageConsumer consumer = session.createConsumer(queue01);

  consumer.setMessageListener(message -> {
  if(message instanceof TextMessage){
  TextMessage txt = (TextMessage)message;
  try {
   System.out.println("接收的消息(2): " + txt.getText());
  } catch (JMSException e) {
   e.printStackTrace();
   }
   }
  });
}

 原生JMS-发布订阅

Producer:

  //5.创建目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Topic topic = session.createTopic("topic01");

//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);

Consumer:

  //5.指定目标地址(Queue:点对点消息;Topic:发布/订阅消息)
  Topic topic = session.createTopic("topic01");

//6.创建消息消费者
MessageConsumer consumer = session.createConsumer(topic01);

springboot整合ActiveMQ
导入坐标
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Producer:
配置类application.yml
server:
port: 9001
spring:
application:
name: activemq-producer
# springboot 与 activemq的整合配置
# 连接工厂
activemq:
broker-url: tcp://192.168.112.129:61616
user: admin
password: admin
# 指定发送模式
jms:
pub-sub-domain: false
测试类 SpringbootProducer 
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringbootProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Test
public void ptpSender(){
jmsMessagingTemplate.convertAndSend("springboot_queue", "springboot test -- queue");
}

}
Consumer:
依赖同Producer,配置同Producer
编写Listener
@Component
public class MsgListener {
@JmsListener(destination = "springboot_queue")
public void receiveMsg(Message message) throws JMSException {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
System.out.println("接收的消息: " + txtMsg.getText());
}
}
}




标签:Queue,Session,创建,Topic,session,消息,使用,ActiveMQ
From: https://www.cnblogs.com/Fei-Gao/p/16827933.html

相关文章

  • [单片机][N76E003][MCP4017][MCP4018][MCP4019] 数字电位器 使用方法 例子 代码
    7位:电阻分辨率-127电阻器(128步)-->W/*-----------------------------------------宏定义-----------------------------------------*//*------------------------------......
  • Linux C语言 Makefile 的使用 函数
    创建三个.c文件终端输入:创建目录:mkdirMakefile进入目录:cdMakefile使用gedit:gedit第一个文件:main.c#include<stdio.h>#include"input.h"#include"calcu.h"intm......
  • JS正则RegExp.test()使用注意事项(不具有重复性)
    参考来源:https://www.jb51.net/article/101466.htm本文实例分析了JS正则RegExp.test()使用注意事项。分享给大家供大家参考,具体如下:先看下面这段代码://2012-12-121......
  • git的介绍和使用
    git介绍什么是gitgit是一种版本控制器-控制的对象是开发的项目代码什么是版本控制器完成协同开发项目,帮助程序员整合代码 i)帮助开发者合并开发的代码 ii)如果出现......
  • Qt+VLC简单的使用显示视频Demo
    先看看效果: vlc播放视频,要比QMediaPlayer实用的多,并且同时运行20个视频时不会出现卡顿。 这个Demo功能实现非常简单,简单的说一下vlc流程:1、创建并初始化一个libvlc实例LI......
  • 如何使用容联SDK,以及如何使用回调简单示例
    一、容联SDK如何使用A、收费标准(公有云走网络的都是免费的)     语音会议提供一个房间最大支持32方     视频会议提供一个房间最大支持30方(免费的弊......
  • segger_rtt使用帮助
    一、KEIL中添加[RTT_Syscalls_KEIL.c][SEGGER_RTT.c][SEGGER_RTT_printf.c],并将文件夹[segger_rtt]添加到引用目录中。二、在[SEGGER_RTT_printf.c]中顶部添加#defineNR......
  • [nrf51][nrf52][low power] 使用RTX时 如何进入低功耗
    1、由于RTX使用了RTC1中断,导致频繁唤醒,无法进入睡眠状态。2、思路:进入低功耗前,关闭RTC1电源,按键唤醒后重新配置RTC1。伪代码:进入低功耗前1.关闭外设GPIOnrf_gpio_......
  • GIT入门与Gitee的使用
    一:Git是什么?Git是目前世界上最先进的分布式版本控制系统。工作原理/流程: Workspace:工作区Index/Stage:暂存区Repository:仓库区(或本地仓库)Remote:远程仓库(比如Githu......
  • CentOS9上面使用rpm方式安装SQLServer2022的简单总结
    CentOS9上面使用rpm方式安装SQLServer2022的简单总结下载需要的资料下载CentOS9Stream的安装介质https://mirrors.bfsu.edu.cn/centos-stream/9-stream/BaseOS/x86_64......