首页 > 数据库 >redis学习

redis学习

时间:2023-05-13 20:24:04浏览次数:38  
标签:String redis id 学习 线程 服务器 节点

目录

1.redis常用的命令

a>String命令

b>Hash命令


c>list命令

D>Set命令

E>Sotedset命令


F>对value操作的命令(通用命令)

  • exists(key):确认一个key是否存在
  • del(key):删除一个key
  • type(key):返回值的类型
  • keys(pattern):返回满足给定pattern的所有key
  • randomkey:随机返回key空间的一个
  • keyrename(oldname, newname):重命名key
  • dbsize:返回当前数据库中key的数目
  • expire:设定一个key的活动时间(s)
  • ttl:获得一个key的活动时间
  • select(index):按索引查询
  • move(key, dbindex):移动当前数据库中的key到dbindex数据库
  • flushdb:删除当前选择数据库中的所有key
  • flushall:删除所有数据库中的所有key

2.redis在java中的连接

a>配置commons-pool依赖

(jieds和letuce底层都是采用的这个连接池)

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

b>配置yml文件连接redis

redis:
    host: 192.168.50.128
    port: 6379
    password: 123123
    lettuce:
      pool:
        max-active: 10
        max-idle: 10
        min-idle: 1
        time-between-eviction-runs: 10s

c>自定义序列化redis的键值

(redis默认键的序列化是objectOutputStream将Java对象序列化为字节保存到redis中(直观感受就是String字符串变成了一串字节),因此增大了存储空间,并且根据具体键查找查不到),下面这段代码设置key为String,值序列化成json形式.(springmvc里有jkson依赖)

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        // 创建redisTemplate队象
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        //设置连接工厂
        template.setConnectionFactory(connectionFactory);
        //创建Json序列化工具
        GenericJackson2JsonRedisSerializer generic = new GenericJackson2JsonRedisSerializer();
        //设置Key序列化
        template.setKeySerializer(RedisSerializer.string());
        template.setHashKeySerializer(RedisSerializer.string());
        //设置Value序列化
        template.setValueSerializer(generic);
        template.setHashValueSerializer(generic);
        //返回
        return template;
    }
}

3.序列化器改进

由于上面的自定义jkson在存储值中有一个class路径用于自动反序列化成String,因此会消耗大量的内存。所以不要使用这种方法,统一使用S厅序列化器存储的key和value都是String类型,如果需要存储java对象时,使用json工具比如fastjson2手动完成对象的序列化和反序列化。(StringRedisTemplate满足了这个要求)

代码示列;

4.session改为Redis

a>session模式

每个tomcat请求都会携带一个sessionId(在cookie中),根据这个sessionId获取session中的值,但是如果将服务器拆成多个tomcat则那么session也会分成多个,sessionId就会不一样,无法正确获取session中的值。因此用redis数据库从服务器中获取值。(session有效期是30分钟),每个项目有多个controller接口的,因此每个接口中都需要对session进行判断,所以使用拦截器,将登录信息验证配置到拦截器中,拦截器获取到的用户信息需要给到controller层,由此引入threadlocal来存储用户信息。

用户每次访问tomcat都是一个独立的线程,用threadlocal可以起到隔离的作用,threadlocal可以开辟一个内存空间保存对应的用户信息(保存变量的副本),互不干扰。

session.setAttribute("user",code);
session.getAttribute("user");
UserServiceImpl类中实现;
    
@Override
public Result sendCode(String phone, HttpSession session) {
    //1.校验手机号:利用util下RegexUtils进行正则验证
    if(RegexUtils.isPhoneInvalid(phone)){
        return Result.fail("手机号格式不正确!");
    }
    //2.生成验证码:导入hutool依赖,内有RandomUtil
    String code = RandomUtil.randomNumbers(6);
    //3.保存验证码到session
    session.setAttribute("code",code);
    //4.发送验证码
    log.info("验证码为: " + code);
    log.debug("发送短信验证码成功!");

    return Result.ok();

}

login:
    
    @Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
    //1.校验手机号
    String phone = loginForm.getPhone();
    if(RegexUtils.isPhoneInvalid(phone)){
     return Result.fail("手机号格式错误!");
    }
    //2.校验验证码
    Object cacheCode = session.getAttribute("code");
    String code = loginForm.getCode();
    if(code==null||!cacheCode.toString().equals(code)){
        //3.不一致,报错
        return Result.fail("验证码错误!");
    }
     //4.一致,根据手机号查询用户(需要写对应的单表查询方法:select * from tb_user where phone = #{phone})
    User user = query().eq("phone", phone).one();
    if(user==null){
        //5.注册用户
        user.setPhone(phone);
        user.setNickName("user_"+RandomUtil.randomString(10));
        //保存用户
        save(user);
    }
    //6.存入session
    session.setAttribute("user",user);
    return Result.ok();
}

 登录验证功能
 
 //6.存入session
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));


拦截器

public class LoginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取session
        HttpSession session = request.getSession();
        //2.获取session中的用户
        Object user = session.getAttribute("user");
        //3.判断用户是否存在. 不存在:拦截;存在:放入ThreadLocal,放行(写了ThreadLocal的封装工具类UserHolder)
        if(user==null){
            response.setStatus(401);
            response.getWriter().write("用户未登录!");
            return false;
        }
        UserHolder.saveUser((UserDTO) user);
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        //移除用户
        UserHolder.removeUser();
    }
}

添加拦截器

@Configuration
public class MvcConfig implements WebMvcConfigurer {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns(
                        "/user/code",
                        "/user/login",
                        "/blog/hot",
                        "/shop/**",
                        "/shop-type/**",
                        "/upload/**",
                        "/voucher/**"
                );
    }
}

b>Redis实现短信登陆

redis实现session1

redis存储两个业务

手机号:手机号作为键,验证码作为值,(用于验证验证码是否正确)在业务层配置(别忘了把token返回给前端)

Token:token作为键,用户信息作为值,(用于登录令牌放行页面)在拦截器中配置

常量类:

public class RedisConstants {
    public static final String LOGIN_CODE_KEY = "login:code:";
    public static final Long LOGIN_CODE_TTL = 2L;
    public static final String LOGIN_USER_KEY = "login:token:";
    public static final Long LOGIN_USER_TTL = 36000L;

    public static final Long CACHE_NULL_TTL = 2L;

    public static final Long CACHE_SHOP_TTL = 30L;
    public static final String CACHE_SHOP_KEY = "cache:shop:";

    public static final String LOCK_SHOP_KEY = "lock:shop:";
    public static final Long LOCK_SHOP_TTL = 10L;

    public static final String SECKILL_STOCK_KEY = "seckill:stock:";
    public static final String BLOG_LIKED_KEY = "blog:liked:";
    public static final String FEED_KEY = "feed:";
    public static final String SHOP_GEO_KEY = "shop:geo:";
    public static final String USER_SIGN_KEY = "sign:";
}

Sendcode修改

//3.保存验证码到Redis
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY +phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);//有效期2mins

login配置

