首页 > 其他分享 >rabbitmq消费(失败)重试3次(多次)

rabbitmq消费(失败)重试3次(多次)

时间:2022-09-05 09:33:07浏览次数:59  
标签:QUEUE rabbitmq 重试 失败 springframework org import message public

方案一:spring提供的retry

配置文件:

server:
  port:8080
spring:
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    port: 5672
    username: xxxx
    password: xxx
    publisher-confirm-type: correlated
    listener:
      simple:
        acknowledge-mode: manual #开启手动确认
        retry:
          enabled: true
          max-attempts: 5 #重试次数
          initial-interval: 5000  #重试间隔时间(单位毫秒)
          max-interval: 10000   #重试最大时间间隔(单位毫秒)
          stateless: true

错误的例子:

**开了失败重试后,在消失消费失败时,不能使用否定确认,会陷入死循环(重试机制就不生效了)**

消息消费失败了,消息回退到queue -> queue再次下发消息 -> 消费失败->消息回退到queue -> queue再次下发消息...

@RabbitListener(queues = HISTORY_QUEUE)
    public void receiveMsg1(Message message, Channel channel) {
        message.getMessageProperties();
        String msg = new String(message.getBody());
        LOGGER.info(HISTORY_QUEUE + " 接受到消息1:{}", msg);
        try {

            if ("test".equals(msg)) {
                throw new RuntimeException();
            }
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条

            LOGGER.info(HISTORY_QUEUE + " 消费成功1");
        } catch (Exception e) {
            LOGGER.info(HISTORY_QUEUE + " 消费失败1,param:{}", msg, e.getStackTrace());

            try {
//                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此消息
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);//失败,将消息放回队列
//                channel.basicRecover(true); //消息将被重新排队,并可能被传递给不同的消费者。
            } catch (IOException ioException) {
                LOGGER.error(HISTORY_QUEUE + " 消息反馈失败1,param:{}", msg, ioException.getStackTrace());
            }
        }
    }

正确的用法:

**只确认消息消费正确的消息,消费异常的不处理;**

为什么用了try,还要手动抛异常?因为我想拿日志

    @RabbitListener(queues = HISTORY_QUEUE)
    public void receiveMsg1(Message message, Channel channel) {
        message.getMessageProperties();
        String msg = new String(message.getBody());
        LOGGER.info(HISTORY_QUEUE + " 接受到消息1:{}", msg);
        try {

            if ("test".equals(msg)) {
                throw new RuntimeException();
            }
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条

            LOGGER.info(HISTORY_QUEUE + " 消费成功1");
        } catch (Exception e) {
            LOGGER.info(HISTORY_QUEUE + " 消费失败1,param:{}", msg,  e.getMessage());
            throw new RuntimeException();
//            try {
////                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此消息
//                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);//失败,将消息放回队列
//                channel.basicRecover(true); //消息将被重新排队,并可能被传递给不同的消费者。
//            } catch (IOException ioException) {
//                LOGGER.error(HISTORY_QUEUE + " 消息反馈失败1,param:{}", msg, ioException.getStackTrace());
//            }
        }
    }

方案2:死信交换机和消息头

需求:

发送消息推送广告,通过mq解耦;mq失败后重发三次,三次失败就丢弃消息;

配置:

server:
  port:8080
spring:
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    port: 5672
    username: xxxx
    password: xxx
    publisher-confirm-type: correlated
    listener:
      simple:
        acknowledge-mode: manual #开启手动确认

 

package com.example.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

import static com.example.demo.config.HistoryDirectRabbitConfig.DEAD_EXCHANGE;
import static com.example.demo.config.HistoryDirectRabbitConfig.DEAD_ROUTING_KEY;

@Configuration
public class AgainMagSedConfig {
    public static final String AGAIN_QUEUE = "again.queue";
    public static final String AGAIN_EXCHANGE = "again.exchange";
    public static final String AGAIN_ROUTING_KEY = "again.routing.key";

    @Bean
    public Queue againQueue() {
        Map<String, Object> map = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        return new Queue(AGAIN_QUEUE, true, false, false, map);
    }

    @Bean
    public Exchange againExchange() {
        return new DirectExchange(AGAIN_EXCHANGE, true, false);
    }

    @Bean
    public Binding againRoutingKey(Queue againQueue, Exchange againExchange) {
        return BindingBuilder.bind(againQueue).to(againExchange).with(AGAIN_ROUTING_KEY).noargs();
    }
    
    public static final String DEAD_QUEUE = "dead.queue";
    public static final String DEAD_EXCHANGE = "dead.exchange";
    public static final String DEAD_ROUTING_KEY = "dead.routing.key";

    @Bean
    public Queue deadQueue() {
        return new Queue(DEAD_QUEUE, true);
    }

    @Bean
    public Exchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE, true, false);
    }

    @Bean
    public Binding deadRoutingKey(Queue deadQueue, Exchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }

}

回调方法:

