首页 > 其他分享 >异步MQ:后发先至

异步MQ:后发先至

时间:2024-10-20 10:51:12浏览次数:8  
标签:异步 sequenceNumber 处理 先至 MQ 消息 message 序列号

目录

一、消息排序与识别

1、消息添加时间戳或序列号

2、识别重复消息

二、状态管理与补偿机制

1、维护处理状态

2、建立补偿机制

三、监控与告警

1、实时监控消息处理顺序

2、告警与通知


在异步处理过程中,当出现消息“后发先至”的情况时,消费者可以采取以下措施来处理:

一、消息排序与识别

1、消息添加时间戳或序列号

  • 消息生产者在发送消息时,可以为消息添加一个精确的时间戳或唯一的序列号。这样消费者在接收消息时,可以根据时间戳或序列号来判断消息的顺序。例如,使用当前的时间戳精确到毫秒级别,或者使用一个自增的序列号,确保每个消息都有一个明确的顺序标识。
  • 消费者在处理消息之前,可以先对消息进行排序。可以使用数据结构如优先队列(Priority Queue),按照时间戳或序列号的顺序对消息进行存储和处理。这样可以确保先发送的消息先被处理,避免“后发先至”导致的顺序混乱  
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Message {
    private int sequenceNumber;
    private long timestamp;
    private String content;

    public Message(int sequenceNumber, long timestamp, String content) {
        this.sequenceNumber = sequenceNumber;
        this.timestamp = timestamp;
        this.content = content;
    }

    public int getSequenceNumber() {
        return sequenceNumber;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public String getContent() {
        return content;
    }
}

public class AsynchronousProcessingExample {
    private static List<Message> messageQueue = new ArrayList<>();

    public static void main(String[] avg) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 10; i++) {
            int finalI = i;
            executorService.submit(() -> {
                // 模拟随机延迟
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long timestamp = System.currentTimeMillis();
                int sequenceNumber = finalI + 1;
                Message message = new Message(sequenceNumber, timestamp, "Message " + sequenceNumber);
                messageQueue.add(message);
                System.out.println("Sent message: " + message.getContent() + ", Sequence Number: " + message.getSequenceNumber() + ", Timestamp: " + message.getTimestamp());
            });
        }

        executorService.shutdown();

        while (!executorService.isTerminated()) {
            // 等待所有任务完成
        }

        // 处理消息队列,按照时间戳或序列号排序
        messageQueue.sort((m1, m2) -> {
            if (m1.getTimestamp() < m2.getTimestamp()) {
                return -1;
            } else if (m1.getTimestamp() > m2.getTimestamp()) {
                return 1;
            } else {
                return Integer.compare(m1.getSequenceNumber(), m2.getSequenceNumber());
            }
        });

        System.out.println("\nProcessed messages in order:");
        for (Message message : messageQueue) {
            System.out.println("Processed message: " + message.getContent() + ", Sequence Number: " + message.getSequenceNumber() + ", Timestamp: " + message.getTimestamp());
        }
    }
}

上图举例,创建了一个Message类来表示消息,包含序列号、时间戳和内容。在主方法中,使用多线程模拟异步发送消息的过程,为每个消息分配一个序列号和时间戳。最后,对消息队列按照时间戳或序列号进行排序,确保消息按照正确的顺序处理。

2、识别重复消息

  • 由于“后发先至”可能导致重复处理消息,消费者需要能够识别重复消息并避免重复处理。可以使用消息的唯一标识(如消息 ID)来判断消息是否已经被处理过。例如,将已处理的消息 ID 存储在一个数据库表或缓存中,在处理新消息时,先检查该消息 ID 是否已经存在。如果存在,则说明该消息已经被处理过,可以直接忽略;如果不存在,则进行正常处理。

二、状态管理与补偿机制

1、维护处理状态

  • 消费者在处理消息的过程中,应该维护消息的处理状态。可以使用数据库或分布式存储系统来记录每个消息的处理进度和状态。例如,当消费者开始处理一个消息时,可以将该消息的状态设置为“处理中”;当处理完成后,将状态更新为“已处理”。这样,即使出现“后发先至”的情况,消费者也可以根据消息的状态来决定是否需要重新处理该消息。
  • 对于长时间处理的消息,可以设置超时机制。如果一个消息的处理时间超过了一定的阈值,可以将其状态标记为“超时”,并进行相应的处理,如重新处理或通知管理员。

2、建立补偿机制

  • 如果消费者发现由于“后发先至”导致消息处理顺序错误,可以建立补偿机制来纠正错误。例如,如果一个消息的处理依赖于前面的消息,但是前面的消息还没有被处理,消费者可以暂停处理当前消息,等待前面的消息处理完成后再继续。或者,如果一个消息已经被处理,但是后面的消息导致该消息的处理结果需要修改,消费者可以进行补偿处理,如回滚前面的操作并重新处理。