@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public Result sendCode(String phone, HttpSession session) {
        //1.校验手机号:利用util下RegexUtils进行正则验证
        if(RegexUtils.isPhoneInvalid(phone)){
            return Result.fail("手机号格式不正确!");
        }
        //2.生成验证码:导入hutool依赖,内有RandomUtil
        String code = RandomUtil.randomNumbers(6);
        //3.保存验证码到Redis
        stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY +phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);//有效期2mins
        //4.发送验证码
        log.info("验证码为: " + code);
        log.debug("发送短信验证码成功!");

        return Result.ok();

    }

    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        //1.校验手机号
        String phone = loginForm.getPhone();
        if(RegexUtils.isPhoneInvalid(phone)){
         return Result.fail("手机号格式错误!");
        }
        //2.从Redis中获取验证码
        String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone);
        String code = loginForm.getCode();
        if(cacheCode==null||!cacheCode.equals(code)){
            //3.不一致,报错
            return Result.fail("验证码错误!");
        }
         //4.一致,根据手机号查询用户(需要写对应的单表查询方法:select * from tb_user where phone = #{phone})
        User user = query().eq("phone", phone).one();
        if(user==null){
            //5.注册用户
            User newUser = new User();
            newUser.setPhone(phone);
            newUser.setNickName("user_"+RandomUtil.randomString(10));
            save(newUser);
            user = newUser;
        }
        //6.保存用户到Redis
            //(1)生成token
            String token = UUID.randomUUID().toString(true);//hutools
            //(2)User转为HashMap存储
        UserDTO userDTO = BeanUtil.copyProperties(user,UserDTO.class);
        HashMap<Object, Object> userMap = new HashMap<>();
        userMap.put("id", userDTO.getId().toString());
        userMap.put("nickName", userDTO.getNickName());
        userMap.put("icon", userDTO.getIcon());
            //(3)存储到Redis
            String tokenKey = LOGIN_USER_KEY + token;
            stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);

            //(4) 设置有效期
            stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);

        return Result.ok(token);
    }
}

@Configuration
public class MvcConfig implements WebMvcConfigurer {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
                .excludePathPatterns(
                        "/user/code",
                        "/user/login",
                        "/blog/hot",
                        "/shop/**",
                        "/shop-type/**",
                        "/upload/**",
                        "/voucher/**"
                );
    }
}

loginInterceptor:

public class LoginInterceptor implements HandlerInterceptor{

    private final StringRedisTemplate stringRedisTemplate;

    public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取请求头中的token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)){
            //不存在,拦截 设置响应状态吗为401(未授权)
            response.setStatus(401);
            return false;
        }
        //2.基于token获取redis中用户
        String key=RedisConstants.LOGIN_USER_KEY + token;
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        //3.判断用户是否存在
        if (userMap.isEmpty()){
            //4.不存在则拦截,设置响应状态吗为401(未授权)
            response.setStatus(401);
            return false;
        }
        //5.将查询到的Hash数据转化为UserDTO对象
        UserDTO userDTO=new UserDTO();
        BeanUtil.fillBeanWithMap(userMap,userDTO, false);
        //6.保存用户信息到ThreadLocal
        UserHolder.saveUser(userDTO);
        //7.更新token的有效时间,只要用户还在访问我们就需要更新token的存活时间
        stringRedisTemplate.expire(key, RedisConstants.LOGIN_USER_TTL, TimeUnit.SECONDS);
        //8.放行
        return true;
    }


    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        //销毁,以免内存泄漏
        UserHolder.removeUser();
    }
}

利用随机生成的Token作为键,用户信息作为值,用户每次访问tomcat都会携带token从而去redis中获取数据,前提是我们需要将token返回给前端,前端用拦截器处理每次发送请求时都会携带一个token参数(思考:为什么不使用手机号作为token键?,因为手机号会随着前端返给浏览器产生安全隐患)

(1)登录拦截

配置一个拦截器在里面1.判断token有没有数据,没有则返回false,进行页面的拦截,String token=request.getHeader("authorization");2.根据token查询redis有没有用户信息,没有则进行拦截(意味着没有登录)。3.有就将用户信息保存到threadlocal中(既然用到redis那么controller为什么不直接从redis中获取信息,个人理解考虑到线程的原因以及减少对redis的一个交互)4.对redis中token进行有效期的更新(保证只要用户在操作token有效期就会跟新)

部分用法:

用String类型的set可以直接设置过期时间,用hash类型的putAll需要另起一行expire设置过期时间。

c>登录拦截器优化

上面的登录拦截刷新时间有bug

因为拦截器中部分页面不需要登陆信息永远保持放开,因此操作这部分页面的时候是没有经过拦截的因此就不会进行刷新时间。

登录拦截优化

解决方法:再配置一个拦截器优先级作为第二个专门用来拦截登陆页面的,更新时间的拦截器优先级设置为第一个,放开所有页面(返回true),不过还是先判断token是不是空,redis是不是有用户信息,但是判断内部都返回的true,从而结束此方法。如果不为空有值则将信息存储到threadlocal中并且跟新刷新时间,不要忘记在afterCompetion方法中将threadlocal移除,防止内存泄漏。

5.redis缓存更新策略

a>更新策略三种方式

缓存更新1

b>主动更新的三种方式

缓存更新2

01:自己写编码,可控性高

02:调用已经写好的服务来处理(市面上不多)

03:需要实时监控缓存中的数据更新,保证最终一致性,但是不能保证实时的一致性。

那么01编写编码的解决方案:

缓存更新3

缓存更新4

