首页 > 其他分享 >RabbitMQ如果保证消息可靠性

RabbitMQ如果保证消息可靠性

时间:2023-09-15 11:48:31浏览次数:58  
标签:可靠性 String confirm RabbitMQ 交换机 保证 消息 correlationData public


这是RabbitMQ消息从生产者到消费者的流程。
从图中可以看出消息可能在以下几个地方丢失

  1. 生产者处丢失:消息没有正确到达RabbitMQ的交换机。 解决策略:confirm机制
  2. RabbitMQ本身将消息丢失:因为一些原因导致RabbitMQ重启,导致内存中的消息丢失。 解决策略:消息持久化
  3. 消费者处丢失:消费者拿到消息之后还没有消费完就宕机了,导致消息丢失。 解决策略:手动ack
    今天先说一个confirm机制和手动ack,持久化下次再说
    消息会经过三次传输
  4. 从生产者到交换机
  5. 从交换机到队列
  6. 从队列到消费者
    RabbitMQ也提供了对应的机制来保证这三次消息传输的可靠性,如下图
  7. 开启confirm机制后为每条消息分配一个唯一id,当生产者发送消息后RabbitMQ回调一个confirm方法,返回消息的id和是否成功发送的标识以及失败原因,来让我们消息是否成功发送,如果失败进行相应的处理
  8. 当消息从交换机没有正确到达队列时会回调一个return方法,返回消息、应答编码、失败原因、交换机以及路由键。回调后我们进行相应的处理。如果成功到达队列则不会回调。
  9. 默认是自动ack的,当消息到达消费者rabbitMQ就把消息删除了,这样是最高效的,但如果消费者因为某些原因没有正确消费消息,那么这条消息就丢失了。手动ack是指消费者正确处理完消息之后返回给rabbitMQ一个ack状态,告诉RabbitMQ可以把这条消息删除了,这样就保证了消息不会在消费者处丢失。

话不多说,上代码

创建工程,配置文件添加rabbitmq的配置

server:
  port: 8082

spring:
  rabbitmq:
    username: guest
    password: guest
    port: 5672
    host: localhost
    publisher-confirm-type: correlated  #设置发送者确认confirm机制
    publisher-returns: true #消息未从交换机正确到达队列时发送回调
    listener:
      simple:
        acknowledge-mode: manual  #指定消息确认模式为手动确认

RabbitMQ配置类

@Component
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "message_confirm_exchange";
    public static final String QUEUE_NAME = "message_confirm_queue";
    private static final String ROUTING_KEY = "user.#";

    @Bean
    private TopicExchange topicExchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    private Queue queue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    private Binding bindingTopic(){
        return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTING_KEY);
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //设置mandatory为true才能让消息没有到达队列时回调return方法,如果不设置的话默认为false,当消息没有到达队列就直接丢弃了
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}

注册confirm和return回调

@Component
@Slf4j
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //依赖注入完成后执行此方法
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 生产者确认confirm机制。消息无论是否到达交换机都会回调这个方法
     * 20年12月之前的CorrelationData对象中只有一个id属性,我们需要自己绑定id和具体消息的关系来根据id找到消息
     * 但20年12月之后的CorrelationData对象中多了一个returnedMessage属性,生产者在设置CorrelationData对象的消息id时可以把同时把发送的消息设置进去,这样在confirm回调中可以直接获取消息了,不用自己做绑定
     * @param correlationData
     * @param ack
     * @param s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        //将字节数组转化为消息对象
        MessageDTO messageDTO = ByteObjectConvert.byte2Object(correlationData.getReturnedMessage().getBody());
        if (ack){
            log.info("confirm,发送成功-----消息:{},id:{}",messageDTO,correlationData.getId());
        }else{
            log.info("confirm,发送失败-----消息:{},id:{},失败原因:{}",messageDTO,correlationData.getId(),s);
            //发送失败,将失败消息存入数据库。入库后可以使用定时任务去扫描发送失败的消息进行重发
            //获取要发送的交换机,路由键。在生产者发送消息时我自己定义了一个map用于存储消息要发送到的交换机和路由键,用于在confirm失败时获取然后进行相应处理
            String exchangeAndRoutingKey = DataContainer.getExchangeAndRoutingKey(correlationData.getId());
            String[] split = exchangeAndRoutingKey.split(",");
            String exchange = split[0];
            String routingKey = split[1];
            log.info("\n 交换机:{},路由键:{}",exchange,routingKey);
            //入库后删除map中的数据
            DataContainer.del(correlationData.getId());
        }
    }

    /**
     * 如果消息未从交换机正确到达队列会回调这个方法,正确到达不会回调这个方法
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        byte[] body = message.getBody();
        //将消息的字节数组转为对象
        MessageDTO o = ByteObjectConvert.byte2Object(body);
        //存入数据库
        log.info("returns-----:{}\n replyCode:{},replyText:{},exchange:{},routingKey:{}",o,replyCode,replyText,exchange,routingKey);
        //入库后删除map中的数据
        String msgID = message.getMessageProperties().getCorrelationId();
        DataContainer.del(msgID);
    }
}

定义一个用于存储消息要发送到的交换机和路由键的map

/*
 * @Description TODO (保存发送消息的交换机和路由键,为了在confirm为false时获取然后入库)
 * 创建人: 程长新
 * 创建时间:2023/9/14 15:08
 **/
