实现消费券秒杀的优化,在加入限时抢购的优惠券时,自动的将消费券的库存stock信息也加入到redis中(可设为抢购结束后过期)
抢购之前在redis中进行库存是否充足(stock)、用户是否已经抢购(set)的判断
如果条件都满足,则将订单信息加入到消息队列中
另开启一个线程将消息队列中订单信息异步地同步到数据库中,这样就缓解了直接写数据库的压力,新开启的线程可以根据数据库适应的速度进行写操作
异步秒杀业务流程
说明:
Lua脚本保证一些操作在Redis执行的原子性。直接由redis中的eva "lua script" [keys] [args]
命令调用
在一开始直接将优惠券的信息直接读到redis中,然后利用lua脚本直接在redis中进行用户抢购资格判断(避免了每次抢购都要直接读数据库进行判断)
根据lua脚本的返回值判断用户是否符合资格,如果符合资格则加入到消息队列(或阻塞队列)中
三种redis实现消息队列的思路
基于list实现消息队列
redis中的list数据结构是一个双向链表,可以利用LPUSH和RPOP、RPUSH和LPOP组合实现消息队列
队列中没有消息再pop会直接返回null,可以使用BLPOP、BRPOP来达到阻塞等待pop的效果
优点:基于redis存储不受限于jvm内存上限、基于redis持久化机制数据安全性有保证、可满足消息有序性
缺点:无法避免消息丢失(一个消费者获取消息后出现异常)、只适用于单消费者模式(只能pop一次)
基于PubSub实现消息队列
常用的命令
subscribe channel [channel] #消费者订阅频道
publish [channel] [msg] #生产者向频道发送消息
psubscribe pattern[pattern] #订阅和pattern格式相同的频道 通配符?、*、[]、\
优点:支持多生产者、多消费者模式
缺点:不支持持久化、无法避免消息丢失(无人订阅时消息一发送就丢失)、消息堆积有上限超出时消息丢失
基于Stream消费组实现消息队列
redis5.0引入的一种数据类型(可持久化),可以实现一个功能非常完善的消息队列
单消费者模式
优点:消息可回溯、一个消息可被多个消费者获取、可阻塞读取
缺点:任然存在消息漏读的风险
消费者组模式
xgroup create [key] [groupname] [id] [mkstream]
#key代表队列名称、id为消息索引$为最后一个消息,0为第一个消息、mkstream自动创建队列(可选)
xgroup destroy [key] [groupname]
#删除指定的消费者组
xgroup createconsumer [key] [groupname] [consumername]
#给key消息队列的groupname组中添加消费者consumername
xgroup delconsumer [key] [groupname] [consumername]
#将key消息队列的groupname组中消费者consumername删除
是redis中最为完善的消息队列:消息可回溯、可以多消费者争抢消息,加快消费速度、可以阻塞读取、没有漏读风险、消息确认机制,每个消息至少被消费
基于Stream消息队列异步秒杀的代码实现
新增优惠券信息时将库存信息写入redis中{“voucher:stock:voucherId” : “stockValue”}
创建一个Stream类型的消息队列,名为“stream.orders”。redis创建命令:XGROUP CREATE stream.orders g1 0 MKSTREAM
修改秒杀脚本,用户在redis中获取到抢购资格后直接在redis中将订单信息加入到消息队列中
lua脚本seckill.lua
判断用户是否有购买资格,如果有则将订单信息加入到消息队列中
-- 1、参数列表
-- 1.1、优惠券id
local voucherId = ARGV[1]
-- 1.2、用户id
local userId = ARGV[2]
-- 1.3、订单id
local orderId = ARGV[3]
-- 2、数据key
-- 1.1、库存key
local stockKey = "seckill:stock:" .. voucherId
-- 1.2、订单key
local orderKey = "seckill:order:" .. voucherId
-- 3、脚本业务
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.1、判断库存是否充足,如果不足返回1
return 1
end
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.2、判断用户是否下过单,如果下过单返回2
return 2
end
-- 4、扣减库存
redis.call('incrby', stockKey, -1)
-- 5、将userId存入订单的set集合,返回0
redis.call('sadd',orderKey, userId)
-- 6、发送消息到消息队列中 xadd stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
静态加载lua脚本
//静态加载lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
//resource/seckill.lua
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
//lua脚本执行后的返回类型
SECKILL_SCRIPT.setResultType(Long.class);
}
秒杀的主业务流程
SeckillServiceImpl.seckillVoucher2()
public Result seckillVoucher2(Long voucherId){
UserDTO user = UserHolder.getUser();
long orderId = redisIdWorker.nextId("order");
//1、执行lua脚本
Long returnValue = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
user.getId().toString(),
String.valueOf(orderId)
);
//2、判断结果是否为0
//3、不为0,没有资格购买
int result = returnValue.intValue();
if(result != 0){
return Result.fail(result == 1 ? "库存不足" : "仅限抢购一单");
}
//4、如果有购买资格的话在lua脚本中就将订单加入到消息队列到中了
return Result.ok(orderId);
}
在项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,异步的将消息队列中未处理的订单信息写入到数据库
创建一个专门处理处理消息队列的内部类SeckillServiceImpl.VoucherOrderHandler.java
private static final String STREAM_ORDER_MQ = "stream.orders";
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct //在项目启动时,开启一个线程任务
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
//专门处理将订单信息写入数据库的线程
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
try {
//1、获取消息队列中的信息
//xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >
List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().block(Duration.ofSeconds(2)).count(1),
StreamOffset.create(STREAM_ORDER_MQ, ReadOffset.lastConsumed())
);
//2判断获取消息是否成功
if(records == null || records.isEmpty()){
//3、如果失败,说明没有消息,继续循环
continue;
}
//String 为消息的id, <Object, Object>存的是消息队列中的键值对
MapRecord<String, Object, Object> record = records.get(0);
Map<Object, Object> map = record.getValue();
//4、如果获取成功进行下单
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
handleCreateOrder(voucherOrder);
//5、在消息队列中对此条消息进行确认
stringRedisTemplate.opsForStream().acknowledge(STREAM_ORDER_MQ, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理错误", e);
//当出现异常时,从pendingList获取未处理的消息继续处理
handlePendingList();
}
}
}
//发生异常时进行处理
private void handlePendingList(){
while(true){
try {
//1、获取pending-list中的异常消息
//xreadgroup group g1 c1 count 1 block 2000 streams stream.orders 0
List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(STREAM_ORDER_MQ, ReadOffset.from("0"))
);
//2判断是否有异常消息
if(records == null || records.isEmpty()){
//3、如果没有跳出循环,执行正常消息
break;
}
//String 为消息的id, <Object, Object>存的是消息队列中的键值对
MapRecord<String, Object, Object> record = records.get(0);
Map<Object, Object> map = record.getValue();
//4、如果获取成功进行下单
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
handleCreateOrder(voucherOrder);
//5、在消息队列中对此条消息进行确认
stringRedisTemplate.opsForStream().acknowledge(STREAM_ORDER_MQ, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理错误", e);
//如果又出现异常,进入下一轮循环
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
private void handleCreateOrder(VoucherOrder voucherOrder) {
voucherOrderService.save(voucherOrder);
}
}
标签:异步,队列,redis,lua,消息,key,--
From: https://www.cnblogs.com/Gw-CodingWorld/p/16816205.html