由上图发现先操作数据库然后写入redis缓存,如果在此期间有更新的业务就会先跟新数据库然后删除缓存(因为图1更新数据库很慢,中间时间很容易让别的进程有机可乘。图二写入缓存很快,发生中间别的进程插入的机会很少)(跟新数据和删除缓存需要添加一个事务

6.缓存穿透

概念:缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这些请求就会一直去数据库中拿数据。(如果访问请求过大会导致数据库崩溃)

a>解决方式

缓存空对象

注意:

null不指向任何对象,相当于没有任何值;而""代表一个长度为0的字符串
null不分配内存空间;而""会分配内存空间

解释:

(将数据库中不存在的数据缓在redis种存储空值,以后直接从redis中获取数据只不过获取的是""空字符串值)

优点:实现简单,维护方便

缺点:内存消耗,(可以设置有效期)可能造成的短期的不一致

思想:

缓存穿透1

image-20221112172025850

布隆过滤:

解释:

在客户端和redis之间配置一个布隆过滤器,不存在就直接拒绝访问,存在则放行(但是布隆显示存在的数据不一定在数据库中真的存在,因此有一定的缓存穿透的风险)。

原理:bit数组,把数据库中需要查询的数据基于hash算法计算出hash值再将这些hash值转换为二进制位保存到布隆过滤器。

优点:内存占用少,没有多余的key

缺点:实现复杂,存在误判的可能

加强用户权限校验

做好热点参数的限流(Spring cloud)

做好数据的基础格式校验

增强id的复杂度

7.缓存雪崩

缓存雪崩1

8.缓存击穿

缓存击穿1

a>解决方式

缓存击穿3

缓存击穿2

互斥锁:

只有获取到互斥锁的线程才能重建缓存数据,另一个线程获取锁失败发现锁被另一个线程拿到了就会重试不断循环,直到有线程完成缓存的构建。(所有的线程都会等待,直到完成构建,性能较差)

缓存击穿4

互斥锁的实现方式(setIfAbsent就是redis命令中的setnx方法)

自定义lock方法
Boolean flag=stringRedisTemplate.opsForValue().setIfAbsent(key,"1",50L,TimeUnit.MINUTES);

自定义unlock方法
    stringRedisTemplate.opsForValue().delete(key);

缓存击穿中的互斥锁方法可以结合缓存穿透一起用

代码:

/**
 * 获取互斥锁
 */
private boolean tryLock(String key) {
    Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", TTL_TEN, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(flag);
}

/**
 * 释放互斥锁
 */
private void unLock(String key) {
    redisTemplate.delete(key);
}

一次请求的过程

1、请求打进来,先去 Redis 中查,未命中;

2、获取互斥锁:将一个 Key 为 LOCK_SHOP_KEY + id 的数据写入 Redis 中,此时其他线程就无法拿到这个 Key,3、也就无法继续后续操作;

4、获取失败就进行休眠,休眠结束后通过递归再次请求;

5、获取成功,查询数据库、将需要查询的那个数据写入 Redis;

6、最后,删除通过 setnx 创建的那个 Key。

需求:修改根据id查询店铺的业务(互斥锁方式解决

/**互斥锁实现解决缓存击穿**/
public Shop queryWithMutex(Long id){
    //1.从Redis内查询商品缓存
    String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
    if(StrUtil.isNotBlank(shopJson)){
        //手动反序列化
        return JSONUtil.toBean(shopJson, Shop.class);
    }
    //如果上面的判断不对,那么就是我们设置的""(有缓存"",证明数据库内肯定是没有的)或者null(没有缓存)
    //判断命中的是否时空值
    if(shopJson!=null){//
        return null;
    }

    //a.实现缓存重建
    //a.1 获取互斥锁
    String lockKey = LOCK_SHOP_KEY + id;
    Shop shop = null;
    try {
        boolean hasLock = tryLock(lockKey);
        //a.2 判断是否获取到,获取到:根据id查数据库 获取不到:休眠
        if(!hasLock){
            Thread.sleep(50);
            return queryWithMutex(id);
        }

        //2.不存在就根据id查询数据库
        shop = getById(id);
        //模拟重建的延时
        Thread.sleep(200);
        if(shop==null){
            stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
            return null;
        }
        //3.数据库数据写入Redis
        //手动序列化
        String shopStr = JSONUtil.toJsonStr(shop);
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,shopStr,CACHE_SHOP_TTL, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        //释放互斥锁
        unlock(lockKey);
    }

    return shop;
}

@Override
public Result querygetById(Long id) {
    //缓存穿透
    //Shop shop = queryWithPassThrough(id);

    //互斥锁解决缓存击穿
    Shop shop = queryWithMutex(id);
    if(shop==null) return Result.fail("店铺不存在!");

    return Result.ok(shop);
}

逻辑过期:

之前产生缓存击穿的最大问题就是tt失效了,因此不设置ttl,存储数据的时候添加一个逻辑时间键值对,在业务代码中判断有没有过期。

某个线程查询缓存发现逻辑时间已经过期,于是获取互斥锁成功,这时候会开启一个新的线程来查询数据库重建缓存,新的线程重置逻辑过期时间,然后释放锁,在释放锁期间,原来的线程任然返回一个过期的数据,不会等待,其他线程也是,发现拿不到互斥锁会直接返回一个旧数据,不用等待。

缓存击穿5

根据逻辑过期时间来解决击穿是一开始就将高并发访问的信息放到redis中,并且设置一个过期时间。可以自定义一个类,此类中存放访问的数据信息,以及一个有效期限(数据信息可以采取Object的类型方式,也可以采用泛型类指定传入的类型,当然也可以继承原来的类),因此如果没有在redis中查询到信息,就直接返回空,不需要做穿透问题。不要忘记此处是手动开辟一个线程来重建缓存的。

相比互斥锁逻辑过期用的是新开辟了一个线程,异步执行,当前线程会返回一个原来的数据。不会等待。缺点,只能适用于实时性不高的业务。

疑问?为什么过期时间设置这么短?

1、保证解决并发问题的同时保证数据库一致性,数据库中可能已经更新了,redis中时间设置过长,导致长时间不能跟新新数据。

代码:

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);//开启10个线程

/**逻辑过期实现解决缓存击穿**/
public Shop queryWithLogical(Long id){
    //1.从Redis内查询商品缓存
    String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
    //2.判断是否存在
    if(StrUtil.isBlank(shopJson)){
        return null;
    }
    //3.命中,需要先把json反序列化为对象
    RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
    JSONObject data = (JSONObject) redisData.getData();
    Shop shop = JSONUtil.toBean(data, Shop.class);

    //4.判断是否过期
    LocalDateTime expireTime = redisData.getExpireTime();
    if(expireTime.isAfter(LocalDateTime.now())){
        //未过期直接返回
        return shop;
    }
        //5.过期的话需要缓存重建
    //5.1 获取互斥锁
    String lockKey = LOCK_SHOP_KEY + id;
    boolean hasLock = tryLock(lockKey);
    //5.2判断是否获取到,获取到:根据id查数据库 获取不到:休眠
   if(hasLock){
       //成功就开启独立线程,实现缓存重建, 这里的话用线程池
       CACHE_REBUILD_EXECUTOR.submit(()->{
           try {
               //重建缓存
               this.saveShop2Redis(id,20L);
           } catch (Exception e) {
               throw new RuntimeException(e);
           }finally {
               //释放锁
               unlock(lockKey);
           }

       });

   }

    return shop;
}

/**缓存重建方法**/
public void saveShop2Redis(Long id,Long expireSeconds) throws InterruptedException {
    //1.查询店铺信息
    Shop shop = getById(id);
    Thread.sleep(200);
    //2.封装逻辑过期时间
    RedisData redisData = new RedisData();
    redisData.setData(shop);
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
    //3.写入Redis
    stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}

9.redis实现全局Id

时间戳+序列号,符号位1位,时间戳为31位,序列号为32位

时间戳为当前时间的秒数减去一个给定值的秒数

序列号是redis实现的一个自增长(会自动创建这个key)

//格式化时间
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy:MM:dd");
//格式化当前时间
String date = LocalDateTime.now().format(dateTimeFormatter);
long count = stringRedisTemplate.opsForValue().increment("icr" + keyPrefix + date);
//拼接,count为当前时间的秒数
timestamp<<32|count

10.乐观锁实现超卖(多个线程之间高并发)(MySQL数据库中)

a>超卖现象:(跟新数据时)

解释:

超卖现象指的是超出自身设置的售出数。 当出现商品的销售量大于实际的库存量的现象,成为“超卖”现象 。

原因:(都是出现在货物还剩1的时候,多个线程同时访问,但是此时所有线程查询到的库存都是1都是大于0的,而此时前一个线程执行完毕,当前线程还停留在查询的数据是1于是继续将库存减一,此时订单数量就会超过库存,库存也会变成负值。)

1.不同用户在读请求的时候,发现商品库存足够,然后同时发起请求,进行秒杀操作,减库存,导致库存减为负数。

2.同一个用户在有库存的时候,连续发出多个请求,两个请求同时存在,于是生成多个订单。

乐观锁:

1.版本号

多添加一个版本号,当商品库存减一的时候版本号就加一,当实现更新的操作时将当前商品的id值,以及当前版本号和查寻到的版本号是否一致当作条件。

乐观锁1

2.cas(stock相当于版本号,因为他也是有数值1的变化)

直接用库存来当作查询条件,将商品的id值以及当前stock和查询到的stock值是否相等作为条件(会导致成功率很低)改进:直接将跟新的条件改为id值和stock是否大于0(因为如果判断的是否相等的话,此时如果有大量的线程都不相等则都会失败,但是当条件改为stock>0的时候,其他线程只有在不大于0的时候才会失效)

//实现操作单个字段的mybatisPlus语句setSql
LambdaUpdateWrapper<SeckillVoucher> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(SeckillVoucher::getVoucherId,VoucherId).setSql("stock=stock-1");
seckillVoucherService.update(queryWrapper);

乐观锁2

悲观锁:实现简单,性能差

乐观锁:性能好,存在成功率低的问题

11.实现一人一单(一个用户的多线程,高并发)(数据库)

跟新数据时可以使用乐观锁,添加数据时可以使用悲观锁,

a>业务逻辑

一人一单1

重点

1.创建订单和扣减库存是一个事务

2.上面的业务不加锁会存在同一个用户之间线程的并发从而发生产生多个订单

3.加锁要锁的是当前用户对象,不可以用synchronized锁方法。锁方法则所有的线程包括其他用户都会等待锁释放

4.用户对象是用userId.toString()转换为字符串,但是这里有坑,toString()方法底层是new String的形式,因此每次发送请求时当前用户包括其他用户,都会新建一个对象,因此每次都锁的是一个新对象所以就没锁住,因此需要调用userId.toString().intern();这时候当调用toString()方法时先去常量池中寻找有没有这个值,有就直接拿取。

5.应该在另一个方法中调用这个扣减库存和创建订单的方法,因为需要锁住整个方法,其他线程才没有可机之乘

6.继5之后直接调用这个方法会发生Spring事务的失效,需要用代理对象调用这个方法,(用aop的代理对象)

添加aspectJweaver这个依赖,在启动类中配置@EnableAspectAutoProxy(exposeProxy=true),扩号中需要暴露代理对象,接口中需要添加这个方法(代理模式)

12.分布式锁(集群下的线程并发)

一个tomcat被一个jvm管理,一个jvm有一个锁监视器且只能监视当前tomcat的锁,所以多集群下原来的锁会失效

一人一单2

分布式锁:

分布式锁1

a>redis直接实现分布式锁

可重入锁;一个线程连续两次获取这个锁。redis直接实现分布式锁不支持可重入锁,因为比如一个方法中获取到了这个锁,有另一个方法也在尝试获取这个锁,如果在第一个方法中调用这个方法,由于第一个已经获取到了锁,因此这个被调用的方法锁就会获取失败

分布式锁2

原理;有一个计数器记录重入的次数,每重入一次就加一,每释放一次锁就减一

分布式锁3

(1)初级,只是定义一个锁,没有考虑到并发问题

自定义锁

用setIfAbsent()判断是否能够获取锁封装在tryLock方法中。

分布式锁4

(2)多线程情况下的误删锁解决

当线程1的任务阻塞时间过长导致锁自动超时释放了,线程2此时可以获取锁进行任务,这时候线程1执行完毕将锁释放了,释放的锁却是线程2的锁。

解决:判断一下线程的标识,是当前线程的锁才能释放

用uuid拼接线程的id防止线程id重复

分布式锁4

(3)多线程情况下的误删锁解决(判断完标示之后发生任务阻塞导致锁超时释放产生上面相同的问题)

解决:将判断表示和释放锁操作放在lua脚本中保证其原子性(redsi事务太复杂)

lua脚本:(return 0表示失败,return1表示成功)
if(redis.call('get',KEYS[1])==ARGV[1])then
    --释放锁
    return redis.call('del',KEY[1])
    end
return 0;

定义lua脚本语句

//泛型是返回值的类型
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static{
    UNLOCK_SCRIPT=new DefaultRedisScript<>();
    //设置lua路径
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    UNLOCK_SCRIPT.setResultType(Long.class);
}

调用lua脚本

public void unLock(){
    //第一个参数是script脚本,第二个参数是key的集合需要将key转换为集合,第三个值是值
    StringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(key),Idcurrent);
}

