首页 > 其他分享 >Xv6 Lab9: Locks

Xv6 Lab9: Locks

时间:2023-07-30 21:11:32浏览次数:49  
标签:key lock bucket Xv6 Locks victim table Lab9 bcache

Memory allocator

这一题很简单,主要任务,就是为每个 cpu 维护一个空闲物理内存的链表 freelist,xv6 默认使用的结构体 kmem,其中包含一个 freelist 供所有的 cpu 使用。我们要做的,就是把 freelist 修改成 freelist 的数组,即 struct run *freelist[NCPU],其中 NCPU 是定义于 kernel/params.h 的宏,对应 cpu 的个数。 kmem 中的 spinlock 是用来保护 freelist 的,既然 freelist 变成了数组,那么也需要有 NCPU 个 spinlock。因此,修改 kmem 为如下结构体:

struct {
    struct spinlock lock[NCPU];
    struct run *freelist[NCPU]; // for each cpu, allocate a freelist
} kmem;

接着,我们需要修改 kinit,让它初始化每个 spinlock。

void kinit() {
    char lockname[6] = {'k', 'm', 'e', 'm', '0', 0};
    for (int i = 0; i < NCPU; ++i) {
        lockname[4] = '0' + i;
        initlock(kmem.lock + i, lockname);
    }
    freerange(end, (void *)PHYSTOP);
}

接着,我们需要修改 freerange,使得 freerange 将范围内所有的空闲内存都分配给运行 freerange 的 cpu 的 freelist。这里我们定义了一个新的函数 kfree_cpu 来实现这一功能(记得在 defs.h 中声明),这里要注意一点,调用 cpuid() 以及使用它的结果之前,要关闭外部中断防止定时器中断切走线程,之后再开启中断。

void freerange(void *pa_start, void *pa_end) {
    char *p;
    p = (char *)PGROUNDUP((uint64)pa_start);
    push_off();
    int cid = cpuid();
    for (; p + PGSIZE <= (char *)pa_end; p += PGSIZE) {
        kfree_cpu(p, cid);
    }
    pop_off();
}
void kfree_cpu(void *pa, int cid) {
    struct run *r;
    if (((uint64)pa % PGSIZE) != 0 || (char *)pa < end || (uint64)pa >= PHYSTOP) {
        panic("kfree");
    }
    memset(pa, 1, PGSIZE);
    r = (struct run *)pa;
    r->next = kmem.freelist[cid];
    kmem.freelist[cid] = r;
}

kfree 也要作对应修改,将空闲内存添加到运行 kfree 的 cpu 的 freelist 中。

void kfree(void *pa) {
    struct run *r;
    if (((uint64)pa % PGSIZE) != 0 || (char *)pa < end || (uint64)pa >= PHYSTOP)
        panic("kfree");
    // Fill with junk to catch dangling refs.
    memset(pa, 1, PGSIZE);
    r = (struct run *)pa;
    push_off();
    int cid = cpuid();
    // acquire(&kmem.lock);
    r->next = kmem.freelist[cid];
    kmem.freelist[cid] = r;
    pop_off();
    // release(&kmem.lock);
}

最后还要修改 kalloc,从运行 kalloc 的 cpu 的 freelist 分配,如果 freelist 为空(即 r == 0),那么就从其它 cpu 的 freelist 处获取,这里存在 race condition,需要上锁,完成操作之后再解锁。

void *kalloc(void) {
    struct run *r;
    // alloc 的时候,需要上锁,考虑到当前 cpu 的 freelist 为空,需要使用锁
    // acquire(&kmem.lock);
    push_off();
    int cid = cpuid();
    r = kmem.freelist[cid];
    if (r) {
        kmem.freelist[cid] = r->next;
    } else {
        // 遍历其他 cpu 的 freelist,找一个有空闲块的 cpu 的 freelist
        for (int i = 1; i < NCPU; ++i) {
            int idx = (cid + i) % NCPU;
            acquire(&kmem.lock[idx]);
            r = kmem.freelist[idx];
            if (r) {
                kmem.freelist[idx] = r->next;
                release(&kmem.lock[idx]);
                break;
            }
            release(&kmem.lock[idx]);
        }
    }
    pop_off();
    // release(&kmem.lock);
    if (r)
        memset((char *)r, 5, PGSIZE); // fill with junk
    return (void *)r;
}

