首页 > 其他分享 >SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复

时间:2022-12-23 18:04:29浏览次数:37  
标签:SpringBoot -- import springframework org 消息传递 public jms springboot


SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现消息传递

作者:一一哥

我在上一章节中,给大家介绍了ActiveMQ,本节中我会介绍Spring Boot中如何整合ActiveMQ,实现消息的创建和消费。

一. Spring Boot中整合ActiveMQ

1. 创建web项目

我们按照之前的经验,创建一个web程序,并将之改造成Spring Boot项目,具体过程略。

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_apache


2. 添加依赖包

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

3. 创建application.yml配置文件

#配置activemq
spring:
activemq:
#activemq的url
broker-url: tcp://127.0.0.1:61616
#用户名
user: admin
#密码
password: admin
pool:
enabled: false #是否使用线程池
max-connections: 100 #最大连接数
#是否信任所有包
packages:
trust-all: true
#默认情况下,activemq使用的是queue模式,如果要使用topic模式,必须设置为true
jms:
pub-sub-domain: true

4. 创建ActiveMQ的配置类

在这个类中创建连接工厂,消息队列等。

package com.yyg.boot.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

/**
* @Author 一一哥Sun
* @Date Created in 2020/4/14
* @Description Description
* //@EnableJms启用jms功能
*/
@Configuration
@EnableJms
public class ActivemqConfig {

@Autowired
private Environment env;

@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(env.getProperty("spring.activemq.broker-url"));
connectionFactory.setUserName(env.getProperty("spring.activemq.user"));
connectionFactory.setPassword(env.getProperty("spring.activemq.password"));
return connectionFactory;
}

/**
* 实现监听queue
*/
@Bean("jmsQueueListenerContainerFactory")
public JmsListenerContainerFactory<?> queueContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//开启接收topic类型的消息
factory.setPubSubDomain(false);
return factory;
}

/**
* 实现监听topic
*/
@Bean("jmsTopicListenerContainerFactory")
public JmsListenerContainerFactory<?> topicContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}

/**
* 队列名称
*/
@Bean("springboot.queue")
public Queue queue() {
return new ActiveMQQueue("springboot.queue") ;
}

/**
* Topic名称
*/
@Bean("springboot.topic")
public Topic topic() {
return new ActiveMQTopic("springboot.topic") ;
}

}

5. 创建消息生产者的工具类

在这个Producer类中,创建几个发送消息的的方法。

package com.yyg.boot.jms;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;

/**
* @Author 一一哥Sun
* @Date Created in 2020/4/15
* @Description 消息生产者
*/
@Slf4j
@Component
public class Producer {

@Resource(name = "springboot.queue")
private Queue queue;

@Resource(name = "springboot.topic")
private Topic topic;

@Resource(name = "springboot.replyQueue")
private Queue replyQueue;

@Autowired
private JmsMessagingTemplate jmsTemplate;

/**
* 发送消息,destination是发送到的目标队列,message是待发送的消息内容;
*/
public void sendMessage(Destination destination, final String message) {
jmsTemplate.convertAndSend(destination, message);
}

/**
* 发送队列消息
*/
public void sendQueueMessage(final String message) {
sendMessage(queue, message);
}

/**
* 发送Topic消息
*/
public void sendTopicMessage(final String message) {
sendMessage(topic, message);
}

}

6. 定义消费消息的Consumer类

package com.yyg.boot.jms;

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
* @Author 一一哥Sun
* @Date Created in 2020/4/15
* @Description 消息的消费者
*/
@Slf4j
@Component
public class Consumer {

/**
* 监听Queue队列,queue类型
*/
@JmsListener(destination="springboot.queue",
containerFactory = "jmsQueueListenerContainerFactory")
public void receiveQueue(String text){
log.warn(this.getClass().getName()+ "-->收到的报文为:"+text);
}

/**
* 监听Topic队列,topic类型,这里containerFactory要配置为jmsTopicListenerContainerFactory
*/
@JmsListener(destination = "springboot.topic",
containerFactory = "jmsTopicListenerContainerFactory")
public void receiveTopic(String text) {
log.warn(this.getClass().getName()+"-->收到的报文为:"+text);
}

}

7. 创建Controller,发布消息

package com.yyg.boot.web;

import com.yyg.boot.domain.User;
import com.yyg.boot.jms.Consumer;
import com.yyg.boot.jms.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @Author 一一哥Sun
* @Date Created in 2020/4/15
* @Description Description
*/
@RestController
public class MsgController {

@Autowired
private Producer producer;

@Autowired
private Consumer consumer;

@GetMapping("/sendQueue")
public String sendQueueMsg() {
User user = new User();
user.setId(1L);
user.setUsername("一一哥Queue");
user.setPassword("123");
producer.sendQueueMessage(user.toString());
return "发送成功!";
}

@GetMapping("/sendTopic")
public String sendTopicMsg() {
User user = new User();
user.setId(2L);
user.setUsername("一一哥Topic");
user.setPassword("123456");
producer.sendTopicMessage(user.toString());
return "发送成功!";
}

}