b>基于redis实现的分布式工具Redisson(以后使用这个不需要自定义锁)

Redisson提供了一系列的分布式java对象,还提供了许多分布式服务

(1)redission入门

Redission依赖:

<dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson</artifactId>
 <version>3.17.6</version>
</dependency>

Redisson客户端配置类

分布式锁5

注入Redisson依赖,使用Redisson锁

分布式锁6

(2)Redisson实现可重入锁

原来不用Redisson时存储的就是key值和线程id,此时用可重入锁,就需要多添加一个字段用来记录可重入的次数所以应该用hash结构来存储原理:有一个计数器记录重入的次数,每重入一次就加一,每释放一次锁就减一可重入锁1

用lua脚本来实现获取锁和释放锁的逻辑,保证原子性。

hexists: HEXISTS key field //用于hash

exists: EXISTS key [key ...] //用于通用

Redisson内部已经实现了lua脚本,下面是lua脚本的原理,直接调用Redisson相关Api即可。

可重入锁2

可重入锁3

Redisson可重入锁测试代码

 @Resource
    private RedissonClient redissonClient;
    private RLock lock;

    @BeforeEach
        // 创建 Lock 实例(可重入)
    void setUp() {
        lock = redissonClient.getLock("order");
    }

    @Test
    void methodOne() throws InterruptedException {
        boolean isLocked = lock.tryLock();
        log.info(lock.getName());
        if (!isLocked) {
            log.error("Fail To Get Lock~1");
            return;
        }
        try {
            log.info("Get Lock Successfully~1");
            methodTwo();
        } finally {
            log.info("Release Lock~1");
            lock.unlock();
        }
    }

    @Test
    void methodTwo() throws InterruptedException {
        boolean isLocked = lock.tryLock();
        if (!isLocked) {
            log.error("Fail To Get Lock!~2");
            return;
        }
        try {
            log.info("Get Lock Successfully!~2");
        } finally {
            log.info("Release Lock!~2");
            lock.unlock();
        }
    }

Redisson分布式锁底层源码实现流程

WatchDog默认30s释放时间

可重入锁5

(3)Redisson的multiLock连锁

用来解决分布式的主从一致性问题,判断所有节点中是不是都能获取到锁

主从一致1

主从一致2

主从一致3

13秒杀业务优化(异步完成下单)

优化秒杀1

优化秒杀2

优化秒杀3

a>新增优惠券的同时将优惠券信息保存到redis

image-20221118091749789

b>redis的lua脚本

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2用户id
local userId = ARGV[2]

-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:'..voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:'..voucherId

-- 3.脚本业务
-- 3.1判断库存是否充足
if (tonumber(redis.call('get', stockKey)) <= 0)
    then return 1
end
-- 3.2判断用户是否下单 sismember orderKey userId
if(redis.call('sismember',orderKey,userId)==1)
    then return 2
end
-- 3.3扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.4下单 sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0

513lua

c>优化后的完整秒杀代码

 	private static final DefaultRedisScript<Long> SECKI_SCRIPT;

    static{//写成静态代码块,类加载就可以完成初始定义,就不用每次释放锁都去加载这个,性能提高咯
        SECKI_SCRIPT = new DefaultRedisScript<>();
        SECKI_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));//设置脚本位置
        SECKI_SCRIPT.setResultType(Long.class);
    }

    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);//创建阻塞队列

    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//创建线程池

//将proxy设置为全局变量
Private IvocherOrderService proxy = proxy;

// 判断库存和进行一人一单判断后将信息放入阻塞队列
public Result seckillVoucher(Long voucherId) {
    //1.查询优惠卷
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        //2.判断秒杀是否开始,是否结束
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            return Result.fail("秒杀尚未开始!");
        }
        if(voucher.getEndTime().isBefore(LocalDateTime.now())){
            return Result.fail("秒杀已结束!");
        }
        //3.判断库存是否充足
        if(voucher.getStock()<=0){
            return Result.fail("优惠券库存不足!");
        }
    //获取当前用户
    Long userId = UserHolder.getUser().getId();
    //1.执行Lua脚本
    Long result = stringRedisTemplate.execute(
            SECKI_SCRIPT,
            Collections.emptyList(),//空List
            voucherId.toString(), userId.toString()
    );
    //2.判断结果是否0 是0就是成功,可下单,下单信息保存到阻塞队列
    if(result!=0){
        return Result.fail(result==1?"库存不足!":"不能重复下单!");
    }
    //生成订单id
    long orderId = redisIdWorker.nextId("order");
    //创建订单数据
    VoucherOrder voucherOrder = new VoucherOrder();
    voucherOrder.setUserId(userId);
    voucherOrder.setId(orderId);
    voucherOrder.setVoucherId(voucherId);
    //放入阻塞队列,当线程获取不到元素时会一直阻塞,直到获取到元素继续执行
    orderTasks.add(voucherOrder);
    //获取代理对象
    proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象
    //3.返回订单id
    return Result.ok(orderId);

}

// 类加载后就持续从阻塞队列取出订单信息

