首页 > 数据库 >Redis实战篇

Redis实战篇

时间:2024-09-27 20:21:17浏览次数:10  
标签:实战篇 缓存 return redis Redis stringRedisTemplate key id

Redis实战篇

涵盖内容如下:

1、导入黑马点评项目

  • 当前模型

1.1 导入SQL

在这里插入图片描述

1.2 导入后端项目

在这里插入图片描述

解压到工作台,用idea打开即可

注: 不要忘了修改application.yaml文件中的mysql、redis地址信息

  • 测试
    • 启动项目后,在浏览器访问:http://localhost:8081/shop-type/list ,如果可以看到数据则证明运行没有问题
    • 原项目使用的端口为8081,此处由于我的8081端口被java占用,将端口修改为8082,成功获取到如下信息

1.3 导入前端项目

自资料中提供了一个nginx压缩包,将其解压到任意目录,确保该目录不含中文、特殊字符和空格,此处我解压到D:\study\front

此处由于我后端的端口改为8082,前端nginx配置文件中也需要进行端口的修改,在D:\study\front\nginx-1.18.0\conf作如下修改:

1.4 运行前端项目

  • 在nginx所在目录下打开一个CMD窗口,输入命令:start .\nginx.exe
  • 打开浏览器并打开开发者工具,打开手机模式,访问: http://127.0.0.1:8080 ,即可看到页面

2、短信登陆

2.1 基于Session实现登录流程

2.2 session共享问题

多台参与轮巡的Tomcat并不共享session存储空间, 当请求切换到不同tomcat时导致数据丢失的问题.

  • 早期做法: session拷贝
    • 分析虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了
    • 存在的问题1、每台服务器中都有完整的一份session数据,服务器压力过大。2、session拷贝数据时,可能会出现延迟
  • 现在: redis替换session
    • 分析redis数据本身就是共享的,可以避免session

2.3 Redis代替session的业务流程

2.3.1 设计key的结构

**验证码的保存: **

  • 存储数据结构
    • 直接使用string就可以
  • key
    • 用户提交手机号, 后端发送验证码, 之后校验用户时客户端还需要带着这个key来取
    • 用户提交正是手机号和验证码因此, 可以使用手机号作为key

**用户信息保存: **

  • 存储数据的结构

    • 如果不是特别在意内存,其实使用String就可以, 从优化的角度讲, 推荐使用hash
  • key的选择每个用户都有自己的session,但是redis的key是共享的,为了防止重复, 不能使用code

    • key的设计需要满足两点1、key要具有唯一性2、客户端要方便携带

    • 如果采用手机号:

      • 手机号是一个敏感信息, 保存在客户端(浏览器)后, 会有泄漏的风险
    • 解决方案:

      • 在后台生成一个随机串token,然后让前端带来这个token就能完成我们的整体逻辑了
2.3.2 整体访问流程

在这里插入图片描述

2.3.3 基于Redis实现短信登录
  • UserServiceImpl代码
@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 发送验证码
     * @param phone 前端传来的手机号
     * @param session 不再使用, 已被代替
     * @return 通用结果类
     */
    @Override
    public Result sendCode(String phone, HttpSession session) {
        // 1.校验手机号
        if (RegexUtils.isPhoneInvalid(phone)) {
            // 2. 如果不符合, 返回错误信息
            return Result.fail("手机号格式错误!");
        }
        // 3. 符合, 生成验证码
        String code = RandomUtil.randomNumbers(6);
        // 4. 保存验证码 到redis LOGIN_CODE_KEY = "login:code:" LOGIN_CODE_TTL = 2L
        stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone,code,LOGIN_CODE_TTL,TimeUnit.MINUTES);

        // 5. 发送验证码
        log.debug("发送短信验证码成功, 验证码: {}", code);
        // 返回ok
        return Result.ok();
    }

    /**
     * 登录
     * @param loginForm 保存前端传来的手机号和验证码
     
     * @param session 不再使用, 已被redis替代
     * @return 通用结果类
     */
    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        // 1. 校验手机号
        String phone = loginForm.getPhone();
        if (RegexUtils.isPhoneInvalid(phone)) {
            // 2. 如果不符合, 返回错误信息
            return Result.fail("手机号格式错误!");
        }

        // 3. 从Redis获取验证码并校验
        String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
        String code = loginForm.getCode();
        if (cacheCode == null || !cacheCode.equals(code)) {
            // 不一致, 报错
            return Result.fail("验证码错误");// 使用反向编码可以防止代码嵌套层次过深
        }

        // 4. 一致, 根据手机号查询用户
        User user = query().eq("phone", phone).one();

        // 5. 判断用户是否存在
        if (user == null) {
            // 6. 不存在, 创建新用户并保存到数据库
            user = createUserWithPhone(phone);
        }

        //  7. 保存用户信息到redis
        //  7.1 随机生成token, 作为登录令牌
        String token = UUID.randomUUID().toString(true);
        //  7.2 将User对象转为HashMap存储
        // 使用UserDTO只保存不敏感的用户主要信息, 从而节省空间, 保证安全性
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
                CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue) -> fieldValue.toString()));// StringRedisTemplate要求字段值都是string
        //  7.3 存储
        String tokenKey = LOGIN_USER_KEY + token;
        stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
        // 7.4 设置token有效期
        stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
        //  8. 返回token
        return Result.ok(token);// 前端接收token, 并保存到浏览器上
    }

    /**
     * 使用手机号创建用户
     * @param phone 手机号
     * @return 创建后的用户, 携带手机号和随机字符串生成的默认名
     */
    private User createUserWithPhone(String phone) {
        // 1. 创建用户 LOGIN_USER_KEY = "login:token:" LOGIN_USER_TTL = 30L
        User user = new User();
        user.setPhone(phone);
        user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
        // 2. 保存用户 
        save(user);
        return user;
    }
}
2.3.4 登录拦截功能分析

  • 第一个拦截器
    • 对所有路径进行拦截, 实现保存用户信息 并 刷新token有效期的功能, 使若用户长时间不访问网站则自动清除token, 当用户在网站中活跃, 在该拦截器中刷新token的有效期
  • 第二个拦截器
    • 对指定路径实现拦截功能
2.4.5 拦截器代码具体实现
  • RefreshTokenInterceptor
public class RefreshTokenInterceptor implements HandlerInterceptor {

    private StringRedisTemplate stringRedisTemplate;// 由调用者MvcConfig经由自动注入传来

    public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1. 获取请求头中的token
        String token = request.getHeader("authorization");// token被前端保存在浏览器每次请求的请求头中
        if (StrUtil.isBlank(token)) {
            return true;// 若token不存在, 放行
        }

