全局唯一ID
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显
- 受单表数据量限制
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足以下列特性:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
ID的组成部分:
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
工具类编写代码实现
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
/**
* @author xc
* @date 2023/4/26 14:59
*/
@Component
public class RedisIdWorker {
/**
* 开始时间戳
*/
private static final long BEGIN_TIMESTAMP = 1640995200L;
/**
* 左移位数,防止以后需要修改
*/
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix) {
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long end = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = end - BEGIN_TIMESTAMP;
// 2.生成序列号
// 2.1获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
Long increment = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3.拼接返回
return timestamp << COUNT_BITS | increment;
}
}
实现优惠券秒杀下单
下单时需要判断两点:
- 秒杀是否开始或者结束,如果尚未开始或已经结束则无法下单
- 库存是否充足
流程:
根据流程实现具体业务
@Resource
private ISeckillVoucherService iSeckillVoucherService;
@Resource
private IVoucherService iVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
if (voucherId == null || voucherId < 0) {
return Result.fail("请求id错误");
}
Voucher voucher = iVoucherService.getById(voucherId);
if (voucher == null) {
return Result.fail("当前优惠券不存在");
}
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
if (seckillVoucher == null) {
return Result.fail("秒杀优惠券不存在");
}
LocalDateTime beginTime = seckillVoucher.getBeginTime();
if (LocalDateTime.now().isBefore(beginTime)) {
return Result.fail("秒杀未开始");
}
LocalDateTime endTime = seckillVoucher.getEndTime();
if (LocalDateTime.now().isAfter(endTime)) {
return Result.fail("秒杀已结束");
}
int leftStock = seckillVoucher.getStock();
if (leftStock <= 0) {
return Result.fail("优惠券已被抢空");
}
boolean update = iSeckillVoucherService
.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.update();
if (!update) {
return Result.fail("服务器内部错误");
}
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
voucherOrder.setPayType(voucher.getType());
boolean save = this.save(voucherOrder);
if (!save) {
return Result.fail("服务器内部错误");
}
return Result.ok(orderId);
}
超卖问题
乐观锁
- 版本号法
使用CAS方法:
int leftStock = seckillVoucher.getStock();
if (leftStock <= 0) {
return Result.fail("优惠券已被抢空");
}
boolean update = iSeckillVoucherService
.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
// 更新的时候判断当前剩余库存量是否跟开始查询的时候相等
.eq("stock",leftStock)
.update();
弊端:
成功率很低
库存改为大于0
int leftStock = seckillVoucher.getStock();
if (leftStock <= 0) {
return Result.fail("优惠券已被抢空");
}
boolean update = iSeckillVoucherService
.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
// 更新的时候判断当前剩余库存量是否大于0
.gt("stock",0)
.update();
一人一单
在抢购前判断数据库是否存在已经的订单
// 查询秒杀优惠券,该用户是否已经抢到
int count = query().eq("user_id", userId).eq("voucher_id",voucherId).count();
if (count > 0) {
return Result.fail("您已经抢过了");
}
boolean update = iSeckillVoucherService
.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.eq("stock",leftStock)
.update();
if (!update) {
return Result.fail("服务器内部错误");
}
问题:
在多线程上会出现都走到代码第6行,然后再一起执行更新操作,就会出现一人多单情况
解决:
对操作进行加锁
@Override
public Result seckillVoucher(Long voucherId) {
if (voucherId == null || voucherId < 0) {
return Result.fail("请求id错误");
}
Voucher voucher = iVoucherService.getById(voucherId);
if (voucher == null) {
return Result.fail("当前优惠券不存在");
}
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
if (seckillVoucher == null) {
return Result.fail("秒杀优惠券不存在");
}
LocalDateTime beginTime = seckillVoucher.getBeginTime();
if (LocalDateTime.now().isBefore(beginTime)) {
return Result.fail("秒杀未开始");
}
LocalDateTime endTime = seckillVoucher.getEndTime();
if (LocalDateTime.now().isAfter(endTime)) {
return Result.fail("秒杀已结束");
}
int leftStock = seckillVoucher.getStock();
if (leftStock <= 0) {
return Result.fail("优惠券已被抢空");
}
return getResult(voucherId, voucher, leftStock);
}
@Transactional
public Result getResult(Long voucherId, Voucher voucher, int leftStock) {
Long userId = UserHolder.getUser().getId();
// 查询秒杀优惠券,该用户是否已经抢到
long orderId;
// 对相同用户并发请求加锁 new
// String类型也可能出现不同对象 new
// intern()的作用是返回字符串常量池中对象的地址 new
synchronized (userId.toString().intern()) {
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("您已经抢过了");
}
boolean update = iSeckillVoucherService
.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.eq("stock", leftStock)
.update();
if (!update) {
return Result.fail("服务器内部错误");
}
VoucherOrder voucherOrder = new VoucherOrder();
orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setPayType(voucher.getType());
boolean save = this.save(voucherOrder);
if (!save) {
return Result.fail("服务器内部错误");
}
}
return Result.ok(orderId);
}
}
此时还会出现一个问题:
- 当一个线程释放锁后,但是事务还没提交,那么还是会出现一人多单的情况,所以需要对整个方法调用进行加锁
@Override
public Result seckillVoucher(Long voucherId) {
if (voucherId == null || voucherId < 0) {
return Result.fail("请求id错误");
}
Voucher voucher = iVoucherService.getById(voucherId);
if (voucher == null) {
return Result.fail("当前优惠券不存在");
}
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
if (seckillVoucher == null) {
return Result.fail("秒杀优惠券不存在");
}
LocalDateTime beginTime = seckillVoucher.getBeginTime();
if (LocalDateTime.now().isBefore(beginTime)) {
return Result.fail("秒杀未开始");
}
LocalDateTime endTime = seckillVoucher.getEndTime();
if (LocalDateTime.now().isAfter(endTime)) {
return Result.fail("秒杀已结束");
}
int leftStock = seckillVoucher.getStock();
if (leftStock <= 0) {
return Result.fail("优惠券已被抢空");
}
Long userId = UserHolder.getUser().getId();
// new
synchronized (userId.toString().intern()) {
return getResult(voucherId, voucher, leftStock);
}
}
@Transactional
public Result getResult(Long voucherId, Voucher voucher, int leftStock) {
Long userId = UserHolder.getUser().getId();
// 查询秒杀优惠券,该用户是否已经抢到
long orderId;
// 对相同用户并发请求加锁
// String类型也可能出现不同对象
// intern()的作用是返回字符串常量池中对象的地址
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("您已经抢过了");
}
boolean update = iSeckillVoucherService
.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.eq("stock", leftStock)
.update();
if (!update) {
return Result.fail("服务器内部错误");
}
VoucherOrder voucherOrder = new VoucherOrder();
orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setPayType(voucher.getType());
boolean save = this.save(voucherOrder);
if (!save) {
return Result.fail("服务器内部错误");
}
return Result.ok(orderId);
}
对于spring事务管理熟悉的话,在seckillVoucher方法中调用有事务的getResult这个方法,会出现事务失效。因为相当于this.getResult,用的不是代理类。
解决方法:
@Override
public Result seckillVoucher(Long voucherId) {
if (voucherId == null || voucherId < 0) {
return Result.fail("请求id错误");
}
Voucher voucher = iVoucherService.getById(voucherId);
if (voucher == null) {
return Result.fail("当前优惠券不存在");
}
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
if (seckillVoucher == null) {
return Result.fail("秒杀优惠券不存在");
}
LocalDateTime beginTime = seckillVoucher.getBeginTime();
if (LocalDateTime.now().isBefore(beginTime)) {
return Result.fail("秒杀未开始");
}
LocalDateTime endTime = seckillVoucher.getEndTime();
if (LocalDateTime.now().isAfter(endTime)) {
return Result.fail("秒杀已结束");
}
int leftStock = seckillVoucher.getStock();
if (leftStock <= 0) {
return Result.fail("优惠券已被抢空");
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 获取当前对象的代理对象 new
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.getResult(voucherId, voucher, leftStock);
}
}
/**
* xc
* @param voucherId
* @param voucher
* @param leftStock
* @return
*/
@Override
@Transactional
public Result getResult(Long voucherId, Voucher voucher, int leftStock) {
Long userId = UserHolder.getUser().getId();
// 查询秒杀优惠券,该用户是否已经抢到
long orderId;
// 对相同用户并发请求加锁
// String类型也可能出现不同对象
// intern()的作用是返回字符串常量池中对象的地址
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("您已经抢过了");
}
boolean update = iSeckillVoucherService
.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.eq("stock", leftStock)
.update();
if (!update) {
return Result.fail("服务器内部错误");
}
VoucherOrder voucherOrder = new VoucherOrder();
orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setPayType(voucher.getType());
boolean save = this.save(voucherOrder);
if (!save) {
return Result.fail("服务器内部错误");
}
return Result.ok(orderId);
}
需要导入aspectj的依赖
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
然后在主启动类上暴露代理对象
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableTransactionManagement
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}
}
分布式锁
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了
模拟集群效果:
怎么添加idea的Serivces
在Service中添加SpringBoot项目,通过不同的端口启动
# 在VM options中添加此段代码,以指定端口启动
-Dserver.port=8082
前端nginx通过负载均衡访问后端接口
在集群模式下:
有多个JVM实例的存在,所以又会出现超卖问题
使用分布式锁解决:
流程:
基于Redis实现分布式锁初级版本:
需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。
public interface Ilock {
/**
* 尝试获取锁
* @param timeoutSec 过期时间
* @return 获取锁是否成功
*/
boolean tryLock(long timeoutSec);
void unlock();
}
简单锁实现类
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
/**
* @author: xc
* @date: 2023/4/27 20:46
*/
public class SimpleRedisLock implements ILock {
/**
* 锁的统一前缀
*/
private static final String KEY_PREFIX = "lock:";
/**
* 锁的名称
*/
private String name;
// 因为不是spring管理的bean所以需要构造方法
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
// 当前线程id
Long threadId = Thread.currentThread().getId();
// setIfAbsent:如果不存在就设置
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId+"", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 删除key
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
实现初级redis分布式锁版本
Long userId = UserHolder.getUser().getId();
// 因为只对同一个用户加锁,所以用 order:+userId 作为锁的key
SimpleRedisLock lock = new SimpleRedisLock("order:"+userId, stringRedisTemplate);
// 获取锁
boolean tryLock = lock.tryLock(LOCK_TIMEOUT);
if (!tryLock) {
// 获取锁失败
return Result.fail("不允许重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.getResult(voucherId, voucher, leftStock);
} finally {
// 拿到锁的释放锁
lock.unlock();
}
会出现的问题:
会释放别人的锁
解决方案:释放锁的时候先看一下是不是自己的锁
流程:
改进Redis的分布式锁:
- 在获取锁时存入线程表示(可以用UUID表示)
- 在释放锁时获取线程ID,判断是否与当前线程标示一致
- 如果一致则释放锁
- 如果不一致则不释放锁
修改获取锁和释放锁的逻辑
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
import java.util.UUID;
/**
* @author: xc
* @date: 2023/4/27 20:46
*/
public class SimpleRedisLock implements ILock {
/**
* 锁的统一前缀
*/
private static final String KEY_PREFIX = "lock:";
/**
* 随机生成线程uuid的前缀
*/
private static final String ID_PREFIX = UUID.randomUUID() +"-";
/**
* 锁的名称
*/
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
long threadId = Thread.currentThread().getId();
// 标识位 ID_PREFIX+threadId
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, ID_PREFIX+threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
long threadId = Thread.currentThread().getId();
// 判断标识是否一致
if ((ID_PREFIX+threadId).equals(stringRedisTemplate.opsForValue().get(KEY_PREFIX + name))) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
出现问题:
- 线程1如果判断完是自己的锁后,出现gc阻塞线程知道锁过期,此时线程2过来获取到锁执行自己的业务,然后线程1又阻塞完毕回到删除锁,就会将线程2的锁删除。然而又有线程3来过来获取锁没获取到,就会出现线程2和线程3同时执行代码。
**解决办法:**保证判断锁和释放锁的原子性
使用Redis的Lua脚本:
关于redis的基本语法
执行Lua脚本
再次改进Redis的分布式锁
总结
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
基于Redis的分布式锁的优化:
基于setnx实现的分布式锁存在下面的问题:
Redisson入门
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端
/**
* @author xc
* @date 2023/4/28 9:16
*/
@Configuration
public class RedissonConfig {
public RedissonClient redissonClient() {
// 配置
Config config = new Config();
config.setTransportMode(TransportMode.EPOLL);
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}
使用Redisson的分布式锁
Redisson可重入锁的原理
流程图:
获取锁Lua脚本:
释放锁Lua脚本:
Redisson底层源码讲解(P66、P67)
Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒、获取锁失败的重试机制
- 超时续约:利用watchDog,每个一段时间(releaseTime/3),重置超时时间
解决主从一致(P68)
Redis优化秒杀
改进秒杀业务,提高并发性能
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
// 引入redis
@Resource
private StringRedisTemplate stringRedisTemplate;
// 在保存秒杀优惠券的时候,也将优惠券的id和库存保存到redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
- 基于Lua脚本,判断秒杀库存、一人一单、决定用户是否抢购成功
lua脚本
---
--- Generated by Luanalysis
--- Created by xc.
--- DateTime: 2023/4/28 11:48
---
local voucherId = ARGV[1]
local userId = ARGV[2]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey)) <= 0) then
return 1
end
if(redis.call('sismember',orderKey,userId) == 1) then
return 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
return 0
java代码
private static final DefaultRedisScript<Long> SECKILL_SCIPT;
static {
SECKILL_SCIPT = new DefaultRedisScript<>();
ClassPathResource pathResource = new ClassPathResource("seckill.lua");
SECKILL_SCIPT.setLocation(pathResource);
SECKILL_SCIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
// 1.执行lua脚本
Long userId = UserHolder.getUser().getId();
long execute = stringRedisTemplate.execute(
SECKILL_SCIPT,
Collections.EMPTY_LIST,
voucherId.toString(), userId.toString());
// 2.判断结果是否为0
if (execute != 0) {
// 2.1 不为0,代表没有购买资格
// 为1时库存不足,2时重复下单
return Result.fail(execute == 1 ? "库存不足" : "重复下单");
}
// 2.2 为0 ,有购买资格,把下单信息保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
// new ArrayBlockingQueue<>()
// 3.返回订单id
return Result.ok(orderId);
}
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
// 阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
@Override
public Result seckillVoucher(Long voucherId) {
// 1.执行lua脚本
Long userId = UserHolder.getUser().getId();
long execute = stringRedisTemplate.execute(
SECKILL_SCIPT,
Collections.EMPTY_LIST,
voucherId.toString(), userId.toString());
// 2.判断结果是否为0
if (execute != 0) {
// 2.1 不为0,代表没有购买资格
// 为1时库存不足,2时重复下单
return Result.fail(execute == 1 ? "库存不足" : "重复下单");
}
VoucherOrder voucherOrder = new VoucherOrder();
// 2.2 为0 ,有购买资格,把下单信息保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 加入到阻塞队列
orderTasks.add(voucherOrder);
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 3.返回订单id
return Result.ok(orderId);
}
- 开启线程任务,不断从阻塞队列中回去信息,实现异步下单功能
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
/**
* 在类初始化完后会执行该方法
*/
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while (true){
// 获取队列中的队列信息
try {
VoucherOrder voucherOrder = orderTasks.take();
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常",e);
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 因为是全新开启一个线程,所以需要在订单中拿到用户id
Long userId = voucherOrder.getUserId();
// 因为只对同一个用户加锁,所以用 order:+userId 作为锁的key
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean tryLock = lock.tryLock();
if (!tryLock) {
// 获取锁失败
log.error("不允许重复下单");
return;
}
try {
proxy.getResult(voucherOrder);
} finally {
// 拿到锁的释放锁
lock.unlock();
}
}
}
总结
秒杀业务的优化思路是什么?
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题
- 数据安全问题
Redis消息队列实现异步秒杀
消息队列,字面意思就是存放消息队列。最简单的消息队列模型包括3个角色
- 消息队列:存储和管理消息,也被称为消息代理
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
基于List结构模拟消息队列
基于List的消息队列由哪些优缺点?
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
基于PubSub的消息队列由哪些优缺点?
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
基于STREAM的消息队列由哪些优缺点?
优点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
缺点:
- 有消息漏读的风险
基于Stream的消息队列-消费者组
创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
- MKSTREAM:队列不存在时自动创建队列
其它常见命令:
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
Stream类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次