@PostConstruct
    private void init(){
        //开辟一个线程来下单
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable{

        @Override
        public void run() {
            while(true){
                try {
                    //1.获取订单中的队列消息
                    VoucherOrder voucherOrder = orderTasks.take();
                    //根据订单消息下单
                    handleVoucherOrder(voucherOrder);
                    //2.创建订单
                } catch (Exception e) {
                    log.error("处理订单异常:",e);
                }
            }
        }
    }

//异步下单,新开起的线程来下单创建订单(主要就是利用代理对象调用下面的创建订单方法),注意代理对象
private void handleVoucherOrder(VoucherOrder voucherOrder) {
    //理论上不需要加锁,在在lua脚本中判断用户是否下单时已经做了并发判断了,所以理论上不需要再加锁判断但是可以增加健壮性
        Long userId = voucherOrder.getUserId();//由于是子线程线程,所以不能直接去ThreadLocal取
        //创建锁对象
        //SimpleRedisLock lock = new SimpleRedisLock("order:" + userId,stringRedisTemplate);
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        //获取锁
        boolean hasLock = lock.tryLock( );
        if(!hasLock){
            //获取锁失败
            log.error("不允许重复下单!");
            return;
        }

        try {
            //代理对象改成全局变量
            // IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象,如果写在这里,proxy也是用的threadlocal会隔离线程,因此获取proxy应该放在主线程中实现
            proxy.createVoucherOrder(voucherOrder);//默认是this,我们要实现事务需要proxy
        } catch (IllegalStateException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }

    }

    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder){
        //查询订单看是否存在
        Long userId = UserHolder.getUser().getId();

        if (query().eq("user_id",userId).eq("voucher_id", voucherOrder.getUserId()).count()>0) {
            log.error("用户已经购买过一次!");
            return;
        }

        //4.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock -1")
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0)//where id = ? and stock >0 添加了乐观锁
                .update();

        if(!success){
            log.error("优惠券库存不足!");
            return;
        }

        //7.订单写入数据库
        save(voucherOrder);
    }

d>遗留问题

1.用的jdk阻塞队列来保存订单信息,使用的是jvm内存,如果在高并发的场景下,可能会导致内存溢出,和内存上限

2.内存保存的信息,如果服务器宕机,所有的订单信息都会丢失,后台没有订单数据。如果取出队列中的信息后发生异常没有执行,那么这个任务就丢失了。

因该用消息队列。

13.SotedSet使用

点赞需要排名以及唯一(适用)

image-20221120110940588

image-20221120111044374

513点赞1

513点赞2

代码;

@Override
public Result queryBlogLikes(Long id) {
    //1.查询top5的点赞用户   zrange key 0 4
    String key = BLOG_LIKED_KEY + id;
    Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
    if(top5==null||top5.isEmpty()){
        return Result.ok(Collections.emptyList());
    }
    //2.解析出useId,然后根据UserId查询到user,再转化为UserDto
    List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
    List<User> users = userService.listByIds(ids);
    List<UserDTO> userDTOS  =new ArrayList<>();
    for (User user : users) {
        UserDTO userDTO = new UserDTO();
        BeanUtils.copyProperties(user,userDTO);
        userDTOS.add(userDTO);
    }
    return Result.ok(userDTOS);
}

以上代码有个小bug:listByids(ids)在数据库中是where in ids(5,1),是不会按照给定的值的顺序进行查询的查询结果还是1在前5在后,因此需要在最后添加order by field (id,1,5)

上面代码不可以直接调用listByIds需要改进;

String idStr = StrUtil.join(",",ids);

userService.query().in("id",ids).last("order by field(id,"+idStr+")");

14.滚动分页实现查询收件箱(redis中存的字段是笔记的ID值score是时间戳 )

传统分页查询是根据角标查询的,不能准确的实现当前页所需要的数据(如果新增一条数据,那么查询下一页的时候会将新增的数据算进去导致查询会出现重复)

513分页查询1

根据分数的大小按从大到小取

513分页查询2

如果出现重复score还是会出现重复查询

513分页查询3

解决:

把limit后面的数字改为 上一次查询的最小值的重复数字的个数(偏移量)

案列:每一次查询都会携带lastid和offset(第一次查询时,前端会获取当前时间作为最大值传给后端,偏移量可以自定义为默认值1),我们要将offset以及lastId作为结果返回给前端

img

代码:

@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset){
    return blogService.queryBloyOfFollow(max,offset);
}

public Result queryBloyOfFollow(Long max, Integer offset) {
    //1.获取当前用户
    Long userId = UserHolder.getUser().getId();
    //2.查询当前用户收件箱 zrevrangebyscore key max min limit offset count
    String feedKey = FEED_KEY + userId;
    Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(feedKey, 0, max, offset, 2);
    if(typedTuples==null||typedTuples.isEmpty()){
        return Result.ok();
    }
    //3.解析出收件箱中的blogId,score(时间戳),offset
    List<Long> ids = new ArrayList<>(typedTuples.size());
    long minTime = 0;
    int count = 1;//最小时间的相同个数
    for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {
        //3.1 获取id
        ids.add(Long.valueOf(typedTuple.getValue()));//blog的id
        //3.2 获取分数(时间戳)
        long time = typedTuple.getScore().longValue();
        if(time == minTime){
            count++;
        }else{
            minTime = time;
            count=1;
        }
    }
    //4.根据blogId查找blog
    String idStr = StrUtil.join(",",ids);
    List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id, " + idStr + ")").list();

    for (Blog blog : blogs) {
        //4.1 查询blog有关的用户
        queryBlogUser(blog);
        //4.2 查询blog是否被点过赞
        isLikeBlog(blog);
    }

    //5.封装并返回
    ScrollResult r = new ScrollResult();
    r.setList(blogs);
    r.setOffset(count);
    r.setMinTime(minTime);
    return Result.ok(r);
}

15.分布式缓存

Redis持久化两种RDB以及AOF

a>RDB持久化

1、介绍
RDB,Redis数据备份文件,也叫Redis数据快照。就是把内存中的所有数据都记录到磁盘中,当redis实例故障重启后,从磁盘读取快照文件,恢复数据。

快照文件称RDB文件,默认保存在当前运行目录

2、命令
Redis 提供了两个命令来生成 RDB 文件,分别是 save 和 bgsave,他们的区别就在于是否在「主线程」里执行:

执行了 save 命令,就会在主线程生成 RDB 文件,由于和执行操作命令在同一个线程,所以如果写入 RDB 文件的时间太长,会阻塞主线程;
执行了 bgsave 命令,会创建一个子进程来生成 RDB 文件,这样可以避免主线程的阻塞;
Redis停机时会执行一次RDB

RDB 文件的加载工作是在服务器启动时自动执行的,Redis 并没有提供专门用于加载 RDB 文件的命令。

3、配置
Redis 还可以通过配置文件的选项来实现每隔一段时间自动执行一次 bgsave 命令,默认会提供以下配置:

save 900 1
save 300 10
save 60 10000
别看选项名叫 save,实际上执行的是 bgsave 命令,也就是会创建子进程来生成 RDB 快照文件。

bgsave的fork(复制页表)底层原理

bgsave开始时会fork主进程得到子进程,子进程共享主进程的内存数据。完成fork后读取内存数据并写入RDB文件,fork采用的是copy-on-wirte技术

操作redis时不是直接操作的物理内存,而是建立一个虚拟内存,虚拟内存存储的是和物理内存的映射关系表称作页表,执行 bgsave 命令的时候,会通过 fork() 创建子进程,此时子进程和父进程是共享同一片内存数据的,因为创建子进程的时候,会复制父进程的页表,而不需要复制内存的数据,但是页表指向的物理内存还是同一个。

当主进程执行读操作,访问共享内存,fork会将共享内存标记为read only,只能读不能写。

当主进程执行写操作,则会拷贝一份数据,执行写操作,主进程的页表读操作也转移到拷贝的副本上,只有在发生修改内存数据的情况时,物理内存才会被复制一份。

513fork底层原理1

513fork底层原理2

但是,如果主线程(父进程)要修改共享数据里的某一块数据(比如键值对 A)时,就会发生写时复制,于是这块数据的物理内存就会被复制一份(键值对 A'),然后主线程在这个数据副本(键值对 A')进行修改操作。与此同时,bgsave 子进程可以继续把原来的数据(键值对 A)写入到 RDB 文件。

发生了写时复制后,RDB 快照保存的是原本的内存数据

注意:

在 Redis 执行 RDB 持久化期间,刚 fork 时,主进程和子进程共享同一物理内存,但是途中主进程处理了写操作,修改了共享内存,于是当前被修改的数据的物理内存就会被复制一份。

那么极端情况下,如果所有的共享内存都被修改,则此时的内存占用是原先的 2 倍。

所以,针对写操作多的场景,我们要留意下快照过程中内存的变化,防止内存被占满了。

总结

