前言
大流量情况下的库存是老生常谈的问题了,在这里我整理一下mysql和redis应对扣除库存的方案,采用jmeter进行压测。
JMETER设置
库存初始值50,线程数量1000个,1秒以内启动全部,一个线程循环2次,共2000个请求
MySQL方案
初始方案
<update id="decreaseStock">
UPDATE stock
SET stock_num = stock_num - 1
WHERE id = #{id}
</update>
这种情况下,在并发条件肯定会出现超卖的
进行修改:
<update id="decreaseStock">
UPDATE stock
SET stock_num = stock_num - 1
WHERE id = #{id} AND stock_num >= 1
</update>
增加AND stock_num >= 1条件,即可避免超卖。
相关代码:
@PostMapping(value = "/decreaseStock/{id}")
public ResponseEntity<Object> decreaseStock(@PathVariable("id") Integer id) {
int result = stockService.decreaseStock(id);
return result == 1 ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
压测情况:
根据Throught可知一秒可以处理200个事务(TPS)
如果说系统的并发量不高,则可以以这种方案进行防止库存超卖,但要注意,在可重复读隔离级别情况下,如果where的条件字段没有索引的话,进行update语句会使整个表被锁住,如果这里使用的where条件不是主键id而是product_name,那么需要给这个字段加索引。
在RR可重复读隔离级别下,如果where条件没有命中索引,那么会基于next-key lock(记录锁和间隙锁的组合)对整个表的所有记录加上这个锁,进行全表扫描,这个时候其他记录想要更新就会被阻塞。
但是不一定是有了索引就不会锁住整个表,这是由优化器决定的,可以使用Explain语句来查看当前语句是走的索引还是全表扫描,如果优化器走的还是全标扫描,可以使用 force index([index_name])
强制使用某个索引。
改进
在MySQL情况下还能有其他方案来提升性能吗,在不借助Redis的情况(曾经面试招银网络被问了这道题)
我当时给出的回答是,把单个商品的库存比如50个库存,拆分成好几份,一份10个,5份库存,由于秒杀情况下流量很大,可以把这五份库存分别放到五个数据库里面,这样性能至少是原先方案的5倍,那么还会出现新的问题,就是有些问题,负载均衡上的问题,可能会出现某些库里还存在库存,但是请求却没有打进这个数据库,而是打到库存已经没有的数据库里面。我当时的想法是再搞个库存表,这个库存表采集各个商品的总库存以及商品在各个分库里面的库存数量,然后再写个服务,包含负载均衡的算法,将用户的请求平均打到各个分库去,当某个分库的库存达到0的时候,去通知该服务,服务将这个库剔除,使新的请求不会转发过去。实际这种情况也是存在问题的,高并发下库存为0的库来不及被剔除,也会导致请求被打到库存0的库。
Redis方案
将库存暂时放到Redis,然后从Redis进行库存扣减,能大大提升性能
压测结果:
可见性能几乎是MySQL的10倍了,但是这样子在Redis里面会导致超卖
要确保Redis不超买,需要先查询当前的数量,如果大于0则进行扣减,并且查询和扣减需要为原子性,这里就需要借助lua脚本,将这两次操作写到一起。
加了Lua脚本的代码:
private static final String LUA_DECRESE_STOCK_PATH = "lua/decreseStock.lua";
@PostMapping(value = "/decreaseStockByRedis/{id}")
public ResponseEntity<Object> decreaseStockByRedis(@PathVariable("id") Integer id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
redisScript.setResultType(Long.class);
// 执行Lua脚本
Long result = (Long) redisTemplate.execute(redisScript, Collections.singletonList(id));
// 返回结果判断
return (result != null && result == 1) ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
lua脚本放在resource/lua/decreseStock.lua
local key = KEYS[1]
-- 检查键是否存在
local exists = redis.call('EXISTS', key)
if exists == 1 then
-- 键存在,获取值
local value = redis.call('GET', key)
if tonumber(value) > 0 then
-- 如果值大于0,则递减
redis.call('DECR', key)
return 1 -- 表示递减成功
else
return 0 -- 表示递减失败,值不大于0
end
else
return -1 -- 表示递减失败,键不存在
end
Redis同步库存到MySQL
但是在Redis扣减了库存,总需要同步到MySQL里面
@PostMapping(value = "/decreaseStockByRedis/{id}")
public ResponseEntity<Object> decreaseStockByRedis(@PathVariable("id") Integer id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
redisScript.setResultType(Long.class);
// 执行Lua脚本
Long redisResult = (Long) redisTemplate.execute(redisScript, Collections.singletonList(id));
int dataBaselResult = 0;
if (redisResult == 1) {
dataBaselResult = stockService.decreaseStock(id);
}
// 返回结果判断
return (dataBaselResult == 1 && redisResult == 1) ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
直接按照上述代码来写,删Redis后同时将库存同步到MySQL,相当于使用了Redis性能又没有提升。
其实选择了Redis来进行库存扣减,那么MySQL的库存并不需要去实时进行更新,只需要库存达到最终一致性即可,即先对Redis的库存进行更新,然后再异步同步到MySQL的库存。
如果使用spring的异步线程来解决,会不会出现同步MySQL失败导致数据最终不一致呢,在流量很多的情况下,系统本身就处于压力大的情况,再使用异步线程会占用额外的资源,最好的方法是引入MQ,把库存的同步信息交给MQ,MQ再交到消费系统,进行减库存的操作,由MQ保证消息被消费,实现最终一致性。
部分代码如下,由MQ product发出,再由consumer进行消费:
private final DecreaseStockProduce decreaseStockProduce;
@PostMapping(value = "/decreaseStockByRedis/{id}")
public ResponseEntity<Object> decreaseStockByRedis(@PathVariable("id") String id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
redisScript.setResultType(Long.class);
// 执行Lua脚本
Long redisResult = (Long) redisTemplate.execute(redisScript, Collections.singletonList(id));
if (redisResult == 1) {
// 发送消息
try {
DecreaseStockEvent decreaseStockEvent = DecreaseStockEvent.builder()
.id(id)
.build();
SendResult sendResult = decreaseStockProduce.sendMessage(decreaseStockEvent);
if (!Objects.equals(sendResult.getSendStatus(), SendStatus.SEND_OK)) {
log.error("消息发送错误,请求参数:{}", id);
}
} catch (Exception e) {
log.error("消息发送错误,请求参数:{}", id, e);
}
}
// 返回结果判断
return (redisResult == 1) ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
MQ [TIMEOUT_CLEAN_QUEUE] broker busy问题
这里直接压测会报下面的错误,并且这个时候查看redis库存已经减到0,到是MySQL只减到了37
针对MQ [TIMEOUT_CLEAN_QUEUE] broker busy问题,需要去修改MQ的broker.conf文件
针对TIMEOUT_CLEAN_QUEUE broker busy问题,需要去修改MQ的broker.conf文件,上述的201ms超时了,我这里将等待时间改为400,并且将线程数设置为64,这个线程数可以根据实际压测情况进行调整。
# 发消息线程池数量
sendMessageThreadPoolNums=64
# 拉消息线程池数量
pullMessageThreadPoolNums=64
waitTimeMillsInSendQueue=400
现在再进行压测,发现tps能跑到1000,相比直接入库mysql的200已经是提升很大了。
虽然性能提高,也实现库存的同步,但这个性能下还是会存在一些问题:
比如MQ消息发送失败、或者MySQL库存扣减失败,并且实际情况还有订单的生成和库存之间的一致性也要考虑。
对于上述这些问题,可以查看我的另外一篇博客:
RocketMQ事务消息在订单创建和库存扣减的使用 - Scotyzh - 博客园 (cnblogs.com)
标签:库存,扣减,redis,redisScript,decreaseStock,ResponseEntity,mysql,new,id From: https://www.cnblogs.com/scottyzh/p/17972214