@Slf4j
@NoArgsConstructor
public class DataContainer {
    private static ConcurrentHashMap<String,String> map = new ConcurrentHashMap<>();

    public static void saveExchangeAndRoutingKey(String key, String value){
        String put = map.put(key, value);
    }

    public static void del(String key){
        String remove = map.remove(key);
    }

    public static String getExchangeAndRoutingKey(String key){
        return map.get(key);
    }
}

随便定义一个发送消息的实体

/**
 * 因为要作为消息进行发送,所以这个实体必须是可序列化的。所以实现Serializable接口
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO implements Serializable {
    private int id;
    private String msg;
}

自己定义一个byte数组与Object转换的工具类

/**
 * byte数组与对象转换的工具类
 */
public class ByteObjectConvert {

    public static byte[] object2Bytes(MessageDTO messageDTO) {
        byte[] messageDTOBytes;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = null;
        try {
            objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(messageDTO);
            messageDTOBytes = byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        } finally {
            try {
                byteArrayOutputStream.close();
                objectOutputStream.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return messageDTOBytes;
    }

    public static MessageDTO byte2Object(byte [] body) {
//        byte[] body = correlationData.getReturnedMessage().getBody();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
        ObjectInputStream objectInputStream = null;
        MessageDTO messageDTO = null;
        try {
            objectInputStream = new ObjectInputStream(byteArrayInputStream);
            messageDTO = (MessageDTO) objectInputStream.readObject();

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                byteArrayInputStream.close();
                objectInputStream.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return messageDTO;
    }
}

生产者发送消息


@Slf4j
@RestController
public class Producer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send")
//    @ResponseBody
    public String sendMessage(){
        for (int i = 0; i < 3; i++) {
            String correlationId = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(correlationId);
            String s = "message" + i;
            MessageDTO messageDTO = new MessageDTO(i, s);
            log.info("消息{}的id{}",s,correlationData.getId());
            //将要发送的消息转为字节数组用于构建Message对象
            byte[] messageDTOBytes = ByteObjectConvert.object2Bytes(messageDTO);
            //构建Message对象时同时将唯一id correlationId设置进去
            Message build = MessageBuilder.withBody(messageDTOBytes).setCorrelationId(correlationId).build();
            //将发送的消息设置到CorrelationData对象中,可以直接在confirm回调方法中获取Message消息
            correlationData.setReturnedMessage(build);
            if (i == 1){
                //为了保证消息confirm为false时还能拿到发送到的交换机和路由键,将消息id作为键,交换机和路由键作为值存入map
                DataContainer.saveExchangeAndRoutingKey(correlationData.getId(),"abc" + "," + "user.add");
//                rabbitTemplate.convertAndSend("abc","user.add",messageDTOBytes,correlationData);//如果设置了MessageProperties,那么发送的消息就要构造成Message对象,否则设置的MessageProperties不起作用,因为如果不是Message对象,底层发送的时候还是会将对象构造为Message对象并创建一个空的MessageProperties对象,所以自己设置的会不起作用
                rabbitTemplate.convertAndSend("abc","user.add",build,correlationData);//错误的交换机名称,用于模拟消息没有正确到达交换机的情况
            }else if (i == 2){
                DataContainer.saveExchangeAndRoutingKey(correlationData.getId(),RabbitMQConfig.EXCHANGE_NAME + "," + "abc");
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "abc", build, correlationData);//写一个匹配不到的路由键,用于模拟消息没有从交换机到达队列的情况
            }else {
                DataContainer.saveExchangeAndRoutingKey(correlationData.getId(),RabbitMQConfig.EXCHANGE_NAME + "," + "user.add");
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "user.add", build, correlationData);
            }
            log.info("生产者发送消息:{}",build);
        }
        return "发送成功";
    }
}

消费者监听队列处理消息--开启手动ack


@Component
@Slf4j
public class Consumer01 {

