首页 > 其他分享 >SpringBoot 整合 RabbitMQ

SpringBoot 整合 RabbitMQ

时间:2024-02-04 15:57:28浏览次数:21  
标签:SpringBoot org RabbitMQ springframework 整合 sysMessageEntity import new public

Docker 搭建 RabbitMQ

  1. 拉取 RabbitMQ 的镜像执行命令 docker pull rabbitmq:3.7-management
  2. 执行运行命令 docker run -d --hostname rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 rabbitmq:3-management
  3. 打开浏览器访问 http://localhost:15672 ,账号:admim, 密码: admin

RabbitMQ 消息类型

Topic

topic 既可以实现 direct,也可实现 Fanout,topic 通过 routing_key 进行路由,key 可以存在两种特殊字符“ _ ”与“#”,用于做模糊匹配,其中“_”用于匹配一个单词,“#”用于匹配多个单词(可以是零个) *代表全部的

SpringBoot 整合

通过编写基本配置类,实现交换器和路由器绑定,和实现配置手动确认等,需要实现 RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback 接口,实现接口的方法,还需设置配置文件。

基本配置类

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @ClassName RabbitConfig
 * @Author zhangjiahao
 * @Description RabbitMQ基本配置类
 * @Date $ $
 **/
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private CachingConnectionFactory connectionFactory;

    @PostConstruct
    public void init(){
        System.out.println("初始化rabbitmq");
        rabbitTemplate.setConfirmCallback(this);            // 指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             // 指定 ReturnCallback
    }

    /**
     * 如果消息到达 exchange, 则 confirm 回调, ack = true
     * 如果消息不到达 exchange, 则 confirm 回调, ack = false
     * 需要设置spring.rabbitmq.publisher-confirms=true
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        Message returnedMessage = correlationData.getReturnedMessage();
        if (ack){
            System.out.println("消息是否到达Exchange:{}" +  "消息成功到达Exchange");
        }else {
            System.out.println("消息是否到达Exchange:{}" + "消息到达Exchange失败");
        }

        if (!ack) {
            System.out.println("消息到达Exchange失败原因:{}"+ cause);
            // 根据业务逻辑实现消息补偿机制

        }
    }

    /**
     * exchange 到达 queue, 则 returnedMessage 不回调
     * exchange 到达 queue 失败, 则 returnedMessage 回调
     * 需要设置spring.rabbitmq.publisher-returns=true
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

        log.error("queuec处理失败");
        System.out.println("消息报文:{}"+ new String(message.getBody()));
        System.out.println("消息编号:{}"+ replyCode);
        System.out.println("描述:{}"+ replyText);
        System.out.println("交换机名称:{}"+exchange);
        System.out.println("路由名称:{}"+ routingKey);

        // 根据业务逻辑实现消息补偿机制
    }

    @Bean(name = "consumerlistenerContainer")
    public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(2);
        factory.setMaxConcurrentConsumers(3);
        factory.setPrefetchCount(5);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

}

实现 Direct 队列

Direct 配置类

import io.renren.modules.generator.constant.MQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author zhangjiahao
 * @description DirectRabbitConfig
 * @date 2020-03-23 23:03:21
 */
@Configuration
public class DirectRabbitConfig {

    //队列 起名:TestDirectQueue
    @Bean
    public Queue directQueue() {
        return new Queue(MQConstant.MQQueueAndExchange.DIRECT_QUEUE, true);  //true 是否持久
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(MQConstant.MQQueueAndExchange.DIRECT_QUEUE_EXCHANGE);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(MQConstant.MQQueueAndExchange.DIRECT_QUEUE_ROUTING_KEY);
    }
}

实现 Fanout 队列

Fanout 配置类

import io.renren.modules.generator.constant.MQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName FanoutRabbitConfig
 * @Author zhangjiahao
 * @Description //TODO $
 * @Date $ $
 **/
@Configuration
public class FanoutRabbitConfig {

    @Bean
    public Queue fanoutQueue(){
        return new Queue(MQConstant.MQQueueAndExchange.FANOUT_QUEUE);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(MQConstant.MQQueueAndExchange.FANOUT_QUEUE_EXCHANGE);
    }

