首页 > 其他分享 >记录一下怎么保证MQ消费消息去重,消息重试

记录一下怎么保证MQ消费消息去重,消息重试

时间:2024-04-27 11:35:13浏览次数:27  
标签:MAP String tableName value 重试 MQ 消息 key

先说 背景,有消息生产,有很多SQL表名称,对应去统计不同表的数据,更新数量,但是这些消息会重复,可能有很多逻辑都要重复执行,可能会速度慢

生产:

这是SQL解析,重要的是这段 ,

tableName是枚举里面固定的,图片中有显示
  RabbitMQSender.sendMessage(MQConfig.FIRST_PAGE_SQL_ROUTINGKEY, tableName, MessageType.COMMON, uuid);
///伪代码     
  StatementHandler statementHandler = PluginUtils.realTarget(invocation.getTarget()); MetaObject metaObject = SystemMetaObject.forObject(statementHandler); MappedStatement mappedStatement = (MappedStatement)metaObject.getValue("delegate.mappedStatement"); if(!SqlCommandType.SELECT.equals(mappedStatement.getSqlCommandType())){ Object proceed = invocation.proceed(); firstPageSqlParse(metaObject); return proceed;
      }


private void firstPageSqlParse(MetaObject metaObject) {
BoundSql boundSql = (BoundSql) metaObject.getValue("delegate.boundSql");
String sql = boundSql.getSql();
CompletableFuture.runAsync(() -> {
// 语句提取表名:
String lowSql = sql.toLowerCase();
String tableName = extractTableName(lowSql);
FirstPageCountEnum[] values = FirstPageCountEnum.values();
Set<String> collect = Arrays.stream(values).map(FirstPageCountEnum::getTableName).collect(Collectors.toSet());
if (collect.contains(tableName)) {
String uuid = IdUtil.randomUUID();
RabbitMQSender.sendMessage(MQConfig.FIRST_PAGE_SQL_ROUTINGKEY, tableName, MessageType.COMMON, uuid);
}
});
}

private static String extractTableName(String sql) {
String regex = "(insert\\s+into\\s+|update\\s+|delete\\s+from\\s+)(\\w+)";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(sql);
if (matcher.find()) {
String tableName = matcher.group(2);
System.out.println(tableName);
return tableName;
}
return "";
}

 

 

消费端:

这就是普通接受部分

 /**
     * 监听单个队列
     * concurrency:并发处理消息数
     */
    @RabbitListener(queues = MQConfig.FIRST_PAGE_SQL)
    @RabbitHandler
    public void notificationQueueReceiver(Message message, Channel channel) throws IOException {
        messageHandler(message, channel);
    }

    private void messageHandler(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.ACCEPT;
        try {
            MessageBody messageBody = MessageBody.getMessageBody(message);
          
            String sql = MessageBody.getMessageBody(message).getData().toString();
            // 锁, 
            Object o = CACHE_MQ_MAP_LOCK.get(sql);
            if (o != null) {
                if (CACHE_MQ_MAP.getOrDefault(sql, 0) == 0) {
                    synchronized (o) {
                        if (CACHE_MQ_MAP.getOrDefault(sql, 0) == 0) {
                            CACHE_MQ_MAP.put(sql, 1);
                        }
                        o.notifyAll();
                    }
                }
            }
        } catch (Exception e) {
            log.error("MQ处理消息出错", e);
        } finally {
            // 通过 finally 块来保证 Ack/Nack 会且只会执行一次
            if (action == Action.ACCEPT) {
                // false 只确认当前 consumer 一个消息收到,true 确认所有 consumer 获得的消息。
                channel.basicAck(deliveryTag, false);
            } else {
                // 第二个 boolean 为 false 表示不会重试,为 true 会重新放回队列
                channel.basicReject(deliveryTag, false);
            }
        }
    }