这里提一点,Redis 的快照是全量快照,也就是说每次执行快照,都是把内存中的「所有数据」都记录到磁盘中。

所以可以认为,执行快照是一个比较重的操作,如果频率太频繁,可能会对 Redis 性能产生影响。如果频率太低,服务器故障时,丢失的数据会更多。

通常可能设置至少 5 分钟才保存一次快照,这时如果 Redis 出现宕机等情况,则意味着最多可能丢失 5 分钟数据。

这就是 RDB 快照的缺点,在服务器发生故障时,丢失的数据会比 AOF 持久化的方式更多,因为 RDB 快照是全量快照的方式,因此执行的频率不能太频繁,否则会影响 Redis 性能,而 AOF 日志可以以秒级的方式记录操作命令,所以丢失的数据就相对更少。

b>AOF持久化

1、介绍

AOF称为追加文件,redis处理的每个写命令都会记录在AOF文件,可以看做是命令日志文件。

注意只会记录写操作命令,读操作命令是不会被记录的

2、开启

默认是开启RDB快照方式,我们要在redis.conf配置文件中手动开启AOF

513AOF1

3、三种写回策略

513AOF2

4、AOF重写机制
因为是记录命令,AOF文件会比RDB文件大很多,而且AOF会记录对同一个key的多次修改,最后在删除,全部都是白记录,通过执行bgrewriteaof命令,可以让AOF文件执行重写功能,用最少的命令达到相同的效果。

Redis也可以在触发阈值时自动重写AOF文件,阈值在redis.conf中配置
513AOF3

5、混合持久化

将 RDB 和 AOF 合体使用,这个方法是在 Redis 4.0 提出的,该方法叫混合使用 AOF 日志和内存快照,也叫混合持久化。

如果想要开启混合持久化功能,可以在 Redis 配置文件将下面这个配置项设置成 yes:

aof-use-rdb-preamble yes

混合持久化工作在 AOF 日志重写过程。

当开启了混合持久化时,在 AOF 重写日志时,fork 出来的重写子进程会先将与主线程共享的内存数据以 RDB 方式写入到 AOF 文件,然后主线程处理的操作命令会被记录在重写缓冲区里,重写缓冲区里的增量命令会以 AOF 方式写入到 AOF 文件,写入完成后通知主进程将新的含有 RDB 格式和 AOF 格式的 AOF 文件替换旧的的 AOF 文件。

也就是说,使用了混合持久化,AOF 文件的前半部分是 RDB 格式的全量数据,后半部分是 AOF 格式的增量数据。

好处:

这样的好处在于,重启 Redis 加载数据的时候,由于前半部分是 RDB 内容,这样加载的时候速度会很快

加载完 RDB 的内容后,才会加载后半部分的 AOF 内容,这里的内容是 Redis 后台子进程重写 AOF 期间,主线程处理的操作命令,可以使得数据更少的丢失

6、总结(他们各有优缺点,如果对数据库安全性要求较高,在实际开发中往往两者*结合*使用)

513Aof4

c>Redis主从

(一).主从架构搭建

1、准备实例和配置
(1)创建目录

我们创建三个文件夹,名字分别叫7001、7002、7003:

进入/tmp目录

cd /tmp

创建目录

mkdir 7001 7002 7003

(2)拷贝配置文件到每个实例目录

然后将redis-6.2.4/redis.conf文件拷贝到三个目录中(在/tmp目录执行下列命令):

方式一:逐个拷贝

cp redis-6.2.4/redis.conf 7001
cp redis-6.2.4/redis.conf 7002
cp redis-6.2.4/redis.conf 7003

方式二:管道组合命令,一键拷贝

echo 7001 7002 7003 | xargs -t -n 1 cp redis-6.2.4/redis.conf

(3)修改每个实例的端口、工作目录

修改每个文件夹内的配置文件,将端口分别修改为7001、7002、7003,将rdb文件保存位置都修改为自己所在目录(在/tmp目录执行下列命令)

sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/tmp\/7001\//g' 7001/redis.conf
sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/7002\//g' 7002/redis.conf
sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/7003\//g' 7003/redis.conf

2、启动
为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

第1个

redis-server 7001/redis.conf

第2个

redis-server 7002/redis.conf

第3个

redis-server 7003/redis.conf

3、开启主从关系
现在三个实例还没有任何关系,要配置主从可以使用replicaof 或者slaveof(5.0以前)命令。

有临时和永久两种模式:

修改配置文件(永久生效)

在redis.conf中添加一行配置:slaveof

使用redis-cli客户端连接到redis服务,执行slaveof命令(重启后失效):

slaveof
这里我们为了演示方便,使用方式二。

通过redis-cli命令连接7002,执行下面命令:

连接 7002

redis-cli -p 7002

执行slaveof

slaveof 192.168.150.101 7001

通过redis-cli命令连接7003,执行下面命令:

连接 7003

redis-cli -p 7003

执行slaveof

slaveof 192.168.150.101 7001

然后连接 7001节点,查看集群状态:

连接 7001

redis-cli -p 7001

查看状态

info replication

(二).数据同步原理

1、全量同步

513全量同步1

全量同步只有第一次的时候才会发送

master如何判断slave是不是第一次来同步数据呢?

判断replid数据集的标记,id一致则说明是同一数据集,每一个master都有唯一的replid,slave则会继承master节点的replid,第一次来主会把id同步给从节点

offset偏移量,随着记录在repl——baklog缓存中的数据增多,slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新

因此slave做数据同步,必须向master声明自己的replication id和offset,master才可以判断到底需要同步哪些数据。

详细步骤:

第一阶段:建立链接、协商同步

执行了 replicaof 命令后,从服务器就会给主服务器发送 psync 命令,表示要进行数据同步。

psync 命令包含两个参数,分别是主服务器的 runID 和复制进度 offset。

runID,每个 Redis 服务器在启动时都会自动生产一个随机的 ID 来唯一标识自己。当从服务器和主服务器第一次同步时,因为不知道主服务器的 run ID,所以将其设置为 "?"。
offset,表示复制的进度,第一次同步时,其值为 -1。
主服务器收到 psync 命令后,会用 FULLRESYNC 作为响应命令返回给对方。

并且这个响应命令会带上两个参数:主服务器的 runID 和主服务器目前的复制进度 offset。从服务器收到响应后,会记录这两个值。

FULLRESYNC 响应命令的意图是采用全量复制的方式,也就是主服务器会把所有的数据都同步给从服务器。

所以,第一阶段的工作时为了全量复制做准备。

那具体怎么全量同步呀呢?我们可以往下看第二阶段。

第二阶段:主服务器同步数据给从服务器

接着,主服务器会执行 bgsave 命令来生成 RDB 文件,然后把文件发送给从服务器。

从服务器收到 RDB 文件后,会先清空当前的数据,然后载入 RDB 文件。

这里有一点要注意,主服务器生成 RDB 这个过程是不会阻塞主线程的,因为 bgsave 命令是产生了一个子进程来做生成 RDB 文件的工作,是异步工作的,这样 Redis 依然可以正常处理命令。

但是,这期间的写操作命令并没有记录到刚刚生成的 RDB 文件中,这时主从服务器间的数据就不一致了。

那么为了保证主从服务器的数据一致性,主服务器在下面这三个时间间隙中将收到的写操作命令,写入到 replication buffer 缓冲区里:

主服务器生成 RDB 文件期间;
主服务器发送 RDB 文件给从服务器期间;
「从服务器」加载 RDB 文件期间;
第三阶段:主服务器发送新写操作命令给从服务器

在主服务器生成的 RDB 文件发送完,从服务器收到 RDB 文件后,丢弃所有旧数据,将 RDB 数据载入到内存。完成 RDB 的载入后,会回复一个确认消息给主服务器。

接着,主服务器将 replication buffer 缓冲区里所记录的写操作命令发送给从服务器,从服务器执行来自主服务器 replication buffer 缓冲区里发来的命令,这时主从服务器的数据就一致了。

至此,主从服务器的第一次同步的工作就完成了。

