Task1-Two Merge Iterator
在此任务中,您需要修改:
src/iterators/two_merge_iterator.rs
你已经在Week1Day2中实现了一个合并迭代器,它合并相同类型的迭代器(如:memtable迭代器)。既然我们已经实现了SST格式,我们就同时拥有了磁盘上的SST结构和内存中的memtable。当我们从存储引擎扫描时,我们需要将memtable迭代器和SST迭代器的数据合并到一个迭代器中。在这种情况下,我们需要一个
TwoMergeIterator<X, Y>
来合并两个不同类型的迭代器。可以在two_merge_iter.rs中实现TwoMergeIterator。因为我们这里只有两个迭代器,所以我们不需要维护二叉堆。相反,我们可以简单地使用一个标志来指示要读取哪个迭代器。与MergeIterator类似,如果在两个迭代器中找到相同的键,则第一个迭代器优先。
skip_b
跳过迭代器B中与迭代A重复的数据,已迭代器A为主。
choose_a
选择A或者B迭代器
疑问:
1、为什么只有skip_b
,没有skip_a
?
因为在出现重复的时候,需要以迭代器A为主。
2、为什么先执行skip_b
,在执行choose_a
需要先跳过迭代器中重复的数据,避免重复数据输出。
use anyhow::Result;
use super::StorageIterator;
pub struct TwoMergeIterator<A: StorageIterator, B: StorageIterator> {
a: A,
b: B,
choose_a: bool,
}
impl<
A: 'static + StorageIterator,
B: 'static + for<'a> StorageIterator<KeyType<'a> = A::KeyType<'a>>,
> TwoMergeIterator<A, B>
{
fn choose_a(a: &A, b: &B) -> bool {
if !a.is_valid() {
return false;
}
if !b.is_valid() {
return true;
}
a.key() < b.key()
}
fn skip_b(&mut self) -> Result<()> {
if self.a.is_valid() && self.b.is_valid() && self.b.key() == self.a.key() {
self.b.next()?;
}
Ok(())
}
pub fn create(a: A, b: B) -> Result<Self> {
let mut iter = Self {
choose_a: false,
a,
b,
};
iter.skip_b()?;
iter.choose_a = Self::choose_a(&iter.a, &iter.b);
Ok(iter)
}
}
impl<
A: 'static + StorageIterator,
B: 'static + for<'a> StorageIterator<KeyType<'a> = A::KeyType<'a>>,
> StorageIterator for TwoMergeIterator<A, B>
{
type KeyType<'a> = A::KeyType<'a>;
fn key(&self) -> Self::KeyType<'_> {
if self.choose_a {
self.a.key()
} else {
self.b.key()
}
}
fn value(&self) -> &[u8] {
if self.choose_a {
self.a.value()
} else {
self.b.value()
}
}
fn is_valid(&self) -> bool {
if self.choose_a {
self.a.is_valid()
} else {
self.b.is_valid()
}
}
fn next(&mut self) -> Result<()> {
if self.choose_a {
self.a.next()?;
} else {
self.b.next()?;
}
self.skip_b()?;
self.choose_a = Self::choose_a(&self.a, &self.b);
Ok(())
}
}
Task 2: Read Path - Scan
在此任务中,您需要修改:
src/lsm_iterator.rs src/lsm_storage.rs
在实现TwoMergeIterator之后,我们可以将LsmIteratorInner更改为具有以下类型:
type LsmIteratorInner = TwoMergeIterator<MergeIterator<MemTableIterator>, MergeIterator<SsTableIterator>>;
因此,我们的LSM存储引擎的内部迭代器将是一个结合来自memtable和SST的数据的迭代器。
请注意,我们的SST迭代器不支持传递它的右边界。因此,您需要在LsmIterator中手动处理end_bound。您需要修改LsmIterator逻辑,以便在内部迭代器的键到达结束边界时停止。
我们的测试用例将在l0_sstables中生成一些memtables和SST,您需要在此任务中正确地扫描出所有这些数据。到下一章之前不需要修改SST。因此,您可以继续修改您的LsmStorageInner::scan接口,以在所有memtable和SST上创建一个合并迭代器,从而完成您的存储引擎的读取路径。
因为SsTableIterator::create涉及到I/O操作,可能会很慢,所以我们不想在状态临界部分中这样做。因此,首先应该读取状态并克隆LSM状态快照的Arc。然后,你应该放下锁。之后,您可以遍历所有L0 SST并为每个SST创建迭代器,然后创建一个合并迭代器来检索数据。
fn scan(&self) { let snapshot = { let guard = self.state.read(); Arc::clone(&guard) }; // create iterators and seek them }
在LSM存储状态下,我们只将SST id存储在l0_sstables向量中。您需要从sstables哈希映射中检索实际的SST对象。
range_overlap
判断user_begin到user_end的范围和table_begin到table_end的范围是否有交集。
fn range_overlap(
user_begin: Bound<&[u8]>,
user_end: Bound<&[u8]>,
table_begin: KeySlice,
table_end: KeySlice,
) -> bool {
match user_end {
Bound::Excluded(key) if key <= table_begin.raw_ref() => {
return false;
}
Bound::Included(key) if key < table_begin.raw_ref() => {
return false;
}
_ => {}
}
match user_begin {
Bound::Excluded(key) if key >= table_end.raw_ref() => {
return false;
}
Bound::Included(key) if key > table_end.raw_ref() => {
return false;
}
_ => {}
}
true
}
-
Included
当一个边界是
Included
时,这意味着该边界值是包含在范围内的。例如,如果你有一个范围1..=5
,那么数字5
就是被包含在内的。 -
Excluded
当一个边界是Excluded
时,这意味着该边界值是不包含在范围内的。例如,如果你有一个范围1..5
,那么数字5
是不包含在内的。 -
Unbounded
当一个边界是Unbounded
时,这意味着该端点是没有定义的。这通常用于表示一个开放的边界,例如1..
或..5
这样的范围。
scan
- 遍历
sstables
,将与_lower到_upper范围有交集的SsTable
创建迭代器SsTableIterator
。 - 使用
MergeIterator
将多个SsTableIterator
合并 - 使用
TwoMergeIterator
将mentable
的合并迭代器和SsTableIterator
的合并迭代器合并
// SST迭代器
let mut sst_iters = Vec::with_capacity(snapshot.l0_sstables.len());
for table_id in snapshot.l0_sstables.iter() {
let table = snapshot.sstables[table_id].clone();
if range_overlap(
_lower,
_upper,
table.first_key().as_key_slice(),
table.last_key().as_key_slice(),
) {
let iter = match _lower {
Bound::Included(key) => {
SsTableIterator::create_and_seek_to_key(table, KeySlice::from_slice(key))?
}
Bound::Excluded(key) => {
let mut iter = SsTableIterator::create_and_seek_to_key(
table,
KeySlice::from_slice(key),
)?;
if iter.is_valid() && iter.key().raw_ref() == key {
iter.next()?;
}
iter
}
Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table)?,
};
sst_iters.push(Box::new(iter));
}
}
let l0_iter = MergeIterator::create(sst_iters);
let iter = TwoMergeIterator::create(merge_memtable_iter, l0_iter)?;
Ok(FusedIterator::new(LsmIterator::new(
iter,
map_bound(_upper),
)?))
LsmIterator
先修改类型定义:
type LsmIteratorInner =
TwoMergeIterator<MergeIterator<MemTableIterator>, MergeIterator<SsTableIterator>>;
修改构造函数:
pub(crate) fn new(iter: LsmIteratorInner, upper: Bound<Bytes>) -> Result<Self> {
let mut lsm = Self { inner: iter, upper };
if lsm.is_valid() && lsm.value().is_empty() {
lsm.next();
}
Ok(lsm)
}
修改is_valid
函数:
fn is_valid(&self) -> bool {
if !self.inner.is_valid() {
return false;
}
let mut is_valid = true;
match self.upper.as_ref() {
Bound::Included(upper) => is_valid = self.inner.key().raw_ref() <= upper.as_ref(),
Bound::Excluded(upper) => is_valid = self.inner.key().raw_ref() < upper.as_ref(),
Bound::Unbounded => {}
}
is_valid
}
在值有效的情况下,判断右边界是否满足要求。
Task 3: Read Path - Get
在此任务中,您需要修改:
src/lsm_storage.rs
对于get请求,它将被处理为在memtable中查找,然后在SST上扫描。您可以在探测所有memtable之后创建一个覆盖所有SST的合并迭代器。您可以查找到用户想要查找的键。寻找有两种可能:键与用户探测的键相同,键不相同/不存在。你应该只返回值给用户,当键存在,并且是相同的探测。您还应该像上一节一样减少状态锁的临界区。还要记住处理删除的键。
get函数
在扫描imm_memtables
之后,返回之前:
for table in self.state.read().l0_sstables.iter() {
let table = self.state.read().sstables[table].clone();
let iter = SsTableIterator::create_and_seek_to_key(table, KeySlice::from_slice(_key))?;
if iter.is_valid() && iter.key().raw_ref() == _key {
if iter.value().is_empty() {
return Ok(None);
}
return Ok(Some(Bytes::copy_from_slice(iter.value())));
}
}
遍历每个SsTable
,使用seek_to_key
创建迭代器,如果找到一样的key
,且value
值不是空字符串则返回值。