首页 > 其他分享 >一文讲透消息队列RocketMQ实现消费幂等

一文讲透消息队列RocketMQ实现消费幂等

时间:2023-12-18 13:16:38浏览次数:25  
标签:orderPO version 队列 订单 ConsumeConcurrentlyStatus 消息 讲透 id RocketMQ

这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等

1 基础概念

消费幂等是指:当出现 RocketMQ 消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。

例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为100元。

如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费100元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

2 适用场景

RocketMQ 消息重复的场景如下:

  • 发送时消息重复

    当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。

    如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同但 Message ID 不同的消息。

  • 投递时消息重复

    消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,Broker 服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及消费者应用重启)

    Broker 端或客户端重启、扩容或缩容时,会触发 Rebalance ,此时消费者可能会收到少量重复消息。

3 业务唯一标识

因为不同的 Message ID 对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。

最好的方式是以业务唯一标识作为幂等处理的关键依据,消息必须携带业务唯一标识

消息携带业务唯一标识一般来讲有两种方式:

  1. 消息 Key 存放业务唯一标识
Message msg = new Message(TOPIC /* Topic */,
             TAG /* Tag */,
               ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
             );
message.setKey("ORDERID_100"); // 订单编号
SendResult sendResult = producer.send(message);      
  1. 消息 body 存放业务唯一标识