        // 2. 基于token获取redis中的用户
        String key = RedisConstants.LOGIN_USER_KEY + token;
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);

        // 3. 判断用户是否存在
        if (userMap.isEmpty()) {
            return true;// 若用户不存在, 放行
        }
        //  5. 将查询到的Hash数据转为UserDTO对象
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);

        // 6. 存在, 用户信息保存到TheadLocal
        UserHolder.saveUser(userDTO);

        // 7. 刷新token有效期
        stringRedisTemplate.expire(key,RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
        // 7. 放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 移除用户, 避免内存泄漏
        UserHolder.removeUser();
    }
}
// 在请求处理过程中,preHandle 方法会将用户信息加载到 ThreadLocal 中,并在整个请求周期内使用这些信息。请求处理完成后,afterCompletion 方法会清理这些信息,确保不会影响到其他请求。
  • LoginInterceptor
public class LoginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1. 是否需要拦截(ThreadLocal中是否有用户)
        if (UserHolder.getUser() == null) {
            // 没有, 需要拦截, 设置状态码
            response.setStatus(401);
            // 拦截
        }
        // 有用户, 则放行
        return true;
    }
}
  • MvcConfig
@Configuration
public class MvcConfig implements WebMvcConfigurer {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 登录拦截器
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns(
                        "/user/code",
                        "/user/login",
                        "/blog/hot",
                        "/shop/**",
                        "/shop-type/**",
                        "/upload/**",
                        "/voucher/**"
                ).order(1);
        // token刷新的拦截器
        registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);// order的值越小, 越先执行
    }
}
- 配置的拦截器默认是按照添加顺序执行, 可以通过order来指定执行的优先级, 默认值都为0, order的值越小, 执行的越早

3、商户查询缓存

3.1 如何使用缓存

实际开发中,会构筑多级缓存来使系统运行速度进一步提升,例如:本地缓存与redis中的缓存并发使用

浏览器缓存:主要是存在于浏览器端的缓存

**应用层缓存:**可以分为tomcat本地缓存,比如之前提到的map,或者是使用redis作为缓存

**数据库缓存:**在数据库中有一片空间是 buffer pool,增改查数据都会先加载到mysql的缓存中

**CPU缓存:**当代计算机最大的问题是 cpu性能提升了,但内存读写速度没有跟上,所以为了适应当下的情况,增加了cpu的L1,L2,L3级的缓存

3.2 缓存更新策略

3.2.1 数据库缓存不一致解决方案

旁路缓存模式 读写穿透模式 异步缓存写入

Cache Aside Pattern √ 人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案

Read/Write Through Pattern : 由系统本身完成,数据库与缓存的问题交由系统本身去处理

Write Behind Caching Pattern :调用者只操作缓存,其他线程去异步处理数据库,实现最终一致

方案一:

  • 删除缓存还是更新缓存?
    • 更新缓存:每次更新数据库都更新缓存,无效写操作较多
    • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
  • 如何保证缓存与数据库的操作的同时成功或失败?(原子性)
    • 单体系统,将缓存与数据库操作放在一个事务
    • 分布式系统,利用TCC等分布式事务方案
  • 先操作缓存还是先操作数据库?(线程安全问题)
    • 先删除缓存,再操作数据库
    • 先操作数据库,再删除缓存

  1. 更新数据库操作慢, 需要进行IO, 而线程2查询快, 写入redis也快, 因此发生几率高
  2. 发生的条件: 两个线程并行执行 + 缓存恰好到期 + 查完数据库后, 写完缓存前 更新好数据库, 发生几率低, 可以再加入超时时间

3.3 缓存穿透

缓存穿透 :缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

常见的解决方案有两种:

  • 缓存空对象√
    • 优点:实现简单,维护方便
    • 缺点:
      • 额外的内存消耗(null)
      • 可能造成短期的不一致(ttl内, 也可以插入时手动更新)
  • 布隆过滤
    • 优点:内存占用较少,没有多余key
    • 缺点:
      • 实现复杂
      • 存在误判可能(判断存在的时候不一定真的存在)

编码解决商品查询的缓存穿透问题:

  • 核心思路:

在原来的逻辑中,我们如果发现这个数据在mysql中不存在,直接就返回404了,这样是会存在缓存穿透问题的

现在的逻辑中:如果这个数据不存在,我们不会返回404 ,还是会把这个数据写入到Redis中,并且将value设置为空,欧当再次发起查询时,我们如果发现命中之后,判断这个value是否是null,如果是null,则是之前写入的数据,证明是缓存穿透数据,如果不是,则直接返回数据。

  • 核心代码:
public Shop queryWithPassTrough(Long id) {
    String key = CACHE_SHOP_KEY + id;
    // 1. 从redis查询商户缓存
    String shopJson = stringRedisTemplate.opsForValue().get(key);

    // 2. 判断是否存在
    if (StrUtil.isNotBlank(shopJson)) {// 有效值
        // 3. 存在, 直接返回
        return JSONUtil.toBean(shopJson, Shop.class);
    }

    // 判断命中的是否是空值
    if (shopJson != null) {// 此时 shopJson == ""
        // 返回一个错误信息
        return null;
    }

    // 4. 不存在, 根据id查询数据库
    Shop shop = getById(id);

    // 5. 不存在, 返回错误
    if (shop == null) {
        // 将空值写入redis
        stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);
        return null;
    }

    // 6. 存在, 写入redis
    stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);

    // 7. 返回
    return shop;
}

小总结:

缓存穿透产生的原因是什么?

  • 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

缓存穿透的解决方案有哪些?

  • 缓存null值
  • 布隆过滤
  • 增强id的复杂度,避免被猜测id规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数的限流

3.4 缓存雪崩

缓存雪崩是指在同一时段**大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库**,带来巨大压力。

解决方案:

  • 给不同的Key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

3.5 缓存击穿

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

常见的解决方案有两种:

  • 互斥锁
  • 逻辑过期

逻辑分析:

假设线程1在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程1走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程1没有走完的时候,后续的线程2,线程3,线程4同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大

  • 解决方案一: 锁 + double check

  • 解决方案二: 逻辑过期

3.5.1 利用互斥锁解决缓存击穿问题

思路分析:

利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false

我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程。

操作锁的代码:

/**
 * 尝试获取锁
 * @param key 锁在redis重点的键值
 * @return 是否获取成功
 */
private boolean tryLock(String key) {
    // 有效期一般为正常业务执行业务的10/20倍
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(flag);
}

/**
 * 释放锁
 * @param key 锁在redis重点的键值
 */
private void unlock(String key) {
    stringRedisTemplate.delete(key);
}

操作代码:

