首页 > 其他分享 >SpringBoot 整合 RabbitMQ

SpringBoot 整合 RabbitMQ

时间:2024-10-19 15:47:50浏览次数:1  
标签:SpringBoot springframework item 交换机 RabbitMQ 整合 import org public

简介

一般在开发过程中:

生产者工程:

  1. application.yml 文件配置 RabbitMQ 相关信息;

  2. 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定;

  3. 注入 RabbitTemplate 对象,通过 RabbitTemplate 对象发送消息到交换机。

消费者工程:

  1. application.yml 文件配置 RabbitMQ 相关信息;

  2. 创建消息处理类,用于接收队列中的消息并进行处理。

搭建生产者工程

创建工程

创建 SpringBoot 工程 springboot-producer:

image-20241019154044546

添加依赖

pom.xml 文件中添加 spring-boot-starter-amqp:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置 RabbitMQ

1)配置文件

创建 application.yml,内容如下:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /vhost
    username: admin
    password: 123456

2)绑定交换机和队列

创建 RabbitMQ 队列与交换机绑定的配置类 com.itheima.rabbitmq.config.RabbitMQConfig

package com.zhangmingge.springbootproducer.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    // 交换机名称
    public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
    // 队列名称
    public static final String ITEM_QUEUE = "item_queue";

    // 声明交换机
    @Bean("itemTopicExchange")
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
    }

    // 声明队列
    @Bean("itemQueue")
    public Queue itemQueue() {
        return QueueBuilder.durable(ITEM_QUEUE).build();
    }

    // 绑定队列和交换机
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }
}

搭建消费者工程

创建工程

创建 SpringBoot 工程 springboot-consumer:

image-20241019154009386

添加依赖

pom.xml 文件中添加 spring-boot-starter-amqp:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置 RabbitMQ

创建 application.yml,内容如下:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /vhost
    username: admin
    password: 123456

消息监听处理类

编写消息监听器 MyListener:


package com.zhangmingge.springbootconsumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyListener {

    /**
     * 监听某个队列的消息
     *
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "item_queue")
    public void myListener1(String message) {
        System.out.println("消费者接收到的消息为:" + message);
    }
}

测试

在生产者工程 springboot-producer 中创建测试类,发送消息:

package com.zhangmingge.springbootproducer;

import com.zhangmingge.springbootproducer.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.insert", "商品新增,routing key 为 item.insert");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.update", "商品修改,routing key 为 item.update");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.delete", "商品删除,routing key 为 item.delete");
    }
}

先运行上述测试程序(交换机和队列才能先被声明和绑定),然后启动消费者,在消费者工程 springboot-consumer 中控制台查看是否接收到对应消息。

另外,也可以在 RabbitMQ 的管理控制台中查看到交换机与队列的绑定:

1556074827222

标签:SpringBoot,springframework,item,交换机,RabbitMQ,整合,import,org,public
From: https://www.cnblogs.com/Higurashi-kagome/p/18475985

相关文章