首页 > 其他分享 >Rocksdb 的优秀代码(三)-- 工业级 线程池实现分享

Rocksdb 的优秀代码(三)-- 工业级 线程池实现分享

时间:2022-11-04 11:34:39浏览次数:104  
标签:优先级 -- Rocksdb 调度 priority 线程 Env


文章目录

  • ​​前言​​
  • ​​1. Rocksdb线程池概览​​
  • ​​2. Rocksdb 线程池实现​​
  • ​​2.1 基本数据结构​​
  • ​​2.2 线程池创建​​
  • ​​2.3 线程池 调度线程执行​​
  • ​​2.4 线程池销毁线程​​
  • ​​2.5 线程池优先级调度​​
  • ​​2.6 动态调整线程池 线程数目上限​​
  • ​​3. 总结​​

前言

Rocksdb 作为一个第三方库的形态嵌入到各个存储系统之中存储元数据,当rocksdb被使用的时候其内部会自启动一些线程,随着需要处理的用户数据越来越多,为了保证性能,rocksdb会让这一些线程也会不断增加。而在分布式存储场景,往往一个机器节点会有很多rocksdb实例(64个实例,每一个实例都会有compaction/flush线程),这个时候在Rocksdb内部使用合理的线程管理方式会节省系统CPU调度资源。

所以Rocksdb自实现的​​Thread Pool​​就是为了更好得管理Rocksdb内部线程,除了一些基本的线程调度之外,还会有可控制的线程优先级的调度,因为大多数场景Rocksdb让Flush线程的优先级高于Compaction线程,而有的场景则需要Compaction的优先级高于Flush,为了更快速的compaction清理掉旧数据。

接下来简单看一下Rocksdb 线程池的基本实现,本人已经将该线程池代码摘出来单独维护,可作为一个独立线程池去调度。

​https://github.com/BaronStack/ThreadPool​

线程池存在的目的 正如上面Rocksdb使用线程池的目的一样, 能够更加方便得管理我们应用中的线程,包括但不限于:线程创建,线程资源约束,线程优先级调度,线程销毁 等。

1. Rocksdb线程池概览

Rocksdb 实现的线程池支持的特性:

  • 创建/销毁线程
  • 动态增加、减少线程池线程数目上限(线程池数目需要设置上限,因为Compaction/Flush占用的资源也不能无限增加,需根据实际的Rocksdb 写入量来动态增加)
  • 支持动态调整 线程CPU 和 I/O优先级(为了暴露足够的接口给用户,来让用户选择两个功能调度的优先顺序)

2. Rocksdb 线程池实现

2.1 基本数据结构

// 线程池核心的数据结构
struct Impl {
private:
bool low_io_priority_; // I/O 优先级
bool low_cpu_priority_; // CPU 优先级
Env::Priority priority_; // 线程优先级
Env* env_; // 获取当前线程池的环境变量

int total_threads_limit_; // 线程池线程总数
std::atomic_uint queue_len_; // 当前线程池中执行线程的排队长度
bool exit_all_threads_; // 清理线程池时会调度所有未执行的线程
bool wait_for_jobs_to_complete_; // 等待所有线程池的线程执行完毕

// Entry per Schedule()/Submit() call
struct BGItem {
void* tag = nullptr;
std::function<void()> function; // 执行函数
std::function<void()> unschedFunction; // 不执行函数
};

using BGQueue = std::deque<BGItem>;
BGQueue queue_; // deque 保存线程池中调度的线程相关的信息:线程函数、函数参数

std::mutex mu_;
std::condition_variable bgsignal_; // 条件变量,唤醒正在睡眠的线程
std::vector<port::Thread> bgthreads_; // 保存需要调度的线程
}

线程池类:

class ThreadPoolImpl : public ThreadPool {
private:
std::unique_ptr<Impl> impl_;// 线程池核心数据结构
};

2.2 线程池创建

Rocksdb维护了一个Env 类,这个类再同一个进程中的多个rocksdb实例之间是能够共享的。所以Rocksdb将这个类作为线程池的入口,从而让Flush/Compaction 这样的线程调度过程中,多个db可以只使用同一个线程池。

