首页 > 其他分享 >Paimon lookup store 实现

Paimon lookup store 实现

时间:2024-10-29 23:33:23浏览次数:1  
标签:MemorySlice return int length lookup store key Paimon block

Lookup Store 主要用于 Paimon 中的 Lookup Compaction 以及 Lookup join 的场景. 会将远程的列存文件在本地转化为 KV 查找的格式.

Hash

https://github.com/linkedin/PalDB

Sort

https://github.com/dain/leveldb
https://github.com/apache/paimon/pull/3770

Pasted image 20241029081723
整体文件结构:

Pasted image 20241029230800|182

相比于 Hash file 的优势

  • 一次写入, 避免了文件merge
  • 顺序写入, 保持原先的 key 的顺序, 后续如果按照 key 的顺序查找, 可提升缓存效率

SortLookupStoreWriter

SortLookupStoreWriter#put

put

@Override
public void put(byte[] key, byte[] value) throws IOException {
	dataBlockWriter.add(key, value);
	if (bloomFilter != null) {
		bloomFilter.addHash(MurmurHashUtils.hashBytes(key));
	}

	lastKey = key;

	// 当BlockWriter写入达到一定阈值, 默认是 cache-page-size=64kb.
	if (dataBlockWriter.memory() > blockSize) {
		flush();
	}

	recordCount++;
}

flush

private void flush() throws IOException {  
    if (dataBlockWriter.size() == 0) {  
        return;  
    }  
	// 将data block写入数据文件, 并记录对应的position和长度
    BlockHandle blockHandle = writeBlock(dataBlockWriter);  
    MemorySlice handleEncoding = writeBlockHandle(blockHandle);
    // 将BlockHandle 写入index writer, 这也通过是一个BlockWriter写的
    indexBlockWriter.add(lastKey, handleEncoding.copyBytes());  
}

writeBlock

private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException {
	// close the block
	// 获取block的完整数组, 此时blockWriter中的数组并不会被释放, 而是会继续复用
	MemorySlice block = blockWriter.finish();

	totalUncompressedSize += block.length();

	// attempt to compress the block
	BlockCompressionType blockCompressionType = BlockCompressionType.NONE;
	if (blockCompressor != null) {
		int maxCompressedSize = blockCompressor.getMaxCompressedSize(block.length());
		byte[] compressed = allocateReuseBytes(maxCompressedSize + 5);
		int offset = encodeInt(compressed, 0, block.length());
		int compressedSize =
				offset
						+ blockCompressor.compress(
								block.getHeapMemory(),
								block.offset(),
								block.length(),
								compressed,
								offset);

		// Don't use the compressed data if compressed less than 12.5%,
		if (compressedSize < block.length() - (block.length() / 8)) {
			block = new MemorySlice(MemorySegment.wrap(compressed), 0, compressedSize);
			blockCompressionType = this.compressionType;
		}
	}

	totalCompressedSize += block.length();

	// create block trailer
	// 每一块block会有一个trailer, 记录压缩类型和crc32校验码
	BlockTrailer blockTrailer =
			new BlockTrailer(blockCompressionType, crc32c(block, blockCompressionType));
	MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer);

	// create a handle to this block
	// BlockHandle 记录了每个block的其实position和长度
	BlockHandle blockHandle = new BlockHandle(position, block.length());

	// write data
	// 将数据追加写入磁盘文件
	writeSlice(block);

	// write trailer: 5 bytes
	// 写出trailer
	writeSlice(trailer);

	// clean up state
	blockWriter.reset();

	return blockHandle;
}

close

public LookupStoreFactory.Context close() throws IOException {
	// flush current data block
	flush();

	LOG.info("Number of record: {}", recordCount);

	// write bloom filter
	@Nullable BloomFilterHandle bloomFilterHandle = null;
	if (bloomFilter != null) {
		MemorySegment buffer = bloomFilter.getBuffer();
		bloomFilterHandle =
				new BloomFilterHandle(position, buffer.size(), bloomFilter.expectedEntries());
		writeSlice(MemorySlice.wrap(buffer));
		LOG.info("Bloom filter size: {} bytes", bloomFilter.getBuffer().size());
	}

	// write index block
	// 将index数据写出至文件
	BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);

	// write footer
	// Footer 记录bloomfiler + index
	Footer footer = new Footer(bloomFilterHandle, indexBlockHandle);
	MemorySlice footerEncoding = Footer.writeFooter(footer);
	writeSlice(footerEncoding);

	// 最后关闭文件
	// close file
	fileOutputStream.close();

	LOG.info("totalUncompressedSize: {}", MemorySize.ofBytes(totalUncompressedSize));
	LOG.info("totalCompressedSize: {}", MemorySize.ofBytes(totalCompressedSize));
	return new SortContext(position);
}

BlockWriter

add