Message msg = new Message(TOPIC /* Topic */,
             TAG /* Tag */,
               (JSON.toJSONString(orderDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
             );
message.setKey("ORDERID_100"); // 订单编号
SendResult sendResult = producer.send(message);      

消费者收到消息时,从消息中获取订单号来实现消息幂等 :

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt message : msgs) {
            // 方法1: 根据业务唯一标识的Key做幂等处理
            String orderId = message.getKeys();
            // 方法2: 从消息body体重解析出订单号
            String orderJSON = new String(messageExt.getBody(), "UTF-8");
            OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
            String orderId = orderPO.getId();
            // TODO 业务处理逻辑
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

4 幂等策略

1 业务状态机判断

为了保证幂等,一定要做业务逻辑判断,笔者认为这是保证幂等的首要条件

笔者曾经服务于神州专车,乘客在用户端点击立即叫车,订单服务创建订单,首先保存到数据库后,然后将订单信息同步保存到缓存中。

在订单的载客生命周期里,订单的修改操作先修改缓存,然后发送消息到 MetaQ ,订单落盘服务消费消息,并判断订单信息是否正常(比如有无乱序),若订单数据无误,则存储到数据库中。

订单状态机按顺序分别是:创建已分配司机司机已出发司机已到达司机已接到乘客已到达

这种设计是为了快速提升系统性能,由于网络问题有非常小的概率,消费者会收到乱序的消息。

当订单状态是司机已到达时,消费者可能会收到司机已出发的消息,也就是先发的消息因为网络原因被延迟消费了。

此时,消费者需要判断当前的专车订单状态机,保存最合理的订单数据,就可以忽略旧的消息,打印相关日志即可。

2 全局处理标识

1 数据库去重表

数据库去重表有两个要点 :

  1. 操作之前先从去重表中通过唯一业务标识查询记录是否存在,若不存在,则进行后续消费流程 ;
  2. 为了避免并发场景,去重表需要包含业务唯一键 uniqueKey , 这样就算并发插入也不可能插入多条,插入失败后,抛异常。

举一个电商场景的例子:用户购物车结算时,系统会创建支付订单。用户支付成功后支付订单的状态会由未支付修改为支付成功,然后系统给用户增加积分。

我们可以使用 RocketMQ 事务消息的方案,该方案能够发挥 MQ 的优势:异步解耦,以及事务的最终一致性的特性。

在消费监听器逻辑里,幂等非常重要 。积分表 SQL 如下:

CREATE TABLE `t_points` (
  `id` bigint(20) NOT NULL COMMENT '主键',
  `user_id` bigint(20) NOT NULL COMMENT '用户id',
  `order_id` bigint(20) NOT NULL COMMENT '订单编号',
  `points` int(4) NOT NULL COMMENT '积分',
  `remarks` varchar(128) COLLATE utf8mb4_bin NOT NULL COMMENT '备注',
  `create_time` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `unique_order_Id` (`order_id`) USING BTREE COMMENT '订单唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

当收到订单信息后,首先判断该订单是否有积分记录,若没有记录,才插入积分记录。

就算出现极端并发场景下,订单编号也是唯一键,数据库中也必然不会存在相同订单的多条积分记录。

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
            String orderJSON = new String(messageExt.getBody(), "UTF-8");
            logger.info("orderJSON:" + orderJSON);
            OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
            // 首先查询是否处理完成
            PointsPO pointsPO = pointsMapper.getByOrderId(orderPO.getId());
            if (pointsPO == null) {
                Long id = SnowFlakeIdGenerator.getUniqueId(1023, 0);
                pointsPO = new PointsPO();
                pointsPO.setId(id);
                pointsPO.setOrderId(orderPO.getId());
                pointsPO.setUserId(orderPO.getUserId());
                // 添加积分数 30
                pointsPO.setPoints(30);
                pointsPO.setCreateTime(new Date());
                pointsPO.setRemarks("添加积分数 30");
                pointsMapper.insert(pointsPO);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

2 Redis处理标志位

在消费者接收到消息后,首先判断 Redis 中是否存在该业务主键的标志位,若存在标志位,则认为消费成功,否则,则执行业务逻辑,执行完成后,在缓存中添加标志位。

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String bizKey = messageExt.getKeys(); // 唯一业务主键
           //1. 判断是否存在标志
           if(redisTemplate.hasKey(RedisKeyConstants.WAITING_SEND_LOCK + bizKey)) {
         			continue;
       		 }
         	 //2. 执行业务逻辑
           //TODO do business
           //3. 设置标志位
           redisTemplate.opsForValue().set(RedisKeyConstants.WAITING_SEND_LOCK + bizKey, "1", 72, TimeUnit.HOURS);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

3 分布式锁

仅仅有业务逻辑判断是不够的,为了应对并发场景,我们可以使用分布式锁

分布式锁一般有三种方案:

  • 数据库乐观锁
  • 数据库悲观锁
  • Redis 锁

1 数据库乐观锁

数据乐观锁假设认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做。

由于乐观锁没有了锁等待,提高了吞吐量,所以乐观锁适合读多写少的场景。

实现乐观锁:一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数,当数据被修改时,version 值会加一。

当线程 A 要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的 version 值为当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功。

步骤 1 : 查询出条目数据

select version from my_table where id = #{id}

步骤 2 :修改条目数据,传递版本参数

update  my_table set n = n + 1, version = version + 1 where id=#{id} and version = #{version};

从乐观锁的实现角度来讲,乐观锁非常容易实现,但它有两个缺点:

  • 对业务的侵入性,添加版本字段;
  • 高并发场景下,只有一个线程可以修改成功,那么就会存在大量的失败

消费端演示代码如下:

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String orderJSON = new String(messageExt.getBody(), "UTF-8");
           OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
           Long version = orderMapper.selectVersionByOrderId(orderPO.getId()); //版本
           orderPO.setVersion(version);
           // 对应 SQL:update t_order t set version = version + 1 , status = #{status} where id = #{id} 
           // and version = #{version}
           int affectedCount = orderMapper.updateOrder(orderPO);
           if(affectedCount == 0) {
              return ConsumeConcurrentlyStatus.RECONSUME_LATER;
           }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

2 数据库悲观锁

当我们要对一个数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。

这种借助数据库锁机制在修改数据之前先锁定,再修改的方式被称之为悲观并发控制(又名“悲观锁”,Pessimistic Concurrency Control,缩写“PCC”)。

之所以叫做悲观锁,是因为这是一种对数据的修改抱有悲观态度的并发控制方式。我们一般认为数据被并发修改的概率比较大,所以需要在修改之前先加锁。

悲观并发控制实际上是“先取锁再访问”的保守策略为数据处理的安全提供了保证

MySQL 悲观锁的使用方法如下:

begin;

-- 读取数据并加锁
select ... for update;

-- 修改数据
update ...;

commit;

例如,以下代码将读取 t_order 表中 id 为 1 的记录,并将该记录的 status 字段修改为 3

begin;

select * from t_order where id = 1 for update;

update t_order set status = '3' where id = 1;

commit;

如果 t_order 表中 id 为 1 的记录正在被其他事务修改,则上述代码会等待该记录被释放锁后才能继续执行。

消费端演示代码如下:

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String orderJSON = new String(messageExt.getBody(), "UTF-8");
           OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
           Long orderId = orderPo.getId();
           //调用service的修改订单信息,该方法事务加锁, 当修改订单记录时,该其他线程会等待该记录被释放才能继续执行
           orderService.updateOrderForUpdate(orderId ,orderPO);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

3 Redis锁

使用数据库锁是非常重的一个操作,我们可以使用更轻量级的 Redis 锁来替换,因为 Redis 性能高,同时有非常丰富的生态(类库)支持不同类型的分布式锁。

我们选择 Redisson 框架提供的分布式锁功能,简化的示例代码如下:

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String orderJSON = new String(messageExt.getBody(), "UTF-8");
           OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
           Long orderId = orderPo.getId();
           RLock lock = redissonClient.getLock("order-lock-" + orderId);
           rLock.lock(10, TimeUnit.SECONDS);
           // TODO 业务逻辑
           rLock.unlock();
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

5 总结

这篇文章,我们详细剖析了如何实现 RocketMQ 消费幂等。

1、消费幂等:当出现 RocketMQ 消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。

2、适用场景:发送时消息重复、投递时消息重复、负载均衡时消息重复

3、业务唯一标识:以业务唯一标识作为幂等处理的关键依据,消息必须携带业务唯一标识。

4、幂等策略:业务逻辑代码中需要判断业务状态机,同时根据实际条件选择全局处理标识分布式锁两种方式处理。


如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

标签:orderPO,version,队列,订单,ConsumeConcurrentlyStatus,消息,讲透,id,RocketMQ
From: https://www.cnblogs.com/makemylife/p/17910944.html

相关文章

  • 解锁RocketMQ秘籍:如何保障消息顺序性?
    嗨,小伙伴们!小米在这里啦!今天我们要聊的话题是社招面试中一个经典而又百思不得其解的问题——“RocketMQ如何保证顺序性?”不用担心,小米来给你揭秘RocketMQ的秘密武器,让你轻松过关面试大关!引言:为什么要谈顺序性?首先,我们得明白为什么在消息队列中要讲究消息的顺序性。假设你正在开发一......
  • 消息队列和事件循环
    每个渲染进程都有一个主线程,并且主线程非常繁忙,既要处理DOM,又要计算样式,还要处理布局,同时还需要处理JavaScript任务以及各种输入事件。要让这么多不同类型的任务在主线程中有条不紊地执行,这就需要一个系统来统筹调度这些任务,这个统筹调度系统就是消息队列和事件循环系统。但并不......
  • 宝藏知识!一篇文讲透关于站点SEO的事
    在当今数字化时代,拥有一个强大的网站对于企业和个人而言至关重要。然而,仅仅拥有一个漂亮的网站还不够,它还需要被搜索引擎充分认可和展示。这就是为什么站点SEO(搜索引擎优化)变得如此重要的原因。站点SEO是一系列技术和策略,旨在提升一个网站在搜索引擎结果页面(SERPs)中的排名。通过优......
  • 【合并排序链表】分治/优先队列
    合并两个排序链表模拟维护一个合并链表,每次添加两个排序链表中较小val的节点即可模拟代码publicListNodemergeTwo(ListNodea,ListNodeb){if(a==null)returnb;if(b==null)returna;ListNodeans=newListNode(0);Lis......
  • 记录rabbitMQ的广播队列的错误使用导致未能正确广播的问题
    背景说明:有3个服务S1、S2、S3现在服务S1需要发布消息到广播交换机E,并建立了两个普通队列Q1,Q2,将其绑定到广播交换机E上服务S2和服务S3同时监听队列Q1,Q2本意是,服务S1通过广播交换机E把消息同时推送给服务S2和S3后面测试时,同事发现,服务S2和服务S3都只接收到了部分消息,而不是全......
  • 清空ActiveMQ中的Scheduled延时队列
    要清空ActiveMQ中的Scheduled延时队列,可以执行以下步骤:停止ActiveMQ服务器。在ActiveMQ数据存储目录中找到存储延时消息的目录。该目录的默认位置是<activemq_home>/data/localhost/Scheduled.删除该目录下的所有文件,这将清空延时队列中的消息。启动ActiveMQ服务器。请注意......
  • Java中的消息队列(MQ)应用实践
    摘要:本文将介绍Java中消息队列(MQ)的概念、应用场景以及如何使用Java中的消息队列进行实践。我们将探讨如何使用Java消息队列实现异步通信、解耦和流量削峰等常见需求,并通过实际案例展示其应用。一、引言在分布式系统中,消息队列(MQ)是一种常见的中间件技术,用于实现异步通信和解耦。通过......
  • 阻塞队列linkedBlockQuene和syncroBlockQuene的区别?
    在Java中,LinkedBlockingQueue和SynchronousQueue是两种不同类型的阻塞队列,它们有一些关键的区别:实现机制:LinkedBlockingQueue使用一个链表实现的有界或无界队列。有界队列的容量是固定的,而无界队列的容量理论上是无限的。SynchronousQueue是一个特殊的阻塞队列,它在内部......
  • 【一个队列实现栈】Java队列——Queue接口-LinkedList实现类
    leetcode225.用队列实现栈题意:用一个队列实现栈题解:(1)弹栈:将队头开始的前size()-1个元素全部出队然后重新入队,使队尾元素循环到队头,然后弹出(2)获取栈顶元素:先将队头开始的前size()-1个元素全部出队然后重新入队,使队尾元素循环到队头,此时队头元素即为栈顶元素;然后再重新循环siz......
  • 【双栈实现队列】Java——Stack类
    leetcode232.用栈实现队列题意:双栈实现队列;要求每个入队、出队操作均摊O(1)复杂度题解:用一个栈in维护入队元素,另一个栈out维护出队元素出队或取队头元素:首先判断栈out是否为空,如果为空,将栈in中的元素pop()到栈out中,那么栈out栈顶元素即为原队列队头元素。(米奇妙妙屋啊~)判断......