初始化两个MAP 锁,以及去除重复的数据Map

 @PostConstruct
    public void init() {
        CompletableFuture.runAsync(() -> {
            for (FirstPageCountEnum value : FirstPageCountEnum.values()) {
                CACHE_MQ_MAP_LOCK.put(value.getTableName(), new Object());
                try {
//  为了初始化可以数据更新 String uuid = IdUtil.randomUUID(); RabbitMQSender.sendMessage(MQConfig.FIRST_PAGE_SQL_ROUTINGKEY, value.getTableName(), MessageType.COMMON, uuid); } catch (Exception e) { e.printStackTrace(); } } // 创建消费者线程,处理逻辑在下面,这里有点问题,创建的线程多了,因为表名称重复了。 for (FirstPageCountEnum value : FirstPageCountEnum.values()) { Thread consumerThread = new Thread(new Consumer(CACHE_MQ_MAP, value.getTableName(), userFirstPageCountService)); consumerThread.start(); } }); }   // 保存消息的去重复的MAP private final ConcurrentHashMap<String, Integer> CACHE_MQ_MAP = new ConcurrentHashMap<>(32);
  // 保存锁的 private static final ConcurrentHashMap<String, Object> CACHE_MQ_MAP_LOCK = new ConcurrentHashMap<>(32);

 

创建的消费线程处理的逻辑

 static class Consumer implements Runnable {
        private final ConcurrentHashMap<String, Integer> dataMap;
        private final String tableName;
        private final UserFirstPageCountService userFirstPageCountService;

        public Consumer(ConcurrentHashMap<String, Integer> dataMap, String tableName, UserFirstPageCountService userFirstPageCountService) {
            this.userFirstPageCountService = userFirstPageCountService;
            this.tableName = tableName;
            this.dataMap = dataMap;
        }

        @Override
        public void run() {
            // 消费数据
            while (true) {
                for (String key : dataMap.keySet()) {
                    if (!key.equals(tableName)) {
                        continue;
                    }

                    Object lock = CACHE_MQ_MAP_LOCK.get(key);
                    // 获取数据
                    try {
                        Thread.sleep(2000);
                        int value = dataMap.getOrDefault(key, 0);
                        log.info("Consumed: {},  value:{}", key, value);
                        if (value == 1) {
                            dataMap.put(key, 0);
                //这里业务处理逻辑,里面加了重试。 userFirstPageCountService.sqlParse(tableName); } synchronized (lock) { if (dataMap.getOrDefault(key, 0) == 0) { // 设置对应的key为0 log.info("wait {},{}", key, 0); lock.wait(); log.info("notify {},{}", key, 0); } } } catch (Exception e) { log.error("Consumed error ", e); } } } } }

 

这是重试,初始化逻辑可以保证不丢失。 以及这里出错后调用不成功可以不断重试

 

标签:MAP,String,tableName,value,重试,MQ,消息,key
From: https://www.cnblogs.com/liran123/p/18161866

相关文章

  • rabbitmq系列03---发布确认
    一、发布确认逻辑生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的......
  • 好久没关注uCOS系统的消息了,全家桶免费后,竟一直以全新的名字Flexible Safety RTOS登场
    【视频版】https://www.bilibili.com/video/BV1Kb421Y7v9【前言】2020年初,uCOS全家桶宣布免费后,其Github上uCOS-III更新过两个小版本,uCOS-II仅更新了一次,后面就一直没有更新。uCOS-II的最后一次更新定格在2021年:uCOS-III的最后一次更新定格在2022年末  【现状】开源......
  • Rabbitmq系列02---Exchange
    个人理解:交换机的类型划分个人理解是能过routingkey来划分的,一是否按routingkey找队列;fanout就是不按routingkey找队列,Direct和Topicr按routingkey找队列,只是一个模糊找,一个精准找,而headers不按routingkey是按消头中的内容找队列。一、Fanout(订阅模式|广播模式)  Fanout......
  • RabbitMQ处理消费者过载的策略
    RabbitMQ的消费者过载指的是在RabbitMQ消息队列系统中,消费者(即处理消息的应用程序或进程)无法及时处理从队列中接收到的消息,导致消息在队列中积压,进而可能引发系统性能下降、延迟增加或甚至系统崩溃等问题。引起消费者过载的原因:高负载产生的流量:当生产者向RabbitMQ发送大量消息......
  • RabbitMQ工作原理详解
    RabbitMQ的工作原理主要涉及生产者、消费者、交换机、队列和绑定等组件的交互。以下是其工作原理的详细解释:1、生产者(Producer):生产者负责创建消息并将其发送到RabbitMQ服务器。这些消息可以包含任何类型的数据,如JSON、XML等。生产者首先与RabbitMQ服务器建立连接,并创建一个通......
  • 用于日期转换的消息转换器
    正常来讲,在项目中用到消息日期这类信息的时候,如果要按要求格式化,一般有两种方法1.@JsonFormat(pattern="yyyy-MM-ddHH:mm:ss")在springboot的通常配置拦截器的WebMvcConfiguration中扩展SpringMVC的消息转换器,统一对日期类型进行格式化处理(推荐)第一步:设置相......
  • Apache RocketMQ ACL 2.0 全新升级
    作者:徒钟引言RocketMQ作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ现有的AC......
  • 【网络通信】一文读懂网络应用层常见协议的区别(HTTP 、HTTPS、MQTT、FTP、RTSP、RTMP)
        应用层协议是计算机网络中至关重要的组成部分,它们定义了应用程序如何与网络进行交互,实现数据的传输、接收和处理。本文将重点介绍几种常见的应用层协议:HTTP、HTTPS、MQTT、FTP、RTSP和RTMP,分析它们的特点、区别、工作原理以及应用场景。一、HTTP协议      ......
  • RocketMQLog:WARN No appenders could be found for logger (io.netty.channel.nio.Ni
    springBoot集成rocketMq启动的时候报RocketMQLog:WARNNoappenderscouldbefoundforlogger(io.netty.channel.nio.NioEventLoop). RocketMQLog:WARNPleaseinitializetheloggersystemproperly. 原因是pom中的rocket的依赖版本太高了。<dependency><groupI......
  • Windows下RocketMQ的启动
    下载地址:下载|RocketMQ 解压后   一、修改runbroker.cmd修改 bin目录下的runbroker.cmdset"JAVA_OPT=%JAVA_OPT%-server-Xms2g-Xmx2g"set"JAVA_OPT=%JAVA_OPT%-XX:MaxDirectMemorySize=15g"set"JAVA_OPT=%JAVA_OPT%-cp%CLASSPATH%"分别改为 s......