2、命令传播

主从服务器在完成第一次同步后,双方之间就会维护一个 TCP 连接。

513tcp连接

后续主服务器可以通过这个连接继续将写操作命令传播给从服务器,然后从服务器执行该命令,使得与主服务器的数据库状态相同。

而且这个连接是长连接的,目的是避免频繁的 TCP 连接和断开带来的性能开销。

上面的这个过程被称为基于长连接的命令传播,通过这种方式来保证第一次同步后的主从服务器的数据一致性

3、增量同步

一般从节点重启之后会做增量同步,从节点突然断开了一段时间又不可能重新全量同步性能太低

513增量同步1

repl_baklog大小是有上限的,写满后会覆盖最早的数据,如果slave断开太久,导致未备份的数据被覆盖了,则无法基于log增量同步,只能再次全量同步

优化Redis主从:

在master中配置repl-diskless-sync yes启动无磁盘赋值,避免全量同步时的磁盘IO(全量同步写入RDB文件时候是写入磁盘的效率太低了,我们配置写入网络然后直接发给从)提高全量同步性能

Redis单节点上的内存占用不要太大,减少RDB导致的过多磁盘IO(不用写太多)提高全量同步性能角度

适当提高repl_baklog的大小,发现slave宕机时尽快实现故障恢复,尽可能避免全量同步

限制一个master的slave节点数量,如果实在太多slave,则可以采用主从从链式结构,减少master压力

增量详细步骤

从服务器在恢复网络后,会发送 psync 命令给主服务器,此时的 psync 命令里的 offset 参数不是 -1;

主服务器收到该命令后,然后用 CONTINUE 响应命令告诉从服务器接下来采用增量复制的方式同步数据;
然后主服务将主从服务器断线期间,所执行的写命令发送给从服务器,然后从服务器执行这些命令。
那么关键的问题来了,主服务器怎么知道要将哪些增量数据发送给从服务器呢?

答案藏在这两个东西里:

repl_backlog_buffer,是一个「环形」缓冲区,用于主从服务器断连后,从中找到差异的数据;
replication offset,标记上面那个缓冲区的同步进度,主从服务器都有各自的偏移量,主服务器使用 master_repl_offset 来记录自己「写」到的位置,从服务器使用 slave_repl_offset 来记录自己「读」到的位置。
那 repl_backlog_buffer 缓冲区是什么时候写入的呢?

在主服务器进行命令传播时,不仅会将写命令发送给从服务器,还会将写命令写入到 repl_backlog_buffer 缓冲区里,因此 这个缓冲区里会保存着最近传播的写命令。

网络断开后,当从服务器重新连上主服务器时,从服务器会通过 psync 命令将自己的复制偏移量 slave_repl_offset 发送给主服务器,主服务器根据自己的 master_repl_offset 和 slave_repl_offset 之间的差距,然后来决定对从服务器执行哪种同步操作:

如果判断出从服务器要读取的数据还在 repl_backlog_buffer 缓冲区里,那么主服务器将采用增量同步的方式;
相反,如果判断出从服务器要读取的数据已经不存在 repl_backlog_buffer 缓冲区里,那么主服务器将采用全量同步的方式。
当主服务器在 repl_backlog_buffer 中找到主从服务器差异(增量)的数据后,就会将增量的数据写入到 replication buffer 缓冲区,这个缓冲区我们前面也提到过,它是缓存将要传播给从服务器的命令。

repl_backlog_buffer 缓行缓冲区的默认大小是 1M,并且由于它是一个环形缓冲区,所以当缓冲区写满后,主服务器继续写入的话,就会覆盖之前的数据。因此,当主服务器的写入速度远超于从服务器的读取速度,缓冲区的数据一下就会被覆盖。

那么在网络恢复时,如果从服务器想读的数据已经被覆盖了,主服务器就会采用全量同步,这个方式比增量同步的性能损耗要大很多。

因此,为了避免在网络恢复时,主服务器频繁地使用全量同步的方式,我们应该调整下 repl_backlog_buffer 缓冲区大小,尽可能的大一些,减少出现从服务器要读取的数据被覆盖的概率,从而使得主服务器采用增量同步的方式。

主从复制中两个 Buffer(replication buffer 、repl backlog buffer)有什么区别?

replication buffer 、repl backlog buffer 区别如下:

出现的阶段不一样:
repl backlog buffer 是在增量复制阶段出现,一个主节点只分配一个 repl backlog buffer;
replication buffer 是在全量复制阶段和增量复制阶段都会出现,主节点会给每个新连接的从节点,分配一个 replication buffer;
这两个 Buffer 都有大小限制的,当缓冲区满了之后,发生的事情不一样:
当 repl backlog buffer 满了,因为是环形结构,会直接覆盖起始位置数据;
当 replication buffer 满了,会导致连接断开,删除缓存,从节点重新连接,重新开始全量复制。

d>哨兵模式

1、哨兵的作用
Redis提供了哨兵机制来实现主从集群的自动故障恢复

监控:sentinel会不断检查master和slave是否按照预期工作

自动故障恢复:如果master故障,sentinel会将一个slave变为master,当故障实例恢复后也以新的master为主

通知:sentinel充当redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给redis客户端

513服务状态监控

2、服务状态监控
sentinel基于心跳机制监测服务状态,每隔1秒向集群的每个实例发送ping命令:

主观下线:如果sentinel节点发现某实例未在规定时间响应,则认为改实例主观下线
客观下线:若超过指定数量(quorum)的sentinel都认为该实例主管下线,则该实例客观下线。quorum值最好超过sentinel实例数量的一半

513主观下线

之所以针对「主节点」设计「主观下线」和「客观下线」两个状态,是因为有可能「主节点」其实并没有故障,可能只是因为主节点的系统压力比较大或者网络发送了拥塞,导致主节点没有在规定时间内响应哨兵的 PING 命令。

所以,为了减少误判的情况,哨兵在部署的时候不会只部署一个节点,而是用多个节点部署成哨兵集群(最少需要三台机器来部署哨兵集群),通过多个哨兵节点一起判断,就可以避免单个哨兵因为自身网络状况不好,而误判主节点下线的情况

513主节点下线

当这个哨兵的赞同票数达到哨兵配置文件中的 quorum 配置项设定的值后,这时主节点就会被该哨兵标记为「客观下线」。

例如,现在有 3 个哨兵,quorum 配置的是 2,那么一个哨兵需要 2 张赞成票,就可以标记主节点为“客观下线”了。这 2 张赞成票包括哨兵自己的一张赞成票和另外两个哨兵的赞成票。

PS:quorum 的值一般设置为哨兵个数的二分之一加1,例如 3 个哨兵就设置 2。

哨兵判断完主节点客观下线后,哨兵就要开始在多个「从节点」中,选出一个从节点来做新主节点

选举新的master
一旦发现master故障,sentinel需要在salve中选择一个作为新的master,选择依据是这样的:

(1)首先会判断slave节点与master节点断开时间长短,如果超过指定值(down-after-milliseconds* 10) 则会排除该slave节点
(2)然后判断slave节点的slave-priority值,越小优先级越高,如果是0则永不参与选举
如果slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高
(3)最后是判断slave节点的运行id大小,越小优先级越高。
Redis 有个叫 down-after-milliseconds * 10 配置项,其down-after-milliseconds 是主从节点断连的最大连接超时时间。如果在 down-after-milliseconds 毫秒内,主从节点都没有通过网络联系上,我们就可以认为主从节点断连了。如果发生断连的次数超过了 10 次,就说明这个从节点的网络状况不好,不适合作为新主节点。

至此,我们就把网络状态不好的从节点过滤掉了,接下来要对所有从节点进行三轮考察:优先级、复制进度、ID 号。在进行每一轮考察的时候,哪个从节点优先胜出,就选择其作为新主节点。