public Shop queryWithMutex(Long id) {
    String key = CACHE_SHOP_KEY + id;
    // 1. 从redis查询商户缓存
    String shopJson = stringRedisTemplate.opsForValue().get(key);

    // 2. 判断是否存在
    if (StrUtil.isNotBlank(shopJson)) {// 有效值
        // 3. 存在, 直接返回
        return JSONUtil.toBean(shopJson, Shop.class);
    }

    // 判断命中的是否是空值(穿透)
    if (shopJson != null) {// 此时 shopJson == ""
        // 返回一个错误信息
        return null;
    }

    // 4. 实现缓存重建
    // 4.1 获取互斥锁
    Shop shop = null;
    String lockKey = "lock:shop:" + id;
    try {
        boolean isLock = tryLock(lockKey);
        // 4.2 判断是否获取成功
        if(!isLock) {
            // 4.3 失败, 则休眠并重试
            Thread.sleep(50);// 试一试
            return queryWithMutex(id);// 递归??
        }

        // 4.4 成功
        // 4.4.1 检测redis缓存是否存在, 做DoubleCheck
        shopJson = stringRedisTemplate.opsForValue().get(key);
        //  判断是否存在
        if (StrUtil.isNotBlank(shopJson)) {// 有效值
            // 3. 存在, 直接返回
            return JSONUtil.toBean(shopJson, Shop.class);
        }

        // 判断命中的是否是空值(穿透)
        if (shopJson != null) {// 此时 shopJson == ""
            // 返回一个错误信息
            return null;
        }

        // 4.4.2 根据id查询数据库
        shop = getById(id);
        // 模拟重建的延迟
        Thread.sleep(200);

        // 5. 不存在, 返回错误
        if (shop == null) {
            // 将空值写入redis
            stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);
            return null;
        }

        // 6. 存在, 写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        // 7. 释放互斥锁
        unlock(lockKey);
    }

    // 8. 返回
    return shop;
}
3.5.2 利用逻辑过期解决缓存击穿问题

需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题

思路分析:

当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据(此时数据库里一定没有, 逻辑过期会提前存入所有需要的数据, 并设置逻辑过期时间)

一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据; 如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。

  • 新建实体类
@Data
public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
}
  • ShopServiceImpl 新增方法,利用单元测试进行缓存预热
public void saveShop2Redis(Long id,Long expireSeconds) throws InterruptedException {
    // 1. 查询店铺数据
    Shop shop = getById(id);
    Thread.sleep(200);// 模拟延迟
    // 2. 封装逻辑过期
    RedisData redisData = new RedisData();
    redisData.setData(shop);
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));

    // 3. 写入Redis
    stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(redisData));
}
@Test
public void testSaveShop() throws Exception{
    shopService.saveShop2Redis(1L,10L);
}
  • 正式代码
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

public Shop queryWithLogicalExpire(Long id) {
    String key = CACHE_SHOP_KEY + id;
    // 1. 从redis查询商户缓存
    String shopJson = stringRedisTemplate.opsForValue().get(key);

    // 2. 判断是否存在
    if (StrUtil.isBlank(shopJson)) {// 有效值
        // 3. 存在, 直接返回
        return null;
    }

    // 4. 命中, 需要先把json反序列化为对象
    RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
    JSONObject data = (JSONObject) redisData.getData();
    Shop shop = JSONUtil.toBean(data, Shop.class);
    // 5. 判断是否过期
    LocalDateTime expireTime = redisData.getExpireTime();
    if (expireTime.isAfter(LocalDateTime.now())) {
        // 5.1 未过期, 直接返回店铺信息
        return shop;
    }
    // 5.2 已过期, 需要缓存重建
    // 6. 缓存重建
    // 6.1 获取互斥锁
    String lockKey = LOCK_SHOP_KEY + id;
    boolean isLock = tryLock(lockKey);
    // 6.2 判断是否获取锁成功
    if (isLock) {

        // 6.3.1 成功, doubleCheck
        // 1. 从redis查询商户缓存
        shopJson = stringRedisTemplate.opsForValue().get(key);

        // 2. 判断是否存在
        if (StrUtil.isBlank(shopJson)) {// 有效值
            // 3. 存在, 直接返回
            return null;
        }

        // 4. 命中, 需要先把json反序列化为对象
        redisData = JSONUtil.toBean(shopJson, RedisData.class);
        data = (JSONObject) redisData.getData();
        shop = JSONUtil.toBean(data, Shop.class);
        // 5. 判断是否过期
        expireTime = redisData.getExpireTime();
        if (expireTime.isAfter(LocalDateTime.now())) {
            // 5.1 未过期, 直接返回店铺信息
            return shop;
        }

        // 6.3.2 开启独立线程实现缓存重建
        CACHE_REBUILD_EXECUTOR.submit(() -> {
            try {
                // 重建缓存
                this.saveShop2Redis(id,20L);// 实际业务中应该设置30mins
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                // 释放锁
                unlock(lockKey);
            }

        });

    }
    // 6.4 返回过期商铺信息
    return shop;
}

3.6 封装Redis工具类

基于StringRedisTemplate封装一个缓存工具类,满足下列需求:

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

将逻辑进行封装

@Component
@Slf4j
public class CacheClient {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    public void set(String key, Object value, Long time, TimeUnit unit) {
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
    }

    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
        // 设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        // 写入redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }

    public <R,ID> R queryWithPassTrough(
            String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time, TimeUnit unit) {
        String key = keyPrefix + id;        // 使用者的查询方式, 查询逻辑不能确定, 需要从外界传递过来
        // 1. 从redis查询商户缓存
        String json = stringRedisTemplate.opsForValue().get(key);

        // 2. 判断是否存在
        if (StrUtil.isNotBlank(json)) {// 有效值
            // 3. 存在, 直接返回
            return JSONUtil.toBean(json, type);
        }

        // 判断命中的是否是空值
        if (json != null) {// 此时 shopJson == ""
            // 返回一个错误信息
            return null;
        }

        // 4. 不存在, 根据id查询数据库
        R r = dbFallback.apply(id);

        // 5. 不存在, 返回错误
        if (r == null) {
            // 将空值写入redis
            stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);
            return null;
        }

        // 6. 存在, 写入redis
        this.set(key,r,time,unit);
        // 7. 返回
        return r;
    }

    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

    public <R,ID> R queryWithLogicalExpire(
            String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time, TimeUnit unit) {
        String key = keyPrefix + id;
        // 1. 从redis查询商户缓存
        String json = stringRedisTemplate.opsForValue().get(key);

        // 2. 判断是否存在
        if (StrUtil.isBlank(json)) {// 有效值
            // 3. 存在, 直接返回
            return null;
        }

        // 4. 命中, 需要先把json反序列化为对象
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        JSONObject data = (JSONObject) redisData.getData();
        R r = JSONUtil.toBean(data, type);

        // 5. 判断是否过期
        LocalDateTime expireTime = redisData.getExpireTime();
        if (expireTime.isAfter(LocalDateTime.now())) {
            // 5.1 未过期, 直接返回店铺信息
            return r;
        }
        // 5.2 已过期, 需要缓存重建
        // 6. 缓存重建
        // 6.1 获取互斥锁
        String lockKey = LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        // 6.2 判断是否获取锁成功
        if (isLock) {
            // doubleCheck
            json = stringRedisTemplate.opsForValue().get(key);

            // 2. 判断是否存在
            if (StrUtil.isBlank(json)) {// 有效值
                // 3. 存在, 直接返回
                return null;
            }

            // 4. 命中, 需要先把json反序列化为对象
            redisData = JSONUtil.toBean(json, RedisData.class);
            data = (JSONObject) redisData.getData();
            r = JSONUtil.toBean(data, type);

            // 5. 判断是否过期
            expireTime = redisData.getExpireTime();
            if (expireTime.isAfter(LocalDateTime.now())) {
                // 5.1 未过期, 直接返回店铺信息
                return r;
            }


            // 6.3.2 开启独立线程实现缓存重建
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    // 查数据库
                    R r1 = dbFallback.apply(id);
                    // 写入缓存
                    setWithLogicalExpire(key,r1,time,unit);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    // 释放锁
                    unlock(lockKey);
                }

            });

        }

        // 6.4 返回过期商铺信息
        return r;
    }

    /**
     * 尝试获取锁
     * @param key 锁在redis重点的键值
     * @return 是否获取成功
     */
    private boolean tryLock(String key) {
        // 有效期一般为正常业务执行业务的10/20倍
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    /**
     * 释放锁
     * @param key 锁在redis重点的键值
     */
    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }

}

