首页 > 其他分享 >RabbitMQ——死信队列介绍和应用

RabbitMQ——死信队列介绍和应用

时间:2024-07-18 13:30:42浏览次数:15  
标签:false String 队列 RabbitMQ 死信 public channel

死信和死信队列的概念

什么是死信?简单来说就是无法被消费和处理的消息。一般生产者将消息投递到broker或者queue,消费者直接从中取出消息进行消费。但有时因为某些原因导致消息不能被消费,导致消息积压在队列中,这样的消息如果没有后续的处理就会变成死信,那么专门存放死信的队列就是死信队列。

什么是死信交换机?

那么什么是死信交换机呢?死信交换机是指专门将死信路由到死信队列的交换机。

产生死信的原因

根据官方文档,我们发现一般有三种场景会产生死信。

 

消息超过TTL,即消息过期

2.消息被nack或reject,且不予重新入队

3.队列达到最大长度

死信队列实战和应用

死信队列的应用并不难,无非就是多定义了一个交换机、routingKey和队列罢了。在声明普通队列时传入Map参数,往Map中put死信队列名称、死信routingKey、消息TTL等参数即可完成死信自动投递到死信队列的流程。通过如下代码即可绑定普通队列和死信交换机了,而且还能设置routingKey和队列长度等参数,无需像传统的那样通过channel绑定。

Map<String, Object> arguments = new HashMap<>(); 
// 过期时间 
arguments.put("x-message-ttl", 10000);
// 正常队列设置死信交换机 
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); 
// 设置死信
routingKey arguments.put("x-dead-letter-routing-key", "lisi");
// 设置正常队列的长度限制 arguments.put("x-max-length", 10);

流程图:

生产者Producer:

public class Producer { 
    // 普通交换机名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtils.getChannel();
        //死信消息 设置TTL时间
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                    .builder().expiration("10000").build();

        // 延迟消息
        for (int i = 0;i < 10;i++) {
            String message = i + "info";
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,         
                    message.getBytes());
        }
    }
}          

普通队列消费者C1:

public class Consumer01 {
    // 普通交换机名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    // 普通队列名称
    public static final String NORMAL_QUEUE = "normal_queue";
    // 死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtils.getChannel();
        // 声明死信和普通交换机,类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 声明普通队列
        Map<String, Object> arguments = new HashMap<>();
        // 过期时间
        arguments.put("x-message-ttl", 10000);
        // 正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置死信routingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        // 设置正常队列的长度限制
        arguments.put("x-max-length", 10);

        // 声明普通队列
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        // 声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("consumer01等待接收消息");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            if (msg.equals("info5")) {
                System.out.println("consumer01接收的消息:" + new String(message.getBody()));
                System.out.println(msg + ":此消息是被拒绝的");
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false); //拒绝此消息并不放回普通队列
            } else {
                System.out.println("consumer01接收的消息:" + new String(message.getBody()));
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("C1取消消息");
        };
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
    }
}

死信队列消费者C2

public class Consumer02 {
    // 死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("consumer02等待接收消息");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("consumer02接收的消息:" + new String(message.getBody()));
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("C2取消消息");
        };
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
    }
}

依次启动生产者,和两个消费者,并停掉普通队列的消费者,我们发现生产者发送的消息被死信队列消费者C2给接收了。

在上面的代码中,我在普通队列中设置了消息的TTL为5s,但是我又在生产者设置发送的消息TTL为10s,那么RabbitMQ会以哪个为准呢?其实RabbitMQ会以较短的TTL为准

BI项目添加死信队列

声明交换机、队列和routingKey的配置类

@Configuration
public class TtlQueueConfig {
    private final String COMMON_EXCHANGE = "bi_common_exchange"; // 普通交换机名称
    private final String COMMON_QUEUE = "bi_common_queue"; // 普通队列名称
    private final String DEAD_LETTER_EXCHANGE = "bi_dead_letter_exchange"; // 死信交换机名称
    private final String DEAD_LETTER_QUEUE = "bi_dead_letter_queue"; // 死信队列名称
    private final String COMMON_ROUTINGKEY = "bi_common_routingKey"; // 普通routingKey
    private final String DEAD_LETTER_ROUTINGKEY = "bi_dead_letter_routingKey"; // 死信routingKey

    // 普通交换机
    @Bean("commonExchange")
    public DirectExchange commonExchange() {
        return new DirectExchange(COMMON_EXCHANGE);
    }