8. 创建入口类

package com.yyg.boot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ActiveMQApplication {

public static void main(String[] args) {
SpringApplication.run(ActiveMQApplication.class, args);
}

}

9. 完整项目结构

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_apache_02


10. 启动项目进行测试

测试发送点对点类型的消息

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_apache_03

队列中可以看到成功的收到了消息。

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_java_04


在ActiveMQ中也可以看到出现了springboot.queue队列,并且队列中的消息已被消费掉。

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_apache_05

测试发送发布者订阅者类型的消息

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_java_06

Topic中可以看到成功的收到了消息。

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_apache_07

在ActiveMQ中也可以看到出现了springboot.topic队列,并且队列中的消息已被消费掉。

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_spring_08


二. 回复消息的实现

我们在上面的基础之上,进一步实现发送消息后,进行消息的回复。

1. 改造ActiveMQ类

在该类中添加一个用来接收回复消息的队列。

/**
* 回复队列名称
*/
@Bean("springboot.replyQueue")
public Queue queueReply() {
return new ActiveMQQueue("springboot.replyQueue") ;
}

完整的ActivemqConfig代码:

package com.yyg.boot.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

/**
* @Author 一一哥Sun
* @Date Created in 2020/4/14
* @Description Description
* //@EnableJms启用jms功能
*/
@Configuration
@EnableJms
public class ActivemqConfig {

@Autowired
private Environment env;

@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(env.getProperty("spring.activemq.broker-url"));
connectionFactory.setUserName(env.getProperty("spring.activemq.user"));
connectionFactory.setPassword(env.getProperty("spring.activemq.password"));
return connectionFactory;
}

/**
* 实现监听queue
*/
@Bean("jmsQueueListenerContainerFactory")
public JmsListenerContainerFactory<?> queueContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//开启接收topic类型的消息
factory.setPubSubDomain(false);
return factory;
}

/**
* 实现监听topic
*/
@Bean("jmsTopicListenerContainerFactory")
public JmsListenerContainerFactory<?> topicContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}

/**
* 队列名称
*/
@Bean("springboot.queue")
public Queue queue() {
return new ActiveMQQueue("springboot.queue") ;
}

/**
* Topic名称
*/
@Bean("springboot.topic")
public Topic topic() {
return new ActiveMQTopic("springboot.topic") ;
}

/**
* 回复队列名称
*/
@Bean("springboot.replyQueue")
public Queue queueReply() {
return new ActiveMQQueue("springboot.replyQueue") ;
}

}

2. 改造Producer类

在Producer类中定义一个新的Queue类,并定义发送消息和消费消息的方法。

@Resource(name = "springboot.replyQueue")
private Queue replyQueue;

/**
* 发送队列的回复消息
*/
public void sendQueueMessageReply(String message) {
sendMessage(replyQueue, message);
}

/**
* 生产者监听消费者的应答信息
*/
@JmsListener(destination = "replyTo.queue",containerFactory = "jmsQueueListenerContainerFactory")
public void consumerMessage(final String text) {
log.warn("从replyTo.queue队列中收到的应答报文为:" + text);
}

完整的Producer类代码:

package com.yyg.boot.jms;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;

/**
* @Author 一一哥Sun
* @Date Created in 2020/4/15
* @Description 消息生产者
*/
@Slf4j
@Component
public class Producer {

@Resource(name = "springboot.queue")
private Queue queue;

@Resource(name = "springboot.topic")
private Topic topic;

@Resource(name = "springboot.replyQueue")
private Queue replyQueue;

@Autowired
private JmsMessagingTemplate jmsTemplate;

/**
* 发送消息,destination是发送到的目标队列,message是待发送的消息内容;
*/
public void sendMessage(Destination destination, final String message) {
jmsTemplate.convertAndSend(destination, message);
}

/**
* 发送队列消息
*/
public void sendQueueMessage(final String message) {
sendMessage(queue, message);
}

/**
* 发送Topic消息
*/
public void sendTopicMessage(final String message) {
sendMessage(topic, message);
}

/**
* 发送队列的回复消息
*/
public void sendQueueMessageReply(String message) {
sendMessage(replyQueue, message);
}

/**
* 生产者监听消费者的应答信息
*/
@JmsListener(destination = "replyTo.queue",containerFactory = "jmsQueueListenerContainerFactory")
public void consumerMessage(final String text) {
log.warn("从replyTo.queue队列中收到的应答报文为:" + text);
}

}

3. 改造Consumer类

在该类中添加接收消息,并且设置回复消息的方法。