Buffer cache

TODO(zwyyy): clockintr 中,为什么要 wakeup(&tickslock);

这个实验室需要我们重新设计 bcache 的数据结构,从单纯的链表变成哈希表,哈希表的 bucket 数量在提示中说明了,可以为 $13$。一开始的时候,我想着不如不使用 bcache,直接另外设计一个哈希表,哈希表的每个 bucket 是一个无虚拟头结点的双向链表,但是写好之后发现一直发生死锁,搞得 bget 非常复杂,尤其是哈希表的 bucket 中的元素要移动到另一个 bucket 中时。

后来将数据结构改成了如下形式:

struct entry {
    struct buf head; // val
    struct spinlock lock;
};

struct {
    struct spinlock lock;
    struct buf buf[NBUF];

    struct entry table[NBUCKET];

    // Linked list of all buffers, through prev/next.
    // Sorted by how recently the buffer was used.
    // head.next is most recent, head.prev is least.
} bcache;

bcache 有三个成员变量,lock、buf 数组 buf 和哈希表 tabletable 其实相当于是一个数组,数组中的元素是一个 spinlock 和一个 struct buf 链表的虚拟头结点,相比原始的 bcache,可以理解为把一个链表分成了 NBUCKET 个链表,同时每个链表有一个 spinlock 来保护链表的数据,链表中结点的插入、删除与原始的 bcache 类似。

但由于我们不需要通过链表的顺序来实现 lru 策略,因此,插入的时候直接作为对应链表的虚拟头结点的 next 即可,即作为实际的头结点。

bget 中,最难实现的部分,就是选择一个 victim buf,然后将它重用为当前 block 的 cache。在这里非常容易出现死锁之类的问题,事实上,为了保护这里的 invariant,即一个 block 只能有一份 block cache,并且只会选择 refcnt == 0 的 buf 作为 victim buf,需要用到 bcache.lock 和哈希表的 bucket 的 spinlock,而我一开始只使用了哈希表的 bucket 的 spinlock,加上没有虚拟头结点导致链表的插入、删除相对麻烦,因此不是发生 panic 就是死锁。

binit

明确了数据结构和思路之后,让我们来一步步实现,首先是 binit,它要实现的功能可以说分为两步:

  1. 初始化 bcache,即 bcache 的 spinlock 和哈希表,前面提到,哈希表可以理解为把原来 bcache 中的链表分解成了 NBUCKET 个链表,对每个链接,让头结点的 nextprev 都指向头结点自身来表示链表为空;
  2. 初始化哈希表中的 NBUCKET 个链表,以及每个链表的 spinlock。原始的 bio.c 中,需要把 NBUF 个 buf 插入到 bcache 的链表中去,而这里我们需要把 NBUF 个 buf 插入到这 NBUCKET 个链表中去,根据 buf 在 buf 数组中的索引对 NBUCKET 取的模的值,来判断它应该放到哪个链表中去。
void binit(void) {
    initlock(&bcache.lock, "bcache");
    for (int i = 0; i < NBUCKET; ++i) {
        bcache.table[i].head.next = &bcache.table[i].head;
        bcache.table[i].head.prev = &bcache.table[i].head;
        // lockname[6] = 'a' + i;
        initlock(&bcache.table[i].lock, "bucket");
    }
    struct buf *b;
    for (int i = 0; i < NBUF; ++i) {
        int key = i % NBUCKET;
        // 无论 table 中的链表是否为空,都可以这样来处理
        b = bcache.buf + i;
        b->key = key;
        b->next = bcache.table[key].head.next;
        b->prev = &bcache.table[key].head;
        initsleeplock(&b->lock, "buffer");
        bcache.table[key].head.next->prev = b;
        bcache.table[key].head.next = b;
    }
}