    // 死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 普通队列
    @Bean("commonQueue")
    public Queue commonQueue() {
        Map<String, Object> map = new HashMap<>(3);
        map.put("x-message-ttl", 20000);
        map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        map.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTINGKEY);
        return QueueBuilder.durable(COMMON_QUEUE).withArguments(map).build();
    }

    // 死信队列
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    @Bean
    public Binding commonQueueBindingCommonExchange(@Qualifier("commonQueue") Queue commonQueue,
                                                    @Qualifier("commonExchange") DirectExchange commonExchange) {
        return BindingBuilder.bind(commonQueue).to(commonExchange).with(COMMON_ROUTINGKEY);
    }

    @Bean
    public Binding deadQueueBindingDeadExchange(@Qualifier("deadLetterQueue") Queue deadLetterQueue,
                                                @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTINGKEY);
    }
}

普通消费者(负责异步生成图表信息)

@Configuration
            // 如果失败,消息拒绝
            channel.basicNack(deliveryTag, false, false);
            log.info("消息为空拒绝接收");
            log.info("此消息正在被转发到死信队列中");
        }

        long chartId = Long.parseLong(message);
        Chart chart = chartService.getById(chartId);
        if (chart == null) {
            channel.basicNack(deliveryTag, false, false);
            log.info("图标为空拒绝接收");
            throw new BusinessException(ErrorCode.NOT_FOUND_ERROR, "图表为空");
        }

        // 先修改图表任务状态为“执行中”。等执行成功后,修改为“已完成”、保存执行结果;执行失败后,状态修改为“失败”,记录任务失败信息。
        Chart updateChart = new Chart();
        updateChart.setId(chart.getId());
        updateChart.setStatus("running");
        boolean b = chartService.updateById(updateChart);
        if (!b) {
            channel.basicNack(deliveryTag, false, false);
            handlerChartUpdateError(chart.getId(), "更新图表执行状态失败");
            return;
        }
        // 调用AI
        String result = aiManager.doChat(CommonConstant.BI_MODEL_ID, buildUserInput(chart));
        String[] splits = result.split("【【【【【");
        if (splits.length < 3) {
            channel.basicNack(deliveryTag, false, false);
            handlerChartUpdateError(chart.getId(), "AI生成错误");
            return;
        }
        String genChart = splits[1].trim();
        String genResult = splits[2].trim();
        Chart updateChartResult = new Chart();
        updateChartResult.setId(chart.getId());
        updateChartResult.setGenChart(genChart);
        updateChartResult.setGenResult(genResult);
        updateChartResult.setStatus("succeed");
        boolean updateResult = chartService.updateById(updateChartResult);
        if (!updateResult) {
            channel.basicNack(deliveryTag, false, false);
            handlerChartUpdateError(chart.getId(), "更新图表成功状态失败");
        }
        Long userId = chartService.queryUserIdByChartId(chartId);
        String myChartId = String.format("lingxibi:chart:list:%s", userId);
        redisTemplate.delete(myChartId);

        // 如果任务执行成功,手动执行ack
        channel.basicAck(deliveryTag, false);
    }


    private void handlerChartUpdateError(long chartId, String execMessage) {
        Chart updateChartResult = new Chart();
        updateChartResult.setId(chartId);
        updateChartResult.setStatus("failed");
        updateChartResult.setExecMessage(execMessage);
        boolean updateResult = chartService.updateById(updateChartResult);
        if (!updateResult) {
            log.error("更新图表失败状态失败" + chartId + "," + execMessage);
        }
    }

    /**
     * 构建用户输入
     * @param chart
     * @return
     */
    private String buildUserInput(Chart chart) {
        String goal = chart.getGoal();
        String chartType = chart.getChartType();
        String csvData = chart.getChartData();

        // 构造用户输入
        StringBuilder userInput = new StringBuilder();
        userInput.append("分析需求:").append("\n");
        // 拼接分析目标
        String userGoal = goal;
        if (StringUtils.isNotBlank(chartType)) {
            userGoal += ",请使用" + chartType;
        }
        userInput.append(userGoal).append("\n");
        userInput.append("原始数据:").append("\n");
        // 压缩后的数据

        userInput.append(csvData).append("\n");
        return userInput.toString();
    }
}

死信队列消费者(负责处理死信)

 收到死信后我是直接确认了,这种方式可能不好,你也可以换成其他方式比如重新入队,或者写入数据库并打上日志等等。
@Component
@Slf4j
public class TtlQueueConsumer {
    @Resource
    BIMessageProducer biMessageProducer;

    @SneakyThrows
    @RabbitListener(queues = "bi_dead_letter_queue", ackMode = "MANUAL")
    public void doTTLMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        log.info("已经接受到死信消息:{}", message);
        biMessageProducer.sendMessage(message);
        channel.basicAck(deliveryTag, false);
    }
}