package com.example.demo.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class HistoryCallBack implements RabbitTemplate.ConfirmCallback {
    private static final Logger LOGGER = LoggerFactory.getLogger(HistoryCallBack.class);

    /**
     * 交换机不管是否收到消息的一个回调方法
     *
     * @param correlationData CorrelationData 回调的信息对象
     * @param ack             交换机是否成功接收消息
     * @param cause           交换机没收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            LOGGER.info("exchange receive msg, id:{}", id);
        } else {
            LOGGER.info("exchange not received msg, id:{},cause:{}", id, cause);
        }
    }

}

消息生产者:

package com.example.demo.controller;

import com.example.demo.config.HistoryCallBack;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;

import static com.example.demo.config.AgainMagSedConfig.AGAIN_EXCHANGE;
import static com.example.demo.config.AgainMagSedConfig.AGAIN_ROUTING_KEY;

@RestController
@RequestMapping("/")
public class HelloController {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private HistoryCallBack historyCallBack;

    //依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(historyCallBack);
    }

    @GetMapping("/hello/{msg}")
    public Boolean sendMq(@PathVariable("msg") String msg) {

        //定义发布回调的id
        CorrelationData correlationData = new CorrelationData(msg);
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) {
                message.getMessageProperties().setHeader("num", 1);
                return message;
            }
        };

        rabbitTemplate.convertAndSend(AGAIN_EXCHANGE, AGAIN_ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8), messagePostProcessor);
//        rabbitTemplate.convertAndSend(AGAIN_EXCHANGE, AGAIN_ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8), messagePostProcessor, correlationData);

        return true;
    }
}

消费者:

package com.example.demo.mqlistener;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

import static com.example.demo.config.AgainMagSedConfig.*;
import static com.example.demo.config.HistoryDirectRabbitConfig.DEAD_QUEUE;

@Component
public class AgainListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(AgainListener.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 消费队列
     * @param channel
     * @param message
     * @throws IOException
     */
    @RabbitListener(queues = AGAIN_QUEUE)
    public void acceptAgainQueue(Channel channel, Message message) throws IOException {
        String s = new String(message.getBody());
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        LOGGER.info("队列:{},接受的消息是:{},请求头的num是:{}", AGAIN_QUEUE, s, headers.get("num"));
        if (s.equals("test")) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    /**
     * 死信队列
     * @param channel
     * @param message
     * @throws IOException
     */
    @RabbitListener(queues = DEAD_QUEUE)
    public void acceptDeadQueue(Channel channel, Message message) throws IOException {
        String s = new String(message.getBody());
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        LOGGER.info("死信队列:{},接受的消息是:{},请求头的num是:{}", DEAD_QUEUE, s, headers.get("num"));
        Integer num = (Integer) headers.get("num");
        final Integer num1 = ++num;
        if (num <= 3) {
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) {
                    message.getMessageProperties().setHeader("num", num1);
                    return message;
                }
            };
            rabbitTemplate.convertAndSend(AGAIN_EXCHANGE, AGAIN_ROUTING_KEY, s, messagePostProcessor);
        }

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

效果:

 

标签:QUEUE,rabbitmq,重试,失败,springframework,org,import,message,public
From: https://www.cnblogs.com/lzghyh/p/16656949.html

相关文章

  • .Net Core&RabbitMQ优先级队列
    优先级队列消息除了有生命周期长短,也有紧急与非紧急之分,承载了具有优先级消息的队列则为优先级队列。队列优先级设置为消息设置优先级前,队列需要先具备优先级的能力,队......
  • Navicat连接Oracle时报错 “ORA-28547:连接服务器失败,可能是oracle net 管理错误“ 或
    Navicat连接Oracle时报错“ORA-28547:连接服务器失败,可能是oraclenet管理错误“ 或者“ORA-03135:ConnectionLostContact”使用Navicat连接oracle数据库时报ORA-2......
  • 考试失败原因
     首先,最重要的一点,我暑假确实没有好好学习JAVA,只是粗略地在网上找了视频看了几次,于是许多基础的JAVA语法我没有掌握,每次课程也只是一味地看,课后习题从来没做过,只在专门的......
  • libssh2_sftp_rename失败问题
    最近遇到一个问题,使用libssh2_sftp上传文件时,传输的文件命名文xxx.xx.temp,上传完成后需要把temp后缀去掉。libssh2提供了一个接口libssh2_sftp_rename用于重命名远端文件......
  • IOS 上传ipa文件失败
     Nosuitableapplicationrecordswerefound.Verifyyourbundleidentifier'com.***'iscorrect. iTunesConnect,BundleID/BundleIDSuffixerror修改后成......
  • 深入RabbitMQ消息可靠性
    mandatory——处理不可路由消息在使用Basic.Publish发送一条消息并携带参数mandatory=True时,当消息是不可路由的时,RabbitMQ会发回一个Basic.Return方法帧。不可路由消息......
  • SpringCloud Alibaba 打包后在启动从Nacos读取配置文件失败
     SpringCloudAlibaba引用Nacos配置中心,读取数据源配置,在调试运行都正常,但是打包后在启动运行则会报错,提示读取配置失败巴拉巴拉。执行运行命令java-jar-Dserver.por......
  • EasyCVR级联时传输协议选择TCP,上级平台播放失败的原因排查与解决办法
    EasyCVR视频融合平台部署轻快、功能灵活,可支持多协议、多类型设备接入,在视频能力上,可实现视频直播、录像、回放、检索、云存储、告警上报、集群以及平台级联等。其中平台级......
  • Docker安装RabbitMQ详细步骤
    Docker安装RabbitMQ详细步骤前提:1、在服务器的安全组和防火墙中放通相对应的端口,操作系统:centos7.6,需要放通5672和15672端口2、登录自己的Linux系统服务器3、关闭服务......
  • SSM项目启动Tomcat失败
    1、Tomcat无法运行起来我的情况是配置好Tomcat后tomcat没有闪退但是tomcat启动不起来,当时用的是Tomcat10解决方法是把下面这段代码注释掉就能启动了<!--乱码过滤-->......