首页 > 其他分享 >RabbitMQ目录发送方确认confirm:确认模式2.重复调用接口时,会提示错误所以改进,自己搞一个template,return退回模式:RabbitMQ的可靠性

RabbitMQ目录发送方确认confirm:确认模式2.重复调用接口时,会提示错误所以改进,自己搞一个template,return退回模式:RabbitMQ的可靠性

时间:2024-10-09 19:49:33浏览次数:14  
标签:RabbitTemplate rabbitTemplate confirm 确认 模式 RabbitMQ correlationData public 消息

目录

发送方确认

confirm:确认模式

2.重复调用接口时,会提示错误所以改进,自己搞一个template,

return退回模式:

RabbitMQ的可靠性/保证消息不丢失..

消息在交换机中无法路由到制定队列:return模式

重试机制


发送方确认

当消息的生产者发送消息以后,怎么知道是否到达服务器呢?

发送方确认:生产者到Broker的解决方案,

confirm:确认模式

在发送消息的时候,不管消息是否到达exchange,这个监听都会被执行,如果exchange的ack为true,如果为false,ACK为false;

return:退回模式

消息到达exchange之后,会根据路由规则匹配,把消息放入Queue中,Exchange到Queue的过程,如果一条消息无法被任何队列消费,可以选择把消息退回给发送者,消息退回给发送者时候,可以设置一个返回回调方法,对消息进行处理

不是互斥,可以单独使用,也可以结合使用

confirm:确认模式

package com.example.constant;

public class Constants {

    public static final String CONFIRM_QUEUE="confirm.queue";
    public static final String CONFIRM_EXCHANGE="confirm.exchange";

}
 //发送方确认
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();
    }
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();
    }
    @Bean("confirmBinding")
    public Binding confirmBinding(@Qualifier("confirmQueue")Queue queue,@Qualifier("confirmExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
    }
  @RequestMapping("/confirm")
    public String confirm(){
        //设置回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行了confirm方法");
                if(ack){
                    System.out.printf("接收到消息,消息的ID:%s \n",correlationData==null? null:correlationData.getId());
                }else{
                    System.out.printf("未接收到消息,消息ID:%s,cause:%s \n",correlationData==null?null:correlationData.getId(),cause);
                }
            }
        });
        CorrelationData correlationData=new CorrelationData("1");
        rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE+"1","confirm,","confirm test...",correlationData);
        return  "消息发送成功";

    }

存在两个问题:

这种方式设置confirmcallback影响所有使用RabbitTemplate的方法

package com.example.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTemplateConfig {
   
    @Bean
    public RabbitTemplate confirmrabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        //设置回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行了confirm方法");
                if(ack){
                    System.out.printf("接收到消息,消息的ID:%s \n",correlationData==null? null:correlationData.getId());
                }else{
                    System.out.printf("未接收到消息,消息ID:%s,cause:%s \n",correlationData==null?null:correlationData.getId(),cause);
                }
            }
        });
        return rabbitTemplate;
    }
}

但是又会一个问题,

当我们使用以上改动时候,发现执行上面的方法时候,根本没执行confirm方法

但是确报执行力confirm方法。(我们用的初始想法是用的是两个分别的rabbitMQtemplate,)你引入的template都是你自己声明的了

2.重复调用接口时,会提示错误所以改进,自己搞一个template,

假如发送到了queue,但是routingKey不正确,但是走的逻辑还是消费到了,

return退回模式:

消息到达Exchange之后,会根据路由规则匹配,把消息放入Queue中,Exchange到Queue中,Exchange到Queue的过程,如果一条消息无法被任何队列消息(即使没有队列与消息的路由匹配键匹配或者队列不存在等),可以选择把消息退回给发送者,消息退回给发送者,我们可以设置一个回调方法,对消息进行处理

回调函数有一个参数 ReturnedMessage

Message message:返回的消息对象,包含了消息体和消息属性

int replyCode:由Broker提供回复码,表示消息无法路由的原因,通常是一个数字代码,每个数字代表不同的含义。

String replyTest:一个文本串,提供了无法路由消息的额外信息或者错误描述

String exchange:消息被发送到交换机名称

String routingKey:消息的路由键,即发送消息时指定的键