bget

bget 的实现也可以分成两部分:

  1. block cache 存在于 bcache 中;
  2. block cache 不存在于 bcache 中,需要从 bcache 中寻找 victim buf,然后重用它。

block cache 存在 bcache 的情况下,处理很简单,我们首先由 (dev + 1) * blockno 计算出 target block cahce 的 key,然后 key % NBUCKET 确定该 block cache 位于哪个 bucket,然后对该 bucket 上锁(因为我们需要修改 block cache 的 b->refcnt),遍历 bucket 的链表,找到 devblockno 都对应的那个 block cache b,然后递增 b->refcnt,释放 bucket 的链表的锁。然后请求 b 的 sleeplock,这里主要是考虑到 b 可能要将内容写入到磁盘,需要花费很长时间,同时不希望 b 的内容被其他进程看到,更不希望被修改,所以请求 sleeplock,最后 return b

bcache 中没有这个 block 的 cache 的情况下,处理逻辑就非常复杂了,首先我们需要寻找一个 victim buf,victim buf 一定是 refcnt == 0 的所有 buf 中,ticks 值最小的那个。并且,我们需要保证,当前进程始终持有 victim 所在的 bucket 的锁。

我一开始是这样做的:每次遍历一个 bucket 的链表时持有锁,遍历完就释放,当遍历完所有 bucket 确定下来 victim 之后,需要将 victim 从所在 bucket 中移除,此时先申请bucket 的 spinlock,完成删除之后再释放;然后申请 key 对应的 bucket 的 spinlock,将 victim 插入到 key 对应的 bucket 中,然后释放 spinlock。

这样做的问题在于,找到了 victim 和将 victim 从当前 bucket 中移除时,有一段空隙,当前进程并没有持有 victim 所在的 bucket 的锁,那么此时其他进程就可能修改 victim 的值,例如将 refcnt 从 $0$ 设置为了 $1$,那么就会出现两个 disk block 对应同一个 block cache 的错误情况。

因此,必须保证当前进程始终持有 victim 所在 bucket 的锁,直到将 victim 从当前 bucket 中移除为止。

我是这样实现了,遍历所有的 bucket 来寻找 victim,先申请当前 bucket 的锁,假设没有满足条件的 victim,那么就释放正在遍历的 bucket 的锁,然后开始遍历下一个 bucket;假设找到了满足条件的 victim,如果旧的 victim 和这个新的 victim 不在同一个 bucket 中,那么释放旧 victim 的 bucket 的锁,更新 victim,否则如果旧 victim 和新 victim 位于同一个 bucket 中,那么只更新 victim,不释放锁。

这样操作后,在寻找 victim 和将 victim 从其原本所在的 bucket 中删除之前,进程会始终持有 victim 所在 bucket 的锁。然后申请 key 对应的 bucket 的锁,将 victim 插入到此 bucket 中。

综上, bget 的实现如下,从实现可以看出,其实没有 bcache.lock 的参与就能完成。