ShopServiceImpl:

    @Resource
    private CacheClient cacheClient;

    @Override
    public Result queryById(Long id) {
        // 缓存穿透
        // Shop shop = cacheClient.queryWithPassTrough(CACHE_SHOP_KEY,id,Shop.class, this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);

        // 逻辑过期解决缓存击穿
        Shop shop = cacheClient
                .queryWithLogicalExpire(CACHE_SHOP_KEY, id,Shop.class,this::getById,20L,TimeUnit.SECONDS);

        if (shop == null) return Result.fail("店铺不存在!");
        // 7. 返回
        return Result.ok(shop);
    }

4、优惠券秒杀

4.1 全局唯一id

场景引入

当用户抢购时,会生成订单并保存到tb_voucher_order表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显
  • 受单表数据量的限制

全局ID生成器

是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

符号位:1bit,永远为0

时间戳:31bit,以秒为单位,可以使用69年

序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

4.2 Redis实现全局唯一Id

@Component
public class RedisIdWorker {
    // 开始时间戳
    private static final long BEGIN_TIMESTAMP = 1704067200;
    /**
     * 序列号的位数
     */
    private static final int COUNT_BITS = 32;

    @Resource
    StringRedisTemplate stringRedisTemplate;

    // 不同的业务有不同的key, 因此需要设置一个业务前缀
    public long nextId(String keyPrefix) { 
        // 1. 生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long newSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = newSecond - BEGIN_TIMESTAMP;

        // 2. 生成序列号
        // 2.1 获取当前日期, 精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

        // 3. 拼接并返回
        return timestamp << COUNT_BITS | count;
    }

    /* 获取指定时间的毫秒值
    public static void main(String[] args) {
        LocalDateTime time = LocalDateTime.of(2024, 1, 1, 0, 0, 0);
        long second = time.toEpochSecond(ZoneOffset.UTC);
        System.out.println("second = " + second);
    }*/
}

4.3 锁

悲观锁:

可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等

乐观锁:

会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,当然乐观锁还有一些变种的处理方式比如cas

乐观锁的典型代表:就是cas,利用cas进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换 内存值

常见的分布式锁有三种

Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见

Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述

4.4 Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html

4.5 redission

4.5.1 入门

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

快速入门:

  • 引入依赖
<dependency>
    <groupId>org.redisson</groupId>

    <artifactId>redisson</artifactId>

    <version>3.13.6</version>

</dependency>

  • 配置Redisson客户端
@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.198.130:6379").setPassword("123456");
        // 创建RedissonConfig对象
        return Redisson.create(config);
    }

/*    @Bean
    public RedissonClient redissonClient2(){
        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.198.130:6380").setPassword("123456");
        // 创建RedissonConfig对象
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient3(){
        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.198.130:6381").setPassword("123456");
        // 创建RedissonConfig对象
        return Redisson.create(config);
    }*/
}
  • 如何使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;

@Test
void testRedisson() throws Exception{
    //获取锁(可重入),指定锁的名称
    RLock lock = redissonClient.getLock("anyLock");
    //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
    //判断获取锁成功
    if(isLock){
        try{
            System.out.println("执行业务");          
        }finally{
            //释放锁
            lock.unlock();
        }
        
    }
    
    
    
}
4.5.2 分布式锁-redission可重入锁原理

在分布式锁中,他采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有,所以接下来我们一起分析一下当前的这个lua表达式

这个地方一共有3个参数

KEYS[1] : 锁名称

ARGV[1]: 锁失效时间

ARGV[2]: id + “:” + threadId; 锁的小key

去判断当前这个方法的返回值是否为null,如果是null,则对应则前两个if对应的条件,退出抢锁逻辑,如果返回的不是null,即走了第三个分支,在源码处会进行while(true)的自旋抢锁。

             "if (redis.call('exists', KEYS[1]) == 0) then " + // 当前这个key不存在
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // 创建
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置时间
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  // 存在
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 自增
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 重新设置时间值
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);"

4.6 Redis消息队列

4.6.1 认识消息队列

最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

使用队列的好处在于 解耦

4.6.2 基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

  • 发送消息

eg:

  • 读取消息

eg:

使用XREAD读取第一个消息

XREAD阻塞方式,读取最新的消息

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险
4.6.3 基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列

删除指定的消费者组

XGROUP DESTORY key groupName

给指定的消费者组添加消费者

XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:

“>”:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

4.7 异步秒杀下单代码实现

提前建立消息队列stream.orders

XGROUP create stream.orders g1 0 MKSTREAM

VoucherServiceImpl

  • 前端添加秒杀券的时候将库存写入redis
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
    // 保存秒杀的库存到redis当中
    stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString());
}

seckill.lua

判断是否可以下单, 若可以下单, 发送消息到消息队列中

-- 1. 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]


-- 2. 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId -- lua拼接字符串
-- 2.2 订单key 
local orderKey = 'seckill:order:' .. voucherId

-- 3. 脚本业务
-- 3.1 判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey)) <= 0) then
    -- 3.2 库存不足, 返回1
    return 1
end

-- 3.2 判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3 存在, 说明是重复下单, 返回2
    return 2
end

-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)

-- 3.5 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)

-- 3.6 发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0

VoucherOrderServiceImpl

  • setkillVoucher 前端发送请求处理新的订单

前置准备

 private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// 用于在java中执行Lua脚本, Long为返回值类型

    static {// 提前lua脚本中的内容读取进来
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();// 用于数据库的写入操作, 不要求速度, 给一个线程即可

    @PostConstruct // 外部类初始化完毕就执行, 随时等待处理消息队列中的订单
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
 private IVoucherOrderService proxy;// 在外部获取代理类,用于异步处理订单

@Override
public Result setkillVoucher(Long voucherId) {
    // 获取用户
    Long userId = UserHolder.getUser().getId();
    // 获取订单id
    long orderId = redisIdWorker.nextId("order");
    // 1. 执行lua脚本
    Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(), userId.toString(),String.valueOf(orderId)
    );
    // 2. 判断结果是否为0
    int r = result.intValue();
    if (r != 0) {
        // 2.1 不为0, 代表没有购买资格
        return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
    }

    // 3. 获取代理对象
    // 获取代理对象(事务)
    proxy = (IVoucherOrderService) AopContext.currentProxy();// 目标对象的原始方法是由代理对象执行的, 此处由spring的API获取代理对象
    // 从而确保用户Id的值一样时, 锁就一样

    // 4. 返回订单id
    return Result.ok(orderId);
}
  • SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler())
  • 消息队列中接收到消息, 开始处理订单
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// 用于在java中执行Lua脚本

