首页 > 其他分享 >20-SpringBoot整合RabbitMQ

20-SpringBoot整合RabbitMQ

时间:2022-10-04 23:46:35浏览次数:54  
标签:20 SpringBoot spring boot springframework RabbitMQ import test org

SpringBoot整合RabbitMQ

整合就直接使用单机版的了, 一直开着5个虚拟机, 我电脑不太行

新建SpringBoot工程

你已经是一个长大的IDEA了, 要学会自己新建工程, 然后IDEA自己创建了rabbitmq-consumer和rabbitmq-producer工程

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

POM.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.72</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

都是在创建工程的时候随便点的功能

启动类就不用我粘贴了吧

生产者代码实现

配置文件

spring:
  application:
    name: rabbitmq-producer
  rabbitmq:
    # 集群用
#    addresses: 192.168.247.142:5672
    username: root                         # 用户名
    password: 123456                       # 密码
    host: 192.168.247.142                  # IP地址
    port: 5672                             # 端口号
    virtual-host: /                        # 虚拟地址
    connection-timeout: 15000              # 连接超时时间
    # https://blog.csdn.net/yaomingyang/article/details/108410286 : 详解了publisher-confirm-type 与 publisher-confirms的关系
    publisher-confirm-type: correlated     # confirm回调模式
    publisher-returns: true                # 回调监听, 没有被路由到的消息, 配合下面的mandatory使用
    template:
      mandatory: true

server:
  port: 8001
  servlet:
    context-path: /

消息发送类

package com.dance.rabbitmqproducer.component;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

@Component
public class RabbitMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * correlationData : 唯一标识
     * ack : 是否成功持久化
     * cause : 失败原因
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
        System.out.println("消息ID: " + correlationData.getId());
        System.out.println("ack: " + ack);
        System.out.println("cause: " + cause);
    };

    /**
     * 发送消息
     *
     * @param messageBody 消息提
     * @param headers     参数
     * @throws Exception 异常
     */
    public void sendMessage(Object messageBody, Map<String, Object> headers) throws Exception {
        // 消息头
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<Object> message = MessageBuilder.createMessage(messageBody, messageHeaders);
        // 设置消息确认监听
        rabbitTemplate.setConfirmCallback(confirmCallback);
        /**
         * exchange : 交换机
         * routingKey : 路由键
         * message : 消息体
         * messagePostProcess : 消息后置处理器
         * correlation : 消息唯一ID
         */
        rabbitTemplate.convertAndSend("exchange-test",
                "test.order",
                message,
                postMessage -> {
                    System.out.println("post process message: " + postMessage);
                    return postMessage;
                },
                new CorrelationData(UUID.randomUUID().toString().replace("-", "")));
    }

}

测试类

package com.dance.rabbitmqproducer;

import com.dance.rabbitmqproducer.component.RabbitMQSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;

@SpringBootTest
public class TestSendMsg {

    @Autowired
    private RabbitMQSender rabbitMQSender;

    @Test
    public void testSendMsg() throws Exception {
        String msg = "hello world!";
        rabbitMQSender.sendMessage(msg,new HashMap<>());
    }

}

消费者代码实现

配置文件

spring:
  application:
    name: rabbitmq-consumer
  rabbitmq:
    # 集群用
    #    addresses: 192.168.247.142:5672
    username: root                         # 用户名
    password: 123456                       # 密码
    host: 192.168.247.142                  # IP地址
    port: 5672                             # 端口号
    virtual-host: /                        # 虚拟地址
    connection-timeout: 15000              # 连接超时时间
    listener:
      simple:
        acknowledge-mode: manual           # 手工ACK, 默认为auto, 自动ack
        concurrency: 5                                     # 默认通道数量
        max-concurrency: 10                                 # 最大通道数量
        prefetch: 1                        # 限制消费流量, ack 1个后再接收
      order:
        exchange:
          value: exchange-test
          type: topic
          durable: true
          ignoreDeclarationExceptions: true
        queue:
          value: test-queue
          durable: true
        bindings:
          key: test.#

server:
  port: 8002
  servlet:
    context-path: /

消息消费类