static struct buf *bget(uint dev, uint blockno) {
    struct buf *b;
    int i;
    int key = ((dev + 1) * blockno) % NBUCKET;
    // acquire(&bcache.lock);
    acquire(&bcache.table[key].lock);
    for (b = bcache.table[key].head.next; b != &bcache.table[key].head; b = b->next) {
        if (b->dev == dev && b->blockno == blockno) {
            ++b->refcnt;
            release(&bcache.table[key].lock);
            // release(&bcache.lock);
            acquiresleep(&b->lock);
            return b;
        }
    }
    release(&bcache.table[key].lock);
    // Not cached.
    uint stmp = kUintMax;
    int cur_key = 0;
    struct buf *victim;
    // acquire(&bcache.lock);
    // bcache.table[i].lock 必须交错 release,即确保 victim 被更换之前,victim 对应的锁不能被释放
    // 不如直接扫描 bcache 中的所有 buf
    for (i = 0; i < NBUCKET; ++i) {
        acquire(&bcache.table[i].lock);
        int find_flag = 0;
        for (b = bcache.table[i].head.next; b != &bcache.table[i].head; b = b->next) {
            if (b->refcnt == 0 && b->stamp < stmp) {
                stmp = b->stamp;
                if (cur_key != i && holding(&bcache.table[cur_key].lock)) {
                    release(&bcache.table[cur_key].lock);
                }
                victim = b;
                cur_key = i;
                find_flag = 1;
            }
        }
        if (!find_flag) {
            release(&bcache.table[i].lock);
        }
    }
    if (stmp != kUintMax && victim != 0) {
        // acquire(&bcache.table[cur_key].lock);
        victim->dev = dev;
        victim->blockno = blockno;
        victim->valid = 0;
        victim->refcnt = 1;
        victim->key = key;
        // 可能需要移动当前 buf 到其他 bucket
        victim->next->prev = victim->prev;
        victim->prev->next = victim->next; // 从当前链表删除
        release(&bcache.table[cur_key].lock);

        // 插入到新的链表
        acquire(&bcache.table[key].lock);
        victim->next = bcache.table[key].head.next;
        victim->prev = &bcache.table[key].head;
        bcache.table[key].head.next->prev = victim;
        bcache.table[key].head.next = victim;
        release(&bcache.table[key].lock);
        // release(&bcache.lock);
        acquiresleep(&victim->lock);
        return victim;
    }

    panic("bget: no buffers");
}

注意,我们需要修改 struct buf,为其添加 uint stampint key 两个字段,前者一看便知,后者则是说,假如此 buf 在内存中存在 cache block,那么这个 cache block 位于索引为 key 的 bucket 中。

brelsebpinbunpin

最麻烦的函数其实就是 bget 了,剩下这三个都很好处理。brelse 处理 lru 不再需要执行链表的插入删除,只需要递减 buf->refcnt 并将当前 ticks 赋值给 b->stamp 即可,这里要申请 buf 所在 bucket 的锁,以及 tickslock

void brelse(struct buf *b) {
    if (!holdingsleep(&b->lock)) {
        // printf("panic, brelese\n");
        panic("brelse");
    }
    releasesleep(&b->lock);
    acquire(&bcache.table[b->key].lock);
    b->refcnt--;

    acquire(&tickslock);
    b->stamp = ticks;
    release(&tickslock);

    release(&bcache.table[b->key].lock);
}

对于 bpinbunpin,只需要修改持有和释放的锁即可,变成持有 buf 对应的 bucket 的锁。

void bpin(struct buf *b) {
    acquire(&bcache.table[b->key].lock);
    b->refcnt++;
    release(&bcache.table[b->key].lock);
}

void bunpin(struct buf *b) {
    acquire(&bcache.table[b->key].lock);
    b->refcnt--;
    release(&bcache.table[b->key].lock);
}

总结

第一个实验很简单,就不再重复了。第二个实验属实非常酸爽,各种死锁、panic,针对这种死锁的定位和调试,不知道各位有没有什么好办法。另外,个人感觉,在涉及锁的并行编程中,一个精心设计的数据结构能够让你在实现的时候少很多麻烦,例如这里使用带虚拟头结点的循环双向链表来作为 bucket 中的数据结构,而不是简单的双向链表(带虚拟头结点的虚拟尾结点的双向链表应该也可以),因为简单双向链表,边界条件处理起来很麻烦,本身并行编程为了防止 race condition 和 deadlock 就已经很麻烦了,两个麻烦叠加起来绝对是 $1 + 1 > 2$。

另外一点,则是要考虑清楚,我使用锁,到底要保护一个什么样的 invariant,在这题中,我要保证 victim 始终是一个合法的 victim,它的 refcnt 一定不能中途被其他进程修改。

标签:key,lock,bucket,Xv6,Locks,victim,table,Lab9,bcache
From: https://www.cnblogs.com/zwyyy456/p/17592045.html