static {// 提前lua脚本中的内容读取进来
    SECKILL_SCRIPT = new DefaultRedisScript<>();
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
}

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();// 用于数据库的写入操作, 不要求速度, 给一个线程即可

@PostConstruct // 外部类初始化完毕就执行, 随时等待处理消息队列中的订单
private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable {
    String queueName = "stream.orders";

    @Override
    public void run() {
        while (true) {
            try {
                // 1. 获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                        StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );

                // 2. 判断消息获取是否成功
                if (list == null || list.isEmpty()) {
                    // 2.1 如果获取失败, 说明没有消息, 继续下一次循环
                    continue;
                }
                // 3. 解析消息中的订单消息
                MapRecord<String, Object, Object> record = list.get(0);// 
                Map<Object, Object> values = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                // 4. 如果获取成功, 可以下单
                handleVoucherOrder(voucherOrder);

                // 5. ACK确认 SACK stream.orders g1 id
                stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
            } catch (Exception e) {
                log.error("处理订单异常", e);
                handlePendingList();
            }

        }
    }

    private void handlePendingList() {
        while (true) {
            try {
                // 1. 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 2000 STREAMS stream.orders 0
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1),
                        StreamOffset.create(queueName, ReadOffset.from("0"))
                );

                // 2. 判断消息获取是否成功
                if (list == null || list.isEmpty()) {
                    // 2.1 如果获取失败, 说明pending-list没有消息, 结束循环
                    break;
                }
                // 3. 解析消息中的订单消息
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> values = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                // 4. 如果获取成功, 可以下单
                handleVoucherOrder(voucherOrder);

                // 5. ACK确认 SACK stream.orders g1 id
                stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
            } catch (Exception e) {
                log.error("处理pending-list订单异常", e);// 如果遇到异常, 继续循环即可
                try {
                    Thread.sleep(20);// 可以没有这一步, 防止太频繁
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }

        }
    }
}
  • handleVoucherOrder
  • 消息队列中的订单信息获取成功, 获取分布式锁, 准备开始下单
private void handleVoucherOrder(VoucherOrder voucherOrder) {
    // 1. 获取用户
    Long userId = voucherOrder.getUserId();
    // 2. 创建锁对象
    RLock lock = redissonClient.getLock("lock:order:" + userId);

    // 3. 获取锁
    boolean isLock = lock.tryLock();// 根据实际情况, 确保实际业务一定能完成即可
    // 4. 判断是否获取锁成功
    if (!isLock) {
        // 获取锁失败, 返回错误信息或重试
        log.error("不允许重复下单");
        return;
    }

    try {
        proxy.createVoucherOrder(voucherOrder);
    } finally {
        // 释放锁
        lock.unlock();
    }
}
  • 下单
@Transactional // 对两张表进行操作, 最好加上事务
public void createVoucherOrder(VoucherOrder voucherOrder) {
    // 5. 一人一单
    Long userId = voucherOrder.getUserId();

    // 5.1 查询订单
    int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
    // 5.2 判断是否存在
    if (count > 0) {
        // 用户已经购买过了
        log.error("用户已经购买过一次!");
        return;
    }

    // 6. 扣减库存
    boolean success = seckillVoucherService.update()
            .setSql("stock = stock - 1")
            .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
            .update();
    if (!success) {
        // 扣减失败
        log.error("库存不足!");
        return ;
    }

    // 7. 创建订单
    save(voucherOrder);
}

5、达人探店

5.1 发布探店笔记

BlogController

@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
    // 获取登录用户
    UserDTO user = UserHolder.getUser();
    blog.setUserId(user.getId()); // 保存的博文中需要含有发布者的信息
    // 保存探店博文
    blogService.save(blog);
    // 返回id
    return Result.ok(blog.getId());
}

5.2 查看探店笔记

BlogController

@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id) {
    return blogService.queryBlogById(id); 
}

BlogServiceImpl

@Override
public Result queryBlogById(Long id) {
    // 1. 查询blog
    Blog blog = getById(id);
    if (blog == null) {
        return Result.fail("笔记不存在!");
    }

    // 2. 查询blog有关的用户
    queryBlogUser(blog);
    // 3. 查询blog是否被点赞了
    isBlogLiked(blog);
    return Result.ok(blog);
}

相关功能:

private void queryBlogUser(Blog blog) {
    Long userId = blog.getUserId();
    User user = userService.getById(userId);
    blog.setName(user.getNickName());
    blog.setIcon(user.getIcon());
}
private void isBlogLiked(Blog blog) {
    // 1. 获取登录用户
    UserDTO user = UserHolder.getUser();
    if (user == null) { // 用户未登录, 无需查询是否点赞
        return;
    }
    Long userId = user.getId();
    // 2. 判断当前登录用户是否已经点赞
    String key = "blog:liked:" + blog.getId();
    Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
    blog.setIsLike(score != null);
}

5.3 点赞功能

需求:

  • 同一个用户只能点赞一次,再次点击则取消点赞
  • 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)

实现步骤:

  • 给Blog类中添加一个isLike字段,标示是否被当前用户点赞
  • 修改点赞功能,利用Redis的SortedSet集合判断是否点赞过
  • 修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
  • 修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
  1. 在Blog添加一个字段
/**
 * 是否点赞过了
 */
@TableField(exist = false)
private Boolean isLike;
  1. 点赞功能 (首页和博客详情页)

BlogController

@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
    return blogService.likeBlog(id);
}
@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
    return blogService.queryHotBlog(current);
}

BlogServiceImpl

