首页 > 其他分享 >RabbitMQ实现轮询形式消息最大发送失败次数,及详细解析

RabbitMQ实现轮询形式消息最大发送失败次数,及详细解析

时间:2024-11-02 16:44:25浏览次数:3  
标签:String 轮询 RabbitMQ 发送 消息 exchangeName message 解析 public

RabbitMQ设置消息最大发送失败次数,达到三次后不确认消息(此处根据业务需求可考虑使不确认的消息进入死信交换机)

配置文件:

spring:
  rabbitmq:
    host: 192.168.1.248
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    publisher-confirm-type: correlated # 生产者的发布确认模式为相关模式
    publisher-returns: true # 开启发布者的returns模式
    listener:
      simple:
        acknowledge-mode: manual # 开启监听者(消费者、接受者)的手动确认模式
    cache:
      channel:
        checkout-timeout: 10000

# 自定义属性
my:
  exchangeName: exchange.reliability
  queueName: queue.reliability

交换机和队列配置:

@Configuration
@Slf4j
public class RabbitConfig {

    @Value("${my.exchangeName}")
    private String exchangeName;

    @Value("${my.queueName}")
    private String queueaName;

    @Bean
    public DirectExchange directExchange() {
        //建造者模式交换机默认就是持久化
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    @Bean
    public Queue queue() {
        //建造者模式队列默认就是持久化
        return QueueBuilder.durable(queueaName).build();
    }

    @Bean
    public Binding binding(DirectExchange directExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with("info");
    }

    /**
     * 配置一个消息转换器,json格式的
     *
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

生产者:

@Service
@Slf4j
public class MessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${my.exchangeName}")
    private String exchangeName;

    @PostConstruct
    public void init() {
        //设置确认回调接口(当消息成功到达交换机时,会调用此回调的confirm方法(confirm:Lambda表达式未简化时方法))
        rabbitTemplate.setConfirmCallback(
				// correlationData 相关数据(一般会存一个id属性)
				// ack 确认标志:true表示成功,false表示失败
				// cause 失败原因
                (correlationData, ack, cause) -> {
                    if (!ack) {
                        log.error("消息:{}没有到达交换机,原因为:{}", correlationData.getId(), cause);
                    }
                }
        );

        //设置模版的returnsCallback(当消息无法正确路由到队列时,会调用此回调的returnedMessage方法(returnedMessage:Lambda表达式未简化时方法))
        rabbitTemplate.setReturnsCallback(
                returned -> {
                    String errorMessage = String.format(
						"消息从交换机%s使用路由键%s没有正确的路由到队列,错误代码:%d,错误原因为:%s",  
						returned.getExchange(),  
						returned.getRoutingKey(),  
						returned.getReplyCode(),  
						returned.getReplyText()
					);  
					log.error(errorMessage);  
                }
        );
    }

	// 发送消息
    public void sendMsg() throws JsonProcessingException {
        Orders orders = Orders.builder()
                .orderId(99)
                .orderName("橙子")
                .orderMoney(new BigDecimal(100))
                .orderTime(new Date())
                .build();

		// 存入消息id
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("order:" + orders.getOrderId());
        
        //队列中的消息默认是持久化的
        rabbitTemplate.convertAndSend(exchangeName, "info", orders,
                //消息头中设置重新发送次数(消息后处理器,用于在消息发送前对其进行修改)
                message -> {
                    message.getMessageProperties().setHeader("x-retry-count", 1); //消息头部设置计数属性,表示第一次发送消息
                    return message;
                }
                , correlationData);
        log.info("消息发送完毕");
    }
}

消费者:

@Component
@Slf4j
public class MessageListener {

    @Value("${my.exchangeName}")
    private String exchangeName;

    @Value("${my.queueName}")
    private String queueName;

    @Resource
    private RabbitTemplate rabbitTemplate;

	// 声明一个消息监听器,当有消息到达指定的队列时,Spring会自动调用这个方法
    @RabbitListener(queues = {"${my.queueName}"})
	// orders 消息体
	// message RabbitMQ的消息对象,包含了消息的元数据(如消息ID、头信息、属性等)
    // channel RabbitMQ的通道对象,用于与RabbitMQ服务器进行通信,入确认消息、拒绝消息等
	public void receiveMsg(Orders orders, Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag(); //获取唯一标识符
        int maxRetryCount = 3; // 最大重试次数,加上第一次发送,共三次
        Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("x-retry-count"); //获取本条消息是第几次发送
        retryCount = (retryCount == null) ? 1 : retryCount; //如果为空(属性不存在),默认设置为第一次发送
        try {
            log.info("插入数据库开始...{}", orders.toString());
            //模拟错误
            int a = 1 / 0;
            log.info("插入数据库完成");
            channel.basicAck(deliveryTag, false); //插入成功,手动确认消息,结束本条消息操作
        } catch (Exception e) {
            // 处理失败,检查重试次数
            if (retryCount < maxRetryCount) { //判断是否达到最大次数
                // 增加重试计数并重新入队
                message.getMessageProperties().getHeaders().put("x-retry-count", retryCount + 1);   //消息头部计数属性加1
                rabbitTemplate.convertAndSend(exchangeName, "info", message);   //重新发送消息
                channel.basicAck(deliveryTag, false); // 插入失败,手动确认消息(上一行代码已经重新发送了一条新的消息,所以,本条消息需要手动确认)
            } else {
				/**
				 * 此处业务逻辑为,仅需把消息发送给消费者,不关注后续操作,所以消息直接不确认
				 * 如果业务需求变更,可把消息发入死信队列等
				 */
                // 达到最大重试次数,执行其他处理逻辑
                log.error("插入数据库失败,原因为:{}", e.toString());
                channel.basicNack(deliveryTag, false, false); // 不确认消息并不重新入队
            }
        }
    }
}