package com.example.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate();
        return  rabbitTemplate;
    }
    @Bean
    public RabbitTemplate confirmrabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        //设置回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行了confirm方法");
                if(ack){
                    System.out.printf("接收到消息,消息的ID:%s \n",correlationData==null? null:correlationData.getId());
                }else{
                    System.out.printf("未接收到消息,消息ID:%s,cause:%s \n",correlationData==null?null:correlationData.getId(),cause);
                }
            }
        });
        //消息被退回的时候,回调方法,不设置就不会执行这个回调方法
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("消息退回:"+returnedMessage);
            }
        });
        return rabbitTemplate;
    }
}

RabbitMQ的可靠性/保证消息不丢失..

1.对应的生产者发送消息到RabbitMQ失败,解决方法,confirm确认模式

消息在交换机中无法路由到制定队列:return模式

消息队列自身数据丢失

可能原因:消息到达RabbitMQ之后,RabbitMQ宕机导致数据消失,

解决方法:持久性,开启持久化,消息写入之后,会持久化到磁盘,如果RabbitMQ挂了,恢复之后就会自动读取之前的数据(极端情况,没持久化的时候宕机,集群,提高可靠性)

4.消费者异常,导致消息丢失:

可能原因:消息到达消费者,还没来得及消费,消费者宕机,消费者逻辑有问题

解决方法:[消息确认]RabbitMQ提供了消费者应答机制,来使RabbitMQ能够感知到消费者是否消费成功之后,才会删除消息,从而避免消息丢失,也可以重试机制

重试机制

RabbitMQ的SDK提供方式:

自动确认:消息到达消费者,消息就去删除了

手动确认:消息处理成功后,需要进行ACK.

这个里面的AUTO时消费者处理消息时,抛出异常,会自动进行重试.会一直进行重试

这样就会重试五次啦。但是他的deliverTag不会变化.

手动确认 和那个auto就差不多了,但是也是因为我们选择的是重新入队deliver Tag是不断变化的,因为是不断重新入队,手动的话,我们配置重试次数啥的是无效的,我们的重试机制是在自动确认下才有效

​​​​​​​

标签:RabbitTemplate,rabbitTemplate,confirm,确认,模式,RabbitMQ,correlationData,public,消息
From: https://blog.csdn.net/weixin_72953218/article/details/141966923

相关文章

  • 设计模式——观察者模式
    哈喽,各位盆友们!我是你们亲爱的学徒小z,今天给大家分享的文章是设计模式的——观察者模式。定义定义对象间一种一对多的依赖关系,使得每当一个对象改变状态,则所有依赖于它的对象都会得到通知并被自动更新。通用类图1.具体结构Subject被观察者定义被观察者必须实现的职......
  • 设计模式——门面模式 | 外观模式
    哈喽,各位盆友们!我是你们亲爱的学徒小z,今天给大家分享的文章是设计模式的——门面模式。文章目录定义通用类图1.通用结构2.优点3.缺点使用场景注意事项1.一个子系统可以有多个门面2.门面不参与子系统内的业务逻辑定义定义:要求一个子系统的外部与其内部的通信必须......
  • 【网络配置】聚合链路eNSP--静态 LACP 模式
            聚合链路(LinkAggregation),也称为链路捆绑、链路聚合组(LAG,LinkAggregationGroup)或以太网链路聚合,是一种将多个物理网络接口(例如以太网端口)组合成一个逻辑链路的技术。聚合链路的配置通常涉及以下两种模式:手工模式(StaticLAG):在这种模式下,网络管理员手动配置......
  • padding模式
    缘起在遇到AES和DES等分组加密算法时,需要明文满足一定的长度要求(分组的倍数),但是大多数情况下明文没法满足长度的苛刻要求,于是就要进行padding使传入的内容满足长度要求。Nopadding就是不填充,明文满足分组算法的长度要求,不需要再进行填充。PKCS5/PKCS7填充数据为填充字节的长......
  • 网络接入的镜像模式和串接模式
    网络接入的镜像模式和串接模式主要有以下特点: 一、镜像模式1. 工作原理 -镜像模式也称为端口镜像,是将网络中指定端口的数据流量复制一份到另一个监测端口,以便进行网络分析、故障排查和安全监控等。例如,将连接重要服务器的交换机端口流量镜像到一个用于网络监测的设备......