文章目录
本系列
- 本地缓存库分析(一):golang-lru
- 本地缓存库分析(二):bigcache(本文)
- 本地缓存库分析(三):freecache(未完待续)
- 本地缓存库分析(四):fastcache(未完待续)
- 本地缓存库分析(五):groupcache(未完待续)
前言
本文阅读源码:https://github.com/allegro/bigcache,版本:v3.1.0
整体设计
处理并发访问
本地缓存必然要支持并发访问,如果用一个sync.Mutex
锁住整个缓存,会导致整个缓存的读写串行化,性能很低。于是BigCache使用了分片机制
表面上bigcache中所有的数据是存在一个大cache里面,但实际上底层数据分成了N个不互重合的部分,每一个部分称为一个shard
在Set或者Get数据时,先对key计算hash值,根据hash值取余得到目标shard,之后所有的读写操作都是在各自的shard上进行
当一个shard被上锁,其他shard的读写不受影响,锁粒度被大幅度减小,因为锁范围从全局缓存缩小到了单个shard中
另外,BigCache规定shard个数必须是2的平方数。这么做的好处是,在计算hash值应该分配到哪个shard时,v % len
和 v & (n-1)
等价,因此对2的平方数取余可以改成位运算比传统的%
快很多
减少GC开销
golang里实现缓存最简单的方式是map来存储元素,例如map[string]Item
其缺点是在GC的标记阶段,会访问map的每个key,value,当map里存储了大量数据的时候会把大量cpu耗费在标记对象上面,降低程序性能
go在1.5版本增加了一个特性:如果使用map的key和value中都不包含指针,那么GC会忽略这个map
但这无法解决我们的问题,因为基本上任何缓存数据都有指针,例如用到了string,slice的结构。只有基本类型,例如int或bool不包含指针,也就是只能用map[int]int类型的map
解决办法就是:把真正key的hash值当做索引map的key,把缓存对象序列化后放到一个预先分配的字节数组中,然后将它在数组中的开始位置 作为map[int]int的 value
BigCache设计了2层结构,一个map[uint64]uint32类型的索引map
,和一个[]byte类型的底层数组
存放数据
使用[]byte只会给GC增加了一个额外对象,由于byte切片除了自身对象并不包含其他指针数据,所以GC对于整个对象的标记时间是O(1)的
value在底层数组中紧凑存储
从这里可以看出:
- key的hash值冲突了老的就会被覆盖。首先不同key hash冲突的概率非常小,几乎可以忽略。其次就算真的冲突了,这种处理方式在缓存场景能接受
- value需要被序列化成[]byte才能存储
读写流程
于是在BigCache中获取数据的流程为:
- 计算key的hash值
- 定位到所属shard
- 从索引map中找到entry的下标index
- 去底层数据array中,从index位置开始读value
新增缓存流程:
- 计算key的hash值
- 定位到所属shard
- 往底层数据尾部追加value
- 将key的hash值和value的开始下标保存到索引map
删除缓存流程
- 计算key的hash值
- 定位到所属shard
- 从索引map中找到entry的下标index
- 在底层数组中将value置为已删除
- 从索引map中删除该key的hash值,这样以后就查不到了
缓存淘汰
新增数据和对老数据的修改,都往底层数组尾部追加。同时整个BigCache的所有KV共用一个过期时间,这样就满足规律:从头部到尾部,数据越来越新
那么要在定时任务重淘汰过期数据时就好办了:
-
如果头部没过期(entry中存了加入时间),那么所有数据都没过期,退出过期任务
-
否则不断弹出头部元素,直到遇到不过期的头部为止
- 弹出头部数据的操作本身非常轻量,将head指针指向下一个entry,同时在索引map中删除该hash值即可(entry中存了key的hash值)
如果底层数组装满了,无法继续追加数据,怎么办?
-
如果当前数组容量capacity没达到maxCapacity,那就翻倍扩容
-
如果不能扩容了,就从头部弹出数据,直到腾出足够的空间为止
- 此时不管头部元素是否过期都会弹出,但头部数据是最早会过期的,这样选择也合理
但这种FIFO的数据过期淘汰模式给BigCache带来的局限性:相比LRU、LFU来说缓存命中率会低一些
性能优化
用varint编码
底层数据保存每个entry的长度时,用了无符号varint编码,其优点是当数据比较小时,能用更少的空间保存长度:
数字大小 | uvarint编码需要的字节数 |
---|---|
小于127 | 1 |
小于16382 | 2 |
小于2097149 | 3 |
小于268435452 | 4 |
可以看出,entry长度小于16382时,只用2个字节就能装下长度,相比int的8个字节来说能节省一些空间
复用buffer
在把K,V,key的hash值等信息包装成[]byte类型的entry时,每个shard都中的请求都写到同一个entryBuffer中
首先这么做没有并发问题,因为同时只会有1个请求访问某个shard
并且这样避免了每次要包装entry时都new一个slice,减少了内存分配
栈上计算hash值
BigCache自己封装了一个hash计算工具:这么做的目的是避免内存分配
func newDefaultHasher() Hasher {
return fnv64a{}
}
type fnv64a struct{}
const (
offset64 = 14695981039346656037
prime64 = 1099511628211
)
// 没有内存逃逸,栈上计算
func (f fnv64a) Sum64(key string) uint64 {
var hash uint64 = offset64
for i := 0; i < len(key); i++ {
hash ^= uint64(key[i])
hash *= prime64
}
return hash
}
如果用go官方的hash工具,每次计算key的hash值时都需要new一个对象,然后在这个对象上计算
// 计算hash值,写到s对象里,而不是在栈上运算,所以每次都要new一个sum32
func (s *sum32) Write(data []byte) (int, error) {
hash := *s
for _, c := range data {
hash *= prime32
hash ^= sum32(c)
}
*s = hash
return len(data), nil
}
因为返回指针,发生内存逃逸,会被分配到堆上
func New32() hash.Hash32 {
var s sum32 = offset32
return &s
}
数据结构
BigCache比较简单,就是持有一堆shard,主要逻辑在shard上
type BigCache struct {
// 所有shard
shards []*cacheShard
// 数据有效时间
lifeWindow uint64
clock clock
hash Hasher
config Config
// 用于快速计算在哪个key应该在哪个shard
shardMask uint64
close chan struct{}
}
每个shard包含一个hashmap索引,和entries底层数组:
type cacheShard struct {
// key:真正的key的hash值,value:在entries开始位置的下标
hashmap map[uint64]uint32
// 底层数组
entries queue.BytesQueue
lock sync.RWMutex
// 用于每次put时暂存entry,避免每次put时都分配内存
entryBuffer []byte
// 删除时的回调函数
onRemove onRemoveCallback
/**
...
**/
// 数据有效时间
lifeWindow uint64
// 是否开启定时扫描清理过期key
cleanEnabled bool
}
entries不是简单的一个字节数组,还包含了最老的entry在什么位置head
,下次从哪个位置开始写tail
等信息:
type BytesQueue struct {
// 是否满了
full bool
// 真正放数据的数组
array []byte
// 当前array的容量
capacity int
// 最大能扩到多大容量
maxCapacity int
// 最早的一个entry的开始位置
head int
// 下次往尾部push时,从哪个位置开始,也等于最后一个有效数据的下标 + 1
tail int
// KV对的个数
count int
// 右边界
rightMargin int
// 用于往array存header时提高性能
headerBuffer []byte
}
把value放入底层数组时,将其包装成entry
,内存布局如下:
其中blockSize用无符号varint编码
存储,先把blockSize读出来,接下来8个字节是时间戳,再8个字节是key的hash值,再2个字节是key的长度
-
blcokSize:整个entry占多少字节
-
插入时间戳:用于判断是否过期,如果
插入时间戳 + lifeWindow < now
,就代表过期 -
key的hash值:
- 后台定时任务清理过期entry时,需要根据这个值作为key去hashmap中删除索引
- 用来执行软删,如果已经被删除,这个值置为0,后面清理时就不会执行删除回调
-
key长度:用于计算读key时要读多少字节
-
没有存value的长度,因为可以根据
blockSize - header长度 - key长度
计算出来
Get
BigCache.Get
func (c *BigCache) Get(key string) ([]byte, error) {
// 先计算key的hash
hashedKey := c.hash.Sum64(key)
// 根据hash计算在哪个shard
shard := c.getShard(hashedKey)
return shard.get(key, hashedKey)
}
cacheShard.Get
func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) {
s.lock.RLock()
wrappedEntry, err := s.getWrappedEntry(hashedKey)
if err != nil {
s.lock.RUnlock()
return nil, err
}
// 从entry读key值,和参数中的key比较,如果不一样说明hash冲突了,返回没找到
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
s.lock.RUnlock()
s.collision()
return nil, ErrEntryNotFound
}
// key匹配,读value
entry := readEntry(wrappedEntry)
s.lock.RUnlock()
s.hit(hashedKey)
return entry, nil
}
怎么根据key的hash值去底层数据找entry:
func (s *cacheShard) getWrappedEntry(hashedKey uint64) ([]byte, error) {
// 先从hashmap中找到在底层数据哪个下标位置
itemIndex := s.hashmap[hashedKey]
// 等于0表示没有,hashmap中并不会真正用到0下标,都是从1下标开始放数据
if itemIndex == 0 {
s.miss()
return nil, ErrEntryNotFound
}
// 从itemIndex位置读完整的entry
wrappedEntry, err := s.entries.Get(int(itemIndex))
if err != nil {
s.miss()
return nil, err
}
return wrappedEntry, err
}
BytesQueue.Get
// 根据index。去底层数组中找entry
func (q *BytesQueue) Get(index int) ([]byte, error) {
data, _, err := q.peek(index)
return data, err
}
BytesQueue.peek:从index下标读entry
func (q *BytesQueue) peek(index int) ([]byte, int, error) {
// 校验index是否合法
err := q.peekCheckErr(index)
if err != nil {
return nil, 0, err
}
// 先读blockSize,binary.Uvarint返回用无符号varint编码的blockSize
// 以及blockSize占用了多少个字节
blockSize, n := binary.Uvarint(q.array[index:])
// 返回entry的完整数据,entry占用的字节数(包括blockSize占用的)
return q.array[index+n : index+int(blockSize)], int(blockSize), nil
}
entry的blockSize以无符号varint编码
,以节省空间
binary.Uvarint方法从slice中读取以无符号varint编码的数字,以及这个数组占slice的前几个字节
读到entry后,再从里面找key和value
从entry读key值:从header后面读keySize个字节
func readKeyFromEntry(data []byte) string {
// key的size
length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])
// copy on read
dst := make([]byte, length)
copy(dst, data[headersSizeInBytes:headersSizeInBytes+length])
return bytesToString(dst)
}
从entry读value:也就是从key后面读剩下的字节
func readEntry(data []byte) []byte {
// key的长度
length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])
// 总长度 - 头部长度 - key的长度,等于 value的长度
dst := make([]byte, len(data)-int(headersSizeInBytes+length))
// 从哪开始读?header长度 + key的长度
// 读多少字节? len(dst),也就是value的长度
copy(dst, data[headersSizeInBytes+length:])
return dst
}
Set
BigCache.Set:先计算key的hash值,定位到在哪个shard
func (c *BigCache) Set(key string, entry []byte) error {
hashedKey := c.hash.Sum64(key)
shard := c.getShard(hashedKey)
return shard.set(key, hashedKey, entry)
}
cacheShard.set
func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
// 当前时刻秒级时间戳
currentTimestamp := uint64(s.clock.Epoch())
s.lock.Lock()
// 看这个key的hash值之前在不在cache中,
if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 {
// 如果在,读出之前的entry
if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil {
// 将之前entry的hash值部分置为0,相当于标记删除
resetKeyFromEntry(previousEntry)
// 并且从hashmap中删除,这样之前存的entry就再也查不到了
delete(s.hashmap, hashedKey)
}
}
// 如果没有开启后台定时清理
if !s.cleanEnabled {
// 检查最老的一个entry是否过期, 如果过期就删除
if oldestEntry, err := s.entries.Peek(); err == nil {
s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry)
}
}
// 包装成entry
w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer)
for {
// push到entry中
if index, err := s.entries.Push(w); err == nil {
s.hashmap[hashedKey] = uint32(index)
s.lock.Unlock()
return nil
}
// 空间满了,需要淘汰, 从最老的位置开始淘汰,也就是从head位置
if s.removeOldestEntry(NoSpace) != nil {
s.lock.Unlock()
return fmt.Errorf("entry is bigger than max shard size")
}
}
}
可以看到在发生hash冲突时会把老的entry覆盖:将老的entry置位无效,并且直接将其索引删除。在本次set完成后,在索引hashmap中该hashkey会指向新的entry下标
检查最老的数据是否过期:
通过Peek方法获取head位置的entry:
func (q *BytesQueue) Peek() ([]byte, error) {
data, _, err := q.peek(q.head)
return data, err
}
然后判断该entry是否过期:
func (s *cacheShard) isExpired(oldestEntry []byte, currentTimestamp uint64) bool {
// 读entry的前8个字节,就是插入时间戳
oldestTimestamp := readTimestampFromEntry(oldestEntry)
// 判断缓存已存在的时间是否 > lifeWindow
return currentTimestamp-oldestTimestamp > s.lifeWindow
}
如果过期了,从头部弹出该entry,删除其在hashmap的映射,并执行删除回调
func (s *cacheShard) removeOldestEntry(reason RemoveReason) error {
oldest, err := s.entries.Pop()
if err == nil {
hash := readHashFromEntry(oldest)
// 之前已经被标记删除了
if hash == 0 {
return nil
}
// 在hash表中删除
delete(s.hashmap, hash)
s.onRemove(oldest, reason)
if s.statsEnabled {
delete(s.hashmapStats, hash)
}
return nil
}
return err
}
再看看怎么从头部弹出:
// 读取最老的一个entry,并将head指针移动到下一个entry
func (q *BytesQueue) Pop() ([]byte, error) {
data, blockSize, err := q.peek(q.head)
if err != nil {
return nil, err
}
// head指向下一个entry的开头位置
q.head += blockSize
q.count--
// 如果head右边没有entry了,让head回到开头
if q.head == q.rightMargin {
q.head = leftMarginIndex
// 如果整个数组都空了,tail也回到开头
if q.tail == q.rightMargin {
q.tail = leftMarginIndex
}
// 将右边缘更新为tail,此时head一定 <= tail,右边缘设置为tail合理
q.rightMargin = q.tail
}
q.full = false
return data, nil
}
再看看往尾部Push一个entry:
func (q *BytesQueue) Push(data []byte) (int, error) {
// 计算总共需要多少字节:entry + blockSize
neededSize := getNeededSize(len(data))
// 如果尾部的空间不够
if !q.canInsertAfterTail(neededSize) {
// 看能否插在head的前面
if q.canInsertBeforeHead(neededSize) {
// 能插在head前面,更新tail为1,表示下次从1开始往后push
// 也就是底层数组循环了了一圈
q.tail = leftMarginIndex
} else if q.capacity+neededSize >= q.maxCapacity && q.maxCapacity > 0 {
// 空间不够了,需要淘汰最老的
return -1, &queueError{"Full queue. Maximum size limit reached."}
} else {
// 还没到maxCapacity,执行扩容
q.allocateAdditionalMemory(neededSize)
}
}
// 到这说明尾部空间够,插在尾部
index := q.tail
q.push(data, neededSize)
return index, nil
}
尾部空间够时,调push往tail后面放entry:
func (q *BytesQueue) push(data []byte, len int) {
headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len))
// 先push长度
q.copy(q.headerBuffer, headerEntrySize)
// 再push entry
q.copy(data, len-headerEntrySize)
// 如果没有绕一圈
if q.tail > q.head {
// 更新右边缘 = tail
q.rightMargin = q.tail
}
// 如果q.tail == q.head, 说明绕了一圈,且没有空间了
if q.tail == q.head {
q.full = true
}
q.count++
}
其中在copy内部会增加tail的值,更新为本次塞完后,下一次应该往哪塞:
func (q *BytesQueue) copy(data []byte, len int) {
q.tail += copy(q.array[q.tail:], data[:len])
}
随着业务的进行,底层数据可能呈现下面两种形态:
- 状态1:有效数据区域是连续的,此时
head < tail
- 状态2:有效数据区域不连续,在头部和尾部各有一部分,此时
tail < head
rightMargin
的作用是判断后面还有没有entry。如果pop后,head == rightMargin,说明head后面没有元素了,需要将head指向leftMargin,也就是绕了一圈重新回到开头
状态1中,tail等于rightMargin。状态2中,rightMargin等于之前某次tail的值,因为后面的空间无法再装了,tail才从头开始,此时rightMargin依然代表右边缘
怎么判断是状态1还是状态2?如果tail > head就是状态1,反之是状态2
当状态1发现tail后面的空间不够(q.capacity > q.tai < need
),而head前面的空间够(q.head-1 >= need
)时,会把tail移动到leftMargin位置,相当于环形数组绕了一圈
怎么判断能否插在tail后面?
func (q *BytesQueue) canInsertAfterTail(need int) bool {
// 已经满了,不够
if q.full {
return false
}
// 情况1
if q.tail >= q.head {
// tail后面的剩下的空间容量是否大于需要的长度
return q.capacity-q.tail >= need
}
// 情况2
// 1.如果恰好能装下need,那就装
// 2.如果剩下的空间扣减need后,不够装minimumHeaderSize了,就不装 主要用于在扩容时填充漏洞的最小size,本次分析不关心
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
}
怎么判断能否插在head前面?
func (q *BytesQueue) canInsertBeforeHead(need int) bool {
if q.full {
return false
}
// 情况1
if q.tail >= q.head {
// head前面还有足够多的空间
return q.head-leftMarginIndex == need || q.head-leftMarginIndex >= need+minimumHeaderSize
}
// 情况2
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
}
Delete
BigCache.Delete
func (c *BigCache) Delete(key string) error {
hashedKey := c.hash.Sum64(key)
shard := c.getShard(hashedKey)
return shard.del(hashedKey)
}
cacheShard.del:
这里只是把entry置为标记删除,并删除索引,非常轻量
为啥不在底层数据删除该entry?因为那样就涉及到把后面的数据整个拷贝到前面了,比较重,且没必要。这里删除索引后,在外部看来就是删除了
// 删除
func (s *cacheShard) del(hashedKey uint64) error {
s.lock.Lock()
{
// 拿到hash值在array中索引
itemIndex := s.hashmap[hashedKey]
// 如果不存在,不删除
if itemIndex == 0 {
s.lock.Unlock()
s.delmiss()
return ErrEntryNotFound
}
wrappedEntry, err := s.entries.Get(int(itemIndex))
if err != nil {
s.lock.Unlock()
s.delmiss()
return err
}
// 删除索引
delete(s.hashmap, hashedKey)
s.onRemove(wrappedEntry, Deleted)
if s.statsEnabled {
delete(s.hashmapStats, hashedKey)
}
// 清空entry中的hash值
resetKeyFromEntry(wrappedEntry)
}
s.lock.Unlock()
s.delhit()
return nil
}
那底层的entry在什么时机删除?
- 容量不够,需要从头部开始弹出时
- 过期时
过期
如果配置了过期,在初始化BigCache时会起一个后台任务,定时清理过期的entry
if config.CleanWindow > 0 {
go func() {
ticker := time.NewTicker(config.CleanWindow)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
fmt.Println("ctx done, shutting down bigcache cleanup routine")
return
case t := <-ticker.C:
cache.cleanUp(uint64(t.Unix()))
case <-cache.close:
return
}
}
}()
}
BigCache.cleanUp:对每个shard执行cleanUp
func (c *BigCache) cleanUp(currentTimestamp uint64) {
for _, shard := range c.shards {
shard.cleanUp(currentTimestamp)
}
}
cacheShard.cleanUp:从head开始弹出所有过期的entry,取出其中的hash值,作为key去hashmap中删除索引
func (s *cacheShard) cleanUp(currentTimestamp uint64) {
s.lock.Lock()
for {
if oldestEntry, err := s.entries.Peek(); err != nil {
break
} else if evicted := s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry); !evicted {
break
}
}
s.lock.Unlock()
}
总结
最后看看BigCache解决了哪些原生缓存的问题:
问题 | 解决 |
---|---|
锁竞争严重 | 解决,用了多个shard |
大量缓存写入,导致gc标记阶段占用cpu多 | 解决 |
内存占用不可控 | 解决 |
不支持缓存按时效性淘汰 | 按照FIFO的方式淘汰,效果不如LRU,LFU |
不支持缓存过期 | # 支持 |
缓存数据可以被污染 | 解决,使用序列化后的字节数组 |