package com.dance.rabbitmqconsumer.component;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQRReceiver {

    /**
     * 消费消息
     *
     * @param messageBody 消息体
     * @param channel     通道
     * @throws Exception 异常
     * @RabbitListener 监听消息
     * @QueueBinding 绑定
     * @Queue 队列
     * @Exchange 交换机
     */
    // 直接写死[不建议]
//    @RabbitListener(
//            bindings = {
//                    @QueueBinding(
//                            value = @Queue(value = "test-queue", durable = "true"),
//                            exchange = @Exchange(
//                                    value = "exchange-test",
//                                    type = "topic",
//                                    durable = "true",
//                                    ignoreDeclarationExceptions = "true"
//                            ),
//                            key = "test.#"
//                    )
//            }
//    )
    // 采用配置文件的方式
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(
                                    value = "${spring.rabbitmq.listener.order.queue.value}",
                                    durable = "${spring.rabbitmq.listener.order.queue.durable}"
                            ),
                            exchange = @Exchange(
                                    value = "${spring.rabbitmq.listener.order.exchange.value}",
                                    type = "${spring.rabbitmq.listener.order.exchange.type}",
                                    durable = "${spring.rabbitmq.listener.order.exchange.durable}",
                                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"
                            ),
                            key = "${spring.rabbitmq.listener.order.bindings.key}"
                    )
            }
    )
    @RabbitHandler
    public void onMessage(Message<Object> messageBody, Channel channel) throws Exception {
        System.out.println("消费消息 :" + messageBody.getPayload());
        MessageHeaders headers = messageBody.getHeaders();
        long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

测试

启动消费者

启动成功

查看控制台

多了exchange-test交换机

多了test-queue队列

并且存在绑定关系, 路由key是test.#

对了一个连接, 这个就是消费者

在channel中多了5个通道, 这个就是我们在配置文件中设置的初始值

点进去可以看到消费的是哪个队列

启动生产者测试类

可以看到, 在confirm监听中, 得到了消息ID, ack为true, 没有异常, 消息发送成功

查看消费者

消费成功, SpringBoot成功集成RabbitMQ

当然这只是一个Demo, 具体开发中使用, 该需要各位自行改造

标签:20,SpringBoot,spring,boot,springframework,RabbitMQ,import,test,org
From: https://www.cnblogs.com/flower-dance/p/16754853.html

相关文章

  • 21-RabbitMQ延迟队列插件
    RabbitMQ延迟队列插件下载官网https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases我用的是3.10.7的RabbitMQ,但是官网没有这么新版本的,只......
  • 22-FAQ登录RabbitMQ控制台提示不是私密连接
    登录RabbitMQ控制台提示不是私密连接将启动的时候的赋权操作再执行一遍rabbitmqctladd_userrootrabbitmqctlset_permissions-p/root".*"".*"".*"rabbitmqct......
  • 15-RabbitMQ高级特性-消费端限流
    消费端限流什么是消费端限流假设一个场景,首先,我们RabbitMQ服务器有上万条消息未处理的消息,我们随机打开一个消费者客户端,会出现下面情况巨量的消息瞬间全......
  • 16-RabbitMQ高级特性-消费端的消息ACK与重回队列
    消费端的消息ACK与重回队列消费端的手工ACK和NACKACK分为自动和手动消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿如果由于服务器宕......
  • 17-RabbitMQ高级特性-TTL队列/消息
    TTL队列/消息TTL:TimeToLive,生存时间RabbitMQ支持消息的过期时间,在消息发送时可以指定RabbitMQ支持队列的过期时间,从消息进入队列开始计算,只要超过了队列......
  • 18-RabbitMQ高级特性-死信队列
    死信队列死信队列:DLX,Dead-Letter-Exchange利用DLX,当消息在一个队列中变成死信(deadmessage)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLXDL......
  • 07-RabbitMQ核心API-Direct Exchange
    DirectExchange简介所有发送到directexchange的消息被转发到Routekey中指定的Queue注意:Direct模式可以使用RabbitMQ自带的Exchange(defaultexchange),所以不需......
  • 08-RabbitMQ核心API-Topic Exchange
    TopicExchange简介所有发送到TopicExchange的消息被转发到所有关心RouteKey中指定Topic的Queue上Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个T......
  • 09-RabbitMQ核心API-Fanout Exchange
    FanoutExchange简介不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上Fanout交换机转发消息是最快的......
  • 10-RabbitMQ核心API-其他[Binding, Queue, Message, Virtual host]
    Binding绑定关系Exchange和Exchange,Queue之间的连接关系Binding中可以包含RouteKey或者参数Queue消息队列,实际存储消息数据Durability:是否持久化,Durable......