public void add(byte[] key, byte[] value) {
	int startPosition = block.size();
	// 写入key长度
	block.writeVarLenInt(key.length);
	// 写入key
	block.writeBytes(key);
	// 写入value长度
	block.writeVarLenInt(value.length);
	// 写入value
	block.writeBytes(value);
	int endPosition = block.size();

	// 使用一个int数组记录每个KV pair的起始位置作为索引
	positions.add(startPosition);
	// 是否对齐. 是否对齐取决于每个KV对的长度是否一样
	if (aligned) {
		int currentSize = endPosition - startPosition;
		if (alignedSize == 0) {
			alignedSize = currentSize;
		} else {
			aligned = alignedSize == currentSize;
		}
	}
}
  • 这里的 block 对应于一块可扩容的 MemorySegment, 也就是 byte[] , 当写入长度超过当前数组的长度时, 就会扩容

finish

public MemorySlice finish() throws IOException {
	if (positions.isEmpty()) {
		throw new IllegalStateException();
	}
	// 当通过BlockWriter写出的数据长度都是对齐的时, 就不需要记录各个Position的index了, 只需要记录一个对齐长度, 读取时自己可以计算.
	if (aligned) {
		block.writeInt(alignedSize);
	} else {
		for (int i = 0; i < positions.size(); i++) {
			block.writeInt(positions.get(i));
		}
		block.writeInt(positions.size());
	}
	block.writeByte(aligned ? ALIGNED.toByte() : UNALIGNED.toByte());
	return block.toSlice();
}

小结

整个文件的写出过程非常简单, 就是按 block 写出, 并且记录每个 block 的位置, 作为 index.

SortLookupStoreReader

读取的过程, 主要就是为了查找 key 是否存在, 以及对应的 value 或者对应的行号.

public byte[] lookup(byte[] key) throws IOException {
	// 先通过bloomfilter提前进行判断
	if (bloomFilter != null && !bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) {
		return null;
	}

	MemorySlice keySlice = MemorySlice.wrap(key);
	// seek the index to the block containing the key
	indexBlockIterator.seekTo(keySlice);

	// if indexIterator does not have a next, it means the key does not exist in this iterator
	if (indexBlockIterator.hasNext()) {
		// seek the current iterator to the key
		// 根据从index block中读取到的key value的位置(BlockHandle), 读取对应的value block
		BlockIterator current = getNextBlock();
		// 在value的iterator中再次二分查找寻找对应block中是否存在match的key, 如果存在则返回对应的数据
		if (current.seekTo(keySlice)) {
			return current.next().getValue().copyBytes();
		}
	}
	return null;
}
  • 查找一次 key 会经历两次二分查找(index + value).

BlockReader

// 从block创建一个iterator
public BlockIterator iterator() {
	BlockAlignedType alignedType =
			BlockAlignedType.fromByte(block.readByte(block.length() - 1));
	int intValue = block.readInt(block.length() - 5);
	if (alignedType == ALIGNED) {
		return new AlignedIterator(block.slice(0, block.length() - 5), intValue, comparator);
	} else {
		int indexLength = intValue * 4;
		int indexOffset = block.length() - 5 - indexLength;
		MemorySlice data = block.slice(0, indexOffset);
		MemorySlice index = block.slice(indexOffset, indexLength);
		return new UnalignedIterator(data, index, comparator);
	}
}

SliceCompartor

这里面传入了 keyComparator, 用于进行 key 的比较. 用于在 index 中进行二分查找. 这里的比较并不是直接基于原始的数据, 而是基于 MemorySlice 进行排序.

比较的过程会将 key 的各个字段从 MemorySegment 中读取反序列化出来, cast 成 Comparable 进行比较.

public SliceComparator(RowType rowType) {
	int bitSetInBytes = calculateBitSetInBytes(rowType.getFieldCount());
	this.reader1 = new RowReader(bitSetInBytes);
	this.reader2 = new RowReader(bitSetInBytes);
	this.fieldReaders = new FieldReader[rowType.getFieldCount()];
	for (int i = 0; i < rowType.getFieldCount(); i++) {
		fieldReaders[i] = createFieldReader(rowType.getTypeAt(i));
	}
}

@Override
public int compare(MemorySlice slice1, MemorySlice slice2) {
	reader1.pointTo(slice1.segment(), slice1.offset());
	reader2.pointTo(slice2.segment(), slice2.offset());
	for (int i = 0; i < fieldReaders.length; i++) {
		boolean isNull1 = reader1.isNullAt(i);
		boolean isNull2 = reader2.isNullAt(i);
		if (!isNull1 || !isNull2) {
			if (isNull1) {
				return -1;
			} else if (isNull2) {
				return 1;
			} else {
				FieldReader fieldReader = fieldReaders[i];
				Object o1 = fieldReader.readField(reader1, i);
				Object o2 = fieldReader.readField(reader2, i);
				@SuppressWarnings({"unchecked", "rawtypes"})
				int comp = ((Comparable) o1).compareTo(o2);
				if (comp != 0) {
					return comp;
				}
			}
		}
	}
	return 0;
}