    @Bean
    public Binding fanoutBinding(){
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }
}

实现 Topic 队列

Topic 配置类

import io.renren.modules.generator.constant.MQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName TopicRabbitConfig
 * @Author zhangjiahao
 * @Description //TODO $
 * @Date $ $
 **/
@Configuration
public class TopicRabbitConfig {

    @Bean
    public Queue topicQueue(){
        return new Queue(MQConstant.MQQueueAndExchange.TOPIC_QUEUE);
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MQConstant.MQQueueAndExchange.TOPIC_QUEUE_EXCHANGE);
    }

    @Bean
    public Binding topicBinding(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(MQConstant.MQQueueAndExchange.TOPIC_QUEUE_ROUTING_KEY);
    }
}

配置文件 yml

spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 127.0.0.1
    port: 5672
    ## 手动确认
    publisher-confirms: true
    publisher-returns: true

编写消费者

消费端的代码基本一致,所以写个 Topic 的代码模板。

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.rabbitmq.client.Channel;
import io.renren.modules.generator.constant.MQConstant;
import io.renren.modules.generator.entity.BrokerMessageLogEntity;
import io.renren.modules.generator.entity.SysMessageEntity;
import io.renren.modules.generator.service.BrokerMessageLogService;
import io.renren.modules.generator.service.SysMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;
import java.util.Map;

/**
 * @ClassName TopicConsumer
 * @Author zhangjiahao
 * @Description //TODO $
 * @Date $ $
 **/
@Slf4j
@Component
public class TopicConsumer {

    @Autowired
    private SysMessageService sysMessageService;
    @Autowired
    private BrokerMessageLogService brokerMessageLogService;

    @RabbitHandler
    @RabbitListener(queues = "topic_queue",containerFactory = "consumerlistenerContainer")
    public void topicConsumer(SysMessageEntity msg, Channel channel, Message message) throws IOException {
        System.out.println(msg.toString());
        BrokerMessageLogEntity one =
                brokerMessageLogService.getOne(new QueryWrapper<BrokerMessageLogEntity>().lambda().eq(BrokerMessageLogEntity::getMessageId, msg.getMessageId()).eq(BrokerMessageLogEntity::getStatus, 0));
        if (one == null) {
            one = new BrokerMessageLogEntity();
            one.setCreateTime(msg.getGmtCreate());
            one.setUpdateTime(msg.getGmtModified());
            one.setStatus("0");
            one.setMessageId(msg.getMessageId());
            one.setMessage(JSON.toJSONString(msg));
            one.setTryCount(MQConstant.MQ_TRY_COUNT);
            one.setNextRetry(new Date(System.currentTimeMillis() + MQConstant.MQ_TIME_SPACE));
        }
        try {
            log.info("正常处理数据,{}", msg);
            //处理完成 消息设置为已处理
            msg.setState(1);
            sysMessageService.updateById(msg);
            // MQ日志状态已处理
            one.setStatus("4");
            brokerMessageLogService.saveOrUpdate(one);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            if(one.getTryCount()<1){
                log.error("已超过尝试次数,在此失败{}",msg);
                msg.setState(-1);
                sysMessageService.updateById(msg);
                one.setStatus("-1");
                brokerMessageLogService.updateById(one);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            }
            // 消息重新放回队列
            try {
                one.setTryCount(one.getTryCount() - 1);
                one.setNextRetry(new Date(System.currentTimeMillis() + MQConstant.MQ_TIME_SPACE));
                brokerMessageLogService.updateById(one);
                System.out.println("消息[{}]处理失败,重新放回队列" + msg);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                e.printStackTrace();
            } catch (IOException ex) {
                log.error("消费失败,{}", msg);
                ex.printStackTrace();
            }

        }
    }
}

这里其他的 service 是用于记录 RabbitMQ 的日志,用于处理投递失败和消费失败的情况。

编写测试的生产者