Rocksdb实现了多个环境变量:​​HdfsEnv​​​,​​PosixEnv​​​等,方便Rocksdb的文件操作/线程操作 接口在不同的环境平台下进行扩展,当然如果用户变更了新的平台,只需要支持​​Env​​基类的接口,就能扩展到用户的新平台。

​Env​​​默认实例是​​PosixEnv​​,为了保证多db实例间共享同一个环境变量,PosixEnv仅维护一个单例。

// 创建Env,初始化几个类的单例
// 这里注意调用的顺序,先调用ThreadLocalPtr实例的初始化,再调用PosixEnv的
// 这样在Env析构的时候能够反方向析构,从而保证ThreadLocal的信息最后一个被清理
Env* Env::Default() {
ThreadLocalPtr::InitSingletons(); // Threadlocal 实例数据,用来访问当前db实例运行的线程状态信息
CompressionContextCache::InitSingleton();
INIT_SYNC_POINT_SINGLETONS();
static PosixEnv default_env; // 创建posix env
return &default_env;
}

紧接着通过 PosixEnv的构造函数创建线程池

// 根据Env设置的线程优先级,为每一个优先级创建一个线程池(方便优先级线程池的调度)
// 创建多个线程池: enum Priority { BOTTOM, LOW, HIGH, USER, TOTAL };
std::vector<ThreadPoolImpl> thread_pools_;

PosixEnv::PosixEnv()
: checkedDiskForMmap_(false),
forceMmapOff_(false),
page_size_(getpagesize()),
thread_pools_(Priority::TOTAL),
allow_non_owner_access_(true) {
ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
// 根据优先级创建线程池,默认创建四个线程池,但一般只会用到两个(LOW,HIGH)
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority(
static_cast<Env::Priority>(pool_id));
// This allows later initializing the thread-local-env of each thread.
thread_pools_[pool_id].SetHostEnv(this);
}
thread_status_updater_ = CreateThreadStatusUpdater();
}

2.3 线程池 调度线程执行

线程池调度栈如下:从入口到具体的线程函数的执行

Env::Schedule() // Env对外接口
PosixEnv::Schedule()
ThreadPoolImpl::Schedule() // 线程池的调度入口
ThreadPoolImpl::Impl::Submit() // 将线程函数、参数、线程回收函数封装,添加到待调度队列queue_
ThreadPoolImpl::Impl::StartBGThreads()
ThreadPoolImpl::Impl::BGThreadWrapper() // 更新当前执行的线程状态并启动一个调度队列中的线程
ThreadPoolImpl::Impl::BGThread()// 从待调度队列queue_中调度线程
func() // 执行线程函数

Env的实例调用​​Schedule​​接口,接收待调度的线程执行函数,参数,所属优先级线程池,以及线程销毁函数及其参数。

virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW, void* tag = nullptr,
void (*unschedFunction)(void* arg) = nullptr) = 0;

后续会执行到​​ThreadPoolImpl::Impl::Submit()​

void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
std::function<void()>&& unschedule, void* tag) {

// 后续需要更新当前线程池的线程调度队列,需要保证更新过程的原子性
std::lock_guard<std::mutex> lock(mu_);

// 需要销毁线程池了,不接受新的线程加入
if (exit_all_threads_) {
return;
}

// 启动线程
StartBGThreads();

// 更新线程函数相关的信息 到线程调度队列尾部(双端队列)
queue_.push_back(BGItem());
// 更新
auto& item = queue_.back();
item.tag = tag;
item.function = std::move(schedule);
item.unschedFunction = std::move(unschedule);

queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed);

// 如果正在执行的线程没有超过线程池线程数限制,则唤醒一个正在休眠的线程
if (!HasExcessiveThread()) {
// Wake up at least one waiting thread.
bgsignal_.notify_one();
} else { // 。。。这个逻辑不太懂,超过限制之后 不应该就不唤醒了吗?
// Need to wake up all threads to make sure the one woken
// up is not the one to terminate.
WakeUpAllThreads();
}
}

后续的执行就是按照以上调用栈进行的,从线程调度队列头部取线程函执行。

2.4 线程池销毁线程

线程池的销毁也就是Env变量的析构函数,db被destory或者close,则会进入该逻辑,Env的默认环境变量是PosixEnv,即Env的子类。则会先调用PosixEnv 的析构函数,其中线程池相关的清理逻辑:

整体的调用栈如下:

~PosixEnv()
ThreadPoolImpl::JoinAllThreads()
ThreadPoolImpl::Impl::JoinThreads()

在析构函数中调用相关的线程清理工作:

~PosixEnv() override {
// 通过Posix startthread 的接口调度的线程函数并发执行完毕
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
}
// 让不同优先级线程池中待执行线程执行完
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].JoinAllThreads();
}

// 放置Posix析构过程中不应该thread_status_updater_ ,防止一些子线程更新线程状态出错
// Delete the thread_status_updater_ only when the current Env is not
// Env::Default(). This is to avoid the free-after-use error when
// Env::Default() is destructed while some other child threads are
// still trying to update thread status.
if (this != Env::Default()) {
delete thread_status_updater_;
}
}

其中​​JoinAllThreads​​函数用来唤醒所有子线程的执行,并设置标记防止接收新的线程

void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {

std::unique_lock<std::mutex> lock(mu_);
assert(!exit_all_threads_);

wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
// 原子(加锁)方式更新如下变量,用作在submit函数中屏蔽接收新的线程
exit_all_threads_ = true;
// prevent threads from being recreated right after they're joined, in case
// the user is concurrently submitting jobs.
// 重置线程池的线程上限,防止用户并发调用submit添加待调度线程
total_threads_limit_ = 0;

lock.unlock();

bgsignal_.notify_all(); //唤醒所有等待在bgsignal_的线程

for (auto& th : bgthreads_) {// join 执行,直到执行完。
th.join();
}

bgthreads_.clear();

exit_all_threads_ = false;
wait_for_jobs_to_complete_ = false;
}

2.5 线程池优先级调度

之前说过Rocksdb线程池支持 用户针对不同LOW/HIGH 线程池的I/O或者CPU的优先级设置。

比如 设置LOW线程池具有更低的I/O优先级和CPU优先级

target_->LowerThreadPoolIOPriority(Env::Priority::LOW);
target_->LowerThreadPoolCPUPriority(Env::Priority::LOW);

具体底层的设置方式是针对之前提到的线程数据结构中的两个参数​​Impl::low_io_priority_​​​和​​Impl::low_c pu_priority_​​​进行置位​​true​​​。在​​ThreadPoolImpl::Impl::BGThread​​​调度函数执行之前,会通过系统调用​​setpriority​​​和​​syscall(SYS_ioprio_set,,,)​​设置当前线程的I/O和CPU优先级。

void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
bool low_io_priority = false;
bool low_cpu_priority = false;

while (true) {
// Wait until there is an item that is ready to run
std::unique_lock<std::mutex> lock(mu_);
...

bool decrease_io_priority = (low_io_priority != low_io_priority_);
bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
lock.unlock();

#ifdef OS_LINUX
// Linux 系统支持 设置CPU优先级
if (decrease_cpu_priority) {
setpriority(
PRIO_PROCESS,
// Current thread.
0,
// Lowest priority possible.
19);
low_cpu_priority = true;
}


if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
// Put schedule into IOPRIO_CLASS_IDLE class (lowest)
// These system calls only have an effect when used in conjunction
// with an I/O scheduler that supports I/O priorities. As at
// kernel 2.6.17 the only such scheduler is the Completely
// Fair Queuing (CFQ) I/O scheduler.
// To change scheduler:
// echo cfq > /sys/block/<device_name>/queue/schedule
// Tunables to consider:
// /sys/block/<device_name>/queue/slice_idle
// /sys/block/<device_name>/queue/slice_sync
// 设置I/O优先级
syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS
0, // current thread
IOPRIO_PRIO_VALUE(3, 0));
low_io_priority = true;
}
#else
// 非Linux系统的话就不做任何处理了,仅仅保证变量被使用而已,防止编译warning
(void)decrease_io_priority; // avoid 'unused variable' error
(void)decrease_cpu_priority;
#endif
func();
}
}

2.6 动态调整线程池 线程数目上限

支持动态调整线程池可调度的线程数目上限,这个能够限制线程池资源的占用,主要用作Rocksdb 中调整Flush和Compaction的各自所处的HIGH和LOW线程池中的线程数目上限。能够根据db的工作负载,动态增加或者减少线程池中可调度的线程数目。