查找的实现就是二分查找的过程, 因为写入的 key 是有序写入的.

public boolean seekTo(MemorySlice targetKey) {
	int left = 0;
	int right = recordCount - 1;

	while (left <= right) {
		int mid = left + (right - left) / 2;

		// 对于aligned iterator, 就直接seek record * recordSize
		// 对于unaligned iterator, 就根据writer写入的索引表来跳转
		seekTo(mid);
		// 读取一条key value pair
		BlockEntry midEntry = readEntry();
		int compare = comparator.compare(midEntry.getKey(), targetKey);

		if (compare == 0) {
			polled = midEntry;
			return true;
		} else if (compare > 0) {
			polled = midEntry;
			right = mid - 1;
		} else {
			left = mid + 1;
		}
	}

	return false;
}

小结

查找过程

  • 先过一遍 bloom filter
  • index 索引查找对应 key 的 block handle
  • 根据第二步的 handle, 读取对应的 block, 在 block 中查找对应的 key value.

标签:MemorySlice,return,int,length,lookup,store,key,Paimon,block
From: https://www.cnblogs.com/Aitozi/p/18514737

相关文章

  • Excel-多表数据查找匹配(VLOOKUP)
    ......
  • Excel-多表数据查找匹配(VLOOKUP)
    ......
  • 题解:CF1666J Job Lookup
    被迫来写篇题解。首先,第一个要求我们只需要在递归构造的时候保证子树对应区间连续即可,现在考虑第二个要求。就题目中的二叉树而言,想要确定其结构,我们只需要关注这段区间,即这棵子树根节点的编号,又因为子树区间连续,所以我们不难想到区间动态规划。设\(dp_{l,r}\)表示\(l\simr......
  • 【expo 库】expo-secure-store 安全存储库
    expo-secure-store是一个用于在移动应用程序中安全存储敏感数据的库。它提供了一组简单的API,使开发人员可以轻松地存储和检索敏感数据,如用户凭据、令牌和其他机密信息。这个库是Expo框架的一部分,Expo是一个开源平台,用于构建、部署和发布原生移动应用程序。在底层实现上,expo-se......
  • 处理容器报错:[ERROR] .. Get “http://safeline-fvm/skynetinto“: dial tp: lookup s
    雷池社区版(WAF)是基于容器部署的在容器化应用的部署和运行过程中,我们常常会遇到各种报错信息。其中,形如“[ERROR]detect/skynet.go:114Get“http://safeline-fvm/skynetinto":dialtp:lookupsafeline-fvmon127.0.0.11:53:servermisbehaving”以及“panic:Get......
  • LookupViT:类似SE的token压缩方案,加速还能丰富特征 | ECCV'24
    视觉变换器(ViT)已成为众多工业级视觉解决方案的事实标准选择。但由于每一层都计算自注意力,这导致其推理成本对许多场景而言是不可接受的,因为自注意力在标记数量上具有平方的计算复杂度。另一方面,图像中的空间信息和视频中的时空信息通常是稀疏和冗余的。LookupViT旨在利用这种信......
  • Apache Paimon介绍
    目录背景诞生应用场景实时数据分析与查询流批一体处理低成本高效存储具体业务场景示例总结系统架构存储层元数据管理计算层数据摄入和输出查询优化扩展性和可靠性生态系统集成总结核心概念表(Table)模式(Schema)分区(Partition)快照(Snapshot)清单文件(Manifest......
  • [快速阅读八] Matlab中bwlookup的实现及其在计算二值图像的欧拉数、面积及其他morph变
    以前看过matlab的bwlookup函数,但是总感觉有点神秘,一直没有去仔细分析,最近在分析计算二值图像的欧拉数时,发现自己写的代码和matlab的总是对不少,于是又去翻了下matlab的源代码,看到了matlab里实现欧拉数的代码非常简单,如下所示:ifn==4lut=4*[00.250.2500.250.5-......
  • 如何恢复 Windows 上 PostgreSQL 14 中被误删的 pg_restore.exe
    如何恢复Windows上PostgreSQL14中被误删的pg_restore.exe方法1:重新安装PostgreSQL14下载PostgreSQL14安装包:前往PostgreSQL官方网站。下载与操作系统版本匹配的PostgreSQL14安装包。安装PostgreSQL:运行安装程序,进行标准安装。安装过程中,选择“Cli......
  • Get “https://registry-1.docker.io/v2/“: proxyconnect tcp: dial tcp: lookup pro
    docker通过代理配置上网无法pullanbox使用代理配置文件解决1.创建代理配置文件运行以下命令创建配置文件:sudomkdir-p/etc/systemd/system/docker.service.dsudotouch/etc/systemd/system/docker.service.d/http-proxy.conf2.编辑配置文件使用nano文本编辑器打......