如果我的文章对你有帮助的话,不妨给我点个赞呗,我会持续带来不一样的内容。如果对Java相关知识感兴趣的话,可以关注我,带你走进Java的世界。

 

标签:false,String,队列,RabbitMQ,死信,public,channel
From: https://www.cnblogs.com/qimoxuan/p/18309338

相关文章

  • 一个故事理解消息队列-上
    前段时间,知识星球里一位同学给我分享了他对消息队列的理解,并且用一个故事形象的表述了消息队列的作用。看完他的表述,我觉得用故事来描述技术组件作用的方式很有意思,也更容易让人理解。这篇文章,借用他的故事,为大家简单介绍一下消息队列。 消息队列的故事假设现在有一本技术......
  • 用数组模拟环形队列——2
    在上一篇的文章中我们介绍了队列的使用,但是普通的队列存在资源浪费的问题,为了解决这个问题,又提出了环形队列,今天,我们就详细介绍一下环形队列的使用方法。环形队列的特点:1.队列的存储结构是一个环形结构,即队列的头尾相连,形成一个环形。2.队列有固定的大小,通常用数组来实现。......
  • RabbitMQ安装
    一、下载erlang语言包https://www.erlang.org/downloads安装otp_win64_20.2.exe右键以管理员身份运行,一直下一步直到安装完成结束二、下载RabbitMQ安装或解压文件https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.41.第一步,傻瓜式安装,一直下一步直到安装结......
  • redis学习-12(实现分布式锁、消息队列、缓存一致性问题、单线程快的原因、跳跃表)
    引用以下内容:redis实现分布式锁:Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案)Redis实现分布式锁的7种方案,及正确使用姿势!redis实现消息队列Redis的学习教程(十)之使用Redis实现消息队列缓存一致性问题想要保证数据库和Redis缓存一致性,推荐采用先更新数......
  • 利用Redis Stream实现一个可靠的消息队列
    RedisStreams简介RedisStreams是Redis5.0引入的一种新数据结构,专门用于处理日志和消息流数据。它结合了多种数据结构的优点,提供了高效的消息存储和消费机制。RedisStreams可以用于实时数据处理、事件驱动的系统、日志聚合和消息队列等场景。主要特点持久化:redis的p......
  • RabbitMQ 添加新用户
    比如:在windows系统中安装的RabbitMQ。它的默认用户名 guest密码是 guest如果要新加一个用户 ,用户名 admin密码是 admin新建用户需要授权 添加用户   或   创建成功之后,点进新创建的用户 用户授权这样就可以用新创建的用户连接了从 黄色的......
  • 异步任务队列
    #周朱张孙宋刘陈"胡王周朱谢周朱刘庄谢.黄"#周朱张孙宋刘陈"./宋周_胡王周朱谢周朱刘庄谢.黄"//#周朱张孙宋刘陈"孙周李袁王郭宋董陈朱.黄"#周朱张孙宋刘陈"赵陈罗曾庄朱罗.黄"#周朱张孙宋刘陈"欧陈朱刘陈郭姜邓曾.黄"#周朱张孙宋刘陈"邓周杨杨蒋胡赵.黄"#周朱张孙宋......
  • 说说RabbitMQ延迟队列实现原理?
    使用RabbitMQ和RocketMQ的人是幸运的,因为这两个MQ自身提供了延迟队列的实现,不像用Kafka的同学那么苦逼,还要自己实现延迟队列。当然,这都是题外话,今天咱们重点来聊聊RabbitMQ延迟队列的实现原理,以及RabbitMQ实现延迟队列的优缺点有哪些?很多人知道使用RabbitMQ是可......
  • 队列
    队列创建宏定义:configSUPPORT_STATIC_ALLOCATION//静态创建QueueHandle_txQueueCreateStatic(UBaseType_tuxQueueLength,UBaseType_tuxItemSize,uint8_t*pucQueueStorageBuffer,StaticQueue_t*pxQueueBuffer);/*Thequeueistobecreatedtoholdamaxi......
  • 使用RocketMQ 实现基于标签过滤的消息队列生产和消费
    在分布式系统中,消息队列(MessageQueue,MQ)是一种常见的通信方式,它能够解耦系统组件,提供异步通信,提升系统的伸缩性和可靠性。ApacheRocketMQ是一款开源的分布式消息中间件,具有高性能、低延迟、高可靠性和高可用性等特点。本文将介绍如何使用ApacheRocketMQ实现基于标签过......