@Override
public Result likeBlog(Long id) {
    // 1. 获取登录用户
    Long userId = UserHolder.getUser().getId();
    // 2. 判断当前登录用户是否已经点赞
    String key = BLOCK_LIKED_KEY + id; // BLOCK_LIKED_KEY = "blog:liked:";
    Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
    if (score == null) {
        // 3. 如果未点赞, 可以点赞
        // 3.1 数据库点赞数 + 1
        boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
        // 3.2 保存用户到redis sortedSet集合 zadd key value score
        if (isSuccess) {
            stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
        }
    } else {
        // 4. 如果已点赞, 取消点赞
        // 4.1 数据库点赞数 -1
        boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
        if (isSuccess) {
            // 4.2 把用户从redis的set集合移除
            stringRedisTemplate.opsForZSet().remove(key,userId.toString());
        }
    }
    return Result.ok();
@Override
public Result queryHotBlog(Integer current) {
    // 根据用户查询
    Page<Blog> page = query()
            .orderByDesc("liked")
            .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
    // 获取当前页数据
    List<Blog> records = page.getRecords();
    // 查询用户
    records.forEach(blog -> {
        this.queryBlogUser(blog);
        this.isBlogLiked(blog);
    });
    return Result.ok(records);
}

相关方法

private void queryBlogUser(Blog blog) {
    Long userId = blog.getUserId();
    User user = userService.getById(userId);
    blog.setName(user.getNickName());
    blog.setIcon(user.getIcon());
}
private void isBlogLiked(Blog blog) {
    // 1. 获取登录用户
    UserDTO user = UserHolder.getUser();
    if (user == null) { // 用户未登录, 无需查询是否点赞
        return;
    }
    Long userId = user.getId();
    // 2. 判断当前登录用户是否已经点赞
    String key = "blog:liked:" + blog.getId();
    Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
    blog.setIsLike(score != null);
}

5.3 点赞排行榜

在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:

Redis数据结构选择: SortedSet

  • 所有点赞的人唯一
  • 需要进行排序

点赞列表查询

BlogController

@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id) {
    return blogService.queryBlogLikes(id);
}

BlogServiceImpl

@Override
public Result queryBlogLikes(Long id) {
    String key = BLOCK_LIKED_KEY + id; // BLOG_LIKED_KEY
    // 1. 查询top5的点赞用户 zrange key 0 4
    Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
    if (top5 == null || top5.isEmpty()) {
        return Result.ok(Collections.emptyList());
    }
    // 2. 解析出其中的用户id
    List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
    String idStr = StrUtil.join(",", ids);
    // 3. 根据用户id查询用户 WHERE id IN (5 , 1) ORDER BY FIELD(id,5,1)
    List<UserDTO> userDTOS = userService.query().in("id",ids).last("ORDER BY FIELD(id,"+ idStr +")").list()
            .stream()
            .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
            .collect(Collectors.toList());
    // 4. 返回
    return Result.ok(userDTOS);
}

6、好友关注

6.1 关注和取关

需求:基于该表数据结构,实现两个接口:

  • 关注和取关接口
  • 判断是否关注的接口

关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb_follow表来标示:

FollowServiceImpl

  • 关注和取关接口
@Override
public Result follow(Long followUserId, Boolean isFollow) {
    // 0. 获取登录用户
    Long userId = UserHolder.getUser().getId();
    String key = "follows:" + userId;
    // 1. 判断到底是关注还是取关
    if (isFollow){
        // 2. 关注, 新增数据
        Follow follow = new Follow();
        follow.setUserId(userId);
        follow.setFollowUserId(followUserId);
        boolean isSuccess = save(follow);
        if (isSuccess) {
            // 把关注用户的id, 放入redis的set集合 sadd userId followUserId
            stringRedisTemplate.opsForSet().add(key,followUserId.toString());
        }
    }else {
        // 3. 取关, 删除 delete from tb_follow where user_id = ? and follow_user_id = ?
        boolean isSuccess = remove(new QueryWrapper<Follow>()
                .eq("user_id", userId)
                .eq("follow_user_id", followUserId));
        // 把关注用户的id从Redis集合中移除
        if (isSuccess){
            stringRedisTemplate.opsForSet().remove(key,followUserId.toString());
        }
    }
    return Result.ok();
}
  • 判断是否关注的接口
@Override
public Result isFollow(Long followUserId) {
    // 1. 获取登录用户
    Long userId = UserHolder.getUser().getId();
    // 2. 查询是否关注
    Integer count = query().eq("user_id", userId)
            .eq("follow_user_id", followUserId).count();
    // 3. 判断
    return Result.ok(count > 0);
}

6.2 共同关注

FollowServiceImpl

@Override
public Result followCommons(Long id) {
    // 1. 获取当前用户
    Long userId = UserHolder.getUser().getId();
    String key = "follows:" + userId;
    // 2. 求交集
    String key2 = "follows:" + id;
    Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
    if (intersect == null || intersect.isEmpty()) {
        // 无交集
        return Result.ok(Collections.emptyList());
    }

    // 3. 解析出id
    List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
    // 4. 查询用户
    List<UserDTO> users = userService.listByIds(ids).stream()
            .map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
    return Result.ok(users);
}

6.3 Feed流实现方案

Feed流的实现有两种模式:

  • Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
    • 优点:信息全面,不会有缺失。并且实现也相对简单
    • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
  • 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
    • 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
    • 缺点:如果算法不精准,可能起到反作用
      本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:

采用Timeline的模式:

  • 拉模式
  • 推模式
  • 推拉结合

拉模式:也叫做读扩散

该模式的核心含义就是:当张三和李四和王五发了消息后,都会保存在自己的邮箱中,假设赵六要读取信息,那么他会从读取他自己的收件箱,此时系统会从他关注的人群中,把他关注人的信息全部都进行拉取,然后在进行排序

优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。

缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。

推模式:也叫做写扩散。

推模式是没有写邮箱的,当张三写了一个内容,此时会主动的把张三写的内容发送到他的粉丝收件箱中去,假设此时李四再来读取,就不用再去临时拉取了

优点:时效快,不用临时拉取

缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去

推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。

推拉模式是一个折中的方案,站在发件人这一段,如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力,如果是大V,那么他是直接将数据先写入到一份到发件箱里边去,然后再直接写一份到活跃粉丝收件箱里边去,现在站在收件人这端来看,如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来,而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息。

6.4 推送到粉丝收件箱

需求:

  • 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
  • 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
  • 查询收件箱数据时,可以实现分页查询

Feed流的滚动分页

我们需要记录每次操作的最后一条,然后从这个位置开始去读取数据

BlogServiceImpl

@Override
public Result saveBlog(Blog blog) {
    // 1. 获取登录用户
    UserDTO user = UserHolder.getUser();
    blog.setUserId(user.getId());
    // 2. 保存探店笔记
    boolean isSuccess = save(blog);
    if (!isSuccess) {
        return Result.fail("新增笔记失败!");
    }
    // 3. 查询笔记作者的所有粉丝 select * from tb_follow where follow_user_id = ?
    List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
    // 4. 推送笔记id给所有粉丝
    for (Follow follow : follows) {
        // 4.1 获取粉丝id
        Long userId = follow.getUserId();
        // 4.2 推送
        String key = FEED_KEY + userId;
        stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
    }
    // 5. 返回id
    return Result.ok(blog.getId());
}

核心的意思:就是我们在保存完探店笔记后,获得到当前笔记的粉丝,然后把数据推送到粉丝的redis中去。

**滚动分页查询参数: **

ZREVRANGEBYSCORE key Max Min LIMIT offset count

namevalue
max当前时间戳
min0 (最小时间戳)
offset0
count根据与前端的约定, 此处定为2
  • offset: 表示从Max向后数offset个开始作为查询count中的第一个

6.5 实现分页查询收件箱

需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:

具体操作如下:

1、每次查询完成后,我们要分析出查询出数据的最小时间戳,这个值会作为下一次查询的条件

2、我们需要找到与上一次查询相同的查询个数作为偏移量,下次查询时,跳过这些查询过的数据,拿到我们需要的数据

综上:我们的请求参数中就需要携带 lastId:上一次查询的最小时间戳 和偏移量这两个参数。

这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台。

  1. 定义出具体的返回实体类
@Data
public class ScrollResult {
    private List<?> list;
    private Long minTime;
    private Integer offset;
}
  1. BlogController
@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset) {
    return blogService.queryBlogOfFollow(max,offset);
}
  1. BlogServiceImpl
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
    // 1. 获取当前用户
    Long userId = UserHolder.getUser().getId();

    // 2. 查询收件箱 ZREVRANGEBYSCORE key MAX MIN LIMIT offset count
    String key = FEED_KEY + userId;
    Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
            .reverseRangeByScoreWithScores(key, 0, max, offset, 2);

    // 3. 非空判断
    if (typedTuples == null || typedTuples.isEmpty()) {
        return Result.ok();
    }

    // 4. 解析数据: blogId minTime(时间戳) offset
    List<Long> ids = new ArrayList<>(typedTuples.size());
    long minTime = 0;
    int os = 1; // 偏移量, 每一个时间戳一定有一个数值
    for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
        // 4.1 获取id
        ids.add(Long.valueOf(tuple.getValue()));
        // 4.2 获取分数(时间戳)
        long time = tuple.getScore().longValue();
        if (time == minTime) {
            os++;
        }else {
            minTime = time;
            os = 1; // 重置时间戳
        }
    }

    // 5. 根据id查询blog
    String idStr = StrUtil.join(",", ids);
    List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
    for (Blog blog : blogs) { // 每一个博客都要做的事
        // 5.1 查询blog有关的用户
        queryBlogUser(blog);
        // 5.2 查询blog是否被点赞了
        isBlogLiked(blog);
    }

    // 6. 封装并返回
    ScrollResult r = new ScrollResult();
    r.setList(blogs);
    r.setOffset(os);
    r.setMinTime(minTime);
    return Result.ok(r);
}

7、附近商户

7.1 GEO数据结构的基本用法

GEO就是Geolocation的简写形式,代表地理坐标。

  • GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
  • GEODIST:计算指定的两个点之间的距离并返回
  • GEOHASH:将指定member的坐标转为hash字符串形式并返回
  • GEOPOS:返回指定member的坐标
  • GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
  • GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
  • GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能

7.2 导入店铺数据到GEO

具体场景说明:

我们要做的事情是:将数据库表中的数据导入到redis中去,redis中的GEO,GEO在redis中就一个member和一个经纬度,我们把x和y轴传入到redis做的经纬度位置去,把id作为member

代码:

HmDianPingApplicationTests

@Test
public void loadShopData() throws Exception{
    // 1.查询店铺信息
    List<Shop> list = shopService.list();
    // 2. 把店铺分组,按照typeId分组, id一致的放到一个集合
    Map<Long,List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
    // 3. 分批完成写入Redis
    for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
        // 3.1 获取类型id
        Long typeId = entry.getKey();
        String key = "shop:geo:" + typeId;

        // 3.2 获取同类型的店铺的集合
        List<Shop> value = entry.getValue();
        List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
        // 3.3 写入redis GEOADD key 经度 纬度 member
        for (Shop shop : value) {
            // stringRedisTemplate.opsForGeo().add(key,new Point(shop.getX(),shop.getY()), shop.getId().toString());
            locations.add(new RedisGeoCommands.GeoLocation<>(
                    shop.getId().toString(),new Point(shop.getX(),shop.getY()) // 第一个就是member
            ));
        }

        stringRedisTemplate.opsForGeo().add(key,locations);
    }
}

效果:

7.3 实现附近商户功能

SpringDataRedis的2.3.9版本并不支持Redis 6.2提供的GEOSEARCH命令,因此我们需要提示其版本,修改自己的POM

  1. 修改pom
<dependency>
    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-data-redis</artifactId>

    <exclusions>
        <exclusion>
            <artifactId>spring-data-redis</artifactId>

            <groupId>org.springframework.data</groupId>

        </exclusion>

        <exclusion>
            <artifactId>lettuce-core</artifactId>

            <groupId>io.lettuce</groupId>

        </exclusion>

    </exclusions>

</dependency>


<dependency>
    <groupId>org.springframework.data</groupId>

    <artifactId>spring-data-redis</artifactId>

    <version>2.6.2</version>

</dependency>


<dependency>
    <groupId>io.lettuce</groupId>

    <artifactId>lettuce-core</artifactId>

    <version>6.1.6.RELEASE</version>

</dependency>

  1. ShopController
/**
 * 根据商铺类型分页查询商铺信息
 * @param typeId 商铺类型
 * @param current 页码
 * @return 商铺列表
 */
@GetMapping("/of/type")
public Result queryShopByType(
        @RequestParam("typeId") Integer typeId,
        @RequestParam(value = "current", defaultValue = "1") Integer current,
        @RequestParam(value = "x",required = false) Double x,
        @RequestParam(value = "y",required = false) Double y
) {
   return shopService.queryShopByType(typeId,current,x,y);
}
  1. ShopServiceImpl
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
    // 1. 判断是否需要根据坐标查询
    if (x == null || y == null) {
        // 不需要坐标查询, 按数据库查询
        Page<Shop> pages = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
    }

    // 2. 计算分页参数
    int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
    int end = current * SystemConstants.DEFAULT_PAGE_SIZE;

    // 3. 查询redis, 按照距离排序, 分页, 结果: shopId , distance
    String key = SHOP_GEO_KEY + typeId;
    GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDIATANCE
            .search(
                    key,
                    GeoReference.fromCoordinate(x, y),
                    new Distance(5000), // 5000m以内, 默认m, 可以指定其他单位
                    RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end) // 包含距离, 查到end
            );

    // 4. 解析出id
    if (results == null) {
        return Result.ok(Collections.emptyList());
    }
    List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
    if (list.size() <= from) {
        // 没有下一页, 结束
        return Result.ok(Collections.emptyList());
    }


    // 4.1 截取from - end的部分
    List<Long> ids = new ArrayList<>(list.size()); // 收集id, 用于查询shops
    Map<String,Distance> distanceMap = new HashMap<>(list.size()); // 收集距离

    list.stream().skip(from).forEach(result -> {
        // 4.2 获取店铺id
        String shopIdStr = result.getContent().getName();

        ids.add(Long.valueOf(shopIdStr));// 收集id
        // 4.3 获取距离
        Distance distance = result.getDistance();

        distanceMap.put(shopIdStr,distance);// 收集距离
    });

    // 5. 根据id查询shop
    String idStr = StrUtil.join(",", ids);
    List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
    // 5.1 将距离封装进shop
    for (Shop shop : shops) {
        shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
    }

    // 6. 返回
    return Result.ok(shops);
}

