首页 > 其他分享 >SpringBoot2集成RabbitMQ(注解+回调)

SpringBoot2集成RabbitMQ(注解+回调)

时间:2023-09-20 15:56:51浏览次数:44  
标签:String RabbitConfig 队列 RabbitMQ 交换机 SpringBoot2 注解 public

一、概述

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。
整体上看其实就是一个生产者消费者模型。只是这个模型更加抽象及精细化了。
RabbitMQ大体可以分为三层,其中第二层又可以细分为两层:
  1.生产者
  2.Broker
    a.交换机(exchange)
    b.队列
  3.消费者
大体上如下图所示:

   其中P代表生产者、X代表交换机、红色的长条代表队列,C代表消费者。

  运行过程大致描述:

    P生产的消息先放入交换机,交换机通过路由键找到绑定的队列,这样交换机的数据就直接到队列中了。而C作为消费者会监听是否有消息(可以主动去拿,可以被动接受)

    生产消费过程:P->exchange->queue->c

 



 

二、代码示例

  1.在pom.xml中引入RabbitMQ

  <!--        集成rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  2.配置application.yml

  rabbitmq:
    host: rabbitmq主机ip
    port: 5672
    username: 用户名
    password: 密码
    publisher-returns: true #开启发送失败退回
    publisher-confirm-type: correlated

  3.创建配置文件(简单配置)RabbitConfig.java

public class RabbitConfig {
    /**
     * 交换机名称(自定义的,想起什么就起身名字)
     */
    public static final String EXCHANGE = "topic.exchange";
    /**
     * 队列名称(自定义的,想起什么就起身名字)
     */
    public static final String QUEUE_A = "topic_test_queue";

    /**
     * 路由键(自定义的,想起什么就起身名字)
     */
    public static final String ROUTINGKEY_A = "key.#";
}

  4.编写生产者

/**
 * 消息生产者
 *
 * @author Tony
 * @version 2023
 * @date 2023/9/20 11:40
 */

@Slf4j
@Component
public class RabbitMqProducer implements RabbitTemplate.ConfirmCallback {
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMqProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 发送消息
     *
     * @param content 消息内容
     */
    public void sendMessage(String content) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(
                RabbitConfig.EXCHANGE,//交换机
                RabbitConfig.ROUTINGKEY_A,//路由键
                content,
                correlationData
        );

    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("接收到了RabbitMQ的回调id:{}", correlationData);
        if (ack) {
            log.info("消息成功消费");
        } else {
            log.info("消息消费失败:{}", cause);
        }
    }
}

  5.编写消费者

/**
 * 队列消费者
 *
 * @author Tony
 * @version 2023
 * @date 2023/9/20 11:41
 */
@Slf4j
@Component
@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value =RabbitConfig.EXCHANGE,durable = "false",type = "topic"),//指明交换机及交换机的类型以及是否持久化
        value = @Queue(value = RabbitConfig.QUEUE_A,durable = "false"),//指明交换机绑定的队列
        key = RabbitConfig.ROUTINGKEY_A))//指明交换机和队列之间的桥梁路由键
public class RabbitMqConsumer {

    /**
     * 接收String类型的消息
     * @param message
     */
    @RabbitHandler
    public void onStringHandle(String message) {
        log.info("RabbitMQ=>这是String类型的消息:{}",message);
    }

    /**
     * 接收Byte数组类型的消息
     * @param message
     */
    @RabbitHandler
    public void onByteHandle(byte[] message) {
        log.info("RabbitMQ=>这是byte[]类型的消息:{}",message);
    }
}

  6.编写一个RabbitMQController.java进行发送消息的测试

@RestController
@RequestMapping("/api/v1/pub/mq/")
public class RabbitMqController {
    @Autowired
    RabbitMqProducer rabbitMqProducer;

    @GetMapping("send")
    public String sendMsg() {
        String msg = "第" + new Random().nextInt(1000) + "个消息," + UUID.randomUUID().toString();
        rabbitMqProducer.sendMessage(msg);
        return msg;

    }
}

  7.运行效果

 

 

 

标签:String,RabbitConfig,队列,RabbitMQ,交换机,SpringBoot2,注解,public
From: https://www.cnblogs.com/tony-yang-flutter/p/17717523.html

