Page由4部分组成
PageBody,PageFooter,FooterSize(4),CheckSum(4)
PageBody是由page类型决定的,可能是压缩的。
PageFooter是经过序列化的PageFooterPB。它包含page_type、未压缩的body大小和其他通用的元数据。如果PageBody的大小和未压缩的body大小一致,则表示这个page是未压缩的。
FooterSize表示PageFooter的长度。
CheckSum是前面3部分的crc32值。
对应的类为PageIO,包含的成员函数如下:
class PageIO { public: // Compress `body' using `codec' into `compressed_body'. // The size of returned `compressed_body' is 0 when the body is not compressed, this // could happen when `codec' is null or space saving is less than `min_space_saving'. /*BlockCompressionCodec是对压缩算法的封装,通过上面的注释可以看出该函数的作用是使用codec将body压缩成compressed_body*/ static Status compress_page_body(const BlockCompressionCodec* codec, double min_space_saving, const std::vector<Slice>& body, faststring* compressed_body); // Encode page from `body' and `footer' and write to `file'. // `body' could be either uncompressed or compressed. // On success, the file pointer to the written page is stored in `result'. /*通过上面的注释可以看出该函数的作用是将body和footer写到文件中,并返回文件指针*/ static Status write_page(WritableFile* wfile, const std::vector<Slice>& body, const PageFooterPB& footer, PagePointer* result); // Convenient function to compress page body and write page in one go. static Status compress_and_write_page(const BlockCompressionCodec* codec, double min_space_saving, WritableFile* wfile, const std::vector<Slice>& body, const PageFooterPB& footer, PagePointer* result) { DCHECK_EQ(footer.uncompressed_size(), Slice::compute_total_size(body)); faststring compressed_body; RETURN_IF_ERROR(compress_page_body(codec, min_space_saving, body, &compressed_body)); if (compressed_body.size() == 0) { // uncompressed return write_page(wfile, body, footer, result); } return write_page(wfile, {Slice(compressed_body)}, footer, result); } // Read and parse a page according to `opts'. // On success // `handle' holds the memory of page data, // `body' points to page body, // `footer' stores the page footer. /*通过上面的注释可以看出,该函数的作用是根据opts中指定的参数来解析page,返回page的body和footer*/ static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, Slice* body, PageFooterPB* footer); };
将数据写入page时,先调用compress_page_body对数据进行压缩,对应的代码如下:
Status PageIO::compress_page_body(const BlockCompressionCodec* codec, double min_space_saving, const std::vector<Slice>& body, faststring* compressed_body) { /*从这里可以看出body是由很多个Slice组成的,Slice其实就是包含data和length的结构,先计算出所有Slice的总长度,
下面会校验是否超过压缩算法支持的最大长度,通过代码中可以了解到LZ4格式LZ4_MAX_INPUT_SIZE的限制,
SNAPPY/LZ4FRAME/ZLIB/ZSTD没有限制,对于LZ4格式,如果超过了最大的长度,则不再对数据进行压缩。*/ size_t uncompressed_size = Slice::compute_total_size(body); auto cleanup = MakeScopedCleanup([&]() { compressed_body->clear(); }); if (codec != nullptr && codec->exceed_max_input_size(uncompressed_size)) { compressed_body->clear(); return Status::OK(); } if (codec != nullptr && uncompressed_size > 0) { /*对于LZ4_FRAME、ZSTD和LZ4,可以使用compression pool,作用是先将压缩的结果放到compression_buffer中,
然后再将compression_buffer中的数据拷贝到compressed_body中,这样做的好处是避免一开始就申请很大块的内存,然后再缩小。*/ if (use_compression_pool(codec->type())) { Slice compressed_slice; RETURN_IF_ERROR( codec->compress(body, &compressed_slice, true, uncompressed_size, compressed_body, nullptr)); } else { compressed_body->resize(codec->max_compressed_len(uncompressed_size)); Slice compressed_slice(*compressed_body); RETURN_IF_ERROR(codec->compress(body, &compressed_slice)); compressed_body->resize(compressed_slice.get_size()); } double space_saving = 1.0 - static_cast<double>(compressed_body->size()) / uncompressed_size; // return compressed body only when it saves more than min_space_saving if (space_saving > 0 && space_saving >= min_space_saving) { //空间节省率超过10%的时候执行shrink_to_fit,对空间进行重新整理。 compressed_body->shrink_to_fit(); cleanup.cancel(); return Status::OK(); } } return Status::OK(); }
压缩完毕之后,将数据写到文件中,如下面代码所示:
std::string footer_buf; // serialized footer + footer size footer.SerializeToString(&footer_buf); put_fixed32_le(&footer_buf, static_cast<uint32_t>(footer_buf.size())); //首先将body放到page中 std::vector<Slice> page = body; //然后将序列化后的footer放到page中 page.emplace_back(footer_buf); // checksum uint8_t checksum_buf[sizeof(uint32_t)]; uint32_t checksum = crc32c::Value(page); encode_fixed32_le(checksum_buf, checksum); //最后再将checksum值放到page中 page.emplace_back(checksum_buf, sizeof(uint32_t)); uint64_t offset = wfile->size(); //将这个page写到file中 RETURN_IF_ERROR(wfile->appendv(&page[0], page.size())); result->offset = offset; result->size = wfile->size() - offset;
至此,page的写入过程完成。
读取page的过程,在函数read_and_decompress_page中实现
auto cache = StoragePageCache::instance(); PageCacheHandle cache_handle; //CacheKey有两个信息组成:文件名称和文件偏移 StoragePageCache::CacheKey cache_key(opts.read_file->filename(), opts.page_pointer.offset); //在StoragePageCache中查找指定的CacheKey是否存在,如果存在,则使用Cache中缓存的page. if (opts.use_page_cache && cache->lookup(cache_key, &cache_handle)) { // we find page in cache, use it *handle = PageHandle(std::move(cache_handle)); opts.stats->cached_pages_num++; // parse body and footer Slice page_slice = handle->data(); uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); //解析得到footer std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size); if (!footer->ParseFromString(footer_buf)) { return Status::Corruption("Bad page: invalid footer"); } //解析得到body *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); return Status::OK(); }
如果cache中不存在,则先拿到page的body,再拿到page的footer,如果body是压缩的,则解压,得到解压后的body,并将解压后的数据放到cache中,cache使用的是LRU Cache。
标签:body,StarRocks,footer,--,源码,compressed,codec,page,size From: https://www.cnblogs.com/snake-fly/p/17566624.html