标签:String,轮询,RabbitMQ,发送,消息,exchangeName,message,解析,public
From: https://blog.csdn.net/weixin_71992340/article/details/143368275

相关文章

  • rabbitmq
    1.rabbitmq架构 Exchange:消息交换机,指定消息按照什么规则,路由到什么队列Queue:消息的载体,每个消息会被投递到一个或者多个队列Binding:绑定,将exchange和queue按照路由规则绑定起来RoutingKey:路由关键字,exchange根据这个关键字进行消息投递Poducer:消息生产者Consumer:消费者C......
  • 【C++】布隆过滤器的概念与特点解析
    目录00.引入01.布隆过滤器的概念特点1:极低的内存消耗特点2:快速查询特点3:假阳性误判(禁止删除)02.布隆过滤器的底层实现00.引入上一篇博客介绍了位图这一数据结构,它可以在大量整数中快速查找某一数据是否存在,并且内存占用率很低(例如,查找40亿个整数只需0.5G空间)。博客链......
  • 从数据提取到管理:合合信息的智能文档处理全方位解析【合合信息智能文档处理百宝箱】
    文章目录一、引言二、智能文档处理“百宝箱”概述三、可视化文档解析前端TextInParseX3.1TextInParseX简介3.2技术特点3.3TextInParseX功能四、向量化acge-embedding模型4.1向量化模型acge-embedding技术亮点总结4.2Embedding嵌入/向量化4.3向量化模型效果......
  • 华为大模型面试通关秘籍:50道高频面试题及答案解析
    觉得中大厂面试太难的,完全就是自己没准备充分,技术不到位,没准备的面试完全是浪费时间,更是对自己的不负责!.今天我给大家分享一下我整理的**《精选50个大模型高频面试题》**大模型面试专题和答案,其中大部分都是面试常问的面试题,可以对照这查漏补缺奥!祝大家早日上岸呀!1.简......
  • 绑定域名时域名解析状态显示解析失败或不正确的解决方法
    检查DNS记录登录到您的域名注册商的管理后台。进入域名解析管理页面,检查DNS记录是否正确。确保A记录或CNAME记录指向了正确的IP地址或主机名。验证解析记录使用命令行工具(如 ping 或 nslookup)验证域名解析是否正确。例如,在Windows命令提示符下运行:  ping......
  • 信息学科平台系统构建:Spring Boot框架深度解析
    4系统概要设计4.1概述本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式,是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示:图4-1系统工作原理图4.2系统结构本系统......
  • 信息学科平台系统设计与实现:Spring Boot技术全解析
    4系统概要设计4.1概述本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式,是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示:图4-1系统工作原理图4.2系统结构本系统......
  • 操作符史上最全解析没有之一!!!!!!! (上)
    文章目录1.算术操作符2.移位操作符3.位操作符&按位与|按位或^按位异或例题例题:4.赋值操作符-=+=*=/=%=<<=<<=&=|=5.单目操作符!+--&sizeof~++*类型(强制类型转换)6.关系操作符<<=!=\>\>===7.逻辑操作符8.条件操作符9.逗号表达式1.算术操作......
  • 深入学习指针!指针史上最全解析!!(1)
    文章目录1.内存和地址1.1内存1.2究竟该如何理解编址2.指针变量和地址2.1取地址操作符(&)2.2指针变量和解引用操作符(*)2.2.1指针变量2.2.2如何拆解指针类型2.2.3解引用操作符2.3指针变量的大小3.指针变量类型的意义3.1指针的解引用3.2指针+-整数3.3void*指针4.指针运......
  • 10种数据预处理中的数据泄露模式解析:识别与避免策略
    在机器学习教学实践中,我们常会遇到这样一个问题:"模型表现非常出色,准确率超过90%!但当将其提交到隐藏数据集进行测试时,效果却大打折扣。问题出在哪里?"这种情况几乎总是与数据泄露有关。当测试数据在数据准备阶段无意中泄露(渗透)到训练数据时,就会发生数据泄露。这种情况经常......