相关文章

  • rabbitmq Broker not available; cannot force queue declarations during start: jav
    一、概述使用SpringBoot集成RabbitMQ遇到的问题。2023-09-2014:19:39.655INFO10256---[restartedMain]o.s.b.w.embedded.tomcat.TomcatWebServer:Tomcatstartedonport(s):80(http)withcontextpath''2023-09-2014:19:39.656INFO10256---[rest......
  • ubuntu安装RabbitMQ
    一、概述本地需要做RabbitMQ测试,想着安装包本地不利于来回换机子测试。就想着把其安装到云服务上,云服务的系统是ubuntu系统。这样通过远程连接,就不需要担心换机器的时候RabbitMQ用不了的问题了。可谓一次安装到处使用二、示例,备注:这里直接安装最新版的1.安装erlan......
  • 注解解析的BeanDefinition
    配置文件解析完后,剩下的工作就是注册了,是processBeanDefinition函数中的BeanDefinitionReaderUtils.registerBeanDefinition(bdHodler,getReaderContext().getRegistry())代码的解析了。publicstaticvoidregistryBeanDefinition(BeanDefinitionHolderdefinitionHolder,BeanDefi......
  • Mybatis 05 注解实现增删改查
    UserMapperpublicinterfaceUserMapper{/*注解实现增删改查*/@Select("select*fromtb_user")List<User>selectAll2();@Insert("insertintotb_uservalues(null,#{username},#{password},#{gender},#{addr})")voidinsert......
  • Spring注解工具类
    前言在看Spring源码的时候,经常会有处理注解的时候,比如从方法上获取注解,类上获取注解,注解属性别名。JDK中自带的获取注解API有点简单,不会从父类方法或者接口上的方法去查找,不能为属性定义别名等,因此Spring封装了一个便利的工具类,更加方便的去获取注解信息。JDK自带方法Annotated......
  • 详解Spring缓存注解@Cacheable、@CachePut和@CacheEvict
    详解Spring缓存注解@Cacheable、@CachePut和@CacheEvict的使用简介在大型的应用程序中,缓存是一项关键技术,用于提高系统的性能和响应速度。Spring框架提供了强大的缓存功能,通过使用缓存注解可以轻松地集成缓存机制到应用程序中。本文将详细介绍Spring框架中的@Cacheable、@CachePu......
  • 主动写入流对@ResponseBody注解的影响
    问题回溯2023年Q2某日运营反馈一个问题,商品系统商家中心某批量工具模板无法下载,导致功能无法使用(因为模板是动态变化的)商家中心报错(JSON串):{"code":-1,"msg":"失败"}负责的同事看到失败后立即与我展开讨论(因为不是关键业务,所以不需要回滚,修复即可),我们发现新功能模板下载的代码......
  • 主动写入流对@ResponseBody注解的影响 | 京东云技术团队
    问题回溯2023年Q2某日运营反馈一个问题,商品系统商家中心某批量工具模板无法下载,导致功能无法使用(因为模板是动态变化的)商家中心报错(JSON串):{"code":-1,"msg":"失败"}负责的同事看到失败后立即与我展开讨论(因为不是关键业务,所以不需要回滚,修复即可),我们发现新功能模板下载的代码与之前......
  • 支持SpEL表达式的自定义日志注解@SysLog介绍
    目录序言预期思路过程结果序言之前封装过一个日志注解,打印方法执行信息,功能较为单一不够灵活,近来兴趣来了,想重构下,使其支持表达式语法,以应对灵活的日志打印需求。该注解是方法层面的日志打印,如需更细的粒度,还请手撸log.xxx()。预期通过自定义注解,灵活的语法表达式,拦......
  • 消息队列 - RabbitMQ
    RabbitMQ简介RabbitMQ是一个广泛使用的开源消息队列系统,它实现了高级消息队列协议(AMQP)标准,为分布式应用程序提供了强大的消息传递功能。RabbitMQ是Erlang语言编写的,具有高度的可扩展性和可靠性,因此被广泛用于构建分布式、异步的消息通信系统。以下是关于RabbitMQ的详细介......