@GetMapping("direct")
public R direct() {
    for (int i = 0; i < 11; i++) {
        SysMessageEntity sysMessageEntity = new SysMessageEntity();
        sysMessageEntity.setCreator("1");
        sysMessageEntity.setGmtCreate(new Date());
        sysMessageEntity.setGmtModified(new Date());
        sysMessageEntity.setMessageId(snowFlake.nextId() + "");
        sysMessageEntity.setModifier("1");
        sysMessageEntity.setName("测试消费direct-" + i);
        sysMessageEntity.setType(0);
        sysMessageEntity.setState(0);
        CommonCorrelationData commonCorrelationData = new CommonCorrelationData();
        commonCorrelationData.setId(sysMessageEntity.getMessageId());
        commonCorrelationData.setData(sysMessageEntity);
        sysMessageService.save(sysMessageEntity);
        rabbitTemplate.convertAndSend(MQConstant.MQQueueAndExchange.DIRECT_QUEUE_EXCHANGE,
                MQConstant.MQQueueAndExchange.DIRECT_QUEUE_ROUTING_KEY, sysMessageEntity,commonCorrelationData);

    }
    return R.ok();
}

@GetMapping("/fanout")
public R fanout() {
    for (int i = 0; i < 11; i++) {
        SysMessageEntity sysMessageEntity = new SysMessageEntity();
        sysMessageEntity.setCreator("1");
        sysMessageEntity.setGmtCreate(new Date());
        sysMessageEntity.setGmtModified(new Date());
        sysMessageEntity.setMessageId(snowFlake.nextId() + "");
        sysMessageEntity.setModifier("1");
        sysMessageEntity.setName("测试消费fanout-" + i);
        sysMessageEntity.setType(0);
        sysMessageEntity.setState(0);
        sysMessageService.save(sysMessageEntity);
        rabbitTemplate.convertAndSend(MQConstant.MQQueueAndExchange.FANOUT_QUEUE_EXCHANGE,
                MQConstant.MQQueueAndExchange.FANOUT_QUEUE_ROUTING_KEY, sysMessageEntity);
    }
    return R.ok();
}

@GetMapping("topic")
public R topic() {
    for (int i = 0; i < 11; i++) {
        SysMessageEntity sysMessageEntity = new SysMessageEntity();
        sysMessageEntity.setCreator("1");
        sysMessageEntity.setGmtCreate(new Date());
        sysMessageEntity.setGmtModified(new Date());
        sysMessageEntity.setMessageId(snowFlake.nextId() + "");
        sysMessageEntity.setModifier("1");
        sysMessageEntity.setName("测试消费topic-" + i);
        sysMessageEntity.setType(0);
        sysMessageEntity.setState(0);
        sysMessageService.save(sysMessageEntity);
        rabbitTemplate.convertAndSend(MQConstant.MQQueueAndExchange.TOPIC_QUEUE_EXCHANGE,
                MQConstant.MQQueueAndExchange.TOPIC_QUEUE_ROUTING_KEY, sysMessageEntity);

    }
    return R.ok();
}

关于 RabbitMQ 的 UI 界面设置

自定义 CorrelationData

因为 MQ 完全监控的原因,可以通过手动确认来处理投递失败的。

 /**

      * 如果消息到达 exchange, 则 confirm 回调, ack = true
      * 如果消息不到达 exchange, 则 confirm 回调, ack = false
      * 需要设置spring.rabbitmq.publisher-confirms=true
      * @param correlationData
      * @param ack
      * @param cause

      */
     @Override
     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
         Message returnedMessage = correlationData.getReturnedMessage();
         if (ack){
             System.out.println("消息是否到达Exchange:{}" +  "消息成功到达Exchange");
         }else {
             System.out.println("消息是否到达Exchange:{}" + "消息到达Exchange失败");
         }

         if (!ack) {
             System.out.println("消息到达Exchange失败原因:{}"+ cause);
             // 根据业务逻辑实现消息补偿机制

         }
     }