相关文章

  • STM8S编译错误unable to allocate space for sections/blocks with a total
    STM8S编译错误unabletoallocatespaceforsections/blockswithatotal#欢迎使用Markdown编辑器笔者用IARforSTM8开发碰到很多奇怪的问题,都是因为优化等级引起的首先看这个用dubug编译错误提示:unabletoallocatespaceforsections/blockswithatotalestimatedm......
  • Xv6 Lab7: Multithreading
    Uthread:switchingbetweenthreads这个题还是对的起它moderate的难度了,如果认真看了book-riscv-rev2.pdf的Scheduling章节,以及看了这个课程翻译,那么这题可以很快做出来,个人觉得pdf讲得更加清楚一些。这个题甚至帮你把需要添加代码的地方都标注出来了,参照题目说明,主要......
  • 云原生基础设施实践:NebulaGraph 的 KubeBlocks 集成故事
    像是NebulaGraph这类基础设施上云,通用的方法一般是将线下物理机替换成云端的虚拟资源,依托各大云服务厂商实现“服务上云”。但还有一种选择,就是依托云数据基础设施,将数据库产品变成为云生态的一环,不只是提供自身的数据云服务,还能同其他的数据库一起分析挖掘业务数据价值。在本......
  • [未解决] vue transform-blocks解析源代码报错:Illegal tag name. Use '<' to print '<
    报错内容:[vite]Internalservererror:Illegaltagname.Use'<'toprint'<'.用的是这篇博文的源代码展示方法:如何用vite的vueCustomBlockTransforms(自定义块转换)实现源代码展示使用时突然遇到某一个vue文件添加<demo></demo>标签后报错,但其他vue文件可以正常读取和展示......
  • Xv6 Lab6: Copy-on-Write Fork for xv6
    思路经过lab5:lazypageallocation之后,对xv6的pagefault的处理,算是有所了解了。今天这个COW实验,在2020年的课程视频中有对思路的讲解,可以先看看课程翻译,厘清一下思路。整体思路其实也不难,默认情况下,fokr会调用uvmcopy,将父进程的PP(物理页)复制一份,将这个PP的......
  • Xv6 Lab2
    系统调用Lab1主要是基于提供的系统调用接口来编写一些小工具程序,而Lab2则是要我们自己实现系统调用,并提供系统调用的接口。以本次Lab要我们实现的trace调用为例,说明一下系统调用的流程:在user/trace.c的第$15$行,调用了属于systemcall的trace函数,当前执行makeq......
  • Xv6 Lab4
    RISC-VassemblyWhichregisterscontainargumentstofunctions?Forexample,whichregisterholds13inmain'scalltoprintf?a2寄存器,函数调用时,参数从左到右会依次保存在a0,a1,a2,a3寄存器,似乎是一直到寄存器a7的。Whereisthecalltofunctionfinthe......
  • Xv6 Lab2
    系统调用Lab1主要是基于提供的系统调用接口来编写一些小工具程序,而Lab2则是要我们自己实现系统调用,并提供系统调用的接口。以本次Lab要我们实现的trace调用为例,说明一下系统调用的流程:在user/trace.c的第$15$行,调用了属于systemcall的trace函数,当前执行makeq......
  • Xv6 操作系统组织架构
    进程概述64位的RISC-V的VAS是39位的,即VA只有39位,而Xv6则只有38位,最大虚拟地址为#defineMAXVA0x3fffffffff。VAS的顶端,即最高位存放了两个page,一个是用于trampoline,一个用于mappingtheprocess'strapframe。Xv6使用这两个page来切换到内核以及返回。......
  • MYSQL 执行update语句时报错: The total number of locks exceeds the lock table size
    由于数据量较大导致报错:‘’Thetotalnumberoflocksexceedsthelocktablesize‘’。这句话翻译过来大概是这个意思:总数已经超过锁定表的大小。解决办法:输入查询:showvariableslike"%_buffer%";找到innodb_buffer_pool_size对应的值默认为8388608也就是8兆。我们将其设置......