8、用户签到

8.1 BitMap功能演示

我们按月来统计用户签到信息,签到记录为1,未签到则记录为0.

把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)。这样我们就用极小的空间,来实现了大量数据的表示

Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位。

BitMap的操作命令有:

  • SETBIT:向指定位置(offset)存入一个0或1
  • GETBIT :获取指定位置(offset)的bit值
  • BITCOUNT :统计BitMap中值为1的bit位的数量
  • BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
  • BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
  • BITOP :将多个BitMap的结果做位运算(与 、或、异或)
  • BITPOS :查找bit数组中指定范围内第一个0或1出现的位置

8.2 实现签到功能

需求:实现签到接口,将当前用户当天签到信息保存到Redis中

思路:我们可以把年和月作为bitMap的key,然后保存到一个bitMap中,每次签到就到对应的位上把数字从0变成1,只要对应是1,就表明说明这一天已经签到了,反之则没有签到。

代码

UserController

@PostMapping("/sign")
public Result sign(){
    return userService.sign();
}

UserServiceImpl

@Override
public Result sign() {
    // 1. 获取当前登录用户
    Long userId = UserHolder.getUser().getId();
    // 2. 获取日期
    LocalDateTime now = LocalDateTime.now();
    String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
    // 3. 生成key
    String key = USER_SIGN_KEY + userId + keySuffix;
    // 4. 获取当天是本月的第几天
    int dayOfMonth = now.getDayOfMonth();
    // 5. 写入Redis
    stringRedisTemplate.opsForValue().setBit(key,dayOfMonth - 1,true);
    return Result.ok();
}

8.3 统计连续签到天数

**连续签到天数: **

从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。

得到本月到今天为止的所有签到数据:

BITFIELD key GET u[dayOfMonth] 0 ( dayOfMonth: 总共拿的签到天数, 0: 起始角标 )

假设今天是10号,那么我们就可以从当前月的第一天开始,获得到当前这一天的位数,是10号,那么就是10位,去拿这段时间的数据,就能拿到所有的数据了,那么这10天里边签到了多少次呢?统计有多少个1即可

从后向前遍历每个bit位:

让得到的10进制数字和1做与运算,每与一次,就把签到结果向右移动一位,依次类推,就能完成逐个遍历的效果了。

9、UV统计

9.1 HyperLogLog

  • UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
  • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。

9.2 测试百万数据的统计

测试思路:我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何

@Test
public void testHyperLogLog() throws Exception{
    String[] values = new String[1000];
    int j = 0;
    for (int i = 0; i < 1000000; i++) {
        j = i % 1000; // 0 ~ 999
        values[j] = "user_" + i;
        if (j == 999) {
            // 发送到Redis
            stringRedisTemplate.opsForHyperLogLog().add("hl2",values);
            // 分批多次写入,每次写1000条数据
        }
    }
    // 统计数量
    Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
    System.out.println("count = " + count);
}

经过测试:我们会发生他的误差是在允许范围内,并且内存占用极小

初始:

used_memory:2162472
used_memory_human:2.06M
...
测试后:

used_memory:2176688
used_memory_human:2.08M
...

标签:实战篇,缓存,return,redis,Redis,stringRedisTemplate,key,id
From: https://blog.csdn.net/kiddkid/article/details/142601655

相关文章

  • 【洛谷】AT_abc178_d [ABC178D] Redistribution 的题解
    【洛谷】AT_abc178_d[ABC178D]Redistribution的题解洛谷传送门AT传送门题解一个水水的动态规划,阿巴巴巴。题目大概是这样:给定一个正整数SSS,问有多少个数满足以......
  • Redis
    1.名词解释惰性删除:redis中设置了缓存过期时间,通过算法进行随机删除键值,如果某些键值过期后没有被删除,但是遇到查询请求,则会立即删除的这种方式叫做惰性删除(这种方式是被动式触发的,不查询就不会发生。)缓存穿透:查询的数据不存在时,请求会直接请求数据库,不会通过redis的缓存,此现象......
  • redis集群增加减少节点
    redis集群故障切换方案步骤:1、增加临时节点并加入集群2、将故障节点槽位移动到新节点3、剔除故障节点4、备份故障节点配置下架更换5、更换后按照1-3步骤将临时节点剔除,将原故障节点重新加入集群#测试环境为3主3从,有5个keyredis-cli-c-p8102-h172.17.0.89-a123456d......
  • Redis常见面试题
    过期删除策略删除达到过期时间的key。1)定时删除对于每一个设置了过期时间的key都会创建一个定时器,一旦到达过期时间就立即删除。该策略可以立即清除过期的数据,对内存较友好,但是缺点是占用了大量的CPU资源去处理过期的数据,会影响Redis的吞吐量和响应时间。2)惰性删除当......
  • 【实战篇】为什么临时表可以重名?
    背景在上一篇文章中,我们在优化join查询的时候使用到了临时表。当时,我们是这么用的:createtemporarytabletemp_tliket1;altertabletemp_taddindex(b);insertintotemp_tselect*fromt2whereb>=1andb<=2000;select*fromt1jointemp_ton(t1.b=tem......
  • redis有序集合多字段排序
    首先,redis有序集合本身是不支持多字段排序的例如ZADDusers25AliceZADDusers25BobZADDusers10Carol只能通过前面的分数这一个维度来实现,如果现在引入了另一个字段,可以在分数值(利用阿拉伯数字)上做手脚例如,时间维度2023-01-012023-01-022023-01-03这......
  • 使用 Redis 记录用户连续登录天数的方法及代码分享
    目录标题:使用Redis记录用户连续登录天数的方法及代码分享一、为什么不适合放在数据库中二、Redis的bitmap介绍三、存储方式及统计方法(一)以每天维度存储(二)以用户维度存储在本文中,我们将探讨如何使用Redis记录用户连续登录天数的问题。这是一个在面试中可能会遇到......
  • Redis缓存过期淘汰策略
    先来看下我们遇到的问题生产上redis内存需要设置多少才合适如何配置、修改redis的内存大小,具体怎么操作如果内存满了Redis怎么办redis清理内存的方式选择那种:定期删除和惰性删怎么选择redis缓存淘汰策略1.redis内存1.1查看Redis内存配置文件查看Redis最大占用内......
  • redis自身查询很慢 排查redis-benchmark
    redis-benchmark 是一个用于测试Redis性能的基准测试工具,可以帮助开发人员评估和比较Redis在不同配置或负载情况下的吞吐量和延迟。通过 redis-benchmark 的测试结果,你可以获得qps、平均延迟、错误率等性能指标,从而根据需要进行调优和优化,确保Redis在实际生产环境中具有良......
  • Docker容器启动Redis设置密码并持久化
    启动命令dockerrun--namewh-redis-p6379:6379-v/root/RedisData:/data-d--restartunless-stoppedredis--appendonlyyes--requirepass'Your-password'dockerrun:启动一个新的Docker容器。--namewh-redis:给容器指定一个名称,容器名为wh-redis。指定名......