首页 > 数据库 >分布式系统——基于Redis的分布式锁的实现

分布式系统——基于Redis的分布式锁的实现

时间:2023-04-03 20:56:16浏览次数:40  
标签:String int lock Redis private DEFAULT 分布式系统 public 分布式

分布式锁的介绍

分布式锁是分布式系统中用于协调多个进程或线程之间并发访问共享资源的一种机制。在分布式系统中,由于各个节点之间的通信存在延迟、故障等问题,可能会导致数据的不一致性。为了保证数据的一致性,需要使用分布式锁来协调各个节点的并发访问。

在分布式系统中,多个节点同时访问共享资源可能会导致以下问题:

  • 竞态条件:当多个节点同时读写共享资源时,由于操作执行的顺序不确定,可能会导致数据的不一致性。
  • 脏数据:当多个节点同时修改共享资源时,由于操作的执行顺序不确定,可能会导致某些操作被覆盖或丢失,导致数据的不一致性。
  • 死锁:由于各个节点之间的通信存在延迟,可能会出现不同节点都持有某个资源的锁,从而导致死锁的发生。

为了解决这些问题,可以使用分布式锁来保证共享资源的互斥访问。分布式锁的实现方式可以有多种,常见的实现方式包括:

  • 基于数据库的实现方式:通过数据库中的行级锁或者悲观锁来实现分布式锁。
  • 基于Redis的实现方式:通过Redis的原子性操作和过期时间特性,将锁作为一个键值对存储在Redis中。
  • 基于ZooKeeper的实现方式:通过ZooKeeper中的临时顺序节点来实现分布式锁。

基于Redis的分布式锁实现

整体思路:

pom准备

新建一个SpringBoot项目并且引入相关依赖

   <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.1.7.RELEASE</spring-boot.version>
        <redission.version>3.10.0</redission.version>
        <jedis.version>2.9.0</jedis.version>
        <spring.data.redis.version>2.1.3.RELEASE</spring.data.redis.version>
        <springdata.keyvalue>2.1.3.RELEASE</springdata.keyvalue>
        <aspect.version>1.8.5</aspect.version>
    </properties>
<dependencys>

<dependencies>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>${redission.version}</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${jedis.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
            <version>${spring.data.redis.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.data</groupId>
                    <artifactId>spring-data-keyvalue</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-keyvalue</artifactId>
            <version>${spring.data.redis.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.data</groupId>
                    <artifactId>spring-data-commons</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
            <version>${spring.data.redis.version}</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjtools</artifactId>
            <version>${aspect.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
  </dependencies>

我们这里通过Redisson来操作redis

枚举类

先定义一个枚举类来表示Redis类型

public enum RedisTypeEnum {
    /**
     * 单机
     */
    REDIS,
    /**
     * sentinel集群
     */
    SENTINEL,
    /**
     * cluster集群
     */
    CLUSTER;

    private RedisTypeEnum(){
    }

}

通过配置类来封装redis的连接信息

会将配置文件中以spring.redis开头的配置设置到这个类中

@Data
@ConfigurationProperties(prefix = "spring.redis")
public class JedisProperties {
    /**
     *  默认连接超时时间
     */
    public static final int DEFAULT_CONNECT_TIME_OUT = 10000;

    /**
     * 默认超时时间
     */
    public static final int DEFAULT_TIME_OUT = 10000;

    /**
     * 最小连接超时时间
     */
    public static final int MIN_CONNECT_TIME_OUT = 100;

    /**
     * 默认最大空闲连接数
     */
    public static final int DEFAULT_MAX_IDLE = 1000;

    /**
     * 默认最小空闲连接数
     */
    public static final int DEFAULT_MIN_IDLE = 0;

    /**
     * 默认最大活跃连接数
     */
    public static final int DEFAULT_MAX_ACTIVE = 1400;

    /**
     * 默认最大等待时间(毫秒)
     */
    public static final int DEFAULT_MAX_WAIT = 1500;

    /**
     * 当空闲连接数大于最小空闲连接,且该连接大于时间未适用则关闭,默认未10000毫秒
     */
    public static final int DEFAULT_IDLE_CONN_TIME_OUT = 10000;

    /**
     * 默认重连次数
     */
    public static final int RETRY_ATTEMPTS = 3;

    /**
     * 默认ping时间间隔
     */
    public static final int PING_INTERVAL = 1000;

    /**
     * 多台用 ip用`,`隔开
     */
    private String severs;
    private String type = RedisTypeEnum.REDIS.name();
    private String password;
    private int connectionTimeOut = DEFAULT_CONNECT_TIME_OUT;
    private int timeOut = DEFAULT_TIME_OUT;
    private int maxIdle = DEFAULT_MAX_IDLE;
    private int minIdle = DEFAULT_MIN_IDLE;
    private int maxActive = DEFAULT_MAX_ACTIVE;
    private int maxWait = DEFAULT_MAX_WAIT;
    private int retryAttempts = 3;
    private int idleConnTimeOut = DEFAULT_IDLE_CONN_TIME_OUT;
    private int pingInterval = PING_INTERVAL;

    private String readMode = ReadMode.SLAVE.name();
    private Master master = new Master();
    private Slave slave = new Slave();

    public static class Master{
        private int connectionTimeOut = DEFAULT_CONNECT_TIME_OUT;
        private int timeOut = DEFAULT_TIME_OUT;
        private int maxIdle = DEFAULT_MAX_IDLE;
        private int minIdle = DEFAULT_MIN_IDLE;
        private int maxActive = DEFAULT_MAX_ACTIVE;
        private int maxWait = DEFAULT_MAX_WAIT;
    }

    public static class Slave{
        private int connectionTimeOut = DEFAULT_CONNECT_TIME_OUT;
        private int timeOut = DEFAULT_TIME_OUT;
        private int maxIdle = DEFAULT_MAX_IDLE;
        private int minIdle = DEFAULT_MIN_IDLE;
        private int maxActive = DEFAULT_MAX_ACTIVE;
        private int maxWait = DEFAULT_MAX_WAIT;
    }

    public int getConnectionTimeOut(){
        if(connectionTimeOut < MIN_CONNECT_TIME_OUT){
            return DEFAULT_CONNECT_TIME_OUT;
        }
        return connectionTimeOut;
    }
}

redisson初始化相关配置

@AutoConfigureBefore(JedisAutoConfiguration.class)表示在JedisAutoConfiguration之前配置这里面的Bean对象,因为在JedisAutoConfiguration中需要用到RedissonClient,主要作用就是往容器中注册一个Redisson的客户端。

@Configuration
@AutoConfigureBefore(JedisAutoConfiguration.class)
public class RedissonAutoConfiguration {
    @Resource
    private JedisProperties jedisProperties;

    @Bean
    public RedissonClient createRedissonClient() {
        Config config = new Config();
        String[] ipPortPair = jedisProperties.getSevers().split(":");
        if(ipPortPair.length == 0){
            return null;
        }
        String address = "redis://" + ipPortPair[0] + ":" +
                ipPortPair[1];
        config.useSingleServer().setAddress(address);
        String password = jedisProperties.getPassword();
        if (null != password && !"".equals(password.trim())) {
            config.useSingleServer().setPassword(password);
        }
        return Redisson.create(config);
    }
}

jedis初始化相关配置

通过@EnableConfigurationProperties({JedisProperties.class})将上一步配置的ConfigurationProperties生效,并且将JedisProperties注入容器中。

  • @Primary来覆盖掉spirng-data-redis中的RedisProperties
@EnableConfigurationProperties({JedisProperties.class})
public class JedisAutoConfiguration{
    @Resource
    private JedisProperties jedisProperties;

  @Primary
    @Bean
    public RedisProperties redisProperties(){
        RedisProperties redisProperties = new RedisProperties();
        redisProperties.setPassword(jedisProperties.getPassword());
        redisProperties.setTimeout(Duration.ofMillis(jedisProperties.getConnectionTimeOut()));
        if(RedisTypeEnum.CLUSTER.name().equalsIgnoreCase(jedisProperties.getType())){
            RedisProperties.Cluster cluster = new RedisProperties.Cluster();
            cluster.setMaxRedirects(5);
            String[] redisNodeString = jedisProperties.getSevers().split(",");
            List<String> redisNodes = Lists.newArrayList();
            redisNodes.addAll(Arrays.asList(redisNodeString));
            cluster.setNodes(redisNodes);
            redisProperties.setCluster(cluster);
        }else if(RedisTypeEnum.SENTINEL.name().equalsIgnoreCase(jedisProperties.getType())){

        }else{
            String[] ipPortPair = jedisProperties.getSevers().split(":");
            if(ipPortPair.length > 1){
                redisProperties.setHost(ipPortPair[0]);
                redisProperties.setPort(Integer.parseInt(ipPortPair[1]));
            } else{
                redisProperties.setHost(jedisProperties.getSevers());
                redisProperties.setPort(80);
            }
        }
        redisProperties.getJedis().setPool(initRedisPool());
        return redisProperties;
    }

    private RedisProperties.Pool initRedisPool(){
        RedisProperties.Pool pool = new RedisProperties.Pool();
        pool.setMaxIdle(jedisProperties.getMaxIdle());
        pool.setMinIdle(jedisProperties.getMinIdle());
        pool.setMaxActive(jedisProperties.getMaxActive());
        pool.setMaxWait(Duration.ofMillis(jedisProperties.getMaxWait()));
        return pool;
    }
}

分布式锁注解

我们定义一个Lock注解来使用

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface Lock {

    /**
     * 生成锁的key,支持spring EL表达式
     */
    @AliasFor("value")
    String value() default "default";

    /**
     * 生成锁key的前缀
     */
    @AliasFor("keyPrefix")
    String keyPrefix() default "prefix";

    /**
     * 没获取到锁是否直接返回,默认直接返回
     */
    boolean isBlock() default false;

    /**
     * 等待加锁的时间,单位毫秒
     */
    long waitTime() default 3000;

    /**
     * 锁过期时间,不指定,默认等待方法执行结束自动释放,单位毫秒
     */
    long expireTime() default -1L;

    /**
     * 获取锁失败,抛出此定义的异常
     */
    String lockFailMsg() default "系统正在处理,请稍后";
}

分布式锁接口

我们需要定义一个分布式锁的接口,在接口中定义锁的操作

  • 获取锁对象

  • 加锁

  • 加锁+过期时间

  • 尝试加锁,最多等待waitTime毫秒,上锁以后leaseTime自动解锁

  • 尝试加锁,最多等待waitTime, 毫秒

  • 解锁

  • 是否锁定状态

用法如下:

  1. 基本用法:lock.lock();需要手动解锁lock.unlock();
  2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁:lock.lock(10, TimeUnit.SECONDS);
  3. 尝试加锁,最多等待3秒,上锁以后10秒自动解锁 boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS);
public interface IDistributedLock {

    /**
     * 获取锁对象
     */
    RLock getLock(String lockKey);

    /**
     * 加锁
     */
    void lock(String lockKey);

    /**
     * 加锁+过期时间
     */
    void lock(String lockKey, long timeOut);

    /**
     * 加锁+过期时间
     */
    void lock(String lockKey, TimeUnit timeUnit, long timeOut);

    /**
     * 尝试加锁,最多等待waitTime毫秒,上锁以后leaseTime自动解锁
     */
    boolean tryLock(String lockKey, TimeUnit timeUnit, long waitTime, long leaseTime);

    /**
     * 尝试加锁,最多等待waitTime, 毫秒
     */
    boolean tryLock(String lockKey, TimeUnit timeUnit, long waitTime);

    /**
     * 解锁
     */
    void unlock(String lockKey);

    /**
     * 是否锁定状态
     */
    boolean isLocked(String lockKey);
}

实现Lock接口

@Component
@Slf4j
public class DistributeLockHelper implements IDistributedLock{

    @Resource
    private RedissonClient redissonClient;

    @Override
    public RLock getLock(String lockKey) {
        return redissonClient.getLock(lockKey);
    }

    @Override
    public void lock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock();
    }

    @Override
    public void lock(String lockKey, long timeOut) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeOut, TimeUnit.SECONDS);
    }

    @Override
    public void lock(String lockKey, TimeUnit timeUnit, long timeOut) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeOut,timeUnit);
    }

    @Override
    public boolean tryLock(String lockKey, TimeUnit timeUnit, long waitTime, long leaseTime){
        RLock lock = redissonClient.getLock(lockKey);
        try{
            return lock.tryLock(waitTime, leaseTime, timeUnit);
        }catch (Exception e){
            log.error("tryLock 系统异常!lockKey:{}, waitTime:{}, leaseTime:{}", lockKey, waitTime ,leaseTime);
            return false;
        }
    }

    @Override
    public boolean tryLock(String lockKey, TimeUnit timeUnit, long waitTime) {
        RLock lock = redissonClient.getLock(lockKey);
        try{
            return lock.tryLock(waitTime, timeUnit);
        }catch (Exception e){
            log.error("tryLock 系统异常!lockKey:{}, waitTime:{}", lockKey, waitTime);
            return false;
        }
    }

    @Override
    public void unlock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        // 只释放当前线程自己持有的锁
        if(lock.isHeldByCurrentThread()){
            lock.forceUnlock();
        }
    }

    @Override
    public boolean isLocked(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        return lock.isLocked();
    }
}

实现切面逻辑

定义切点:对于所有加了@Pointcut注解的方法生效

@Around环绕增强,在方法执行前加锁,在方法执行完成后尝试解锁。

在尝试加锁的时候存在三种情况:

  1. 非阻塞式,判断对当前key加锁是否成功,不成功直接抛出异常,比较适合管理后台保存数据重复提交的场景

  2. 阻塞式,没设置过期时间,判断在waitTime时间内是否加锁成功,加锁成功后,会根据业务执行的时间锁自动续期

  3. 阻塞式,设置了过期时间,判断在waitTime时间内是否加锁成功,加锁成功后,锁在expireTime时间自动过期,不在自动续期

@Slf4j
@Aspect
@Component
@Order(1)
public class DistributeLockAspect {
    @Resource
    private IDistributedLock distributedLock;

    @Pointcut("@annotation(com.example.distributelock.annotation.Lock)")
    private void pointcut(){
    }

    @Around("pointcut() && @annotation(lock)")
    public Object around(ProceedingJoinPoint point, Lock lock) throws Throwable{
        String key = DistributeLockAspectUtil.createLockKey(lock.value(), lock.keyPrefix(), point);
        // 非阻塞式,判断对当前key加锁是否成功,不成功直接抛出异常,比较适合管理后台保存数据重复提交的场景
        if(!lock.isBlock() && !distributedLock.tryLock(key, TimeUnit.MILLISECONDS, 0)){
            throw new DistributeLockException(StrUtil.isNotEmpty(lock.lockFailMsg())
                    ? lock.lockFailMsg() : LockConstant.OPERATE_QUICK);
        }
        // 阻塞式,没设置过期时间,判断在waitTime时间内是否加锁成功,加锁成功后,会根据业务执行的时间锁自动续期
        if (ObjectUtil.equal(lock.expireTime(), -1L)){
            if(!distributedLock.tryLock(key, TimeUnit.MILLISECONDS, lock.waitTime())){
                throw new DistributeLockException(StrUtil.isNotEmpty(lock.lockFailMsg())
                        ? lock.lockFailMsg() : LockConstant.OPERATE_QUICK);
            }
        } else{
            // 阻塞式,设置了过期时间,判断在waitTime时间内是否加锁成功,加锁成功后,锁在expireTime时间自动过期,不在自动续期
            if(!distributedLock.tryLock(key, TimeUnit.MILLISECONDS, lock.waitTime(),lock.expireTime())){
                throw new DistributeLockException(StrUtil.isNotEmpty(lock.lockFailMsg())
                        ? lock.lockFailMsg() : LockConstant.OPERATE_QUICK);
            }
        }
        log.info("加锁成功!key:{}", key);
        try{
            return point.proceed();
        }finally {
            distributedLock.unlock(key);
        }
    }


}

分布式锁切面工具类

createLockKey 方法用来计算key的值,会通过value值,锁前缀,方法名来计算出一个redis的key值。

@Slf4j
public class DistributeLockAspectUtil {
    private static  final String DEFAULT_PREFIX_KEY = "lock:";
    private static final String DEFAULT_STRING = "default";

    private static ExpressionParser expressionParser = new SpelExpressionParser();
    private static LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();

    public static String createLockKey(String value, String keyPrefix, ProceedingJoinPoint proceedingJoinPoint) throws
            NoSuchAlgorithmException {
        Method method = getMethod(proceedingJoinPoint);
        Object[] args = proceedingJoinPoint.getArgs();
        value = parse(value, method, args);
        if(DEFAULT_STRING.equalsIgnoreCase(keyPrefix)){
            keyPrefix = DEFAULT_PREFIX_KEY;
            keyPrefix = keyPrefix + method.getName()+":"+ md5(method.toString())+ ":";
        }
        return keyPrefix + value;
    }

    private static String md5(String data) throws NoSuchAlgorithmException{
        MessageDigest md = MessageDigest.getInstance("MD5");
        md.update(data.getBytes());
        return bytesToHex(md.digest());
    }

    private static String bytesToHex(byte[] ch){
        StringBuilder builder = new StringBuilder("");
        for(byte ach : ch){
            builder.append(bytesToHex(ch));
        }
        return builder.toString();
    }

    public static String parse(String key ,Method method, Object[] args){
        String[] params = discoverer.getParameterNames(method);
        if(params == null || Objects.equals("default", key)){
            return key;
        }
        EvaluationContext context = new StandardEvaluationContext();
        for(int i=0; i < params.length;i++){
            context.setVariable(params[i], args[i]);
        }
        String[] keys = key.split(",");
        StringBuilder result = new StringBuilder();
        for(String k : keys){
            result.append(expressionParser.parseExpression(k).getValue(context ,String.class));
            result.append(":");
        }
        return result.deleteCharAt(result.length() -1).toString();
    }

    private static String byteToHex(byte ch){
        String[] str = {"0","1","2","3","4","5","6","7","8","9", "A", "B", "C", "D", "E", "F"};
        return str[ch >> 4 & 0xF] + str[ch & 0xF];
    }

    private static Method getMethod(ProceedingJoinPoint point){
        Object target = point.getTarget();
        String methodName = point.getSignature().getName();
        Object[] args= point.getArgs();
        Class[] parameterTypes = ((MethodSignature)point.getSignature()).getMethod().getParameterTypes();
        Method m = null;
        try{
          m = target.getClass().getMethod(methodName, parameterTypes);
          if(m.isBridge()){
              for (int i=0; i<args.length;i++){
                  Class genClazz = GenericsUtil.getSuperClassGenricType(target.getClass());
                  if(args[i].getClass().isAssignableFrom(genClazz)){
                      parameterTypes[i] = genClazz;
                  }
              }
              m = target.getClass().getMethod(methodName , parameterTypes);
          }
        } catch (Exception e){
            log.error("参数类型反射异常!errMsg:{}", e.getMessage() ,e);
        }
        return m;
    }
}

分布式锁常量类

public class LockConstant {

    public static final String OPERATE_QUICK = "操作太频繁,请稍后重试";
    public static final String REPEATED_SUBMIT = "操作太频繁,请勿重复提交";

}

用到的一些工具类

GenericsUtil

这个类的作用是获取指定类的父类(包括泛型参数)中的指定位置的泛型参数的类型。其中 getSuperClassGenricType(Class clazz) 方法返回父类中第一个泛型参数的类型,而 getSuperClassGenricType(Class clazz, int index) 方法返回父类中指定位置的泛型参数的类型。


public class GenericsUtil {

    public static Class getSuperClassGenricType(Class clazz, int index){
        Type genType = clazz.getGenericSuperclass();
        if(!(genType instanceof ParameterizedType)){
            return Object.class;
        }
        Type[] params = ((ParameterizedType)genType).getActualTypeArguments();
        if(index >= params.length || index < 0){
            throw new RuntimeException("索引不对");
        }
        if(!((params[index]) instanceof Class)){
            return Object.class;
        }
        return (Class)params[index];
    }

    public static Class getSuperClassGenricType(Class clazz){
        return getSuperClassGenricType(clazz, 0);
    }
}

具体使用

我拿我们项目中一个方法举例,我们可以在方法上添加@Lock注解,设置过期时间为3秒,value值通过SpringEL表达式来获取请求参数里面的actNo,来实现接口的幂等性。

最后我们还需要将我们的写的这个工具类封装成一个springboot的starter,所以还需要在resources目录下新建一个META-INF/spring.factories文件,以便容器能够能够自动配置,如果不需要打成jar包被其他项目依赖可以省略这一步。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.exmaple.config.JedisAutoConfiguration,\
  com.example.config.RedissonAutoConfiguration

标签:String,int,lock,Redis,private,DEFAULT,分布式系统,public,分布式
From: https://www.cnblogs.com/loveletters/p/distributelock.html

相关文章

  • Redis常见问题答疑
    数据类型一个数据类型都对应了很多种底层数据结构。以List为例,什么情况下是双向链表,反之又在什么情况下是压缩列表呢?还是说是并存状态?1、Hash和ZSet是数据量少采用压缩列表存储,数据量变大转为哈希表或跳表存储2、但List不是这样,是并存的状态,List是双向链表+压缩列表key过期......
  • 大数据经典论文解读 - 分布式锁 Chubby
    Chubby在谷歌“三驾马车”中3个系统都是单Master系统,这个Master是系统的单点,一旦Master故障集群就无法提供服务。使用BackupMaster,通过监控机制进行切换。但是:如何实现BackupMaster和Master完全同步?监控程序也是单点,如何确定是Master宕机还是监控程序到Master的网络断了?后者......
  • Redis 持久化之RDB 和 AOF
     Redis有两种持久化方案,RDB(RedisDataBase)和AOF(AppendOnlyFile)。如果你想快速了解和使用RDB和AOF,可以直接跳到文章底部看总结。本章节通过配置文件,触发快照的方式,恢复数据的操作,命令操作演示,优缺点来学习Redis的重点知识持久化。RDB详解RDB是Redis默认的持久化方......
  • 安装redis
     1、下载安装reids-4.0.111、推荐进入到linux路径/usr/local/src2、$wgethttp://download.redis.io/releases/redis-4.0.10.tar.gz3、$tarxzfredis-4.0.10.tar.gz4、$cdredis-4.0.10/5、$make&&makeinstall2、启动客户端    完成了。  安装......
  • php redis 悲观锁
     悲观锁(PessimisticLock),顾名思义,就是每次处理redis数据都以最悲观的场景展开,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是......
  • redis 高级数据 hyperLogLog
     1、统计独立的UV2、用做    基数  统计的{1,3,5,7,8,7,5} 基数集:{1,3,5,7,8}     基数:5{1,1,1,1,1,7,1}基数集:{1,7}基数:2基数是数据集去重后元素个数  用于进行基数统计,不是集合,不保存数据,只记录数量而不是具体数据 核心是技术估算......
  • redis集群简介
     1.1       集群的概念所谓的集群,就是通过添加服务器的数量,提供相同的服务,从而让服务器达到一个稳定、高效的状态。1.1.1      使用redis集群的必要性问题:我们已经部署好了redis,并且能启动一个redis,实现数据的读写,为什么还要学习redis集群?答:(1)单个redis存在......
  • Redis常用命令
    命令参考:https://www.redis.net.cn字符串String操作命令Redis中字符串类型常用命令:*SETkeyvalue 设置指定key的值*GETkey  获取指定key的值*SETEXkeysecondsvalue设置指定key的值,并将key的过期时间设为secon......
  • 利用redis完成自动补全搜索功能(一)
     最近要做一个搜索自动补全的功能(目前只要求做最前匹配),自动补全就是自动提示,类似于搜索引擎,再上面输入一个字符,下面会提示多个关键词供参考,比如你输入nb2字符,会自动提示nba,nba录像,nba直播。能想到的一般有3种解决方案1.利用mysql来做,只能使用like'nb%'......
  • 使用Spring-data进行Redis操作
     Redis相信大家都听说过,它是一个开源的key-value缓存数据库,有很多Java的客户端支持,比较有名的有Jedis,JRedis等(见这里)。当然我们可以使用客户端的原生代码实现redis的操作,但实际上在spring中就已经集成了这些客户端的使用,下面我们就以Jedis为例来介绍一下Spring中关于Redis的配置。......