void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
impl_->SetBackgroundThreadsInternal(num, false);
}

void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
bool allow_reduce) {
std::unique_lock<std::mutex> lock(mu_);
// 如果线程池已经要被销毁了,就不用增加线程池的调度线程数目上限了
if (exit_all_threads_) {
lock.unlock();
return;
}

// 增加线程数目或者减少线程数目
// 唤醒休眠的线程并调度后台线程继续执行。
if (num > total_threads_limit_ ||
(num < total_threads_limit_ && allow_reduce)) {
total_threads_limit_ = std::max(0, num);
WakeUpAllThreads();
StartBGThreads();
}
}

3. 总结

到此整个线程池的基本实现就描述完成了,这是一个非常成熟的线程池(经历过接近十年的工业级考验,2012年facebook开始开发rocksdb),规模虽小,但五脏俱全。其能够支撑引擎级别的线程调度压力,保证引擎的核心逻辑flush和compaction的高效调度。

目前该线程池的独立实现已经放在了​​https://github.com/BaronStack/ThreadPool​​ 中,拥有完备的线程池调度/销毁,优先级配置,欢迎star。


标签:优先级,--,Rocksdb,调度,priority,线程,Env
From: https://blog.51cto.com/u_13456560/5823169

相关文章

  • 单机 “5千万以上“ 工业级 LRU cache 实现
    文章目录​​前言​​​​工业级LRUCache​​​​1.基本架构​​​​2.基本操作​​​​2.1insert操作​​​​2.2高并发下insert的一致性/性能保证​​​​2.3L......
  • 案例3:JAVA GUI 随机点名程序
    先开发一个姓名维护的界面,输入学生的姓名,每行录入一个学生姓名,点击保存的时候将学生的姓名保存到一个txt文件中。再开发一个点名的程序,从维护好的txt文件中,随机读取一个学......
  • centos下将vim配置为强大的源码阅读器
    每日杂事缠身,让自己在不断得烦扰之后终于有了自己的清静时光来熟悉一下我的工具,每次熟悉源码都需要先在windows端改好,拖到linux端,再编译。出现问题,还得重新回到windows端,这......
  • Docker安装Nacos
    创建本地的映射文件mkdir-p/root/nacos/init.d/root/nacos/logstouch/root/nacos/init.d/custom.properties在文件中写入配置management.endpoints.web.expo......
  • 贪心算法简单实践 -- 分糖果、钱币找零、最多区间覆盖、哈夫曼编解码
    1.贪心算法概览贪心算法是一种算法思想。希望能够满足限制的情况下将期望值最大化。比如:Huffman编码,Dijkstra单源最短路径问题,Kruskal最小生成树等问题都希望满足限制的情......
  • Andorid Jetpack Hilt
    前言现代开发语言,低代码,减少开发中模板代码的编写越来越被一线技术开发所提倡,google官方在这方面也下了很大的功夫推出jectpack架构组件,而Hilt依赖注入就是一个减少样板......
  • grafana jpprof 试用
    grafanajpprof是为了方便grafanaphlare对于java语言支持开发的一个包,可以让java语言方便的自持pprof格式的内容,进行持续性能优化以下是一个简单的试用环境准备......
  • 在PetaLinux工程中导出所有关键模块代码
    PetaLinux工程会自动下载代码并编译。很多时候,工程师需要修改代码,加入调试信息。使用下列脚本,可以一次性导出所有关键模块的代码。脚本中,为了保持兼容性,导出了MPSoC和Vers......
  • 从 unlink/rm 底层实现来看Linux文件系统管理
    文章目录​​1.前言​​​​2.文件系统结构​​​​3.Unlink实现​​文中涉及到的内核源代码版本是3.10.1。1.前言工作中听到一个同事对unlink系统调用的描述,unlink并......
  • WiredTiger引擎编译 及 LT_PREREQ(2.2.6)问题解决
    近期需要为异构引擎做准备,wiredtiger以其优异的性能(B-tree和LSM-tree都支持)和稳定性(Mongodb的默认存储引擎)被我们备选为异构引擎里的一个子引擎,后续将深入wiredtiger......