上一章:《RocketMq详解:四、RocketMq消息的重试》
文章目录
在学习本章之前,我们先简单谈一谈为什么要解决幂等以及什么是幂等性
1 什么是幂等
在数学计算或者计算机科学中,幂等性(idempotence)是指相同操作或资源在一次或多次请求中具有同样效果的作用。
幂等性在分布式系统设计中具有十分重要的地位。
HTTP 幂等方法,是指无论调用多少次都不会有不同结果的 HTTP 方法。不管你调用一次,还是调用一百次,一千次,结果都是相同的。
什么意思呢?
举个最简单的例子,在我们的接口开发时,如果我们使用get请求获取用户信息,不管我们调用多少次接口,其结果都不会改变,所以是幂等,但是我们调用post或者update请求更新用户信息时,每一次调用都会带来不一样的结果,此时就需要做幂等性的控制
可能这些场景太过于抽象,我举一个具体的例子:
支付场景下,消费者消费扣款消息,对一笔订单进行扣款操作,该扣款操作需要扣除10元。
这个扣款操作重复多次与执行一次的效果相同,只进行一次真实扣款,用户的扣款记录中对应该笔订单的只有一条扣款流水。不会多扣。那么我们就说这个扣款操作是符合要求的,这个消费过程是消息幂等的。
读到此处,大家可能知道了为什么要控制幂等,那么什么场景下需要进行幂等处理
2 需要进行消息幂等的场景
需要进行幂等的场景,本质就是为了解决rocketMq至少保证消费一次的策略,也就是消息的重复,这里的重复主要包含三个方面:
1.发送时重复:
生产者发送消息时,消息成功投递到broker,但此时发生网络闪断或者生产者down掉,导致broker发送ACK失败。
此时生产者由于未能收到消息发送响应,认为发送失败,因此尝试重新发送消息到broker。当消息发送成功后,在broker中就会存在两条相同内容的消息,最终消费者会拉取到两条内容一样并且Message ID也相同的消息。因此造成了消息的重复。
2.消费时重复:
消费消息时同样会出现重复消费的情况。
当消费者在处理业务完成返回消费状态给broker时,由于网络闪断等异常情况导致未能将消费完成的CONSUME_SUCCESS状态返回给broker。broker为了保证消息被至少消费一次的语义,会在网络环境恢复之后再次投递该条被处理的消息,最终造成消费者多次收到内容一样并且Message ID也相同的消息,造成了消息的重复。
可以看到,无论是发送时重复还是消费时重复,最终的效果均为消费者消费时收到了重复的消息,那么我们就知道:只需要在消费者端统一进行幂等处理就能够实现消息幂等。
3.负载均衡时消息重复:
包括但不限于网络抖动、Broker 重启以及订阅方应用重启
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
3.如何才能实现消息幂等呢
首先我们要定义消息幂等的两要素:
- 幂等令牌
- 处理唯一性的确保
我们必须保证存在幂等令牌的情况下保证业务处理结果的唯一性,才认为幂等实现是成功的。
接下来分别解释这两个要素
1.幂等令牌
幂等令牌是生产者和消费者两者中的既定协议,在业务中通常是具备唯一业务标识的字符串,如:下单场景使用订单号、支付场景使用支付流水号等。且一般由生产者端生成并传递给消费者端。
之所以不简单的采用MESSAGE ID作为幂等令牌,是因为Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理 ,不建议以 Message ID 作为处理依据。
最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置。
2.处理唯一性的确保
即服务端应当采用一定的策略保证同一个业务逻辑一定不会重复执行成功多次。如:使用支付宝进行支付,买一个产品支付多次只会成功一笔。
较为常用的方式是采用缓存去重并且通过对业务标识添加数据库的唯一索引实现幂等。
具体的思路为:如支付场景下,支付的发起端生成了一个支付流水号,服务端处理该支付请求成功后,数据持久化成功。由于表中对支付流水添加了唯一索引,因此当重复支付时会因为唯一索引的存在报错 duplicate entry,服务端的业务逻辑捕获该异常并返回调用侧“重复支付”提示。这样就不会重复扣款。
在上面场景的基础上,我们还可以引入Redis等缓存组件实现去重:当支付请求打到服务端,首先去缓存进行判断,根据key=“支付流水号”去get存储的值,如果返回为空,表明是首次进行支付操作同时将当前的支付流水号作为key、value可以为任意字符串通过set(key,value,expireTime)存储在redis中。
当重复的支付请求到来时,尝试进行get(支付流水号)操作,这个操作会命中缓存,因此我们可以认为该请求是重复的支付请求,服务端业务将重复支付的业务提示返回给请求方。
由于我们一般都会在缓存使用过程中设置过期时间,缓存可能会失效从而导致请求穿透到持久化存储中(如:MySQL)。因此不能因为引入缓存而放弃使用唯一索引,将二者结合在一起是一个比较好的方案。
4.RocketMQ场景下如何处理消息幂等
了解了两个要素及典型案例之后,我们回到消息消费的场景。
作为一款高性能的消息中间件,RocketMQ能够保证消息不丢失但不保证消息不重复。
如果在RocketMQ中实现消息去重实际也是可以的,但是考虑到高可用以及高性能的需求,如果做了服务端的消息去重,RocketMQ就需要对消息做额外的rehash、排序等操作,这会花费较大的时间和空间等资源代价,收益并不明显。
RocketMQ考虑到正常情况下出现重复消息的概率其实是很小的,因此RocketMQ将消息幂等操作交给了业务方处理。
实际上上述问题的本质在于:网络调用本身存在不确定性,也就是既不成功也不失败的第三种状态,即所谓的 处理中 状态,因此会有重复的情况发生。
这个问题是很多其他的MQ产品同样会遇到的,通常的方法就是要求消费方在消费消息时进行去重,也就是本文我们说的消费幂等性。
对RocketMQ有一定使用经验的读者可能注意到,每条消息都有一个MessageID,那么我们能否使用该ID作为去重依据,也就是上面提到的幂等令牌呢?
答案是否定的,因为MessageID可能出现冲突的情况,因此不建议通过MessageID作为处理依据而应当使用业务唯一标识如:订单号、流水号等作为幂等处理的关键依据。
上面也提到了,幂等依据应当由消息生产者生成,在发送消息时候,我们能够通过消息的key设置该id
,当消息消费者收到该消息时,根据该消息的key做幂等处理。
PS:如果你觉得每次都需要在生产者侧setkey,在消费者侧getkey,有点繁琐。也可以将该幂等依据设置在消息协议中,消费者接收到消息后解析该id进行幂等操作也是可以的。只需要消息的生产者和消费者约定好如何解析id的协议即可。
4.1 消费端常见的幂等操作
1.使用唯一标识符
为每条消息生成一个唯一的标识符,例如 UUID 或者基于业务逻辑的唯一键。
在消费端,接收到消息后,首先根据这个唯一标识符检查是否已经处理过该消息。如果已经处理过,则直接忽略,否则进行处理并将标识符记录下来(可以存储在数据库、缓存等)。
例如,假设消息中包含一个订单号作为唯一标识符,消费者在处理消息前先查询数据库中是否存在该订单号的处理记录,如果存在则跳过处理。
举一个电商场景的例子:用户购物车结算时,系统会创建支付订单。用户支付成功后支付订单的状态会由未支付修改为支付成功,然后系统给用户增加积分。
我们可以使用 RocketMQ 事务消息的方案,该方案能够发挥 MQ 的优势:异步和解耦,以及事务的最终一致性的特性。
在消费监听器逻辑里,幂等非常重要 。
积分表 SQL 如下:
CREATE TABLE `t_points` ( `id` bigint(20) NOT NULL COMMENT '主键', `user_id` bigint(20) NOT NULL COMMENT '用户id', `order_id` bigint(20) NOT NULL COMMENT '订单编号', `points` int(4) NOT NULL COMMENT '积分', `remarks` varchar(128) COLLATE utf8mb4_bin NOT NULL COMMENT '备注', `create_time` datetime NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `unique_order_Id` (`order_id`) USING BTREE COMMENT '订单唯一') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
当收到订单信息后,首先判断该订单是否有积分记录,若没有记录,才插入积分记录。
就算出现极端并发场景下,订单编号也是唯一键,数据库中也必然不会存在相同订单的多条积分记录。
2.Redis处理标志位
在消费者接收到消息后,首先判断 Redis 中是否存在该业务主键的标志位,若存在标志位,则认为消费成功,否则,则执行业务逻辑,执行完成后,在缓存中添加标志位。
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String bizKey = messageExt.getKeys(); // 唯一业务主键
// 1. 判断是否存在标志
if (redisTemplate.hasKey(RedisKeyConstants.WAITING_SEND_LOCK + bizKey)) {
continue;
}
//2. 执行业务逻辑
// TODO do business
// 3. 设置标志位
redisTemplate.opsForValue().set(RedisKeyConstants.WAITING_SEND_LOCK + bizKey, "1", 72, TimeUnit.HOURS);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
3.分布式锁
仅仅有业务逻辑判断是不够的,为了应对并发场景,我们可以使用分布式锁。
分布式锁一般有三种方案:
- 数据库乐观锁
- 数据库悲观锁
- Redis 锁
3.1 数据库乐观锁
数据乐观锁假设认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做。
由于乐观锁没有了锁等待,提高了吞吐量,所以乐观锁适合读多写少的场景。
实现乐观锁:一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数,当数据被修改时,version 值会加一。
当线程 A 要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的 version 值为当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功。
步骤 1 : 查询出条目数据
select version from my_table where id = #{id}
步骤 2 :修改条目数据,传递版本参数
update my_table set n = n + 1, version = version + 1 where id=#{id} and version = #{version};
从乐观锁的实现角度来讲,乐观锁非常容易实现,但它有两个缺点:
- 对业务的侵入性,添加版本字段;
- 高并发场景下,只有一个线程可以修改成功,那么就会存在大量的失败 。
消费端演示代码如下:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
Long version = orderMapper.selectVersionByOrderId(orderPO.getId());//版本
orderPO.setVersion(version);
// 对应 SQL:update t_order t set version = version + 1 , status = #{status} where id = #{id} and version = #{version}
int affectedCount = orderMapper.updateOrder(orderPO);
if (affectedCount == 0) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
3.2 数据库悲观锁
当我们要对一个数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。
这种借助数据库锁机制在修改数据之前先锁定,再修改的方式被称之为悲观并发控制(又名“悲观锁”,Pessimistic Concurrency Control,缩写“PCC”)。
之所以叫做悲观锁,是因为这是一种对数据的修改抱有悲观态度的并发控制方式。我们一般认为数据被并发修改的概率比较大,所以需要在修改之前先加锁。
悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。
MySQL 悲观锁的使用方法如下:
begin;
-- 读取数据并加锁
select ... for update;
-- 修改数据
update ...;commit;
例如,以下代码将读取 t_order 表中 id 为 1 的记录,并将该记录的 status 字段修改为 3:
begin;
select * from t_order where id = 1 for update;
update t_order set status = '3' where id = 1;
commit;
如果 t_order 表中 id 为 1 的记录正在被其他事务修改,则上述代码会等待该记录被释放锁后才能继续执行。
消费端演示代码如下:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
Long orderId = orderPo.getId();
//调用service的修改订单信息,该方法事务加锁, 当修改订单记录时,该其他线程会等待该记录被释放才能继续执行
orderService.updateOrderForUpdate(orderId, orderPO);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
3.3 Redis锁
使用数据库锁是非常重的一个操作,我们可以使用更轻量级的 Redis 锁来替换,因为 Redis 性能高,同时有非常丰富的生态(类库)支持不同类型的分布式锁。
我们选择 Redisson 框架提供的分布式锁功能,简化的示例代码如下:
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
Long orderId = orderPo.getId();
RLock lock = redissonClient.getLock("order-lock-" + orderId);
rLock.lock(10, TimeUnit.SECONDS);
// TODO 业务逻辑
rLock.unlock();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch(Exception e){
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
上面我们提供了多个幂等的实现方案,下面我们使用SpringBoot结合Aop+Redis锁实现一个通用、主流的消息的幂等解决方案
4.2 SpringBoot结合Aop+Redis锁实现消息的幂等
在实现后续方案之前,请先跟随《RocketMq通用生产和消费方法改造》完成基础的消息发送和消费
1.新增自定义注解用于Aop拦截
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Idempotent {
}
2.新增切面相关信息
2.1 添加maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
2.2 在启动类添加注解
@SpringBootApplication
@EnableAspectJAutoProxy(proxyTargetClass = false, exposeProxy = true)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
注意:我们需要通过CGLIB代理(默认)方式进行实现,因为:
JDK动态代理需要我们代理的方法实现接口,但我们通用消费者的改造是通过继承抽象类实现
2.3 Aop拦截实现
@Aspect
@Component
public class IdempotentAspect {
@Resource
MessageService messageService;
@Around("@annotation(idempotent)")
public Object checkIdempotency(ProceedingJoinPoint joinPoint, Idempotent idempotent) throws Throwable {
MessageDTO messageData = this.getMessageData(joinPoint);
if (messageData == null) {
// 继续执行被拦截的方法
return joinPoint.proceed();
}
String idempotentKey = messageData.getTopic() + "-" + messageData.getKey();
if (messageService.isMessageProcessed(idempotentKey)) {
// 继续执行被拦截的方法
MsgRetryStatus rst = (MsgRetryStatus) joinPoint.proceed(joinPoint.getArgs());
if (MsgRetryStatus.SUCCEED.equals(rst)) {
messageService.markMessageAsProcessed(idempotentKey);
}
return rst;
} else {
return MsgRetryStatus.SUCCEED;
}
}
private MessageDTO getMessageData(ProceedingJoinPoint joinPoint) {
Object[] parameterValues = joinPoint.getArgs();
if (parameterValues.length == 0) {
return null;
}
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
if (method == null || StringUtils.isBlank(method.getName())) {
return null;
}
MessageDTO messageData = (MessageDTO) parameterValues[0];
if (messageData == null
|| StringUtils.isBlank(messageData.getTopic())
|| StringUtils.isBlank(messageData.getKey())) {
return null;
}
return messageData;
}
}
- messageService
@Service
public class MessageService {
/**
* 消息是否消费过
*
* @param idempotentKey
* @return
*/
public boolean isMessageProcessed(String idempotentKey) {
//添加过期时间防止死锁
return RedisLockUtil.lock(idempotentKey, TimeUnit.MINUTES.toSeconds(60));
}
/**
* 标记消息已经完成消息
*
* @param idempotentKey
*/
public void markMessageAsProcessed(String idempotentKey) {
RedisLockUtil.unLock(idempotentKey);
}
/**
* 获取幂等key
* @param messageDTO
* @return
*/
public String getIdempotentKey(MessageDTO messageDTO) {
if (messageDTO == null) {
return null;
}
return messageDTO.getTopic() + "-" + messageDTO.getKey();
}
}
- RedisLock工具类
//redis分布式锁
public final class RedisLockUtil {
private static final int defaultExpire = 60;//默认过期时间
private static final String REMOTEURL = "192.168.1.2";//redis远程连接
private static Jedis redis = new Jedis(REMOTEURL);//设置远程连接
private RedisLockUtil() {
//
}
/**
* 加锁
*
* @param key redis key
* @param expire 过期时间,单位秒
* @return true:加锁成功,false,加锁失败
*/
public static boolean lock(String key, long expire) {
long status = redis.setnx(key, "1");
if (status == 1) {
redis.expire(key, expire);
return true;
}
return false;
}
/**
* 解锁
*
* @param key
*/
public static void unLock(String key) {
redis.del(key);
}
}
3.通用消费逻辑改造
@Slf4j
@Service
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
@Resource
MessageProduct messageProduct;
public void onMessage(MessageDTO message) {
try {
// 处理消息的逻辑
log.info("收到延迟消息成功,消息体:{}", message);
// 使用AopContext.currentProxy()来获取当前代理对象
CommonConsumer proxy = (CommonConsumer) AopContext.currentProxy();
MsgRetryStatus msgRetryStatus = proxy.doConsumerProcess(message);
if (MsgRetryStatus.RETRY.equals(msgRetryStatus)
|| MsgRetryStatus.FAILURE.equals(msgRetryStatus)) {
this.doConsumerProcess(message);
}
} catch (Exception e) {
// 记录错误日志
log.error("消费异常,messageInfo:{}", JSON.toJSONString(message),e);
// 可以选择将失败消息发送到指定Topic
this.doRetryConsumerProcess(message);
}
}
@Idempotent
public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);
/**
* 消息重试
*
* @param messageDTO
*/
private void doRetryConsumerProcess(MessageDTO messageDTO) {
//此处的过期时间和最大重试次数可以放在配置中心(nacos or Apollo)
long retryTimes = RedisUtil.incr(getCacheKey(messageDTO), 60);
if (retryTimes > 3) {
log.warn("消息重试次数超过阈值,已重试:{}次,messageInfo:{}", retryTimes - 1, JSON.toJSONString(messageDTO));
return;
}
messageProduct.SendMessage(messageDTO, MQConsumer.BOOT_RETRY_MQ_CONSUMER_TOPIC, ThreadLocalRandom.current().nextInt(0, 3));
}
public String getCacheKey(MessageDTO messageDTO) {
if (messageDTO == null) {
return null;
}
return messageDTO.getTopic() + "-" + messageDTO.getKey();
}
}
注意:我们调用doConsumerProcess方法是采用的是:使用AopContext.currentProxy()来获取当前代理对象,通过代理对象的方式去调用内部方法,其原因是:CGLIB代理只有外部调用的方法才会被拦截。如果是内部方法调用(如一个bean中的方法调用另一个方法),则不会触发AOP拦截。
但是我们这样直接通过AopContext.currentProxy去获取当前代理对象,仍然无效,会产生以下类型的错误:
原因是:RocketMQ的消息消费是异步的,并且通常在一个单独的线程池中运行,这种情况下可能不会自动继承Spring AOP的上下文。
这意味着即使您启用了exposeProxy,这些线程也可能没有Spring AOP的上下文,解决这个问题的方案有很多,比如我们可以通过自定义线程池来实现,当然也可以使用ApplicationContext来获取当前代理对象,这样可以确保即使在不同的线程中,也可以正确地获取到代理对象并应用AOP切面,这种方式实现起来比较简单,所以我们使用此方案
@Slf4j
@Service
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
@Resource
MessageProduct messageProduct;
@Resource
ApplicationContext applicationContext;
public void onMessage(MessageDTO message) {
try {
// 处理消息的逻辑
log.info("收到延迟消息成功,消息体:{}", message);
// 使用AopContext.currentProxy()来获取当前代理对象
CommonConsumer proxy = applicationContext.getBean(this.getClass());
MsgRetryStatus msgRetryStatus = proxy.doConsumerProcess(message);
if (MsgRetryStatus.RETRY.equals(msgRetryStatus)
|| MsgRetryStatus.FAILURE.equals(msgRetryStatus)) {
this.doConsumerProcess(message);
}
} catch (Exception e) {
// 记录错误日志
log.error("消费异常,messageInfo:{}", JSON.toJSONString(message), e);
// 可以选择将失败消息发送到指定Topic
this.doRetryConsumerProcess(message);
}
}
public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);
/**
* 消息重试
*
* @param messageDTO
*/
private void doRetryConsumerProcess(MessageDTO messageDTO) {
//此处的过期时间和最大重试次数可以放在配置中心(nacos or Apollo)
long retryTimes = RedisUtil.incr(getCacheKey(messageDTO), 60);
if (retryTimes > 3) {
log.warn("消息重试次数超过阈值,已重试:{}次,messageInfo:{}", retryTimes - 1, JSON.toJSONString(messageDTO));
return;
}
messageProduct.SendMessage(messageDTO, MQConsumer.BOOT_RETRY_MQ_CONSUMER_TOPIC, ThreadLocalRandom.current().nextInt(0, 3));
}
public String getCacheKey(MessageDTO messageDTO) {
if (messageDTO == null) {
return null;
}
return messageDTO.getTopic() + "-" + messageDTO.getKey();
}
}
在我们的实现类上加入注解即可
@Service
@RocketMQMessageListener(topic = "boot-mq-topic", consumerGroup = "boot_group_1")
@Slf4j
public class BootMqConsumer extends CommonConsumer {
@Resource
MessageService messageService;
@Override
@Idempotent
public MsgRetryStatus doConsumerProcess(MessageDTO messageDTO) {
log.info("执行消费逻辑,topic:{}", messageDTO.getTopic());
return MsgRetryStatus.SUCCEED;
}
}
至此我们就完整的实现了消息的幂等
4.测试
@GetMapping("/send/msg4")
public String sendMsg4() {
try {
// 构建消息主体,此处可以用对象代替,为了方便演示,使用map
User user = User.builder()
.id(1)
.name("ninesun")
.build();
MessageDTO<User> messageDTO = MessageDTO.<User>builder()
.data(user)
.delayTime(3)
.topic("boot-mq-topic")
.key(String.valueOf(UUID.randomUUID()))
.build();
messageProduct.SendMessage(messageDTO);
} catch (Exception e) {
e.printStackTrace();
}
return "OK";
}
代码地址:https://gitee.com/ninesuntec/rocket-mq-learn-demo/tree/aop-Idempotent