/**
* 回复给生产者的应答信息
*/
@JmsListener(destination="springboot.replyQueue",containerFactory = "jmsQueueListenerContainerFactory")
@SendTo("replyTo.queue") //消费者应答后通知生产者
public String receiveQueueReply(String text){
log.warn(this.getClass().getName()+ "-->收到的报文为:"+text);
return "回复的信息为-->"+text;
}

完整的Consumer类代码:

package com.yyg.boot.jms;

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
* @Author 一一哥Sun
* @Date Created in 2020/4/15
* @Description 消息的消费者
*/
@Slf4j
@Component
public class Consumer {

/**
* 监听Queue队列,queue类型
*/
@JmsListener(destination="springboot.queue",
containerFactory = "jmsQueueListenerContainerFactory")
public void receiveQueue(String text){
log.warn(this.getClass().getName()+ "-->收到的报文为:"+text);
}

/**
* 监听Topic队列,topic类型,这里containerFactory要配置为jmsTopicListenerContainerFactory
*/
@JmsListener(destination = "springboot.topic",
containerFactory = "jmsTopicListenerContainerFactory")
public void receiveTopic(String text) {
log.warn(this.getClass().getName()+"-->收到的报文为:"+text);
}

/**
* 回复给生产者的应答信息
*/
@JmsListener(destination="springboot.replyQueue",containerFactory = "jmsQueueListenerContainerFactory")
@SendTo("replyTo.queue") //消费者应答后通知生产者
public String receiveQueueReply(String text){
log.warn(this.getClass().getName()+ "-->收到的报文为:"+text);
return "回复的信息为-->"+text;
}

}

4. 重新运行,测试消息的回复功能

调用如下接口,测试消息回复功能。

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_spring_09

此时可以看到控制台输出如下信息,说明消息回复成功。

SpringBoot2.x系列教程61--SpringBoot整合消息队列之ActiveMQ代码实现异步消息传递及回复_spring_10


至此,我们实现了Spring boot中如何整合ActiveMQ。
 

标签:SpringBoot,--,import,springframework,org,消息传递,public,jms,springboot
From: https://blog.51cto.com/u_7044146/5966089

相关文章

  • SpringBoot2.x系列教程57--SpringBoot中默认缓存实现方案
    SpringBoot2.x系列教程57--SpringBoot中默认缓存实现方案作者:一一哥在上一节中,我带大家学习了在SpringBoot中对缓存的实现方案,尤其是结合SpringCache的注解的实现方案,接下......
  • SpringBoot2.x系列教程56--SpringBoot中的缓存实现方案介绍
    SpringBoot2.x系列教程56--SpringBoot中的缓存实现方案介绍作者:一一哥一.Spring中对缓存的支持1.SpringCache简介从Spring3.1开始,Spring中引入了对Cache的支持。而在Spri......
  • SpringBoot2.x系列教程36--整合SpringMVC之CORS跨域访问处理(上)
    SpringBoot2.x系列教程36--整合SpringMVC之CORS跨域访问处理(上)作者:一一哥一.跨域问题及解决1.什么是跨域访问?JavaScript出于安全方面的考虑,做了一个同源策略的限制,也就......
  • 解决表单action属性传参时值为null的问题
    一.异常重现最近壹哥有个学生在学习Servlet进行Web开发时,尝试着使用表单中的action传递参数,结果他发现在Servlet中无法接收到前端传过来的参数值。我们先来看看他的代码,具......
  • 关于jsjiami.v6加密和解密
    JavaScript解密是指在JavaScript代码被加密之后,使用特定的工具或方法来恢复其原有的可读性。这种技术通常用于对JavaScript代码进行保护,以防止代码被未经授权的人窃取......
  • 冒泡排序
    冒泡排序只会操作相邻的两个数据。每次冒泡操作都会对相邻的两个元素进行比较,看是否满足大小关系要求。如果不满足就让它俩互换。一次冒泡会让至少一个元素移动到它应......
  • 如何理解动态规划
    一、动态规划三板斧状态转移公式循环或递归性能优化二、WHY1、状态转移公式动态规划与分治不一样,分治的问题是相互独立的,而动态规划的各个状态是有关联关系......
  • 初学java懵了,这个异常是怎么产生的?
    一.异常现象最近壹哥的老表开始学Java啦,结果学了还不到两天,就遇到了他解决不了的问题,然后就跑来问我了。不知有没有其他初学java的小伙伴,大家可以过来围观一下,看看下面的问......
  • 对Integer进行等值比较时踩到的一个坑
    一.引言小伙伴们应该都知道,只要我们写代码,必然就会有BUG的存在。所以解决BUG的过程会伴随程序员的一生,这就是一个无解的常态。在平时的学习和工作过程中,我们需要通过不断地实......
  • BMP格式详解
    BMP文件格式详解(BMPfileformat)BMP文件格式,又称为Bitmap(位图)或是DIB(Device-IndependentDevice,设备无关位图),是Windows系统中广泛使用的图像文件格式。由于它可以不作......