Thread
线程通用接口,跨平台封装,会创建并持有RunLoop
对象
// base/threading/thread.h
raw_ptr<RunLoop> run_loop_ = nullptr;
// 这种写法可以抽离真正的消息循环逻辑到RunLoop中,并且保证这部分逻辑会随着线程主函数结束后销毁
RunLoop run_loop;
run_loop_ = &run_loop;
Run(run_loop_);
ThreadMain
作为线程入口,创建RunLoop
对象,并执行Run
RunLoop
在这里还有处理前置工作的BeforeRun
和AfterRun
两个函数,其中标识RunLoop
的运作状态的running_
则会分别在这两个处理函数中标记
交由RunLoop::Delegate
继续Run
,继承这个代理的就是ThreadControllerWithMessagePumpImpl
,会由ThreadControllerWithMessagePumpImpl
拉起MessagePump
中的Run
和DoRunLoop
真正开始任务循环
void RunLoop::Run(const Location& location) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// "test" tracing category is used here because in regular scenarios RunLoop
// trace events are not useful (each process normally has one RunLoop covering
// its entire lifetime) and might be confusing (they make idle processes look
// non-idle). In tests, however, creating a RunLoop is a frequent and an
// explicit action making this trace event very useful.
TRACE_EVENT("test", "RunLoop::Run", "location", location);
if (!BeforeRun())
return;
// If there is a RunLoopTimeout active then set the timeout.
// TODO(crbug.com/40602467): Use real-time for Run() timeouts so that they
// can be applied even in tests which mock TimeTicks::Now().
CancelableOnceClosure cancelable_timeout;
const RunLoopTimeout* run_timeout = GetTimeoutForCurrentThread();
if (run_timeout) {
cancelable_timeout.Reset(BindOnce(&OnRunLoopTimeout, Unretained(this),
location, run_timeout->on_timeout));
origin_task_runner_->PostDelayedTask(
FROM_HERE, cancelable_timeout.callback(), run_timeout->timeout);
}
DCHECK_EQ(this, delegate_->active_run_loops_.top());
const bool application_tasks_allowed =
delegate_->active_run_loops_.size() == 1U ||
type_ == Type::kNestableTasksAllowed;
delegate_->Run(application_tasks_allowed, TimeDelta::Max());
AfterRun();
}
MessagePump
有以下几种MessagePump的种类,在Thread创建的时候通过option传入确定
enum class MessagePumpType {
// This type of pump only supports tasks and timers.
DEFAULT,
// This type of pump also supports native UI events (e.g., Windows
// messages).
UI,
// User provided implementation of MessagePump interface
CUSTOM,
// This type of pump also supports asynchronous IO.
IO,
#if BUILDFLAG(IS_ANDROID)
// This type of pump is backed by a Java message handler which is
// responsible for running the tasks added to the ML. This is only for use
// on Android. TYPE_JAVA behaves in essence like TYPE_UI, except during
// construction where it does not use the main thread specific pump factory.
JAVA,
#endif // BUILDFLAG(IS_ANDROID)
#if BUILDFLAG(IS_APPLE)
// This type of pump is backed by a NSRunLoop. This is only for use on
// OSX and IOS.
NS_RUNLOOP,
#endif // BUILDFLAG(IS_APPLE)
};
} // namespace base
创建代码如下所示,通过type生产对应的MessagePump,十分的简洁明了
bool Thread::StartWithOptions(Options options) {
DCHECK(options.IsValid());
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
DCHECK(!delegate_);
DCHECK(!IsRunning());
DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's "
<< "not allowed!";
#if BUILDFLAG(IS_WIN)
DCHECK((com_status_ != STA) ||
(options.message_pump_type == MessagePumpType::UI));
#endif
// Reset |id_| here to support restarting the thread.
id_event_.Reset();
id_ = kInvalidThreadId;
SetThreadWasQuitProperly(false);
if (options.delegate) {
DCHECK(!options.message_pump_factory);
delegate_ = std::move(options.delegate);
} else if (options.message_pump_factory) {
delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
MessagePumpType::CUSTOM, options.message_pump_factory);
} else {
delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
options.message_pump_type,
BindOnce([](MessagePumpType type) { return MessagePump::Create(type); },
options.message_pump_type));
}
...
SequenceManagerThreadDelegate
在构造的时候会在对象内部构造SequenceManagerImpl
保存在对象内部。
并在线程主函数启动后Thread::ThreadMain
中有delegate_->BindToCurrentThread();
将MessagePump绑定到对象保存的SequenceManager上。
以MessagePumpWin
和MessagePumpForUI
举例,前者定义了一些基础接口,后者继承实现(还有其他用途的pump比如for IO之类的,因为UI特殊负责接受Windows的UI messages)
需要关注的接口DoRunLoop
,是消息循环的重要接口。
由Run
函数拉起DoRunLoop
循环。
MessagePumpForUI
消息循环的UI循环,工作内容就是执行任务(DoWork
),然后等待下一次任务的执行(WaitForWork(next_work_info);
)
void MessagePumpForUI::DoRunLoop() {
DCHECK_CALLED_ON_VALID_THREAD(bound_thread_);
// IF this was just a simple PeekMessage() loop (servicing all possible work
// queues), then Windows would try to achieve the following order according
// to MSDN documentation about PeekMessage with no filter):
// * Sent messages
// * Posted messages
// * Sent messages (again)
// * WM_PAINT messages
// * WM_TIMER messages
//
// Summary: none of the above classes is starved, and sent messages has twice
// the chance of being processed (i.e., reduced service time).
for (;;) {
// If we do any work, we may create more messages etc., and more work may
// possibly be waiting in another task group. When we (for example)
// ProcessNextWindowsMessage(), there is a good chance there are still more
// messages waiting. On the other hand, when any of these methods return
// having done no work, then it is pretty unlikely that calling them again
// quickly will find any work to do. Finally, if they all say they had no
// work, then it is a good time to consider sleeping (waiting) for more
// work.
in_native_loop_ = false;
bool more_work_is_plausible = ProcessNextWindowsMessage();
in_native_loop_ = false;
if (run_state_->should_quit)
break;
Delegate::NextWorkInfo next_work_info = run_state_->delegate->DoWork();
in_native_loop_ = false;
more_work_is_plausible |= next_work_info.is_immediate();
if (run_state_->should_quit)
break;
if (installed_native_timer_) {
// As described in ScheduleNativeTimer(), the native timer is only
// installed and needed while in a nested native loop. If it is installed,
// it means the above work entered such a loop. Having now resumed, the
// native timer is no longer needed.
KillNativeTimer();
}
if (more_work_is_plausible)
continue;
more_work_is_plausible = run_state_->delegate->DoIdleWork();
// DoIdleWork() shouldn't end up in native nested loops and thus shouldn't
// have any chance of reinstalling a native timer.
DCHECK(!in_native_loop_);
DCHECK(!installed_native_timer_);
if (run_state_->should_quit)
break;
if (more_work_is_plausible)
continue;
WaitForWork(next_work_info);
}
}
MessagePumpDefault
forUI可能还包含额外的UI处理,那么看一下Default方案里面的函数实现,会相应简单一些
void MessagePumpDefault::Run(Delegate* delegate) {
AutoReset<bool> auto_reset_keep_running(&keep_running_, true);
for (;;) {
#if BUILDFLAG(IS_APPLE)
apple::ScopedNSAutoreleasePool autorelease_pool;
#endif
Delegate::NextWorkInfo next_work_info = delegate->DoWork();
bool has_more_immediate_work = next_work_info.is_immediate();
if (!keep_running_)
break;
if (has_more_immediate_work)
continue;
delegate->DoIdleWork();
if (!keep_running_) // 关注这里在执行完任务之后也做了任务循环是否继续的检查,避免在进入睡眠之前需要退出但是进入睡眠导致退出延后的问题。
break;
// 判断下个任务的等待时间,如果下个任务不存在或者不需要执行,需要无限制的等待新任务唤醒。
if (next_work_info.delayed_run_time.is_max()) {
event_.Wait();
} else {
event_.TimedWait(next_work_info.remaining_delay());
}
// Since event_ is auto-reset, we don't need to do anything special here
// other than service each delegate method.
}
}
// 其他线程唤醒当前线程任务循环使用的方法
void MessagePumpDefault::ScheduleWork() {
// Since this can be called on any thread, we need to ensure that our Run
// loop wakes up.
event_.Signal();
}
DoWork
和 DoIdleWork
这两个函数的执行都是靠delegate实现的,这个delegate就是ThreadControllerWithMessagePumpImpl
对象
把DoWork
看成普通任务执行的话,DoIdleWork
就如字面意思一样,执行空闲任务
ThreadControllerWithMessagePumpImpl
这个类型的对象通过RegisterDelegateForCurrentThread
注册到RunLoop
对象的delegate_成员上(这个操作是在上述BindToCurrentThread
操作时一同实现的),是真正Run
接口的实现类。
DoWork
接口的实现类,获取需要执行的任务并执行然后返回下一个任务信息
DoWorkImpl
内部实现,用来获取和执行任务,下面是循环执行任务的条件代码
for (int num_tasks_executed = 0;
(!batch_duration.is_zero() && work_executed < batch_duration) ||
(batch_duration.is_zero() &&
num_tasks_executed < main_thread_only().work_batch_size);
++num_tasks_executed) {
这个是用来获取任务并执行的循环,条件中显示,一次有最多执行的上限,如果这一次循环执行任务达到一定数量就会退出循环(当然如果全是延时任务并未到达执行时间点的话也会退出循环)
absl::optional<SequencedTaskSource::SelectedTask> selected_task =
main_thread_only().task_source->SelectNextTask(lazy_now_select_task,
select_task_option);
获取任务的代码
任务执行也是在这里实现,在循环中,获取到任务之后会执行任务
task_annotator_.RunTask("ThreadControllerImpl::RunTask",
selected_task->task,
[&selected_task](perfetto::EventContext& ctx) {
if (selected_task->task_execution_trace_logger)
selected_task->task_execution_trace_logger.Run(
ctx, selected_task->task);
SequenceManagerImpl::MaybeEmitTaskDetails(
ctx, selected_task.value());
});
SequenceManagerImpl
可以调用CreateTaskQueue
创建任务队列
获取任务的实现者,内部实现SelectNextTaskImpl
TaskQueue
高层抽象,提供对外的易用接口,持有4个主要队列
- 立刻执行队列
immediate_incoming_queue
,用于PostTask
immediate_work_queue
- 延迟执行队列
delayed_incoming_queue
,用于PostDelayedTask
delayed_work_queue
每种类型都准备了incoming队列,是因为这个队列是专门提供给其他线程访问的,用于其他线程发布任务,是需要保证线程安全的(加锁),同时还提供了普通队列,普通队列要求必须要从当前线程访问,这个队列是不加锁的,这样区分开了之后可以有效减少加锁。
immediate_work_queue
上述四种队列的类型都是底层的WorkQueue
TaskQueueSelector
提供任务队列的选择,Selector是一个提供笼统抽象的接口的类型,并不关心具体的TaskQueueImpl
对象,他的内部只会存储WorkQueue
,存储在WorkQueueSets
中
// base\task\task\sequence_manager\task_queue_selector.h
WorkQueueSets delayed_work_queue_sets_;
WorkQueueSets immediate_work_queue_sets_;
这些WorkQueue是在创建TaskQueueImpl对象的时候就已经创建并添加进对尹大哥selector当中了,值得注意的是,SequenceManagerImpl对selector的访问前都有main_thread_only()
标记,这意味着只允许在主线程内操作不加锁的任务队列。
所以在操作队列添加到selector的时候,也就只添加了某两个任务队列,是哪两个已经显而易见了
void TaskQueueSelector::AddQueueImpl(internal::TaskQueueImpl* queue,
TaskQueue::QueuePriority priority) {
#if DCHECK_IS_ON()
DCHECK(!CheckContainsQueueForTest(queue));
#endif
// 只添加了主线程可以访问的两个队列,PostTask可以访问的队列不在其中
delayed_work_queue_sets_.AddQueue(queue->delayed_work_queue(), priority);
immediate_work_queue_sets_.AddQueue(queue->immediate_work_queue(), priority);
#if DCHECK_IS_ON()
DCHECK(CheckContainsQueueForTest(queue));
#endif
}
那PostTask来的任务一定是在某个地方完成了队列的转移。
任务转移
需要把其他线程发布的任务执行的话需要怎么做呢?任务筛选器不会访问PostTask可以访问的队列,意味着任务必须要转移进当前线程可以访问的队列才行。
当immediate_work_queue
为空的时候,需要重新装弹!把immediate_incoming_queue
和immediate_work_queue
交换
void TaskQueueImpl::ReloadEmptyImmediateWorkQueue() {
DCHECK(main_thread_only().immediate_work_queue->Empty());
main_thread_only().immediate_work_queue->TakeImmediateIncomingQueueTasks();
if (main_thread_only().throttler && IsQueueEnabled()) {
main_thread_only().throttler->OnHasImmediateTask();
}
}
void TaskQueueImpl::TakeImmediateIncomingQueueTasks(TaskDeque* queue) {
DCHECK(queue->empty());
// Now is a good time to consider reducing the empty queue's capacity if we're
// wasting memory, before we make it the `immediate_incoming_queue`.
queue->MaybeShrinkQueue();
base::internal::CheckedAutoLock lock(any_thread_lock_);
queue->swap(any_thread_.immediate_incoming_queue);
...
上述代码定义了这一行为,但是这个函数是以人物的形式绑定到SequenceManagerImpl
的,由其决定何时Reload
在选择下一个任务的实现代码中有重载任务相关的逻辑,但是不一定会触发重载
std::optional<SequenceManagerImpl::SelectedTask>
SequenceManagerImpl::SelectNextTaskImpl(LazyNow& lazy_now,
SelectTaskOption option) {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
"SequenceManagerImpl::SelectNextTask");
ReloadEmptyWorkQueues();
MoveReadyDelayedTasksToWorkQueues(&lazy_now);
// If we sampled now, check if it's time to reclaim memory next time we go
// idle.
if (lazy_now.has_value() &&
lazy_now.Now() >= main_thread_only().next_time_to_reclaim_memory) {
main_thread_only().memory_reclaim_scheduled = true;
}
...
ReloadEmptyWorkQueues
具体实现如下,他本身并没有重载人物的逻辑,他只是一个标记为,标记是否要被激活执行真正的任务重载callback
void SequenceManagerImpl::ReloadEmptyWorkQueues() {
work_tracker_.WillReloadImmediateWorkQueues();
// There are two cases where a queue needs reloading. First, it might be
// completely empty and we've just posted a task (this method handles that
// case). Secondly if the work queue becomes empty when calling
// WorkQueue::TakeTaskFromWorkQueue (handled there).
//
// Invokes callbacks created by GetFlagToRequestReloadForEmptyQueue above.
empty_queues_to_reload_.RunActiveCallbacks();
}
TaskQeueueImpl
类型的成员变量empty_queues_to_reload_handle_
在新的任务插入incoming queue的时候会在incoming queue为空的时候设置标记位为true,激活设置的任务重载callback,从而在选择下个任务的时候触发任务转移,这样做就不会有额外的消耗了。
empty_queues_to_reload_handle_
incoming任务队列被移空之后,标记位就会被设置为false,说明被搬空之后就不需要每次拿任务的时候转移了,直到标记为被再次设置。
WorkQueue
真正持有存储任务的队列,TaskQueueImpl::TaskDeque tasks_;
任务就存储在tasks_中,注意别和上面的TaskQueue
搞混,这里的TaskDeque
不是一个类型