首页 > 数据库 >Redis核心设计原理(深入底层C源码)

Redis核心设计原理(深入底层C源码)

时间:2022-10-04 04:22:06浏览次数:58  
标签:return SDS int ziplist Redis unsigned 源码 节点 底层

Redis 基本特性

  1. 非关系型的键值对数据库,可以根据键以O(1) 的时间复杂度取出或插入关联值
  2. Redis 的数据是存在内存中的
  3. 键值对中键的类型可以是字符串,整型,浮点型等,且键是唯一的
  4. 键值对中的值类型可以是string,hash,list,set,sorted set 等
  5. Redis 内置了复制,磁盘持久化,LUA脚本,事务,SSL, ACLs,客户端缓存,客户端代理等功能
  6. 通过Redis哨兵和Redis Cluster 模式提供高可用性

 

Redis高性能的原因

  1.图示(换算时间:1s =1000 ms ,1ms=1000 us ,1us =1000 ns):

         

  2.对于内存数据库来说,本身数据就存在于内存里,避免了磁盘 I/O 的限制,无疑访问速度会远大于磁盘数据库。

  3.其次Redis,默认是采用一个线程执行指令任务的,既减少了线程上下文切换带来的开销,也避免并发问题。

  4.而且Redis中有多种数据类型,每种数据类型的底层都由一种或多种数据结构来支持。正是因为有了这些数据结构,Redis 在存储与读取上的速度才不受阻碍。

 

深入底层C源码分析Redis

  1.Redis是基于键值对存储数据的,像我们平时会使用的时候很容易觉得Redis的键值是多种数据类型的,其实不然,Redis的键值是String类型的,数据变成字节流(byte)基于网络传输的过程,传到Redis服务转成SDS(Simple Dynamic String【简单动态字符串】) String(Redis自定义的数据类型)。既然Redis是基于C语言写的,那么为什么不用原生的?

//如果我们想存储字符串:myname
C: char data[]="myname\0"; //而C语言中对于字符串是默认采用\0作为结尾的

而对于Redis,它是面向多种语言的,对于传过来的数据是不可控的:
    如果传输的视频流或者音频的流文件,大概率会出现"name\0orxxx"这种
    那么C语言只能读到“name”这部分遇到“\0”,则会视为结束了。(这明显是不合适,容易导致数据丢失)
    故,Redis采用sds结构:
    struct sdshdr {
        int len;    //存储的长度
        int free;  //剩余的空闲空间
        char buf[]; //数据存储的地方
    };

    这种数据结构的好处是:
    1.对于存储数据的准确性更高了,依靠len字段来标明准确数据的位置。【二进制安全的数据结构】
    2.采用以空间换时间的方式,每次扩容的时候可以适当分配大一点的空间,记录剩余时间是否够下一次的修改或者追加。(减少对象的销毁与创建的步骤)【提供了内存预分配机制,避免了频繁的内存分配】
    3.会在数据末尾依旧采用\0作为结尾【兼容C语言的函数库】

    说明:

      Redis自定义sdshdr数据结构具备三大特性:

        【1】二进制安全的数据结构

        【2】提供了内存预分配机制,避免了频繁的内存分配

        【3】兼容C语言的函数库

 

  2.String类型的数据结构

    1)代码展示

//redis 3.2 以前
struct sdshdr {
    int len;
    int free;
    char buf[];
};
//redis 3.2 后
//redis\deps\hiredis\sds.h文件
typedef char *sds;

//存在注释:sdshdr5 is never used, we just access the flags byte directly.However is here to document the layout of type 5 SDS strings. 
//意思大概是:sdshdr5从未使用过,我们只是直接访问标志字节。然而,这里是为了记录类型5 SDS字符串的布局
struct __attribute__ ((__packed__)) sdshdr5 {  // 对应的字符串长度小于 1<<5
    unsigned char flags; 
    char buf[];
};

//__attribute__ ((packed)) 的作用就是告诉编译器取消结构体在编译过程的优化对齐,按照实际占用字节数进行对齐

struct __attribute__ ((__packed__)) sdshdr8 { // 对应的字符串长度小于 1<<8
    uint8_t len;                              //目前字符串的长度
    uint8_t alloc;                            //分配的内存总长度
    unsigned char flags;                      //flag用3bit来标明类型,类型后续解释,其余5bit目前没有使用
    char buf[];                               //柔性数组,以'\0'结尾
};
struct __attribute__ ((__packed__)) sdshdr16 { // 对应的字符串长度小于 1<<16
    uint16_t len; 
    uint16_t alloc; 
    unsigned char flags; 
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 { // 对应的字符串长度小于 1<<32
    uint32_t len; 
    uint32_t alloc; 
    unsigned char flags; 
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 { // 对应的字符串长度小于 1<<64
    uint64_t len; 
    uint64_t alloc; 
    unsigned char flags; 
    char buf[];
};

#define SDS_TYPE_5  0
#define SDS_TYPE_8  1
#define SDS_TYPE_16 2
#define SDS_TYPE_32 3
#define SDS_TYPE_64 4

static inline char sdsReqType(size_t string_size) {
    if (string_size < 1<<5)
        return SDS_TYPE_5;
    if (string_size < 1<<8)
        return SDS_TYPE_8;
    if (string_size < 1<<16)
        return SDS_TYPE_16;
#if (LONG_MAX == LLONG_MAX)  
    if (string_size < 1ll<<32)
        return SDS_TYPE_32;
    return SDS_TYPE_64;
#else
    return SDS_TYPE_32;
#endif
}

 

    2)发现说明

      【1】为什么要对原本的数据结构进行修改?(改版后的优化在哪里)

        因为int占据4个字节(8bit),也就是能存42亿左右的,但是在我们实际上,存储的数据大概率都是小数据,所以它存在浪费资源的嫌疑。

        所以进行优化的思维就是根据不同的数据范围,设置不同容量,如,uint8_t 表示占据1字节(8bit,在二进制中最大可以表示255),uint16_t 表示占据2字节(16bit,在二进制中最大可以表示65535)

      【2】官网上说String类型限制大小512M,是怎么限制的?

//位于t_string.c文件中
//为什么要限制,要知道512M已经是一个很大的值了(已经是一个bigkey了),在redis单线程操作中已经很容易阻塞线程
//故在追加命令appendCommand和设置命令setrangeCommand中都会进行校验
static int checkStringLength(client *c, long long size) {
    if (size > 512*1024*1024) {
        addReplyError(c,"string exceeds maximum allowed size (512MB)");
        return C_ERR;
    }
    return C_OK;
}

 

    3)分析是怎么创建的

//在sds.c文件内
//sds在创建的时候,buf数组初始大小为:struct结构体大小 + 字符串的长度+1, +1是为了在字符串末尾添加一个\0。
//在完成字符串到字符数组的拷贝之后,会在字符串末尾加一个\0,这样可以复用C语言的一些函数。
sds sdsnewlen(const void *init, size_t initlen) {
    void *sh;
    sds s;
    // 根据长度计算sds类型
    char type = sdsReqType(initlen);
    if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8;  //为空时强制用sdshdr8
    // 获取结构体大小
    int hdrlen = sdsHdrSize(type);
    unsigned char *fp; /* flags pointer. */

    // 分配内存空间,初始大小为:struct结构体大小+字符串的长度+1,+1是为了在字符串末尾添加一个\0,兼容传统C语言
    sh = s_malloc(hdrlen+initlen+1);
    // sh在这里指向了这个刚刚分配的内存地址
    if (sh == NULL) return NULL;
    // 判断是否是init阶段
    if (!init)
        //init 不为空的话,将sh这块内存全部设置为0
        memset(sh, 0, hdrlen+initlen+1);
    // 指向buf数组的指针
    s = (char*)sh+hdrlen;
    //因为可以看到地址的顺序是 len,alloc,flag,buf,目前s是指向buf,那么后退1位,fp 正好指向了flag对应的地址
    fp = ((unsigned char*)s)-1;
    // 类型选择
    switch(type) {
        case SDS_TYPE_5: {
            *fp = type | (initlen << SDS_TYPE_BITS);
            break;
        }
        case SDS_TYPE_8: {
            SDS_HDR_VAR(8,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
        case SDS_TYPE_16: {
            SDS_HDR_VAR(16,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
        case SDS_TYPE_32: {
            SDS_HDR_VAR(32,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
        case SDS_TYPE_64: {
            SDS_HDR_VAR(64,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
    }
    //如果两者都不为空,则init 这个对应的字符串,赋值给s
    if (initlen && init)
        memcpy(s, init, initlen); // 将字符串拷贝到buf数组
    s[initlen] = '\0';  // 字符串末尾添加一个\0
    return s;
}

// 获取结构体大小
static inline int sdsHdrSize(char type) {
    switch(type&SDS_TYPE_MASK) {
        case SDS_TYPE_5:
            return sizeof(struct sdshdr5);
        case SDS_TYPE_8:
            return sizeof(struct sdshdr8);
        case SDS_TYPE_16:
            return sizeof(struct sdshdr16);
        case SDS_TYPE_32:
            return sizeof(struct sdshdr32);
        case SDS_TYPE_64:
            return sizeof(struct sdshdr64);
    }
    return 0;
}

 

    4)怎么防止操作时缓冲区溢出

//先检查 SDS 的空间是否满足修改所需的要求
//如果不满足要求的话,API 会自动将 SDS 的空间扩展到执行修改所需的大小
//最后才是返回,去执行实际的修改操作
sds sdscatlen(sds s, const void *t, size_t len) {
    size_t curlen = sdslen(s);  //获取s已经使用过的空间字符数

    s = sdsMakeRoomFor(s,len);  //扩大s的空闲空间
    if (s == NULL) return NULL;  
    memcpy(s+curlen, t, len);  //拷贝数据
    sdssetlen(s, curlen+len);  //设置s的len
    s[curlen+len] = '\0'; //最后加上空字符串
    return s;
}

 

 

    5)分析是怎么扩容的

      代码展示

// 扩容sds
sds sdsMakeRoomFor(sds s, size_t addlen) {
    void *sh, *newsh;
    //获取剩余可用的空间
    size_t avail = sdsavail(s);
    size_t len, newlen;
    char type, oldtype = s[-1] & SDS_TYPE_MASK;
    int hdrlen;

    //如果可用空间大于需要增加的长度,那么直接返回
    if (avail >= addlen) return s;

    //len 已使用长度
    len = sdslen(s);
    //sh 回到指向了这个sds的起始位置。
    sh = (char*)s-sdsHdrSize(oldtype);
    // newlen 代表最小需要的长度
    newlen = (len+addlen);
    //Redis认为一旦被扩容了,那这个字符串被再次扩容的几率就很大,所以会在此基础上多加一些空间,防止频繁扩容
    if (newlen < SDS_MAX_PREALLOC)
        newlen *= 2;
    else
        newlen += SDS_MAX_PREALLOC;

    //获取新长度的类型
    type = sdsReqType(newlen);

    //如果是SDS_TYPE_5会被强行转为SDS_TYPE_8
    if (type == SDS_TYPE_5) type = SDS_TYPE_8;

    hdrlen = sdsHdrSize(type);
    if (oldtype==type) {
        //sh是开始地址,在开始地址的基础上,分配更多的空间,逻辑如同初始化部分,hdrlen 是head的长度,即struct本身大小。后面newlen 是buf 大小, +1 是为了结束符号,sds 通常情况下是可以直接打印的
        newsh = s_realloc(sh, hdrlen+newlen+1);
        if (newsh == NULL) {
            s_free(sh);
            return NULL;
        }
        s = (char*)newsh+hdrlen;
    } else {
        //如果类型发生变化,地址内容不可复用,所以找新的空间。
        newsh = s_malloc(hdrlen+newlen+1);
        if (newsh == NULL) return NULL;
        //复制原来的str到新的sds 上面,newsh+hdrlen 等于sds buf 地址开始的位置,s 原buf的位置,len+1 把结束符号也复制进来
        memcpy((char*)newsh+hdrlen, s, len+1);
        //释放前面的内存空间
        s_free(sh);
        //调整s开始的位置,即地址空间指向新的buf开始的位置
        s = (char*)newsh+hdrlen;
        //-1 正好到了flag的位置
        s[-1] = type;
        //分配len的值
        sdssetlen(s, len);
    }
    sdssetalloc(s, newlen);
    //返回新的sds
    return s;
}


// 给len 设值
static inline size_t sdsavail(const sds s) {
    unsigned char flags = s[-1];
    switch(flags&SDS_TYPE_MASK) {
        case SDS_TYPE_5: {
            return 0;
        }
        case SDS_TYPE_8: {
            SDS_HDR_VAR(8,s);
            return sh->alloc - sh->len;
        }
        case SDS_TYPE_16: {
            SDS_HDR_VAR(16,s);
            return sh->alloc - sh->len;
        }
        case SDS_TYPE_32: {
            SDS_HDR_VAR(32,s);
            return sh->alloc - sh->len;
        }
        case SDS_TYPE_64: {
            SDS_HDR_VAR(64,s);
            return sh->alloc - sh->len;
        }
    }
    return 0;
}

// 获取当前sds,可用的长度。
static inline void sdssetlen(sds s, size_t newlen) {
    unsigned char flags = s[-1];
    switch(flags&SDS_TYPE_MASK) {
        case SDS_TYPE_5:
            {
                unsigned char *fp = ((unsigned char*)s)-1;
                *fp = (unsigned char)(SDS_TYPE_5 | (newlen << SDS_TYPE_BITS));
            }
            break;
        case SDS_TYPE_8:
            SDS_HDR(8,s)->len = (uint8_t)newlen;
            break;
        case SDS_TYPE_16:
            SDS_HDR(16,s)->len = (uint16_t)newlen;
            break;
        case SDS_TYPE_32:
            SDS_HDR(32,s)->len = (uint32_t)newlen;
            break;
        case SDS_TYPE_64:
            SDS_HDR(64,s)->len = (uint64_t)newlen;
            break;
    }
}

// 获取alloc的长度
/* sdsalloc() = sdsavail() + sdslen() */
static inline size_t sdsalloc(const sds s) {
    unsigned char flags = s[-1];
    switch(flags&SDS_TYPE_MASK) {
        case SDS_TYPE_5:
            return SDS_TYPE_5_LEN(flags);
        case SDS_TYPE_8:
            return SDS_HDR(8,s)->alloc;
        case SDS_TYPE_16:
            return SDS_HDR(16,s)->alloc;
        case SDS_TYPE_32:
            return SDS_HDR(32,s)->alloc;
        case SDS_TYPE_64:
            return SDS_HDR(64,s)->alloc;
    }
    return 0;
}

// 给 alloc 设值
static inline void sdssetalloc(sds s, size_t newlen) {
    unsigned char flags = s[-1];
    switch(flags&SDS_TYPE_MASK) {
        case SDS_TYPE_5:
            /* Nothing to do, this type has no total allocation info. */
            break;
        case SDS_TYPE_8:
            SDS_HDR(8,s)->alloc = (uint8_t)newlen;
            break;
        case SDS_TYPE_16:
            SDS_HDR(16,s)->alloc = (uint16_t)newlen;
            break;
        case SDS_TYPE_32:
            SDS_HDR(32,s)->alloc = (uint32_t)newlen;
            break;
        case SDS_TYPE_64:
            SDS_HDR(64,s)->alloc = (uint64_t)newlen;
            break;
    }
}

 

      代码说明

        【1】sds内部buf的扩容机制:新buf长度 = (原buf长度 + 添加buf长度)*2,如果buf长度大于1M后,每次扩容也只会增大1M

        【2】对于类型改变的需要变换存储空间。

 

  3.RedisDb 数据结构

    1)代码展示

//位于server.h文件中
typedef struct redisDb {
    dict *dict;                 // 保存了当前数据库的键空间
    dict *expires;              //键空间中所有键的过期时间
    dict *blocking_keys;        //客户端等待数据的键(BLPOP)
    dict *ready_keys;           //保存着处于阻塞状态的键,value为NULL
    dict *watched_keys;         //监视键的MULTI/EXEC CAS
    int id;                     //数据库ID
    long long avg_ttl;          //键的平均过期时间
    unsigned long expires_cursor; //周期性删除过期键的游标
    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

//位于dict.h文件中
typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2]; // ht[0] , ht[1] =null //方便渐进的rehash扩容,dict的hashtable ,其中​一个哈希表正常存储数据​,​另一个哈希表为空,空哈希表在 rehash 时使用
    long rehashidx; /* rehash 索引,当不在进行 rehash 时,值为 -1 */
    unsigned long iterators; //当前正在运行的迭代器的数量
} dict;

//位于dict.h文件中
/*这是我们的哈希表结构。每本字典都有两个这样的词,实现增量重哈希,从旧表到新表。* / typedef struct dictht { dictEntry **table; unsigned long size; // hashtable 容量 unsigned long sizemask; // size -1 unsigned long used; // hashtable 元素个数 used / size =1 } dictht; //位于dict.h文件中 typedef struct dictEntry { void *key; union { void *val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next; } dictEntry; //位于server.h文件中 //redisObject对象 : string , list ,set ,hash ,zset ... typedef struct redisObject { unsigned type:4; // 4 bit, sting , hash unsigned encoding:4; // 4 bit unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or LFU data (least significant 8 bits frequency * and most significant 16 bits access time). * 24 bit * */ int refcount; // 4 byte void *ptr; // 8 byte 总空间: 4 bit + 4 bit + 24 bit + 4 byte + 8 byte = 16 byte } robj;

 

    2)视图展示

            

 

 

    3)代码说明

      【1】由上可知redisDb,主要都是将数据存储在字典(dict)中,而且还是多个,固定存储,过期维护等多个字典。

      【2】dict字典结构,每个字典有两个哈希表结构的原因是为了用于渐进式扩容,当某个哈希表结构过于庞大的时候(按照hashMap的思维,必定是需要对数组进行扩容,增大数组长度,将链表长度缩小,加快遍历),其实它也需要进行扩容,但是再进行扩容操作的同时,容易出现阻塞线程的情况(如果时间太久),为此,dict中采用rehashidx标明是否正在处于扩容状态,且ht[1]会生成一个新的哈希表结构,容量是之前的两倍,然后把ht[0]中的数据按槽位一点一点的搬运过来【断断续续的操作,这样就不会一直阻塞住线程】,新的数据也会落到ht[1]中,直到搬完。然后将ht[1]指针指向ht[0],然后自己再指向null,rehashidx变为0,就完成了扩容操作。

      【3】dictEntry相当于hashMap中的节点(包含了key,value,和指向下个节点的指针),其中val会被进一步封装成redisObject。

      【4】redisObject中的type用于约束客户端命令,如set操作,会判断操作的值与操作的类型匹不匹配。encoding记录了值在redis底层是怎么样的编码形式。ptr指向内存的真实地址。

 

    4)分析String类型的编码

      【1】会存在:int,raw,embstr三种。

      【2】为什么会有int,因为整型值最大固定是64bit,其实与指针*ptr占据的大小一致,其实把数值存于这里可以减少了对空间的开辟。代码展示:

//server.c文件中封装了所有的客户端命令
//发现set命令会执行setCommand方法【该方法位于t_string.c文件中】,直接看核心部分
void setCommand(client *c) {
    ....
    // 完成编码  set:  key  value
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

//该方法位于object.c文件中
robj *tryObjectEncoding(robj *o) {
    long value;
    sds s = o->ptr;
    size_t len;

    /*确保这是一个字符串对象,我们在这个函数中编码的唯一类型。其他类型使用编码的内存高效表示,但由实现该类型的命令处理。* /
    serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);

    //  只有类型为 原生sds类型 或者  embstr类型, 还有机会可以进一步编码,否则直接返回
    if (!sdsEncodedObject(o)) return o;

    // 如果其他地方有应用即当前对象为共享对象, 修改范围将扩大,所以放弃编码为整形操作
     if (o->refcount > 1) return o;

    //判断是否可以把该字符串转化为一个长整型
    len = sdslen(s);

    //   范围是否在 整型值得表示范围 , 0 - 2^64,最多不超过20 位
    if (len <= 20 && string2l(s,len,&value)) {
        /* 
         *   如果Redis的配置不要求运行LRU替换算法,且转成的long型数字的值又比较小
         *  (小于OBJ_SHARED_INTEGERS,在目前的实现中这个值是10000),
         *   那么会使用共享数字对象来表示。之所以这里的判断跟LRU有关,是因为LRU算法要求每个robj有不同的lru字段值,
         *   所以用了LRU就不能共享robj。shared.integers是一个长度为10000的数组,里面预存了10000个小的数字对象。
         *   这些小数字对象都是 encoding = OBJ_ENCODING_INT的string robj对象。
         * 
         * */
        // 没有设置内存淘汰策略,且数字范围在 缓存整型得范围内
        if ((server.maxmemory == 0 ||
            !(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
            value >= 0 &&
            value < OBJ_SHARED_INTEGERS)
        {
            decrRefCount(o); // 不需要用额外得对象来存储
            incrRefCount(shared.integers[value]);
            return shared.integers[value];  // 共享对象
        } else {
            // 如果前一步不能使用共享小对象来表示,那么将原来的robj编码成encoding = OBJ_ENCODING_INT,这时ptr字段直接存成这个long型的值。
            // 注意ptr字段本来是一个void *指针(即存储的是内存地址),
            // 因此在64位机器上有64位宽度,正好能存储一个64位的long型值。这样,除了robj本身之外,它就不再需要额外的内存空间来存储字符串值。
            if (o->encoding == OBJ_ENCODING_RAW) {
                sdsfree(o->ptr); // 释放空间
                o->encoding = OBJ_ENCODING_INT;
                // 用整形编码
                o->ptr = (void*) value;
                return o;
            } else if (o->encoding == OBJ_ENCODING_EMBSTR) {
                decrRefCount(o);
                return createStringObjectFromLongLongForValue(value);
            }
        }
    }

    // 数据长度 小于 OBJ_ENCODING_EMBSTR_SIZE_LIMIT 44  的话, 用 embstr 进行编码
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
        robj *emb;

        if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
        emb = createEmbeddedStringObject(s,sdslen(s));
        decrRefCount(o);
        return emb;
    }

    trimStringObjectIfNeeded(o);

    /* Return the original object. */
    return o;
}

 

      【3】为什么会有embstr,代码展示

//CPU读取数据的时候其实是会有一个缓存行的概念(cache line,通常是64byte的空间),也就是一次性读取的大小

//而redisObject数据大小为16 byte 
typedef struct redisObject {
    unsigned type:4;        //  占4 bit
    unsigned encoding:4;    //  占4 bit 
    unsigned lru:LRU_BITS; // 占24 bit 
    int refcount;           // 4 byte  
    void *ptr;              // 8 byte  
} robj;
//总空间:  4 bit + 4 bit + 24 bit + 4 byte + 8 byte = 16 byte 

所以读取是会读【redisObject 16 byte,及其后面的48byte的数据(但是用不到)】
为了节约CPU成本,可不可以在创建的时候,将数据就存在后面呢?(为什么采用sdshdr8,因为最多存44个字符,sdshdr8可以容纳128个,满足条件,且消耗最小)
struct __attribute__ ((__packed__)) sdshdr8 { // 对应的字符串长度小于 1<<8
    uint8_t len;                              //占据1byte,表示128个
    uint8_t alloc;                            //占据1byte
    unsigned char flags;                      //占据1byte
    char buf[];                               //以'\0'结尾,这个字符也会占据1byte
};
所以如果把他们都存于一个64byte的内存中是不是读取对象的时候顺便可以把值也拿出来了,减少了一次IO。

 

      【4】而raw便是表示:字符串将以简单动态字符串(SDS)的形式存储,需要​两次 malloc 分配内存​,redisObject 对象头和 SDS 对象在内存地址上一般是不连续的。

      

    5)发现说明

      【1】会有人疑问为什么DB默认是16?

        因为Redis的配置文件redis/redis.conf中的databases属性默认是16。所以Redis启动的时候默认会创建16个数据库且拿数据库索引为0的数据库作为默认数据库。这些都是可以通过配置调整的。

 

  4.List 数据结构(Redis采用quicklist(双端链表) 和 ziplist 作为List的底层实现)

    1)介绍

      【1】List是一个有序(按加入的时序排序)的数据结构,Redis采用quicklist(双端链表) 和 ziplist 作为List的底层实现。以通过设置每个ziplist的最大容量,quicklist的数据压缩范围,提升数据存取效率。

//当值为正数时,表示quicklistNode节点上的ziplist的长度。比如当这个值为5时,每个quicklistNode节点的ziplist最多包含5个数据项
//当值为负数时,表示按照字节数来限制quicklistNode节点上的ziplist的的长度,可选值为-1到-5,每个值的含义如下
//-1  ziplist节点最大为4kb
//-2  ziplist节点最大为8kb
//-3  ziplist节点最大为16kb
//-4  ziplist节点最大为32kb
//-5  ziplist节点最大为64kb
list-max-ziplist-size  -2        //  单个ziplist节点最大能存储  8kb  ,超过则进行分裂,将数据存储在新的ziplist节点中


//对节点中间的数据进行压缩,进一步节省内存
//0 特殊值,表示都不压缩
//1   quicklist两端各有1个节点不压缩,中间的节点压缩
//2   quicklist两端各有2个节点不压缩,中间的节点压缩
//n   quicklist两端各有n个节点不压缩,中间的节点压缩
list-compress-depth  1        //  0 代表所有节点,都不进行压缩,1, 代表从头节点往后走一个,尾节点往前走一个不用压缩,其他的全部压缩,以此类推

 

 

 

    2)ziplist 分析详解

      【1】介绍

        1.ziplist是一个经过特殊编码的双向链表,它的设计目标就是为了提高存储效率;

        2.ziplist可以用于存储字符串或整数,其中整数是按真正的二进制表示进行编码的,而不是编码成字符串序列。它能以O(1)的时间复杂度在表的两端提供push和pop操作;

        3.因为ziplist是一个内存连续的集合,所以ziplist遍历只要通过当前节点的指针 加上 当前节点的长度 或 减去 上一节点的长度 ,即可得到下一个节点的数据或上一个节点的数据,这样就省去的指针从而节省了存储空间,又因为内存连续所以在数据读取上的效率也远高于普通的链表。

      【2】代码展示

robj *createZiplistObject(void) {
    unsigned char *zl = ziplistNew();
    robj *o = createObject(OBJ_LIST,zl);
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

robj *createObject(int type, void *ptr) {
    robj *o = zmalloc(sizeof(*o));
    o->type = type;
    o->encoding = OBJ_ENCODING_RAW;
    o->ptr = ptr;
    o->refcount = 1;
    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
        o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;  
    } else {
        o->lru = LRU_CLOCK();   // 获取 24bit 当前时间秒数
    }
    return o;
}

//以下为ziplist.c文件中
#define ZIPLIST_BYTES(zl)       (*((uint32_t*)(zl)))  //获取ziplist的zlbytes的指针(ziplist 所占空间字节数)
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t)))) //获取ziplist的zltail的指针
#define ZIPLIST_LENGTH(zl)      (*((uint16_t*)((zl)+sizeof(uint32_t)*2))) //获取ziplist的zllen的指针
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))  //ziplist头大小
#define ZIPLIST_END_SIZE        (sizeof(uint8_t))  // ziplist结束标志位大小
#define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE)  // 获取第一个元素的指针
#define ZIPLIST_ENTRY_TAIL(zl)  ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) // 获取最后一个元素的指针
#define ZIPLIST_ENTRY_END(zl)   ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)  // 获取结束标志位指针

unsigned char *ziplistNew(void) { // 创建一个压缩表
    unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;  // zip头加结束标识位数
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);  // 大小端转换
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;  // len赋值为0
    zl[bytes-1] = ZIP_END;  // 结束标志位赋值
    return zl;
}

/*
 * 压缩列表节点 对应 上文中 Ziplist 中的 entry
 * zlentry每个节点由三部分组成:Previous entry len、encoding、data
 *      prevlengh: 记录上一个节点的长度,为了方便反向遍历ziplist
 *      encoding: 编码,由于 ziplist 就是用来节省空间的,所以 ziplist 有多种编码,用来表示不同长度的字符串或整数。
 *      data: 用于存储 entry 真实的数据
 * 结构体定义了7个字段,主要还是为了满足各种可变因素
 */
typedef struct zlentry {
    unsigned int prevrawlensize;  //prevrawlensize是描述prevrawlen的大小,有1字节和5字节两种
    unsigned int prevrawlen;     //prevrawlen是前一个节点的长度,
    unsigned int lensize;       //lensize为编码len所需的字节大小
    unsigned int len;         //len为当前节点长度
    unsigned int headersize;   //当前节点的header大小
    unsigned char encoding;    //节点的编码方式
    unsigned char *p;          //指向节点的指针  
} zlentry;

 

 

 

      【3】图示:

        

 

 

         【4】图示参数说明

zlbytes:32bit,表示ziplist占用的字节总数。
zltail:    32bit,表示ziplist表中最后一项(entry)在ziplist中的偏移字节数。通过zltail我们可以很方便地找到最后一项,从而可以在ziplist尾端快速地执行push或pop操作
zlen:     16bit, 表示ziplist中数据项(entry)的个数。
entry:表示真正存放数据的数据项,长度不定
zlend: ziplist最后1个字节,是一个结束标记,值固定等于255。
prerawlen: 前一个entry的数据长度。
len: entry中数据的长度
data: 真实数据存储

         【5】说明

          1.Ziplist的设计结构,保障了空间的节省与查询的高效,但是当出现zlentry增加或删除时,Ziplist是不能直接在原有空间上进行修改,每一次变动都需要重新开辟空间去拷贝、修改。这样的场景下Ziplist一旦内部元素过多,将会导致性能的急剧下滑。因此Redis 在实现上做了一层优化,当Ziplist过大时,会将其分割成多个Ziplist,然后再通过一个双向链表将其串联起来。

 

 

    3)quicklist 分析详解

      【1】介绍

        1.Redis quicklist是Redis 3.2版本以后针对链表和压缩列表进行改造的一种数据结构,是 zipList 和 linkedList 的混合体,相对于链表它压缩了内存,进一步的提高了效率。

 

      【2】代码展示

robj *createQuicklistObject(void) {
    quicklist *l = quicklistCreate();
    robj *o = createObject(OBJ_LIST,l);
    o->encoding = OBJ_ENCODING_QUICKLIST;
    return o;
}

//处于quicklist.c文件中
quicklist *quicklistCreate(void) {
    struct quicklist *quicklist;

    quicklist = zmalloc(sizeof(*quicklist));
    quicklist->head = quicklist->tail = NULL;
    quicklist->len = 0;
    quicklist->count = 0;
    quicklist->compress = 0;
    quicklist->fill = -2;
    quicklist->bookmark_count = 0;
    return quicklist;
}


//处于quicklist.h文件中
//quicklist 是一个 40 字节的结构(在 64 位系统上),描述了一个快速列表。
typedef struct quicklist {
    quicklistNode *head;  //指向头节点(左侧第一个节点)的指针。 
    quicklistNode *tail;  //指向尾节点(右侧第一个节点)的指针。
    unsigned long count;  // 所有 quicklistNode 节点中所有的 entry 个数     
    unsigned long len;     // quickListNode 节点个数,也就是 quickList 的长度
    int fill : QL_FILL_BITS;          //单个节点的填充因子,也就是 ziplist 的大小       
    unsigned int compress : QL_COMP_BITS;  // 保存压缩成都只,配置文件设置,64位操作系统占 16bit , 6 表示压缩
    unsigned int bookmark_count: QL_BM_BITS;
    quicklistBookmark bookmarks[];
} quicklist;

//quicklistNode 是一个 32 字节的结构,描述了一个快速列表的 ziplist。
typedef struct quicklistNode {
    struct quicklistNode *prev;  // 双向链表前驱节点
    struct quicklistNode *next;  // 双向链表的后节点
    unsigned char *zl;       //数据指针。如果当前节点的数据没有压缩,那么它指向一个ziplist结构;否则,它指向一个quicklistLZF结构。
    unsigned int sz;         // 压缩列表 ziplist 的总长度   
    unsigned int count : 16;     // 每个 ziplist 中 entry 的个数
    unsigned int encoding : 2;    // 表示是否采用了 LZF 压缩 quickList 节点 1 表示压缩过,2 表示没有压缩站 2bit 长度
    unsigned int container : 2;  // 表示是否开启 ziplist 进行压缩
    unsigned int recompress : 1;   // 表示该节点是否被压缩过
    unsigned int attempted_compress : 1;  // 测试使用
    unsigned int extra : 10;    // 额外拓展位,占 10bit 长度
} quicklistNode;

//当指定使用 lzf 压缩算法压缩 ziplist entry 节点时,quicklistNode 结构的 zl 成员执行 quicklistLZF 结构
typedef struct quicklistLZF {
    unsigned int sz;  //表示被LZF 压缩后的 ziplist 的大小
    char compressed[];  // 压缩有的数据,柔性数组
} quicklistLZF;

 

 

      【3】图示:

              

 

 

 

      【4】说明

        1.通过控制ziplist 的大小,则很好的解决了超大ziplist 的拷贝情况下对性能的影响。每次改动只需要针对具体的小段ziplist 进行操作。

 

 

    4)发现说明

      【1】为什么不采用两个指针指向前后数据的方式,而是要采用复合的数据结构完成?

        1.采用双指针的方式,那就必须赋予两个指针pre和next,一个指针占据了8byte,故两个指针就需要消耗16byte。如果list存在大量数据,所以就需要消耗相当多的内存在指针方面(胖指针问题)。

        2.采用双链表的话数据可能会分的很散,因为指针就是采用不连续的存储空间来存储数据,容易造成大量的内存碎片。

        3.采用quicklist 和 ziplist 混合,达到减少指针消耗的空间,其次连续的存储空间读取起来效率高于不连续的存储空间,节省IO。

        4.通过控制ziplist 的大小,则很好的解决了超大ziplist 的拷贝情况下对性能的影响。每次改动只需要针对具体的小段ziplist 进行操作。

        

  5.Hash 数据结构

    1)介绍

      【1】Hash 数据结构底层实现为一个字典( dict ),也是RedisBb用来存储K-V的数据结构,当数据量比较小,或者单个元素比较小时,底层用ziplist存储,数据大小和元素数量阈值可以通过如下参数设置。

hash-max-ziplist-entries  512    //  ziplist 元素个数超过 512 ,将改为hashtable编码 
hash-max-ziplist-value    64      //  单个元素大小超过 64 byte时,将改为hashtable编码

 

 

 

    2)发现说明

      【1】为什么数据量小的时候采用ziplist存储?

        1.ziplist使用紧凑的连续内存块顺序存储数据,在list或者hash结构中,未使用listNode(24字节)和dictEntry(24字节)结构体来存储元素项,因此会节省内存。

        2.ziplist结构元素访问采用的是后向遍历(从后往前),因此在hash中可将热点的key或者在list中将热点的元素项放在最后,可以提升性能。

        3.因为ziplist的内存结构中,仅仅只使用了额外的11个字节来存储ziplist的属性,另外很重要的是ziplist使用后向遍历,当list或者hash中的元素较多时,可以根据元素的冷热性调整元素存储顺序。

        4.而在dictht结构体中,存储属性需要32个字节,其中元素dictEntry也是每个占用24个字节。

 

  6.Set 数据结构

    1)介绍

      【1】Set 为无序的,自动去重的集合数据类型,Set 数据结构底层实现为一个value 为 null 的 字典( dict ),当数据可以用整形表示时,Set集合将被编码为intset数据结构。

//在配置文件中设置
set-max-intset-entries 512       // intset 能存储的最大元素个数,超过则用hashtable编码

      【2】两个条件任意满足时Set将用hashtable存储数据。1, 元素个数大于 set-max-intset-entries , 2 , 元素无法用整形表示。

 

    2)intset数据结构

//intset内部其实是一个数组(int8_t coentents[]数组),而且存储数据的时候是有序的,因为在查找数据的时候是通过二分查找来实现的。
typedef struct intset {
    uint32_t encoding;  // 编码方式
    uint32_t length;   // 集合包含的元素数量
    int8_t contents[];  // 保存元素的数组
} intset;

 

    3)set存储过程

// set添加元素的处理函数,在文件t_set.c中
//过程汇总
//检查set是否存在不存在则创建一个set结合。
//根据传入的set集合一个个进行添加,添加的时候需要进行内存压缩。
//setTypeAdd执行set添加过程中会判断是否进行编码转换。
void saddCommand(client *c) {
    robj *set;
    int j, added = 0;

    // 取出集合对象
    set = lookupKeyWrite(c->db,c->argv[1]);
    // 对象不存在,创建一个新的,并将它关联到数据库
    if (set == NULL) {
        set = setTypeCreate(c->argv[2]->ptr);
        dbAdd(c->db,c->argv[1],set);
    } 
    // 对象存在,检查类型
    else {
        if (set->type != OBJ_SET) {
            addReply(c,shared.wrongtypeerr);
            return;
        }
    }
    // 将所有输入元素添加到集合中
    for (j = 2; j < c->argc; j++) {
        // set 类型 添加元素
        if (setTypeAdd(set,c->argv[j]->ptr)) added++;
    }
    // 如果有至少一个元素被成功添加,那么执行以下程序
    if (added) {
        // 发送键修改信号
        signalModifiedKey(c,c->db,c->argv[1]);
        // 发送事件通知
        notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
    }
    // 将数据库设为脏
    server.dirty += added;
    // 返回添加元素的数量
    addReplyLongLong(c,added);
}

//元素已经存在 直接返回 0 , 否则添加元素 返回 1 
//过程汇总
//如果能够转成int的对象(isObjectRepresentableAsLongLong),那么就用intset保存。
//如果用intset保存的时候,如果长度超过512(REDIS_SET_MAX_INTSET_ENTRIES)就转为hashtable编码。
//其他情况统一用hashtable进行存储。
int setTypeAdd(robj *subject, sds value) {
    long long llval;
    // 字典
    if (subject->encoding == OBJ_ENCODING_HT) {
        // 将 value 作为键, NULL 作为值,将元素添加到字典中
        dict *ht = subject->ptr;
        dictEntry *de = dictAddRaw(ht,value,NULL);
        if (de) {
            dictSetKey(ht,de,sdsdup(value));
            dictSetVal(ht,de,NULL);
            return 1;
        }
    } 
    // intset
    else if (subject->encoding == OBJ_ENCODING_INTSET) {
         // 判断是否可以用整形编码,可以的话用intset 编码 
        if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
            uint8_t success = 0;
            subject->ptr = intsetAdd(subject->ptr,llval,&success);
            if (success) {
                //如果元素个数超过  set-max-intset-entries[ 默认 512 ]   时,将转化为 hashtable 数据结构
                if (intsetLen(subject->ptr) > server.set_max_intset_entries)
                    setTypeConvert(subject,OBJ_ENCODING_HT);
                return 1;
            }
        } else {
            //转整形失败,直接用hashtable存储
            setTypeConvert(subject,OBJ_ENCODING_HT);

            // 执行添加操作
            serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
            return 1;
        }
    } else {
        // 未知编码
        serverPanic("Unknown set encoding");
    }
    return 0;
}

 

  7.ZSet 数据结构

     1)介绍

      【1】ZSet  为有序的,自动去重的集合数据类型,ZSet 数据结构底层实现为 字典(dict) + 跳表(skiplist) ,当数据比较少时,用ziplist编码结构存储。 

zset-max-ziplist-entries  128    // 元素个数超过128 ,将用skiplist编码
zset-max-ziplist-value     64     //  单个元素大小超过 64 byte, 将用 skiplist编码

      【2】数据比较少时,用ziplist编码结构存储的图示:

        

 

 

 

    2)skiplist 分析解析

      【1】数据结构代码

// 创建zset 数据结构: 字典 + 跳表
robj *createZsetObject(void) {
    zset *zs = zmalloc(sizeof(*zs));
    robj *o;
    // dict用来查询数据到分数的对应关系, 如 zscore 就可以直接根据 元素拿到分值 
    zs->dict = dictCreate(&zsetDictType,NULL);
    
    // skiplist用来根据分数查询数据(可能是范围查找)
    zs->zsl = zslCreate();  
    // 设置对象类型 
    o = createObject(OBJ_ZSET,zs);  
    // 设置编码类型 
    o->encoding = OBJ_ENCODING_SKIPLIST;
    return o;
}

//位于edis/src/server.h 中
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */

typedef struct zskiplistNode {
    sds ele;   //存储字符串类型数据 redis3.0版本中使用robj类型表示,但是在redis4.0.1中直接使用sds类型表示
    double score;   //存储排序的分值
    struct zskiplistNode *backward;  //指向上一个节点,用于zrevrange命令
    struct zskiplistLevel {
        struct zskiplistNode *forward;  //指向下一个节点
        unsigned long span;  //到达后一个节点的跨度(两个相邻节点span为1)
    } level[];  //该节点在各层的信息,柔性数组成员
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;  // 跳跃表头尾节点
    unsigned long length;  //节点个数
    int level;  //除头结点外最大的层数
} zskiplist;

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

 

      【2】追踪添加函数

//在server.c发现跳表的添加函数为zaddCommand
//去t_zset.c文件查看流程
void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_NONE);
}

void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements;
    int scoreidx = 0;
    /* The following vars are used in order to track what the command actually
     * did during the execution, to reply to the client and to trigger the
     * notification of keyspace change. */
    int added = 0;      //新添加元素的数量
    int updated = 0;    //更新分数的元素数量
    int processed = 0;  //被处理的元素数量

    /* Parse options. At the end 'scoreidx' is set to the argument position
     * of the score of the first score-element pair. */
    scoreidx = 2;
    
    // 输入参数解析 
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
        else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
        else break;
        scoreidx++;
    }

    /* Turn options into simple to check vars. */
    int incr = (flags & ZADD_INCR) != 0;
    int nx = (flags & ZADD_NX) != 0;
    int xx = (flags & ZADD_XX) != 0;
    int ch = (flags & ZADD_CH) != 0;

    /* After the options, we expect to have an even number of args, since
     * we expect any number of score-element pairs. */
    elements = c->argc-scoreidx;
    if (elements % 2 || !elements) {
        addReply(c,shared.syntaxerr);
        return;
    }
    elements /= 2; /* Now this holds the number of score-element pairs. */

    /* Check for incompatible options. */
    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }

    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    /* Start parsing all the scores, we need to emit any syntax error
     * before executing additions to the sorted set, as the command should
     * either execute fully or nothing at all. */
    
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist.
    
        查询对应的 key 在对应的 db 即 hash table  中,是否存在 

     */
    zobj = lookupKeyWrite(c->db,key);
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */

        // 如果 zset_max_ziplist_entries ==0
        //        // 或者 zadd 元素的长度 > zset_max_ziplist_value
        //        // 则直接创建 skiplist 数据结构
        //        // 否则创建ziplist 压缩列表数据结构
        
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject();
        } else {
            zobj = createZsetZiplistObject();
        }
        // 关联对象到db 
        dbAdd(c->db,key,zobj);
    } else {
        if (zobj->type != OBJ_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }
    // 处理所有元素 
    for (j = 0; j < elements; j++) {
        double newscore;
        // 分值 
        score = scores[j];

        int retflags = flags;
        // 元素 
        ele = c->argv[scoreidx+1+j*2]->ptr;

        // 往  zobj 添加元素   
        int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        if (retflags & ZADD_ADDED) added++;
        if (retflags & ZADD_UPDATED) updated++;
        if (!(retflags & ZADD_NOP)) processed++;
        score = newscore;
    }
    server.dirty += (added+updated);

reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        if (processed)
            addReplyDouble(c,score);
        else
            addReplyNull(c);
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }

cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

// 创建zset 数据结构: 字典 + 跳表
robj *createZsetObject(void) {
    zset *zs = zmalloc(sizeof(*zs));
    robj *o;
    // dict用来查询数据到分数的对应关系, 如 zscore 就可以直接根据 元素拿到分值 
    zs->dict = dictCreate(&zsetDictType,NULL);
    
    // skiplist用来根据分数查询数据(可能是范围查找)
    zs->zsl = zslCreate();

    // 设置对象类型 
    o = createObject(OBJ_ZSET,zs);

     // 设置编码类型 
    o->encoding = OBJ_ENCODING_SKIPLIST;
    return o;
}

// 创建zset 数据结构: ZipList 
robj *createZsetZiplistObject(void) {
    unsigned char *zl = ziplistNew();
    robj *o = createObject(OBJ_ZSET,zl);
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
    /* Turn options into simple to check vars. 
     可选参数解析 
    */
    int incr = (*flags & ZADD_INCR) != 0;
    int nx = (*flags & ZADD_NX) != 0;
    int xx = (*flags & ZADD_XX) != 0;
    *flags = 0; /* We'll return our response flags. */
    double curscore;

    /* NaN as input is an error regardless of all the other parameters. 
       数值判断 
    */
    if (isnan(score)) {
        *flags = ZADD_NAN;
        return 0;
    }

    /* Update the sorted set according to its encoding. 
     数据类型为ziplist 的情况 
    */
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *eptr;

        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
            /* NX? Return, same element already exists. */
            if (nx) {
                *flags |= ZADD_NOP;
                return 1;
            }

            /* Prepare the score for the increment if needed. */
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *flags |= ZADD_NAN;
                    return 0;
                }
                if (newscore) *newscore = score;
            }

            /* Remove and re-insert when score changed. 
             元素 score 有变化,则删除老节点,重新插入
            */
            if (score != curscore) {
                zobj->ptr = zzlDelete(zobj->ptr,eptr);
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                *flags |= ZADD_UPDATED;
            }
            return 1;
        } else if (!xx) {
            /* Optimize: check if the element is too large or the list
             * becomes too long *before* executing zzlInsert. */
            zobj->ptr = zzlInsert(zobj->ptr,ele,score);
            if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
                sdslen(ele) > server.zset_max_ziplist_value)
               // 元素个数 或者 单个元素大小超过阈值  任意条件满足就转化为skiplist      

                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            if (newscore) *newscore = score;
            *flags |= ZADD_ADDED;
            return 1;
        } else {
            *flags |= ZADD_NOP;
            return 1;
        }
   
   // 数据类型为 跳表的情况       
  } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
       
        // 获取值指针
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;
        
        // O(1) 的时间复杂度,获取到元素   
        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            /* NX? Return, same element already exists. 
            NX 互斥 
            */
            if (nx) {
                *flags |= ZADD_NOP;
                return 1;
            }
            // 当前分值 
            curscore = *(double*)dictGetVal(de);

            /* Prepare the score for the increment if needed. */
            // 递增 
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *flags |= ZADD_NAN;
                    return 0;
                }
                if (newscore) *newscore = score;
            }

            /* Remove and re-insert when score changes. 
                分值不同的场景 
            */
            if (score != curscore) {
                znode = zslUpdateScore(zs->zsl,curscore,ele,score);

                /* Note that we did not removed the original element from
                 * the hash table representing the sorted set, so we just
                 * update the score.
                 *  hash 表中不需要移除元素, 修改分值就可以了 
                 * 
                 *  */
                dictGetVal(de) = &znode->score; /* Update score ptr. */
                *flags |= ZADD_UPDATED;
            }
            return 1;

        // 元素不存在      
        } else if (!xx) {

            ele = sdsdup(ele);

            // 插入新元素  
            znode = zslInsert(zs->zsl,score,ele);
            
            serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
            *flags |= ZADD_ADDED;
            if (newscore) *newscore = score;
            return 1;
        } else {
            *flags |= ZADD_NOP;
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* Never reached. */
}

// 往跳表中 新增元素  
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    // 数值判断 
    serverAssert(!isnan(score));
    
    x = zsl->header;
    
    // 遍历所有层高 ,寻找插入点: 高位 -> 低位  
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position 
         存储排位, 便于更新
        */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||  // 找到第一个比新分值大的节点,前面一个位置即是插入点 
                    (x->level[i].forward->score == score &&    
                    sdscmp(x->level[i].forward->ele,ele) < 0)))  //相同分值则按字典序排序  
        {
            rank[i] += x->level[i].span;  // 累加排位分值 
            x = x->level[i].forward;
        }
        update[i] = x;  // 每一层的拐点  
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. 
     * 
     * */
    level = zslRandomLevel();    // 幂次定律, 随机生成层高 ,越高的层出现概率越低 
    if (level > zsl->level) {   //  随机层高大于当前的最大层高,则初始化新的层高 
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;  //header 最层都是跳表的长度
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);  // 创建新的节点 

    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;  // 插入新节点
        update[i]->level[i].forward = x; 

        /* update span covered by update[i] as x is inserted here 
           更新 span  信息 
        */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels 
       新加入节点, 更新顶层 span 
     */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    // 更新后退指针 和尾指针      
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

//返回一个随机的层数,不是level的索引是层数
int zslRandomLevel(void) { 
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))  //有1/4的概率加入到上一层中
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

 

      【3】示图展示

               

 

 

      【4】示图说明

        1.默认会构造一个不存数据的拥有32层高度的头结点,而每加一个结点,会自身去概率生成层数(概率为1/4),这样就可以通过头结点快速查找数据了。

标签:return,SDS,int,ziplist,Redis,unsigned,源码,节点,底层
From: https://www.cnblogs.com/chafry/p/16747719.html

相关文章

  • 003-Redis 中的 BitMaps
    1.BitMap1.1bitcount1.1.1基本信息BITCOUNTkey[startend]summary:Countsetbitsinastringsince:2.6.0Countthenumberofsetbits(populationcounti......
  • Redis学习目录
     目录持续更新...​​Redis简介​​​​Redis安装及基本配置​​​​Redis持久化​​​​Redis开发及管理实战​​​​Redis高可用及集群​​​​Redis多API开发​​ ......
  • Docker 启动 Redis 就停止解决方案(2022-3)
    启动命令如下:dockerrun-itd\-p6379:6379\--namemyredis\-v/home/redis/redis.conf:/etc/redis/redis.conf\-v/home/redis/data:/data\redis:latest\re......
  • 源码角度了解Skywalking之AbstractClassEnhancePluginDefine插件增强定义
    源码角度了解Skywalking之AbstractClassEnhancePluginDefine插件增强定义AbstractClassEnhancePluginDefine是所有插件的抽象类,我们在分析Skywalking初始化流程的时候见到......
  • 源码角度了解Skywalking之ClassEnhancePluginDefine拦截构造方法和类实例方法
    源码角度了解Skywalking之ClassEnhancePluginDefine拦截构造方法和类实例方法上篇文章我们分析到ClassEnhancePluginDefine的拦截静态方法进行增强,而Skywalking初始化的时......
  • 2022-10-03-SpringMVC执行流程梳理及结合源码断点调试过程源码分析
    SpringMVC执行流程梳理接口方式控制器实现流程分析控制器层代码实现控制器配置SpringMVC.xml配置文件客户端浏览器发起请求,按回车前端控制器拦截所有请求/......
  • thinkphp 知识付费小程序源码 后台免费一键更新教程资源
    知识付费小程序源码thinkphp后台管理数据,带800条真实教程资源数据,后台免费一键更新教程资源  thinkphp3.2后台,小程序前端已修改很多bug,会员收藏,会员下载记录,会员积......
  • 002-Redis的 String 使用
    Redis命令十分丰富,包括的命令组有Cluster、Connection、Geo、Hashes、HyperLogLog、Keys、Lists、Pub/Sub、Scripting、Server、Sets、SortedSets、Strings、Transactions......
  • redis 允许远程访问
    1.取消绑定本地地址找到redis配置文件,redis.conf,注释掉指定的bind,当不指定时表示允许所有访问。   2.关闭保护模式在redis服务器上使用redis-cli,执行命令C......
  • Redis高性能怎么做到的?
    Redis的高性能怎么做到的?Redis这个NOSQL数据库在计算机界可谓是无人不知,无人不晓。只要涉及到数据那么就需要数据库,数据库类型很多,但是NOSQL的kv内存数据库也很多,redis作为......