    @RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME})
    public void receiveMessage01(MessageDTO msg, Message message, Channel channel){
//    public void receiveMessage01(Message message, Channel channel){
//        MessageDTO msg = ByteObjectConvert.byte2Object(message.getBody());
        try {
            /*if ("message1".equals(msg)){
                int i = 1/0;
            }*/
            log.info("消费者01成功接收到消息:{}",msg);
            //rabbitmq只有收到的消费者发送的ack消息,才会将消息删除,否则不会删除
            //第一个参数表示该消息的index,第二个参数表示是否批量确认,批量的话小于这个tag的都会ack掉
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            log.error("消费者01处理消息:{},发生异常:{}",msg,e.getMessage());
            try {
                //如果消息没有被拒绝过就将消息重新入队,如果被拒绝过一次一直接丢弃
                if (message.getMessageProperties().getRedelivered()){
                    //basicReject与basicNack一样,只不过basicNack可以批量处理,basicReject只能处理单个
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                    log.info("消费者01丢弃消息:{}",msg);
                }else {
                    //参数2为批量标识,如果设为true的话小于这个index的消息都将被确认
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                    log.info("消费者01拒绝消息:{}",msg);
                }
            } catch (IOException ex) {
                log.error("拒绝消息时发生异常:{}",ex.getMessage());
                e.printStackTrace();
            }
            e.printStackTrace();
        }
    }
}

标签:可靠性,String,confirm,RabbitMQ,交换机,保证,消息,correlationData,public
From: https://www.cnblogs.com/ccx-lly/p/17704094.html

相关文章

  • RabbitMq
     如何保证消息的可靠性Rabbit消息传输路径是生产者到路由到队列到消费者消费。而Rabbitmq丢消息有以下几种情况1生产者发送消息到RabbitMQ服务器过程中,RabbitMQ服务器如果宕机停止服务,消息会丢失。RabbitMQ是支持消息持久化的,消息持久化需要设置:Exchange为持久化和Queu......
  • MQ消息可靠性等
    RabbitMQ如何保证消息可靠性?首先RabbirMQ是一个开源的支持多协议的性能优秀的消息中间件,他的消息可靠性,消息延迟以及可用性比较高,但是单机消息吞吐量比较一般。消息的可靠性是指,消息准确无误的到达消费者手中,不能出现消息的丢失等问题,消息丢失又分为:①生产者发送消息为到达交换......
  • 保证接口数据安全的10种方案
    前言我们日常开发中,如何保证接口数据的安全性呢?个人觉得,接口数据安全的保证过程,主要体现在这几个方面:一个就是数据传输过程中的安全,还有就是数据到达服务端,如何识别数据,最后一点就是数据存储的安全性。今天跟大家聊聊保证接口数据安全的10个方案。1.数据加密,防止报文明文传输。......
  • RabbitMQ、RocketMQ和Kafka的不同之处
    RabbitMQ、RocketMQ和Kafka是三种常见的消息队列系统,它们在设计和使用方面有一些不同之处:架构设计:RabbitMQ:RabbitMQ是一个基于AMQP(高级消息队列协议)的开源消息队列系统,采用的是传统的Broker架构模式,其中包括生产者、消费者和中间件(Broker)。RocketMQ:RocketMQ是一个基于分布式......
  • 消息队列 RabbitMQ
    发布者:生产者,消息的发送方。连接:网络连接。Channel:信道,多路复用连接中的一条独立的双向数据流通道。Exchange:交换器(路由器),负责消息的路由到相应队列。类型:direct、fanout、topicBinding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息......
  • RabbitMQ - Exception (504) Reason: "channel id space exhausted"
    使用go的第三方包:github.com/rabbitmq/amqp091-go出现报错:getmqchannelerror{"error":"Exception(504)Reason:channelidspaceexhausted"}ctx:=context.Background()results,err:=global.Redis.LRange(ctx,abListName,0,-1).Result()......
  • rabbitmq详细实例
    1.概述RabbitMQ是由LShift提供的一个AdvancedMessageQueuingProtocol(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成,因此也是继承了这些优点。FROM《维基百科——RabbitMQ》Rabbit科技有限公司开发了RabbitMQ,并提供对其的支持。起初,Rabbit......
  • 消息可靠性-生产者确认原理
        ......
  • 【7.0】基于RabbitMQ实现RPC
    【一】RPC介绍【1】介绍RPC(RemoteProcedureCall)是一种远程过程调用的协议,它允许一个计算机程序通过网络请求调用远程服务器上的一个子程序或函数。基于RabbitMQ实现的RPC可以更加可靠地实现远程过程调用。【2】分布式的系统中使用微服务之间的调用resful的接口rpc调......
  • 【RabbitMQ总结】
    【RabbitMQ总结】【一】消息队列引入什么是消息队列消息队列解决的问题常见的消息队列比较【二】RabbitMQ安装什么是RabbitMQ服务器原生安装RabbitMQ客户端安装RabbitMQWindows安装RabbitMQRabbitMQ设置用户名和密码RabbitMQ界面说明【三】Ra......