通过重写 CorrelationData 来实现完全的监控。

 public class CorrelationData implements Correlation {
     private final SettableListenableFuture<CorrelationData.Confirm> future = new SettableListenableFuture();
     @Nullable
     private volatile String id;
     private volatile Message returnedMessage;

     public CorrelationData() {
     }

     public CorrelationData(String id) {
         this.id = id;
     }

     @Nullable
     public String getId() {
         return this.id;
     }

     public void setId(String id) {
         this.id = id;
     }

     public SettableListenableFuture<CorrelationData.Confirm> getFuture() {
         return this.future;
     }

     @Nullable
     public Message getReturnedMessage() {
         return this.returnedMessage;
     }

     public void setReturnedMessage(Message returnedMessage) {
         this.returnedMessage = returnedMessage;
     }

     public String toString() {
         return "CorrelationData [id=" + this.id + "]";
     }

     public static class Confirm {
         private final boolean ack;
         private final String reason;

         public Confirm(boolean ack, @Nullable String reason) {
             this.ack = ack;
             this.reason = reason;
         }

         public boolean isAck() {
             return this.ack;
         }

         public String getReason() {
             return this.reason;
         }

         public String toString() {
             return "Confirm [ack=" + this.ack + (this.reason != null ? ", reason=" + this.reason : "") + "]";
         }
     }
 }

参考

RabbitMQ 消息路由机制流程总结

标签:SpringBoot,org,RabbitMQ,springframework,整合,sysMessageEntity,import,new,public
From: https://www.cnblogs.com/jiuxialb/p/18006395/springboot-zheng-he-rabbitmq

相关文章

  • RabbitMQ快速入门
    MQ的基本概念MQ概述MQ全称MessageQueue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。MQ,消息队列,存储消息的中间件分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信发送方称为生产者,接收方称为消费者MQ的优势和劣势优......
  • Springboot项目发布war到tomcat
    springboot项目有些日子没有开发了,新做一个minspringboot项目,复习下项目开发及发布流程。1.新建项目: 2.新建一个业务controllercontroller名称及方法,名称随意,项目结构如下: testcontroller代码文件的内容如下:packagecom.*****.Controller;importorg.springframewor......
  • SpringBoot-热部署插件添加
      在开发中修改代码避免反复重启编译   <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId></dependency> 使用idea为2023.2.3 ......
  • MyBatis整合第三方缓存EHCache
    EHCache缓存针对于MyBatis的二级缓存。MyBatis默认二级缓存是SqlSessionFactory级别的。添加依赖<!--MyBatis-EHCache整合包--><dependency> <groupId>org.mybatis.caches</groupId> <artifactId>mybatis-ehcache</artifactId> <version>1.2.1</vers......
  • SpringBoot的约定是什么?
    springboot项目中必须在src/main/resources中放入application.yml(yaml,properties)配置文件,名字为applicationspringboot项目中必须在src/main/java中只能有一个启动类......
  • SpringBoot简单集成JWT
    1.JWT入门1.1JWT概念官方网站:https://jwt.io/introduction/JSONWebToken(JWT)是一个定义在RFC7519开放标准下的技术,提供了一种紧凑且自包含的方式用于在各方之间安全地传输信息。JWT使用JSON对象作为载体,同时通过数字签名来验证和确保信息的可信度。数字签名可以通过......
  • SpringBoot + LiteFlow:轻松应对复杂业务逻辑,简直不要太香!
    LiteFlow简介LiteFlow是什么?LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑。通过支持热加载规则配置,开发者能够即时调整流程步骤,将复杂的业务如价格计算、下单流程等拆分为独立且可复用的组件,从而实现系统的高度......
  • C# TEKLA 二次开发,将exe整合到TEKLA界面中
    类似这种,避免每次去找exe文件. 流程,1#准备csmacro文件和图标文件下面的是宏的内容usingSystem.Windows.Forms;usingSystem.IO;usingSystem.Diagnostics;usingTekla.Structures.ModelInternal;usingTekla.Structures.Internal;usingSystem;namespaceTekla.T......
  • RabbitMQ 学习笔记 - 1
    RabbitMQ的基础概念生产者产生数据发送消息的程序是生产者交换机交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切的知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢......
  • Spring-xml(+注解)方式整合第三方的框架-mybatis
    1)不需要自定义命名空间:MyBatisSpring整合Mybatis的步骤如下://原始配置<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependenc......