SDS 动态字符串
Redis 是 c 语言实现的,传统 c 字符串存在不可变导致的频繁内存分配,一些 API 函数可能引起缓冲区溢出等问题。 Redis 在 c 字符串的基础上,封装实现了 SDS动态字符串,能够根据每次存储关键字的大小自动申请额外缓冲区内存,避免频繁申请和缓冲区溢出问题。
SDS 定义
struct sdshdr {
// SDS缓冲区已经使用的字节数
int len;
// SDS缓冲区还未使用的字节数
int free;
// 字节数组
char buf[];
}
下图保存了一个5字节长的字符串,各字段信息如下,注意 SDS 也遵守 c 字符串的格式,末尾以 '\0' 空字符结尾,这样可以复用一些 c 字符串函数。
SDS 与 c 字符串区别
常数复杂度获取字符串长度:
SDS 的 len 属性可以直接查出字符串长度,而 c 字符串需要遍历每一个字符直到遇到空字符,复杂度 O(n);
杜绝缓冲区溢出:
考虑 char *strcat(char *dest, const char *src)
函数,它将 src 字符串中的内容拼接到 dest后边,如果用户在执行这个函数时没有为 dest 分配了足够多的内存,就有可能造成缓冲区溢出。
SDS 的空间分配策略杜绝了缓冲区溢出问题,当SDS的 API函数需要对 SDS 进行修改时,API 会首先检查 SDS 的空间是否满足修改的需求,如果不满足就会先扩展到合适的大小再执行。
减少修改字符带来的内存重分配次数:
C 字符串每次进行增加或缩短总会导致字符数组内存重分配操作,如果是增长字符串操作,程序需要先扩展底层数组大小;如果是截断操作,需要先释放空间。Redis 作为数据库如果经常重分配内存严重影响效率,SDS 采取了空间预分配策略和惰性空间释放策略。
空间预分配:(1)如果对SDS修改后,SDS的长度小于 1MB,那么程序分配和 len属性同样大小的未使用空间,例如调整后字符串 13Bytes,那么新分配 13Bytes + 13Bytes + 1Bytes;
(2)如果修改后大于 1MB,额外分配 1MB未使用空间。
惰性空间释放:例如 sdstrim()
函数接收一个 SDS 和 c字符串作为参数,功能是移除所有在 c字符串中出现的字符。执行后多余的空间不会释放,而是调整 free 属性的值备用。只有调用相关内存回收 API才会回收空间。
API介绍:
链表
定义
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
}
listNode 内嵌在 list 结构中,如下:
typedef struct list {
listNode *head;
listNode *tail;
unsigned long len;
void *(*dup)(void *ptr); //节点值复制
void (*free)(void *ptr); //节点值释放
int (*match)(void *ptr, void *key); //节点值对比
} list;
API
字典
dict 定义
Redis 的字典底层使用哈希表来实现,一个哈希表拥有多个节点,每个节点就是一个键值对,主要介绍哈希表、哈希表节点、字典实现。新版本把哈希表和字典设计到一个结构体中。
结构体定义
哈希表和字典:
//最新源码将哈希表和字典融合在一起
typedef struct dict {
dictEntry **table; // 节点列表,存的是指针数组
dictType *type; // 特定类型函数
unsigned long size; // 哈希表大小
unsigned long sizemask; // 哈希表大小掩码,用于计算索引值
unsigned long used; // 哈希表已经使用的节点数
void *privdata; // 特定类型函数的相关参数
} dict;
哈希表节点:
typedef struct dictEntry {
void *key; // key,void* 任意类型
void *val; // value,void* 任意类型
struct dictEntry *next; // 链表指向下一个节点
} dictEntry;
特定类型函数:
typedef struct dictType {
//哈希值计算
unsigned int (*hashFunction)(const void *key);
//键复制
void *(*keyDup)(void *privdata, const void *key);
//值复制
void *(*valDup)(void *privdata, const void *obj);
//键比较
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
//键销毁
void (*keyDestructor)(void *privdata, void *key);
//值销毁
void (*valDestructor)(void *privdata, void *obj);
} dictType;
上图为普通状态下字典的结构,其中 dictht 在最新版本代码已取消,dictht 数据直接嵌入到 dict 中。
哈希算法
当要添加新值到字典中,先计算对应的索引位置,步骤如下:
hash = dict->type->hashFunction(key);
//根据 key 计算哈希值h = hash & ht->sizemask;
//哈希值与掩码相与
源码中哈希值计算部分如下,Redis 是采用 MurmurHash
算法来计算哈希值的。
(1)先根据 dictType
特定类型函数指针调用 hashFunction(key)
函数来计算特定 Key 的哈希值 hash,然后通过 hash & ht->sizemask
计算该 Key 应该存放的位置;
(2)接着根据 dict
找到特定索引位置的指针,判断指针是否为空,若不为空则判断该位置节点的 Key是否和目标 Key相同,不同就链接在其后边。
rehash
随操作执行哈希表的键值对可能会增多或减少,为了让哈希表的负载因子能够维持在一个合理范围内,需要对哈希表进行扩容或收缩来增加查找效率。
执行 rehash 的步骤如下:
- 创建新的字典对象,字典大小取决于要做的操作和当前包含的键值对数量;(1)如果扩展操作,新空间大小等于
dict_old.used*2
的最小的 2的 n次幂;(2)如果是收缩操作,新空间大小等于dict_old.used
的最小的 2的 n次幂; - 将原字典的所有键值对重新计算索引,存储到新字典的指定位置上;
- 待迁移完所有键值对后,释放旧字典。
- 迁移操作为渐进式 rehash,键值对数量过大不可能暂停用户线程来专门执行迁移,在 rehash 期间每次对字典的添加/删除/查找/更新 操作除了执行指定操作外,还需要把键值对 rehash 到新字典上。
/* Expand or create the hashtable */
static int dictExpand(dict *ht, unsigned long size) {
dict n; /* the new hashtable */
// 获取 rehash 后的表长
unsigned long realsize = _dictNextPower(size), i;
/* the size is invalid if it is smaller than the number of
* elements already inside the hashtable */
if (ht->used > size)
return DICT_ERR;
_dictInit(&n, ht->type, ht->privdata);
n.size = realsize;
n.sizemask = realsize-1;
n.table = hi_calloc(realsize,sizeof(dictEntry*));
if (n.table == NULL)
return DICT_ERR;
// 从旧表拷贝数据到新表,并计算数据在新表中的位置,最后释放旧表空间
n.used = ht->used;
for (i = 0; i < ht->size && ht->used > 0; i++) {
dictEntry *he, *nextHe;
if (ht->table[i] == NULL) continue;
/* For each hash entry on this slot... */
he = ht->table[i];
while(he) {
unsigned int h;
nextHe = he->next;
/* Get the new element index */
h = dictHashKey(ht, he->key) & n.sizemask;
he->next = n.table[h];
n.table[h] = he;
ht->used--;
/* Pass to the next element */
he = nextHe;
}
}
assert(ht->used == 0);
hi_free(ht->table);
/* Remap the new hashtable in the old */
*ht = n;
return DICT_OK;
}
// 获取大于 size 的最小的 2的整次幂的数值
static unsigned long _dictNextPower(unsigned long size) {
unsigned long i = DICT_HT_INITIAL_SIZE;
if (size >= LONG_MAX) return LONG_MAX;
while(1) {
if (i >= size)
return i;
i *= 2;
}
}
图示 rehash
的过程:
- 执行
reahsh
之前的字典:
- 为新哈希表分配空间后:
- 将旧哈希表中的数据迁移到新哈希表:
- 释放旧哈希表空间:
哈希表扩展和收缩的触发场景:
- 当服务器没有在执行
BGSAVE
命令或者BGREWRITEAOF
命令,并且哈希表的负载因子大于等于1; - 服务器目前正在执行
BGSAVE
命令或者BGREWRITEAOF
命令,并且哈希表的负载因子大于等于5;
负载因子的计算公式为: load_factor = dict.used / dict.size
;
根据程序是否在执行 BGSAVE
、BGREWRITEAOF
命令,服务器扩展操作所需的负载因子不同,这是因为执行上述命令时是通过创建子进程来操作的,并且大多数都是写时复制实现的,所以子进程存在时应该提高负载因子来尽量避免进行哈希表扩展操作,从而避免不必要的内存写入操作,最大限度地节约内存。另一方面,当哈希表负载因子小于 0.1时,程序开始自动对哈希表执行收缩操作。
渐进式 rehash
当哈希表数据较多时,rehash
过程不能很快完成,此时如果一次完成 rehash
就可能会导致服务停止。因此为了避免 rehash
过程对服务器性能造成影响,需要分多次渐进式地将旧表中的数据 rehash
到新表中。
渐进式rehash
的步骤如下:
- 分配一块新的内存空间 ht[1],此时字典同时拥有 ht[0]、ht[1];
- 在字典中维护一个
rehashidx
变量,将值设置为0 表示rehash
过程正式开始; - 在
rehash
期间,每次对字典进行添加、删除、查找、更新操作时,程序会检查是否需要进行渐进式rehash
,当rehash
操作完成后将rehashidx
值新增1; - 字典操作的不断进行,最终在某个时间点上旧表的所有键值都会
rehash
到新表上,当所有rehash
操作完成后,rehashidx
的值设置为 -1;
渐进式 rehash
过程中哈希表的操作:
- 渐进式
rehash
过程中会同时操作两个哈希表,此过程中字典的查找、删除、更新等操作都会在两个哈希表上进行。例如查找操作如果在旧表ht[0]
上没有查找到就会切换到ht[1]
上进行查找; - 渐进式
rehash
过程中新添加的字典的键值只会被保存到ht[1]
中,ht[0]
中不会再保存任何值,这一操作保证了ht[0]
包含的键值对数量只减不增。
源代码:
- 获取插入点位置时执行渐进式 rehash:
/* Finds and returns the position within the dict where the provided key should
* be inserted using dictInsertAtPosition if the key does not already exist in
* the dict. If the key exists in the dict, NULL is returned and the optional
* 'existing' entry pointer is populated, if provided. */
void *dictFindPositionForInsert(dict *d, const void *key, dictEntry **existing) {
unsigned long idx, table;
dictEntry *he;
// 计算哈希值
uint64_t hash = dictHashKey(d, key);
if (existing) *existing = NULL;
if (dictIsRehashing(d)) _dictRehashStep(d);
// 检查是否需要扩容哈希表
if (_dictExpandIfNeeded(d) == DICT_ERR)
return NULL;
for (table = 0; table <= 1; table++) {
idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
/* Search if this slot does not already contain the given key */
he = d->ht_table[table][idx];
while(he) {
void *he_key = dictGetKey(he);
if (key == he_key || dictCompareKeys(d, key, he_key)) {
if (existing) *existing = he;
return NULL;
}
he = dictGetNext(he);
}
if (!dictIsRehashing(d)) break;
}
/* If we are in the process of rehashing the hash table, the bucket is
* always returned in the context of the second (new) hash table. */
dictEntry **bucket = &d->ht_table[dictIsRehashing(d) ? 1 : 0][idx];
return bucket;
}
- 检查是否扩容:
static int _dictExpandIfNeeded(dict *d)
{
// 已经在执行渐进式 rehash操作,返回
if (dictIsRehashing(d)) return DICT_OK;
// 如果哈希表为空,先初始化表空间
if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (!dictTypeExpandAllowed(d)) //检查是否能够扩容??
return DICT_OK;
if ((dict_can_resize == DICT_RESIZE_ENABLE &&
d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0])) ||
(dict_can_resize != DICT_RESIZE_FORBID &&
d->ht_used[0] / DICTHT_SIZE(d->ht_size_exp[0]) > dict_force_resize_ratio))
{
// 扩容操作
return dictExpand(d, d->ht_used[0] + 1);
}
return DICT_OK;
}
- 扩容或者创建哈希表
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
if (malloc_failed) *malloc_failed = 0;
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht_used[0] > size)
return DICT_ERR;
/* the new hash table */
dictEntry **new_ht_table;
unsigned long new_ht_used;
signed char new_ht_size_exp = _dictNextExp(size); //计算需要扩容的空间大小的幂值
/* Detect overflows */
size_t newsize = 1ul<<new_ht_size_exp; //检查新扩容大小是否合理
if (newsize < size || newsize * sizeof(dictEntry*) < newsize)
return DICT_ERR;
/* Rehashing to the same table size is not useful. */
if (new_ht_size_exp == d->ht_size_exp[0]) return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */
if (malloc_failed) {
new_ht_table = ztrycalloc(newsize*sizeof(dictEntry*)); //分配新空间
*malloc_failed = new_ht_table == NULL;
if (*malloc_failed)
return DICT_ERR;
} else
new_ht_table = zcalloc(newsize*sizeof(dictEntry*)); //分配新的空间
new_ht_used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht_table[0] == NULL) {
d->ht_size_exp[0] = new_ht_size_exp;
d->ht_used[0] = new_ht_used;
d->ht_table[0] = new_ht_table;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
d->ht_size_exp[1] = new_ht_size_exp;
d->ht_used[1] = new_ht_used;
d->ht_table[1] = new_ht_table;
d->rehashidx = 0;
return DICT_OK;
}
字典API
跳表
skipList 介绍
- 跳跃表是一种有序的数据结构,通过在每个节点中维持多个指向其它节点的指针,实现快速访问节点的目的。
- 跳跃表平均查找时间复杂度 O(logN),最坏情况下为 O(N);
- 跳跃表和平衡树实现更简单,但都有很高的性能;
- Redis 采用跳跃表作为有序集合键的底层实现,如果一个有序集合包含的元素数量比较多或者内部成员是很长的字符串时,就采用跳表来实现。
- Redis 使用跳表只有两个场景,一是实现有序集合的键,另一个是在集群节点中用作内部的数据结构;
// 跳表节点
typedef struct zskiplistNode {
sds ele; //成员对象
double score; //分值
struct zskiplistNode *backward; //后退指针
struct zskiplistLevel { //层级指针数组
struct zskiplistNode *forward; //前进指针
unsigned long span; //跨度
} level[];
} zskiplistNode;
// 跳表列表
typedef struct zskiplist {
struct zskiplistNode *header, *tail; //头、尾指针
unsigned long length; //节点个数
int level; //最大层高
} zskiplist;
如上图以及源代码,跳表有以下重点属性:
zskiplistNode.level
:节点中用L1、L2、L3...
代表层级节点,每个层都有两个属性:前进指针和跨度,前进指针指向下一个随机节点,跨度描述了这两个节点间的距离。注意第一层用level[0]
存储、第二层用level[1]
等等。backward
:后退指针,每个节点除了层级节点拥有前进指针,自身还拥有一个后退指针,通过后退指针可以方便实现对调表的遍历和修改操作;score
:每个节点的分数,跳表中各个节点先按照分值由小到大存储,如果分值相同再按照保存的成员对象从小到大排序;
这里举一个遍历跳表的过程:
- 假设迭代程序首先访问跳表的第一个节点(表头),然后从第四层前进指针移动到表中的第二个节点;
- 从第二个节点,程序沿着第二层前进指针移动到表中的第三个节点;
- 从第三个节点,程序沿着第二层前进指针移动到表中第四个节点;
- 沿着第四个节点再访问一次前进指针,发现为 NULL,代表遍历完毕。
skipList 增删改查分析
skipList API
整数集合
整数集合(intset)是 Redis 用于保存整数值的集合抽象数据结构,它可以用来保存类型为 int16_t
、int32_t
、int64_t
的整数值,并且保存集合中不会出现重复元素。
typedef struct intset {
uint32_t encoding; //指明集合中具体的数据类型 int16_t、int32_t、int64_t
uint32_t length; //集合中元素个数
int8_t contents[]; //数据集合
} intset;
其中 encoding
属性可以取以下值:
INTSET_ENC_INT16
:int16_t 类型整数值(-32768 ~ 32768);INTSET_ENC_INT32
:int32_t 类型整数值(-2147483648 ~ 2147483647)INTSET_ENC_INT64
:int64_t 类型整数值(....);
整数集合可以对数据存储进行调整,如果初始存储的数据值分别是 1、3、5,它会采用 int16_t
类型来保存数据;此时如果插入了一个新的整数值 -123214532541324534,需要使用类型 int64_t
来保存,此时整数集合的所有元素都会转化为 int64_t
类型。
整数集合升级
升级一个整数集合然后添加新元素的操作步骤如下:
- 根据新元素类型,扩展整数集合的底层数组空间大小,分配新空间;
- 将底层数组现在有的元素都转化为与新元素相同的类型,并将类型转换后的元素插入到对应位置上;
- 将新元素添加到底层数组中去。
整数集合的升级操作能够尽可能地节约内存,并且在添加不同类型元素时更加灵活,底层能够自由转换。
整数集合降级
整数集合不支持降级操作,一旦对数组进行了升级就会一直保持升级后的状态。
压缩列表
压缩列表是列表键和哈希键的底层实现之一。当一个列表键只包含少量列表项,并且每个列表项要么就是小整数值,要么就是长度比较短的字符串,那么 Redis 就会使用压缩列表来作为列表键的底层实现。另外当一个哈希键只包含少量键值对、并且每个键值对的键之和是小整数值或者长度较短的字符串,就会使用压缩列表作为哈希键的实现。
例如:
redis > RPUSH lst 1 2 432 121 "aaaa"
(integer) 5
redis > OBJECT ENCODING lst
"ziplist"
redis > HMSET profile "name" "jack" "age" 235 534
OK
redis > OBJECT ENCODING profile
"ziplist"
压缩列表的组成
zlbytes
:整个压缩列表存储的字节数;zltail
:压缩列表的列表尾节点距离起始地址的偏移量,可以计算出列表尾节点的地址;zllen
:压缩列表包含的节点数量;extryX
:压缩列表的各个节点数据;zlend
:标记压缩列表的末端节点地址。
其中压缩列表的各个节点数据结构如下:
/* We use this function to receive information about a ziplist entry.
* Note that this is not how the data is actually encoded, is just what we
* get filled by a function in order to operate more easily. */
typedef struct zlentry {
//编码后的前一个节点的长度
unsigned int prevrawlensize; /* Bytes used to encode the previous entry len*/
//前一个节点的长度
unsigned int prevrawlen; /* Previous entry len. */
//节点类型/长度的编码字节单位,字符串为1/2/5,数字一般为1
unsigned int lensize; /* Bytes used to encode this entry type/len.
For example strings have a 1, 2 or 5 bytes
header. Integers always use a single byte.*/
//当前节点数据的长度
unsigned int len; /* Bytes used to represent the actual entry.
For strings this is just the string length
while for integers it is 1, 2, 3, 4, 8 or
0 (for 4 bit immediate) depending on the
number range. */
unsigned int headersize; /* prevrawlensize + lensize. */
//数据类型及长度
unsigned char encoding; /* Set to ZIP_STR_* or ZIP_INT_* depending on
the entry encoding. However for 4 bits
immediate integers this can assume a range
of values and must be range-checked. */
//节点保存的数据
unsigned char *p; /* Pointer to the very start of the entry, that
is, this points to prev-entry-len field. */
} zlentry;
prevrawlen
:以字节为单位,表示前一个节点的数据长度,大小可以是1字节或者5字节;如果前一个节点数据长度小于 254,用1个字节表示;如果前一个节点数据长度大于254,用5个字节表示,其中第一个字节为0xFE
,后四个字节保存前一个字节的长度;由于每个节点都记录了前一个节点的长度,所以在获取当前节点地址的前提下可以反向遍历整个压缩列表;encoding
:数据列类型及长度,其中包括(1)值最高位为00/01/10的代表字节数组(编码长度1/2/5),长度由除开最高两位后其它位记录;(2)值最高位为11代表整数(编码长度为1),节点数据值就是整数值;content
:节点保存的值,节点的值可以是一个字节数组或者整数,值的类型和长度由节点的encoding
属性决定。
连锁更新
若单个节点数据长度小于254,则用1个字节来表示前一个节点的长度值;若长度大于254,需要用5个字节表示节点的长度值。那么需要考虑这样一种情况,在一个压缩列表中有多个连续的长度介于 250字节到 253字节的连续节点 E1、E2、E3、E4.....;此时如果插入一个长度为255的新节点作为E1的前置节点,那么后续E1的 prevrawlen
需要扩大到 5个字节,由于E1 prevrawlen
属性扩大到了5字节,导致E1的数据长度也扩大了,那么E2的 prevrawlen
属性也需要相应扩大,如此连续下去,会触发整个压缩列表节点的连锁更新操作。
同时不仅插入节点会导致连锁更新,删除某个节点也会导致连锁更新,举例如下:
连锁更新需要对压缩列表执行 N次空间重分配操作,每次空间重分配的时间复杂度为 O(N),所以连锁更新最坏时间复杂度为 O(N2),那么应当尽量避免连锁更新的触发:
- 压缩列表只有恰好多个连续、长度介于
250~253
字节的节点,连锁更新才有可能会触发,实际情况不多见; - 连锁更新只要更新节点不多也不会对性能造成很大的影响。