第一轮考察:哨兵首先会根据从节点的优先级来进行排序,优先级越小排名越靠前,
第二轮考察:如果优先级相同,则查看复制的下标,哪个从「主节点」接收的复制数据多,哪个就靠前。
第三轮考察:如果优先级和下标都相同,就选择从节点 ID 较小的那个。

第一轮考察:优先级最高的从节点胜出

Redis 有个叫 slave-priority 配置项,可以给从节点设置优先级。

每一台从节点的服务器配置不一定是相同的,我们可以根据服务器性能配置来设置从节点的优先级。

比如,如果 「 A 从节点」的物理内存是所有从节点中最大的, 那么我们可以把「 A 从节点」的优先级设置成最高。这样当哨兵进行第一轮考虑的时候,优先级最高的 A 从节点就会优先胜出,于是就会成为新主节点。

第二轮考察:复制进度最靠前的从节点胜出

如果在第一轮考察中,发现优先级最高的从节点有两个,那么就会进行第二轮考察,比较两个从节点哪个复制进度。

什么是复制进度?主从架构中,主节点会将写操作同步给从节点,在这个过程中,主节点会用 master_repl_offset 记录当前的最新写操作在 repl_backlog_buffer 中的位置(如下图中的「主服务器已经写入的数据」的位置),而从节点会用 slave_repl_offset 这个值记录当前的复制进度(如下图中的「从服务器要读的位置」的位置)。

513哨兵模式

如果某个从节点的 slave_repl_offset 最接近 master_repl_offset,说明它的复制进度是最靠前的,于是就可以将它选为新主节点。

第三轮考察:ID 号小的从节点胜出

如果在第二轮考察中,发现有两个从节点优先级和复制进度都是一样的,那么就会进行第三轮考察,比较两个从节点的 ID 号,ID 号小的从节点胜出。

什么是 ID 号?每个从节点都有一个编号,这个编号就是 ID 号,是用来唯一标识从节点的。

当选中了其中一个slave为新的master后 (例如slave1),故障的转移的步骤如下

(1)sentinel给备选的slave1节点发送slaveof no one命令,让该节点成为master
(2)sentinel给所有其它slave发送slaveof192.168.150.101 7002命令,让这些slave成为新master的从节点,开始从新的master上同步数据。
(3)最后,sentinel将故障节点标记为slave,当故障节点恢复后会自动成为新的master的slave节点

513redis最后

所以切换主节点的全过程是:

1、第一轮投票:判断主节点下线

当哨兵集群中的某个哨兵判定主节点下线(主观下线)后,就会向其他哨兵发起命令,其他哨兵收到这个命令后,就会根据自身和主节点的网络状况,做出赞成投票或者拒绝投票的响应。

当这个哨兵的赞同票数达到哨兵配置文件中的 quorum 配置项设定的值后,这时主节点就会被该哨兵标记为「客观下线」。

2、第二轮投票:选出哨兵leader

某个哨兵判定主节点客观下线后,该哨兵就会发起投票,告诉其他哨兵,它想成为 leader,想成为 leader 的哨兵节点,要满足两个条件:

第一,拿到半数以上的赞成票;
第二,拿到的票数同时还需要大于等于哨兵配置文件中的 quorum 值。
3、由哨兵 leader 进行主从故障转移

选举出了哨兵 leader 后,就可以进行主从故障转移的过程了。该操作包含以下四个步骤:

第一步:在已下线主节点(旧主节点)属下的所有「从节点」里面,挑选出一个从节点,并将其转换为主节点,选择的规则:
过滤掉已经离线的从节点;
过滤掉历史网络连接状态不好的从节点;
将剩下的从节点,进行三轮考察:优先级、复制进度、ID 号。在每一轮考察过程中,如果找到了一个胜出的从节点,就将其作为新主节点。
第二步:让已下线主节点属下的所有「从节点」修改复制目标,修改为复制「新主节点」;
第三步:将新主节点的 IP 地址和信息,通过「发布者/订阅者机制」通知给客户端;
第四步:继续监视旧主节点,当这个旧主节点重新上线时,将它设置为新主节点的从节点;

标签:String,redis,id,学习,线程,服务器,节点
From: https://www.cnblogs.com/liu-jin/p/17398103.html

相关文章

  • 三菱PLC分拣程序基于三菱FX系列的分拣程序,可用于学习
    三菱PLC分拣程序基于三菱FX系列的分拣程序,可用于学习ID:7620637759674572......
  • MySQL学习日志五,外键与DML语言
    外键准备一个表,gradeid作为外键CREATETABLE`grade`( `gradeid`INT(10)NOTNULLAUTO_INCREMENTCOMMENT'年级id', `gradename`VARCHAR(50)NOTNULLCOMMENT'年级名称', PRIMARYKEY(`gradeid`))ENGINE=INNODBDEFAULTCHARSET=utf8创建生成的表方法一:在......
  • 恒压,PID控制,模拟量转换学习成品恒压程序案例 1.pid控制; 2.模拟量
    恒压,PID控制,模拟量转换学习成品恒压程序案例1.pid控制;2.模拟量采集,转换;3.昆仑通态触摸屏;4.西门子S7-200PLC,自带模拟量输入,输出模块;5.采用PID闭环控制,模拟量转换已经建立专门的库,采集数据转换一步到位,方便,简洁,自动闭环控制等;6.无论是学习,还是实际应用,都是不错的程序。;(每......
  • gstearmer学习
    极限性能echoperformance|tee$(find/sys|grepgovernor$)#打开性能模式export KMSSINK_DISABLE_VSYNC=1#关闭vsyncecho400000000>sys/kernel/debug/clk/aclk_rkvdec/clk_rate#提高解码频率echo0x100>/sys/module/rk_vcodec/parameters/mpp_dev_debug#打......
  • PMP 学习笔记(一)
       最近在准备PMP考试,正在看《PMBOK第六版》,虽然八百多页的书看着都慌,但其实先掌握大纲,然后粗读,做一波题,再细读有针对性的读,结合平时项目经验,目前来说,还不算特别难理解。  (当然,现在才刚刚开始,不知道后面会不会被打脸......
  • 学习日记——CSS高级选择器
    1.层次选择器①后代选择器EF:选择匹配的F元素,且匹配的F元素被包含在匹配的E元素内②子选择器E>F:选择匹配的F元素,且匹配的F元素是匹配的E元素的子元素③相邻兄弟选择器E+F:选择匹配的F元素,且匹配的F元素紧位于匹配的E元素后面④通用兄弟选择器E~F:选择匹配的F元素,且位于匹配......
  • [学习笔记+做题记录] 扫描线
    一、扫描线扫描线一般用于图形类的计算,用数据结构辅助在图形上扫来扫去,比如计算矩形面积并,周长并,二位数点等问题。二、Atlantis问题/矩形面积并https://www.luogu.com.cn/problem/P5490先挂张图(明显是OI-wiki的):算法原理很简单,就是扫描一下每一个纵坐标\(y\)(矩阵的边界......
  • 野火Linux uboot编译/烧录/移植学习
    首先,要说野火的linux驱动的pdf做得不是很好,代码内容匆匆略过。后来才发现野火有专门的网页,这是驱动部分的章节:https://doc.embedfire.com/lubancat/build_and_deploy/zh/latest/index.html代码都可以下载!!!预备:添加编译器相关①学习:立即生效添加交叉工具链,需要修改/etc/profi......
  • Python学习之五_字符串处理生成查询SQL
    Python学习之五_字符串处理生成查询SQL前言昨天想给同事讲解一下获取查询部分表核心列信息的SQL方法也写好了一个简单文档.但是感觉不是很优雅.最近两三天晚上一直在学习Python.想将昨天的文档处理成一个工具的方式.将查询SQL展示出来.然后再由同事手工检查确认.增加时......
  • 近期学习总结
    近期学习总结学习中遇到的问题1写代码的速度太慢2写的代码不够简洁3写的代码太少,缺乏经验未来的学习计划1学好C语言,练好代码能力2能够学好嵌入式,大三能完成一个嵌入式项目3打好高数基础个人未来规划在校期间,程序设计,电路设计要有一定......