三、监控与告警

1、实时监控消息处理顺序

  • 建立监控系统,实时监控消息的处理顺序。可以使用日志分析工具或专门的监控软件来跟踪消息的处理过程,并检测是否出现“后发先至”的情况。例如,通过分析消费者的日志,查找处理时间异常的消息,或者使用监控工具设置特定的规则来检测消息处理顺序的异常。
  • 监控系统可以生成实时报表和可视化图表,以便管理员能够直观地了解消息处理的情况。如果发现“后发先至”的情况频繁出现,管理员可以及时采取措施进行调整和优化。

2、告警与通知

  • 当监控系统检测到“后发先至”的情况时,应该及时发出告警通知相关人员。告警方式可以包括邮件、短信、即时通讯工具等。例如,当出现严重的消息处理顺序错误时,发送紧急邮件通知管理员,并在监控界面上显示醒目的告警信息。
  • 告警通知应该包含详细的错误信息,如消息 ID、处理时间、错误原因等,以便相关人员能够快速定位和解决问题。同时,应该建立相应的处理流程,确保告警得到及时响应和处理。

标签:异步,sequenceNumber,处理,先至,MQ,消息,message,序列号
From: https://blog.csdn.net/m0_37658981/article/details/143089444

相关文章

  • SpringBoot 整合 RabbitMQ
    简介一般在开发过程中:生产者工程:application.yml文件配置RabbitMQ相关信息;在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定;注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机。消费者工程:application.yml文件配置RabbitMQ相关信......
  • RabbitMQ 通配符(Topic)模式示例
    总结自:BV15k4y1k7Ep模式说明Topic类型与Direct相比,都是可以根据Routingkey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routingkey的时候使用通配符!Topic类型的Routingkey一般都是由一个或多个单词组成,多个单词之间以.分隔,例如:item.insert通配符规......
  • RabbitMQ 路由(Routing)模式示例
    总结自:BV15k4y1k7Ep模式说明和消费订阅模式相比,路由模式特点:交换机的类型为Direct。队列与交换机绑定时,要指定一个Routingkey(路由key)。消息的发送方在向Exchange发送消息时,也必须指定消息的Routingkey。Exchange不再把消息交给每一个绑定的队列,而是根据消息的Rout......
  • C#异步计数器的使用
    1、CancellationTokenSourcecancellationToken=newCancellationTokenSource();publicvoidInitData(){cancellationToken.Token.Register(()=>{Name="被取消了";});intcount=0;Task.Run(()=>{whi......
  • RabbitMQ 发布订阅(Publish Subscribe)模式示例
    总结自:BV15k4y1k7Ep交换机订阅模式示例图:在简单模式和工作队列模式中,只有3个角色:P:生产者,也就是要发送消息的程序。C:消费者,消息的接受者,会一直等待消息到来。Queue:消息队列,图中红色部分。而在订阅模型中,多了一个Exchange角色,而且工作过程略有变化:P:生产者,也就是要......
  • RabbitMQ 普通模式
    RabbitMQ普通模式一、普通模式示意图二、普通模式介绍RabbitMQ普通模式也称为点对点模式,它是消息队列的一种基本实现方式。在这种模式下,生产者将消息发送到队列中,消费者从队列中接收并处理消息。每条消息只会被一个消费者接收,其他消费者无法重复消费。特点:单一消费......
  • 记录Redis+MQ延迟双删保证缓存一致性
    场景描述在博客系统中,用户可以给博客点赞或者评论,这些操作需要更新数据库中的数据,同时要保证缓存中的博客信息与数据库保持一致。为了提高性能,博客数据会存放在Redis缓存中。但当有大量用户同事点赞或是评论时,缓存和数据库中的数据可能出现不一致。何谓延迟双删?延迟双删......
  • RabbitMQ系列学习笔记(八)--发布订阅模式
    文章目录一、发布订阅模式原理二、发布订阅模式实战1、消费者代码2、生产者代码3、查看运行结果本文参考:尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmqRabbitMQ详解Centos7环境安装Erlang、RabbitMQ详细过程(配图)一、发布订阅模式原理在开发过程中,有一......
  • RabbitMQ系列学习笔记(十)--通配符模式
    文章目录一、通配符模式原理二、通配符模式实战1、消费者代码2、生产者代码3、查看运行结果本文参考:尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmqRabbitMQ详解Centos7环境安装Erlang、RabbitMQ详细过程(配图)一、通配符模式原理通配符模式(Topics)是在路......
  • 【深度知识】5.Prometheus-PromQL查询监控数据和语法说明
    成就你的写作梦想立即下载 【深度知识】5.Prometheus-PromQL查询监控数据和语法说明笔名辉哥 简书优秀创作者0.8682021-03-2822:20IP属地:上海打开App  1.摘要PromQL(PrometheusQueryLanguage)是Prometheus自己开发的数据查询DSL......