OpenAPI 接口幂等实现
1、幂等性是啥?
进行一次接口调用与进行多次相同的接口调用都能得到与预期相符的结果。
通俗的讲,创建资源或更新资源的操作在多次调用后只生效一次。
2、什么情况会需要保证幂等性
比如,购物时的下单操作,如前端提交按钮未做并发、抖动控制,那么用户点击一次。可能因为某些原因导致 Http 请求了多次,这就会导致用户生成多个相同订单。
再有,在我们的分布式项目中,为了提高通行的可靠性,通信框架/MQ 可能会向数据服务推送多条相同的消息,如果不做幂等性控制,消息会被多次消费。
等等。。。
上述说了需要保证幂等性的场景,但我们实现幂等还要考虑下述条件:
- 如果服务接受了多个请求,且
幂等 token
和请求参数
完全一样,服务应该保证幂等直接返回相似数据。 - 如果服务接受了多个请求,且
幂等 token
和请求参数
不完全一样,服务应该拒绝幂等。
即:幂等 token 不一致直接拒绝幂等直接走正常逻辑;
幂等 token 一致但请求参数却不一致,我们返回 token 异常,也可以拒绝幂等。 - 不同用户之间的请求不能相互影响。
- 不同接口之间的请求不能相互影响。
即:不同接口不能被相同 token 影响。 - 更新接口不能使用缓存数据,需要特殊处理。
比如:客户端带了幂等 token
请求了会员续费接口,此时响应了新的会员过期时间,然后客户端又未携带了幂等 token
请求了会员续费接口,此时用户会员到期时间得到了更新,用户再次携带了幂等 token
进行请求,响应的缓存的相似数据就明显不对了。 - 这里为啥说更新不能缓存,而创建未提呢?因为大多数更新需要考虑缓存一致性问题,而创建本身就是从无到有的过程,一般无需考虑,但也要根据实际业务来进行判断,这里后续实现方案为:创建直接走缓存,更新为重新查库。
3、如何保证幂等性
这里提供一种无侵入的幂等处理方案,构建幂等表
。
![image-幂等实现流程](/Users/yijun.wen/Library/Application Support/typora-user-images/image-20221024155920677.png)
流程解析:
-
客户端请求时,为相关接口(所有创建资源的接口、部分更新接口)添加一个请求头参数:clientToken ,
clientToken 是一个由客户端生成的唯一的、大小写敏感、不超过64个 ASCII 字符的字符串。例如,clientToken=123e4567-e89b-12d3-a456-426655440000
clientToken 可以由服务端提供单独的接口生成。生成方式很多这里不做讨论。 -
服务端对相关接口做 AOP 切入,处理进行幂等判断、幂等记录、数据缓存。
-
依据 Redis 中 clientToken 的状态信息返回相似信息
4、具体实现
4.1 创建切入点注解类
/**
* 幂等注解
*
* @author Eajur.wen
* @version 1.0
* @date 2022-10-19 11:25:09
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface IdempotentAnnotation {
/**
* 是否缓存获取
*
* @return
*/
boolean cache() default true;
/**
* 需要特殊处理的接口标识
*
* @return
*/
String name() default "";
}
cache 默认为 true ,会缓存第一次响应数据,后续幂等的请求直接走缓存响应数据
name 为需要特殊处理接口的标识,在 cache 为 false 时,根据此标识做特殊处理
4.2 幂等 AOP 实现
/**
* 幂等 AOP 实现
*
* @author Eajur.wen
* @version 1.0
* @date 2022-10-19 11:26:45
*/
@Component
@Aspect
@Slf4j
public class IdempotentAspect2 {
public static final String CLIENT_TOKEN = "clientToken";
public static final String RENEWAL_NO_CACHE = "renewal";
public static final int CLIENT_TOKEN_MAX_LENGTH = 64;
public static final String CLIENT_TOKEN_KEY_PRE = "client:token:";
public static final String CLIENT_TOKEN_DATA_KEY_PRE = "client:token:data:";
public static final String CLIENT_TOKEN_DATA_ID_KEY_PRE = "client:token:data:id:";
public static final String CLIENT_TOKEN_DATA_ABSTRACT_KEY_PRE = "client:token:data:abstract:";
public static final long CLIENT_TOKEN_TIMEOUT_MINUTES = 5;
/**
* 请求中 处理中
*/
public static final int CLIENT_TOKEN_REQUEST_STATUS = 1;
public static final int CLIENT_TOKEN_SUCCESS_STATUS = 2;
@Autowired
private HttpServletRequest request;
@Autowired
private RedisTemplate redisTemplate;
@Pointcut("@annotation(com.eajur.idempotent.annotation.IdempotentAnnotation)")
public void pt() {
}
@Around("pt()")
public Object idempotent(ProceedingJoinPoint joinPoint) throws Throwable {
// 没有注解直接放行
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
IdempotentAnnotation annotation = method.getAnnotation(IdempotentAnnotation.class);
if (annotation == null) {
return joinPoint.proceed();
}
boolean cache = annotation.cache();
String clientToken = request.getHeader(CLIENT_TOKEN);
// 没有请求头直接放行
if (!StringUtils.hasText(clientToken)) {
return joinPoint.proceed();
}
// clientToken 不能过长
if (clientToken.length() > CLIENT_TOKEN_MAX_LENGTH) {
return new ViewData(ErrorCodeEnum.REPEATED_REQUEST_ERROR);
}
// 未登录接口暂不做幂等
Long memberId = SubjectUtil.getMemberId();
if (memberId == null) {
return joinPoint.proceed();
}
//获取参数名称和值
Map<String, Object> nameAndArgs = CommonUtil.getNameAndValue(joinPoint);
String jsonStr = JSONUtil.toJsonStr(nameAndArgs);
String abstractData = SmUtil.sm3(jsonStr);
// 记录请求 clientToken
String methodName = method.getName();
String baseKey = memberId + ":" + methodName + ":" + clientToken;
String key = CLIENT_TOKEN_KEY_PRE + baseKey;
String dataKey = CLIENT_TOKEN_DATA_KEY_PRE + baseKey;
String abstractKey = CLIENT_TOKEN_DATA_ABSTRACT_KEY_PRE + baseKey;
ValueOperations ops = redisTemplate.opsForValue();
Object flag = ops.getAndExpire(key, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (flag == null) {
ops.set(key, CLIENT_TOKEN_REQUEST_STATUS, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
Object proceed;
try {
proceed = joinPoint.proceed();
} catch (Throwable throwable) {
// 请求失败清除幂等信息
redisTemplate.delete(key);
throw throwable;
}
ops.set(key, CLIENT_TOKEN_SUCCESS_STATUS, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
ops.set(abstractKey, abstractData, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (cache) {
ops.set(dataKey, proceed, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
}
return proceed;
}
// 请求参数不一致不做幂等
Object oldAbstractData = ops.getAndExpire(abstractKey, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!abstractData.equals(oldAbstractData)) {
Object proceed;
try {
proceed = joinPoint.proceed();
} catch (Throwable throwable) {
// 请求失败清除幂等信息
redisTemplate.delete(key);
redisTemplate.delete(dataKey);
redisTemplate.delete(abstractKey);
throw throwable;
}
ops.set(key, CLIENT_TOKEN_SUCCESS_STATUS, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
ops.set(abstractKey, abstractData, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (cache) {
ops.set(dataKey, proceed, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
}
return proceed;
}
// 上次请求未完成
if (flag.equals(CLIENT_TOKEN_REQUEST_STATUS)) {
return new ViewData().error(ErrorCodeEnum.REPEATED_REQUEST_ERROR);
}
// 响应相似数据并刷新过期时间
if (flag.equals(CLIENT_TOKEN_SUCCESS_STATUS) && cache) {
Object data = ops.getAndExpire(dataKey, CLIENT_TOKEN_TIMEOUT_MINUTES, TimeUnit.MINUTES);
return data;
} else {
String name = annotation.name();
switch (name) {
case RENEWAL_NO_CACHE:
// 特殊处理 我在这的处理是直接查库获取最新数据返回
// 可以通过 CLIENT_TOKEN_DATA_ID_KEY_PRE 缓存主键信息,也可以根据上面的 nameAndArgs 做处理
return new ViewData();
default:
return joinPoint.proceed();
}
}
}
}