# MIT 6.S081入门lab8 锁
一、参考资料阅读与总结
1.xv6 book书籍阅读(Chapter 7: Scheduling:7.5 to end)
5.sleep与wakeup
- xv6使用了sleep-wake的机制,实现了进程交互的抽象(序列协调/条件同步机制)
- 这一机制的核心是防止丢失唤醒(生产者还未睡眠时,资源更新并唤醒):
如果贸然在睡眠中加锁会导致死锁,因此采用采用加锁后在睡眠函数中释放锁/唤醒获得锁的机制;
6.代码:sleep和wakeup
-
条件同步: 等待1->发生2->发送信号2->唤醒2->继续运行1
-
xv6实现:
-
sleep:
获取自旋锁(celler程序)-> 获取进程锁 ->释放自旋锁(保证不会有wakeup被调用,避免了唤醒丢失)
->记录睡眠频道->设置状态->进入调度
注:当输入自旋锁未进程锁时,什么都不做(wait)
被唤醒->清空睡眠频道->释放进程锁->取会原始锁sleep kernel/proc.c
// Atomically release lock and sleep on chan. // Reacquires lock when awakened. void sleep(void *chan, struct spinlock *lk) { struct proc *p = myproc(); // Must acquire p->lock in order to // change p->state and then call sched. // Once we hold p->lock, we can be // guaranteed that we won't miss any wakeup // (wakeup locks p->lock), // so it's okay to release lk. if(lk != &p->lock){ //DOC: sleeplock0 acquire(&p->lock); //DOC: sleeplock1 release(lk); } // Go to sleep. p->chan = chan; p->state = SLEEPING; sched(); // Tidy up. p->chan = 0; // Reacquire original lock. if(lk != &p->lock){ release(&p->lock); acquire(lk); }
-
wakeup:
遍历进程->获取进程锁(防止唤醒丢失)->检查存在进程睡眠且满足唤醒条件时->修改状态
wakeup1对应的是lk为p->lock的情况,此时不用遍历进程,直接唤醒指定进程wakeup kernel/proc.c
// Wake up all processes sleeping on chan. // Must be called without any p->lock. void wakeup(void *chan) { struct proc *p; for(p = proc; p < &proc[NPROC]; p++) { acquire(&p->lock); if(p->state == SLEEPING && p->chan == chan) { p->state = RUNNABLE; } release(&p->lock); } } // Wake up p if it is sleeping in wait(); used by exit(). // Caller must hold p->lock. static void wakeup1(struct proc *p) { if(!holding(&p->lock)) panic("wakeup1"); if(p->chan == p && p->state == SLEEPING) { p->state = RUNNABLE; } }
-
-
锁机制详解:
sleep保证了当sleep之前,持有条件锁/进程锁/都有;wakeup保证了检查状态时候,需要同时持有条件锁和进程锁;sleep检查条件之前,wakeup使条件满足;要么在sleep睡眠之后,wakeup发现该进程并唤醒它
存在多进程休眠/唤醒时,sleep的acquire(lk)保证唤醒时获取数据是原子的,如果条件不满足,剩余进程仍须睡眠 -> sleep需要放在while循环中多次判断条件是否满足 -
优势: 实现的轻量级(不需要特殊数据结构,如睡眠管道),又提供了抽象。
7.代码:Pipes
-
代码的实际应用: 管道(从一端写入到内核缓冲区,从另一端读出),使用自旋锁+环形缓冲区实现。
满:nwrite==nread+PIPESIZE ;
空:nwrite==nread
;索引:buf[nread%PIPESIZE]
-
管道结构体:
struct pipe kernel/pipe.c
#define PIPESIZE 512 struct pipe { struct spinlock lock; char data[PIPESIZE]; uint nread; // number of bytes read uint nwrite; // number of bytes written int readopen; // read fd is still open int writeopen; // write fd is still open };
-
pipewrite:
获取管道锁-> 写入n字节数据->唤醒nread上的pipread进程
如缓存区满(循环开始)->判断是否出错->唤醒nread上的pipread进程 ->挂起在nwrite上睡眠(循环结束)Pipewrite kernel/pipe.c
int pipewrite(struct pipe *pi, uint64 addr, int n) { int i; char ch; struct proc *pr = myproc(); acquire(&pi->lock); for(i = 0; i < n; i++){ while(pi->nwrite == pi->nread + PIPESIZE){ //DOC: pipewrite-full if(pi->readopen == 0 || pr->killed){ release(&pi->lock); return -1; } wakeup(&pi->nread); sleep(&pi->nwrite, &pi->lock); } if(copyin(pr->pagetable, &ch, addr + i, 1) == -1) break; pi->data[pi->nwrite++ % PIPESIZE] = ch; } wakeup(&pi->nread); release(&pi->lock); return i; }
-
piperead:
获取管道锁-> 检查缓冲区(如为空检查进程状态后,直接挂在nread上睡眠) -> 读出缓存区所有字节 ->唤醒nwrite上的pipwrite进程->释放管道锁Piperead kernel/pipe.c
int piperead(struct pipe *pi, uint64 addr, int n) { int i; struct proc *pr = myproc(); char ch; acquire(&pi->lock); while(pi->nread == pi->nwrite && pi->writeopen){ //DOC: pipe-empty if(pr->killed){ release(&pi->lock); return -1; } sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep } for(i = 0; i < n; i++){ //DOC: piperead-copy if(pi->nread == pi->nwrite) break; ch = pi->data[pi->nread++ % PIPESIZE]; if(copyout(pr->pagetable, addr + i, &ch, 1) == -1) break; } wakeup(&pi->nwrite); //DOC: piperead-wakeup release(&pi->lock); return i; }
-
CPU并发调用情况: 管道锁保证了同时调用时缓存区操作的原子性,sleep/wakeup机制避免了死锁与唤醒丢失
-注意: 一定要使用循环检查被唤醒时条件是否满足
8.代码:wait,exit和kill
-
核心目的: 让父进程发现子进程终止
-
xv6实现:
子进程exit时,将其状态设置为ZOMBIE-> wait注意到终止子进程,将其标记为UNUSED,复制退出状态并返回PID
如父进程比子进程先exit ->子进程托管给init ->init不断调用wait清理终止子进程资源user/init.c main
char *argv[] = { "sh", 0 }; int main(void) { int pid, wpid; if(open("console", O_RDWR) < 0){ mknod("console", CONSOLE, 0); mknod("statistics", STATS, 0); open("console", O_RDWR); } dup(0); // stdout dup(0); // stderr for(;;){ printf("init: starting sh\n"); pid = fork(); if(pid < 0){ printf("init: fork failed\n"); exit(1); } if(pid == 0){ exec("sh", argv); printf("init: exec sh failed\n"); exit(1); } for(;;){ // this call to wait() returns if the shell exits, // or if a parentless process exits. wpid = wait((int *) 0); //不断调用wait函数 if(wpid == pid){ // the shell exited; restart it. break; } else if(wpid < 0){ printf("init: wait returned an error\n"); exit(1); } else { // it was a parentless process; do nothing. } } } }
-
wait:使用进程锁作为条件锁,防止唤醒丢失
申请进程锁-> 扫描所有进程查看子进程 -> 获取子进程锁 -> 检查子进程状态 -> 如为ZOMBIE则复制子进程退出状态到传入地址 ->freeproc清理子进程 -> 释放锁并返回pid;
如没有子进程/父进程被杀死 -> 直接释放锁/返回;
如子进程未完成 -> sleep挂起(释放调用进程锁);
防止死锁: 先对父进程上锁,再对子进程上锁;
注:当检查子进程时,未加锁的原因是为了防止死锁(防止检索到父进程),同时,由于子进程的父子关系只能被父进程所改变,因此这个操作是安全的wait kernel/proc.c
/ Wait for a child process to exit and return its pid. // Return -1 if this process has no children. int wait(uint64 addr) { struct proc *np; int havekids, pid; struct proc *p = myproc(); // hold p->lock for the whole time to avoid lost // wakeups from a child's exit(). acquire(&p->lock); for(;;){ // Scan through table looking for exited children. havekids = 0; for(np = proc; np < &proc[NPROC]; np++){ // this code uses np->parent without holding np->lock. // acquiring the lock first would cause a deadlock, // since np might be an ancestor, and we already hold p->lock. if(np->parent == p){ // np->parent can't change between the check and the acquire() // because only the parent changes it, and we're the parent. acquire(&np->lock); havekids = 1; if(np->state == ZOMBIE){ // Found one. pid = np->pid; if(addr != 0 && copyout(p->pagetable, addr, (char *)&np->xstate, sizeof(np->xstate)) < 0) { release(&np->lock); release(&p->lock); return -1; } freeproc(np); release(&np->lock); release(&p->lock); return pid; } release(&np->lock); } } // No point waiting if we don't have any children. if(!havekids || p->killed){ release(&p->lock); return -1; } // Wait for a child to exit. sleep(p, &p->lock); //DOC: wait-sleep } }
-
exit: 关闭所有文件-> 唤醒init进程(准备处理潜在的僵尸进程) -> 提取出p->parent防止死锁 ->将当前进程子进程托管给init -> 唤醒父进程 -> 设置当前进程为ZOMBIE ->调度
锁的控制:
设置状态并唤醒父进程时需要持有父进程的锁 -> 防止唤醒丢失;
exit自己持有自己进程的锁->防止被设为ZOMBIE后意外释放;
上锁规则->先父后子,防止死锁。exit reparent kernel/proc.c
// Exit the current process. Does not return. // An exited process remains in the zombie state // until its parent calls wait(). void exit(int status) { struct proc *p = myproc(); if(p == initproc) panic("init exiting"); // Close all open files. for(int fd = 0; fd < NOFILE; fd++){ if(p->ofile[fd]){ struct file *f = p->ofile[fd]; fileclose(f); p->ofile[fd] = 0; } } begin_op(); iput(p->cwd); end_op(); p->cwd = 0; // we might re-parent a child to init. we can't be precise about // waking up init, since we can't acquire its lock once we've // acquired any other proc lock. so wake up init whether that's // necessary or not. init may miss this wakeup, but that seems // harmless. acquire(&initproc->lock); wakeup1(initproc); release(&initproc->lock); // grab a copy of p->parent, to ensure that we unlock the same // parent we locked. in case our parent gives us away to init while // we're waiting for the parent lock. we may then race with an // exiting parent, but the result will be a harmless spurious wakeup // to a dead or wrong process; proc structs are never re-allocated // as anything else. acquire(&p->lock); struct proc *original_parent = p->parent; release(&p->lock); // we need the parent's lock in order to wake it up from wait(). // the parent-then-child rule says we have to lock it first. acquire(&original_parent->lock); acquire(&p->lock); // Give any children to init. reparent(p); // Parent might be sleeping in wait(). wakeup1(original_parent); p->xstate = status; p->state = ZOMBIE; release(&original_parent->lock); // Jump into the scheduler, never to return. sched(); panic("zombie exit"); } ... // Pass p's abandoned children to init. // Caller must hold p->lock. void reparent(struct proc *p) { struct proc *pp; for(pp = proc; pp < &proc[NPROC]; pp++){ // this code uses pp->parent without holding pp->lock. // acquiring the lock first could cause a deadlock // if pp or a child of pp were also in exit() // and about to try to lock p. if(pp->parent == p){ // pp->parent can't change between the check and the acquire() // because only the parent changes it, and we're the parent. acquire(&pp->lock); pp->parent = initproc; // we should wake up init here, but that would require // initproc->lock, which would be a deadlock, since we hold // the lock on one of init's children (pp). this is why // exit() always wakes init (before acquiring any locks). release(&pp->lock); } } }
-
kill:当前进程终止其他进程,由于直接终止会导致进程的混乱(终止进程情况不确定),因此依赖调度器结束进程
遍历进程找到目标进程 -> 将kill标记为1 ->如目标进程睡眠,调整状态为可运行(唤醒)
同时,再usertrap检查中,在系统调用前与系统调用后都需要检查该进程是否被杀死,如被杀死,直接调用exit终止进程
注意: 在调用sleep的循环中,应该加入对p->killed的检测;同时,在系统关键部分(如磁盘驱动),由于关键人物分多步完成,因此,不检查p->killed防止关键系统(如文件系统)出错kill kernel/proc.c
// Kill the process with the given pid. // The victim won't exit until it tries to return // to user space (see usertrap() in trap.c). int kill(int pid) { struct proc *p; for(p = proc; p < &proc[NPROC]; p++){ acquire(&p->lock); if(p->pid == pid){ p->killed = 1; if(p->state == SLEEPING){ // Wake process from sleep(). p->state = RUNNABLE; } release(&p->lock); return 0; } release(&p->lock); } return -1; }
usertrap kernel/trap.c
void usertrap(void) { // ... if(r_scause() == 8){ // system call if(p->killed) exit(-1); // ... syscall(); } // On the way out,usertrap checks if the process has been killed // or should yield the CPU (if this trap is a timer interrupt) if(p->killed) exit(-1); // ... usertrapret(); }
9.真实世界与总结
- 调度策略: xv6:轮转调度;真实操作系统:优先级调度等
- 复杂调度策略存在的问题:
优先级反转: 由于锁与优先级不匹配,因此导致高优先级等待时间变长;解决方案: 继承优先级,即所有正在访问资源的进程获得同样需要访问该资源的更高优先级进程的优先级,直到它们用完了有关资源,它们的优先级才恢复到原始值;
无限阻塞(饥饿): 低优先级会一直阻塞;解决方案: 老化机制,根据的等待时间增加优先级;
Lock Convoys::锁与优先级不匹配导致的频繁争抢锁带来的性能退化 - linux使用调度策略: CFS完全公平调度算法
- sleep-wakup防止唤醒丢失的解决方案: 早期Unix关闭中断(单CPU);xv6、FreeBSD:使用上下文显式锁;Plan9:持有锁的回调函数;Linux:使用带锁的显式进程队列(等待队列)。
- xv6唤醒机制存在的问题: 虚假唤醒(惊群效应)、解决方案:单一进程唤醒与所有进程唤醒分开
- 注:对于进程的终止并释放相当复杂,需要考虑不同的情况;
- 现代操作系统对进程索引的改进:使用红黑树实现常数时间的索引。
二、涉及函数
-
proc中函数基本已经介绍完毕,剩余的辅助函数分别是either_copyout、either_copyin(不定向拷贝[从内核/用户态])和procdump查看当前进程
either_copyout either_copyin procdump kernel/proc.c
// Copy to either a user address, or kernel address, // depending on usr_dst. // Returns 0 on success, -1 on error. int either_copyout(int user_dst, uint64 dst, void *src, uint64 len) { struct proc *p = myproc(); if(user_dst){ return copyout(p->pagetable, dst, src, len); } else { memmove((char *)dst, src, len); return 0; } } // Copy from either a user address, or kernel address, // depending on usr_src. // Returns 0 on success, -1 on error. int either_copyin(void *dst, int user_src, uint64 src, uint64 len) { struct proc *p = myproc(); if(user_src){ return copyin(p->pagetable, dst, src, len); } else { memmove(dst, (char*)src, len); return 0; } } // Print a process listing to console. For debugging. // Runs when user types ^P on console. // No lock to avoid wedging a stuck machine further. void procdump(void) { static char *states[] = { [UNUSED] "unused", [SLEEPING] "sleep ", [RUNNABLE] "runble", [RUNNING] "run ", [ZOMBIE] "zombie" }; struct proc *p; char *state; printf("\n"); for(p = proc; p < &proc[NPROC]; p++){ if(p->state == UNUSED) continue; if(p->state >= 0 && p->state < NELEM(states) && states[p->state]) state = states[p->state]; else state = "???"; printf("%d %s %s", p->pid, state, p->name); printf("\n"); } }
-
睡眠锁在cow部分已经介绍了,此处不在赘述;cow部分:https://www.cnblogs.com/David-Dong/p/18053283
三、课程视频观看笔记(lec13)
- 线程切换: 使用swtch()切换时,首先获取进程锁,之后由调度器释放锁 -> 防止一个进程被多个核心运行(由于多CPU共享统一内存)
- swtch()对锁的限制: 在调用swtch时,只能持有线程锁:防止进程间切换导致的死锁;
- 协调机制(睡眠-唤醒机制): 由于线程代码存在需要进行线程交互的的操作(管道/读取外设/等待),因此需要协调机制(自旋等待->等待时间较短)(睡眠-唤醒机制->等待时间较长)
- 例:UART驱动:由于UART反应较慢,因此在发送时需要使用发送休眠->发送完成中断唤醒机制,从而保证CPU的使用率;
- 唤醒丢失:由于访问共享设备/内存的问题,因此在访问独占信号会导致死锁;同时,由于如果锁不连续,会导致唤醒丢失,即在两个锁的间隙触发中断,即在sleep之前触发中断导致wakeup的空唤醒;
- 唤醒丢失的解决方案:
sleep:通过传递锁保护其等待的资源->通过资源锁和进程锁的传递,保证了sleep的原子性;
wakeup:通过获取进程锁保持查询状态的原子性; - 关闭线程:存在问题:其他线程可能被其他线程所依赖;线程可能存在使用内核结构/资源的过程中;
- 退出线程: 关闭打开文件->释放文件系统->init继承其子线程->唤醒父进程->设置状态为ZOMBIE->进入调度器
- wait():寻找子进程->判断子进程状态为ZOMBIE-> 保存返回值并释放资源;(init使用wait巡查进程插槽,实现进程的释放)
- 注:获取原始父进程是处理父子进程同时退出的情况
- kill(): 本质是标记进程并唤醒其睡眠状态,等待中断调度实现退出
- 注在while中可以根据情况,判断其是否能在sleep时候被kill(通过判断该进程的killed标记)
- 权限管理: 在xv6是没有权限管理的,在真实系统,如linux中,在kill时是由权限检查的。
四、完成lab及其代码
-
锁竞争优化一般思路:
- 只在必须共享的时候共享(对应为将资源从 CPU 共享拆分为每个 CPU 独立)(实验1)
- 必须共享时,尽量减少在关键区中停留的时间(对应“大锁化小锁”,降低锁的粒度)(实验2)
-
Memory allocator: 这个实验核心就是对cpu的内存锁进行分桶操作,使其能够实现并行申请内存
kernel/def.h
... // kalloc.c ... void * ksteal (int);\ ...
kernel/kalloc.c
// Physical memory allocator, for user processes, // kernel stacks, page-table pages, // and pipe buffers. Allocates whole 4096-byte pages. #include "types.h" #include "param.h" #include "memlayout.h" #include "spinlock.h" #include "riscv.h" #include "defs.h" void freerange(void *pa_start, void *pa_end); extern char end[]; // first address after kernel. // defined by kernel.ld. struct run { struct run *next; }; struct { struct spinlock lock; struct run *freelist; } kmem[NCPU]; //将内存分桶,实现单个cpu单个内存链表 void kinit() { for (int i = 0; i < NCPU; i++) { char lock_name[9] = {0}; snprintf(lock_name,8, "kmem_%d",i); initlock(&kmem[i].lock, lock_name); //锁的初始化 } // initlock(&kmem.lock, "kmem"); freerange(end, (void*)PHYSTOP); } void freerange(void *pa_start, void *pa_end) { char *p; p = (char*)PGROUNDUP((uint64)pa_start); for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE) kfree(p); } // Free the page of physical memory pointed at by v, // which normally should have been returned by a // call to kalloc(). (The exception is when // initializing the allocator; see kinit above.) void kfree(void *pa) { struct run *r; if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP) panic("kfree"); push_off(); //注意这里操作要关中断,防止被其他进程调度打断 int cpu = cpuid(); // Fill with junk to catch dangling refs. memset(pa, 1, PGSIZE); r = (struct run*)pa; acquire(&kmem[cpu].lock); r->next = kmem[cpu].freelist; kmem[cpu].freelist = r; release(&kmem[cpu].lock); pop_off(); } // Allocate one 4096-byte page of physical memory. // Returns a pointer that the kernel can use. // Returns 0 if the memory cannot be allocated. void * kalloc(void) { struct run *r; push_off(); int cpu = cpuid(); acquire(&kmem[cpu].lock); r = kmem[cpu].freelist; if(r) kmem[cpu].freelist = r->next; release(&kmem[cpu].lock); //这里需要释放自己的锁,但是保持关中断,防止cpu重复偷页导致的多次调度 if(r == 0) { r = ksteal(cpu); } if(r) memset((char*)r, 5, PGSIZE); // fill with junk pop_off(); return (void*)r; } //Try to steal a free physical memory page from another core // interrupt should be off //return address / NULL void * ksteal (int cpu) { struct run *r; for (int i = 0; i < NCPU; i++) { //注意这里牺牲了遍历性,即本cpu也遍历了,但是如果做循环遍历的方法,如果不第一时间获取锁会导致页丢失 acquire(&kmem[i].lock); r = kmem[i].freelist; if (r) //onepage steal kmem[i].freelist = r->next; release(&kmem[i].lock); if (r) break; } return (void*) r; }
-
Buffer cache
- 核心是bget()函数,这个函数实现三个功能:寻找缓存块;缓存块分配;缓存块重用。
- 最重要问题是如何调度锁,由其是保证持有两个以上锁的时候需要相应的策略保证不会导致死锁;
kernel/buf.h
struct buf { int valid; // has data been read from disk? int disk; // does disk "own" buf? uint dev; uint blockno; struct sleeplock lock; uint refcnt; struct buf *prev; // LRU cache list struct buf *next; uchar data[BSIZE]; uint timestamp; //the buf last using time };
kerenl/bio.c
// Buffer cache. // // The buffer cache is a linked list of buf structures holding // cached copies of disk block contents. Caching disk blocks // in memory reduces the number of disk reads and also provides // a synchronization point for disk blocks used by multiple processes. // // Interface: // * To get a buffer for a particular disk block, call bread. // * After changing buffer data, call bwrite to write it to disk. // * When done with the buffer, call brelse. // * Do not use the buffer after calling brelse. // * Only one process at a time can use a buffer, // so do not keep them longer than necessary. #include "types.h" #include "param.h" #include "spinlock.h" #include "sleeplock.h" #include "riscv.h" #include "defs.h" #include "fs.h" #include "buf.h" #define NBUCKET 13 #define HASH(blockno) (blockno % NBUCKET) extern uint ticks; struct { struct spinlock lock; struct buf buf[NBUF]; // Linked list of all buffers, through prev/next. // Sorted by how recently the buffer was used. // head.next is most recent, head.prev is least. struct spinlock hashlock; int used_size; //used buf size; struct buf buckets[NBUCKET]; //hash-table bucket struct spinlock buckets_lock[NBUCKET]; //hash-table bucket lock // struct buf head; } bcache; // init lock and count nums void binit(void) { struct buf *b; bcache.used_size = 0; initlock(&bcache.lock, "bcache"); initlock(&bcache.hashlock, "bcache_hashlock"); for (int i = 0; i < NBUCKET; i++) { char lock_name[16] = {0}; snprintf(lock_name,16, "bcache_hashlock%d",i); initlock(&bcache.buckets_lock[i], lock_name); bcache.buckets[i].next = 0; } // Create linked list of buffers // bcache.head.prev = &bcache.head; // bcache.head.next = &bcache.head; for(b = bcache.buf; b < bcache.buf+NBUF; b++){ // b->next = bcache.head.next; // b->prev = &bcache.head; initsleeplock(&b->lock, "buffer"); b->refcnt = 0; b->timestamp = 0; // bcache.head.next->prev = b; // bcache.head.next = b; } } // Look through buffer cache for block on device dev. // If not found, allocate a buffer. // In either case, return locked buffer. static struct buf* bget(uint dev, uint blockno) { struct buf *b, *pre_min_b = 0; int min_buckets_num = -1, key = HASH(blockno); //对搜寻当前链表,查看缓存是否命中 //(锁使用当前列表的锁buckets_lock[key]保护) acquire(&bcache.buckets_lock[key]); // Is the block already cached? for(b = bcache.buckets[key].next; b != 0; b = b->next){ if(b->dev == dev && b->blockno == blockno){ b->refcnt++; release(&bcache.buckets_lock[key]); // release(&bcache.lock); acquiresleep(&b->lock); return b; } } //搜寻是否存在空余缓存,进行进行缓存的分配 //(由bcache.lock大锁+buckets_lock[key]保护) // Not cached.to check unused buf acquire(&bcache.lock); if (bcache.used_size < NBUF) { b = &bcache.buf[bcache.used_size++]; b->dev = dev; b->blockno = blockno; b->valid = 0; b->refcnt = 1; b->next = bcache.buckets[key].next; bcache.buckets[key].next = b; release(&bcache.lock); release(&bcache.buckets_lock[key]); acquiresleep(&b->lock); return b; } release(&bcache.lock); release(&bcache.buckets_lock[key]); //如果未找到/分配的化释放全部锁,进入LRU // Recycle the least recently used (LRU) unused buffer. //first to seek again acquire(&bcache.hashlock); //保证哈希表的搜索 // Is the block already cached? 获取哈希锁后再进行搜寻,由于从硬盘读取时间成本远远大于搜索时间成本,因此检测是否存在其他进程实现了读取( //由bcache.hashlock大锁+buckets_lock[key]保护) for(b = bcache.buckets[key].next; b != 0; b = b->next){ if(b->dev == dev && b->blockno == blockno){ acquire(&bcache.buckets_lock[key]); b->refcnt++; release(&bcache.buckets_lock[key]); acquire(&bcache.hashlock); acquiresleep(&b->lock); return b; } } // 注意此处持有哈希锁 //遍历全局散列表,寻找相应的最小时间戳(全局LRU,搜寻成本较高, linux使用的是红黑树+链表实现,之后可以进行修改) //由bcache.hashlock大锁+buckets_lock[i]保护) for (int i = 0; i < NBUCKET; ++i) { acquire(&bcache.buckets_lock[i]); int signal_foundnew = 0; for(b = &bcache.buckets[i]; b->next != 0; b = b->next){ if(b->next->refcnt == 0 && (!pre_min_b || b->next->timestamp < pre_min_b->next->timestamp)) { pre_min_b = b; signal_foundnew = 1; } } if (!signal_foundnew) { release(&bcache.buckets_lock[i]); //这个散列表中没找到,释放(buckets_lock[i]) } else { if (min_buckets_num != -1) release(&bcache.buckets_lock[min_buckets_num]); //找到后更新持有的锁(注意此处由于后续需要对该散列表进行操作,因此需要保证目标散列表操作的原子性) min_buckets_num = i; } } //注意,此处是持有两个锁 哈希锁和目标散列表的锁 //获取目标缓存 if (!pre_min_b) { panic("bget: no buffers"); } else { b = pre_min_b->next; } //如果找到的缓存不再当前散列表中,需要进行切换 if (min_buckets_num != key) { pre_min_b->next = b->next; //修改被偷取的bucket的链表 release(&bcache.buckets_lock[min_buckets_num]); //释放目标散列表的锁 acquire(&bcache.buckets_lock[key]); //获取当前链表的锁(更新持有的锁) b->next = bcache.buckets[key].next; bcache.buckets[key].next = b; } //注意,此处持有两个锁,哈希锁和当前散列表的锁 //完成操作 b->dev = dev; b->blockno = blockno; b->valid = 0; b->refcnt = 1; release(&bcache.buckets_lock[key]); release(&bcache.hashlock); acquiresleep(&b->lock); return b; // release(&bcache.lock); } // Return a locked buf with the contents of the indicated block. struct buf* bread(uint dev, uint blockno) { struct buf *b; b = bget(dev, blockno); if(!b->valid) { virtio_disk_rw(b, 0); b->valid = 1; } return b; } // Write b's contents to disk. Must be locked. void bwrite(struct buf *b) { if(!holdingsleep(&b->lock)) panic("bwrite"); virtio_disk_rw(b, 1); } // Release a locked buffer. // Move to the head of the most-recently-used list. void brelse(struct buf *b) { int key; if(!holdingsleep(&b->lock)) panic("brelse"); releasesleep(&b->lock); // acquire(&bcache.lock); key = HASH(b->blockno); acquire(&bcache.buckets_lock[key]); b->refcnt--; if (b->refcnt == 0) { // no one is waiting for it. // b->next->prev = b->prev; // b->prev->next = b->next; // b->next = bcache.head.next; // b->prev = &bcache.head; // bcache.head.next->prev = b; // bcache.head.next = b; b->timestamp = ticks; } release(&bcache.buckets_lock[key]); // release(&bcache.lock); } void bpin(struct buf *b) { int key = HASH(b->blockno); acquire(&bcache.buckets_lock[key]); b->refcnt++; release(&bcache.buckets_lock[key]); } void bunpin(struct buf *b) { int key = HASH(b->blockno); acquire(&bcache.buckets_lock[key]); b->refcnt--; release(&bcache.buckets_lock[key]); }
参考文献
2020版xv6手册:http://xv6.dgs.zone/tranlate_books/book-riscv-rev1/c6/s0.html
xv6手册与代码笔记:https://zhuanlan.zhihu.com/p/353580321
xv6手册中文版:http://xv6.dgs.zone/tranlate_books/book-riscv-rev1/c7/s5.html
28天速通MIT 6.S081操作系统公开课:https://zhuanlan.zhihu.com/p/629372818
MIT6.s081操作系统笔记:https://juejin.cn/post/7021218568226209828