Summary
在本章中,您将:
- 使用L0 flush实现LSM写路径。
- 实现逻辑以正确更新LSM状态。
要将测试用例复制到启动器代码中并运行它们,
cargo x copy-test --week 1 --day 6
cargo x scheck
Task1-Flush Memtable to SST
此时,我们已经准备好了所有内存中的东西和磁盘上的文件,并且存储引擎能够从所有这些结构中读取和合并数据。现在,我们将实现将东西从内存移动到磁盘的逻辑(所谓的flush),并完成Mini-LSM第1周教程。
在此任务中,您需要修改:
src/lsm_storage.rs src/mem_table.rs
您需要修改
LSMStorageInner::force_flush_next_imm_memtable
和MemTable::flush
。在LSMStorageInner::open
中,如果LSM数据库目录不存在,则需要创建它。要将memtable
刷新到磁盘,我们需要做三件事:
- 选择要flush的memtable。
- 创建memtable对应的SST文件。
- 将memtable从不可变memtable列表中移除,并将SST文件添加到L0 SST中。
我们暂时没有解释什么是L0(0级)SST。通常,它们是作为memtable flush的结果直接创建的SSTs文件的集合。在本教程的第1周中,我们将在磁盘上只有L0 SST。我们将在第2周深入探讨如何在磁盘上使用分层或分层结构有效地组织它们。
请注意,创建SST文件是一个计算量大、成本高的操作。同样,我们不希望长时间持有状态读/写锁,因为它可能会阻塞其他操作,并在LSM操作中产生巨大的延迟峰值。另外,我们使用state_lock互斥锁来序列化LSM树中的状态修改操作。在这个任务中,您需要仔细考虑如何使用这些锁来使LSM状态修改不受竞争条件限制,同时最小化临界部分。
我们没有并发测试用例,您需要仔细考虑您的实现。另外,请记住,不可变memtable列表中的最后一个memtable是最早的,也是您应该刷新的。
剧透:Flush L0伪代码
fn flush_l0(&self) { let _state_lock = self.state_lock.lock(); let memtable_to_flush; let snapshot = { let guard = self.state.read(); memtable_to_flush = guard.imm_memtables.last(); }; let sst = memtable_to_flush.flush()?; { let guard = self.state.write(); guard.imm_memtables.pop(); guard.l0_sstables.insert(0, sst); }; }
force_flush_next_imm_memtable
在开始这个任务前,建议回顾一下Week1Day1中的force_freeze_memtable
的实现。
也正如任务书中给的代码展示的那样,有以下几个步骤:
- 获取修改状态的锁,确保只有一个线程对状态进行修改
- 获取读锁
- 读取数据
- 释放读锁
- 进行flush操作
- 获取写锁
- 对状态进行修改
- 程序结束自动释放写锁
pub fn force_flush_next_imm_memtable(&self) -> Result<()> {
// 获取修改状态的锁,确保只有一个线程对状态进行修改
let _state_lock = self.state_lock.lock();
let memtable_to_flush;
{
// 获取读锁
let guard = self.state.read();
// 读取数据
memtable_to_flush = guard
.imm_memtables
.last()
.expect("no imm memtables!")
.clone();
// 释放读锁
};
// 进行flush操作
let mut sst_builder = SsTableBuilder::new(self.options.block_size);
memtable_to_flush.flush(&mut sst_builder)?;
let sst = Arc::new(sst_builder.build(
memtable_to_flush.id(),
Some(self.block_cache.clone()),
self.path_of_sst(memtable_to_flush.id()),
)?);
{
// 获取写锁
let mut guard = self.state.write();
let mut snapshot = guard.as_ref().clone();
snapshot.imm_memtables.pop();
snapshot.l0_sstables.insert(0, sst.sst_id());
snapshot.sstables.insert(sst.sst_id(), sst);
// 对状态进行修改
*guard = Arc::new(snapshot)
// 程序结束自动释放写锁
};
Ok(())
}
在完成这章节内容过程中,对下面问题进行思考:
1、为什么要释放读锁,再获取写锁?
因为读
、写
锁冲突,读锁不释放,获取不到写锁,可以运行下面这个小demo,只会输出get r_guard
:
use parking_lot::RwLock;
struct LsmStorageState {
number: RwLock<u16>,
}
fn main() {
let state = LsmStorageState {
number: RwLock::new(1)
};
let r_guard = state.number.read();
println!("get r_guard");
let w_guard = state.number.write();
println!("get w_guard");
let r_guard_2 = state.number.read();
println!("get r_guard_2");
}
2、为什么要先进行flush操作,再获取写锁?
因为flush
操作,涉及io操作比较耗时,避免线程长时间持有写锁,阻塞其他线程的读操作。
3、有了读写锁,为什么还要状态锁?
借用Week1Day1中任务书中考虑的:
考虑memtable即将达到容量限制的情况,两个线程成功地将两个键放入memtable中,它们都在放入两个键后发现memtable达到容量限制。他们都会对memtable进行大小检查,并决定冻结它。在这种情况下,我们可能会创建一个空的memtable,然后立即冻结。
4、为什么要clone出一个snapshot,再对snapshot修改,再将snapshot替换?
可以尝试运行下面的代码:
use std::sync::Arc;
use parking_lot::RwLock;
#[derive(Clone)]
struct LsmStorageState {
number: Vec<u16>,
}
struct LsmStorageInner {
state: Arc<RwLock<Arc<LsmStorageState>>>,
}
fn main() {
let lsm = LsmStorageInner {
state: Arc::new(
RwLock::new(Arc::new(LsmStorageState {
number: vec![16]
}))
)
};
let mut guard = lsm.state.write();
let mut snapshot = guard.as_ref();
snapshot.number.push(1);
}
会得到如下报错:
error[E0596]: cannot borrow `snapshot.number` as mutable, as it is behind a `&` reference
--> src/main.rs:23:5
|
23 | snapshot.number.push(1);
| ^^^^^^^^^^^^^^^ `snapshot` is a `&` reference, so the data it refers to cannot be borrowed as mutable
|
help: consider specifying this binding's type
|
22 | let mut snapshot: &mut LsmStorageState = guard.as_ref();
| ++++++++++++++++++++++
因为guard.as_ref()
只能获取到不可变的引用,所以要想修改其中的数据,只能复制一份,在复制的数据上面进行修改。
flush操作
只用将mentable
中的数据,添加进builder
构造者中
pub fn flush(&self, _builder: &mut SsTableBuilder) -> Result<()> {
for entry in self.map.iter() {
_builder.add(KeySlice::from_slice(&entry.key()[..]), &entry.value()[..]);
}
Ok(())
}
Task 2-Flush Trigger
在此任务中,您需要修改:
src/lsm_storage.rs src/compact.rs
当内存中的memtable(immutable+mutable)数量超过LSM存储选项中的num_memtable_limit时,应该将最早的memtable刷新到磁盘。这是由后台的flush线程完成的。flush线程将以MiniLSM结构启动。我们已经实现了启动线程和正确停止线程的必要代码。
在此任务中,您需要在Compact.rs中实现LsmStorageInner::trigger_flush,并在lsm_storage.rs中实现MiniLsm::close。trigger_flush将每50毫秒执行一次。如果memtable的数量超过了限制,应该调用force_flush_next_imm_memtable来刷新一个memtable。当用户调用close函数时,应该等到flush线程(和第2周的compaction线程)结束。
trigger_flush
先阅读定时线程函数spawn_flush_thread
。
crossbeam_channel::tick
创建一个定时发送信号的通道,每50毫秒发送一次信号。loop
循环将持续运行,直到接收到rx
通道的信号。crossbeam_channel::select!
宏用于非阻塞地选择从多个通道接收消息。这里有两个分支:recv(ticker) -> _
:当定时器发送信号时,调用this.trigger_flush()
触发刷新操作。如果有错误发生,打印错误信息。recv(rx) -> _
:当从rx通道接收到信号时,退出循环,从而终止线程。
可知每50ms会触发调用this.trigger_flush()
函数,所以自动转储的代码需要写在这个函数中。
fn trigger_flush(&self) -> Result<()> {
let res = {
let state = self.state.read();
state.imm_memtables.len() >= self.options.num_memtable_limit
};
if res {
self.force_flush_next_imm_memtable()?;
}
Ok(())
}
判断imm_memtables
是否操作参数限制,如果超过了则调用force_flush_next_imm_memtable
转储最老的那个mentable
。
Task 3-Filter the SSTs
现在您已经有了一个完全工作的存储引擎,并且您可以使用mini-lsm-cli与您的存储引擎进行交互。
cargo run --bin mini-lsm-cli -- --compaction none
然后
fill 1000 3000 get 2333 flush fill 1000 3000 get 2333 flush get 2333 scan 2000 2333
如果你填充了更多的数据,你可以看到你的flush线程正在工作并自动刷新L0 SST,而不使用flush命令。
最后,让我们在本周结束之前对SST进行一个简单的优化。根据用户提供的key范围,我们可以很容易的过滤掉一些不包含key范围的SST,这样我们就不需要在merge迭代器中读取它们了。
在此任务中,您需要修改:
src/lsm_storage.rs src/iterators/* src/lsm_iterator.rs
您需要更改读取路径函数以跳过不可能包含键/键范围的SST。您需要为您的迭代器实现num_active_iterator,以便测试用例可以检查您的实现是否正确。对于MergeIterator和TwoMergeIterator,它是所有子迭代器的num_active_iterator之和。需要注意的是,如果你没有修改MergeIterator的起始代码中的字段,记得还要考虑MergeIterator::current。对于LsmIterator和FusedIterator,只需从内部迭代器返回活动迭代器的数量。
你可以实现range_overlap和key_within这样的helper函数来简化你的代码。
这个任务在昨天的Week1Day5中已经实现了,发现实现num_active_iterator
函数后,用例便能通过。