首页 > 其他分享 >[Chromium] 多线程任务队列

[Chromium] 多线程任务队列

时间:2024-11-26 15:55:39浏览次数:9  
标签:task thread 队列 work queue 任务 多线程 Chromium

Thread

线程通用接口,跨平台封装,会创建并持有RunLoop对象

// base/threading/thread.h
raw_ptr<RunLoop> run_loop_ = nullptr;
// 这种写法可以抽离真正的消息循环逻辑到RunLoop中,并且保证这部分逻辑会随着线程主函数结束后销毁
RunLoop run_loop;
run_loop_ = &run_loop;
Run(run_loop_);

ThreadMain作为线程入口,创建RunLoop对象,并执行Run

RunLoop

在这里还有处理前置工作的BeforeRunAfterRun两个函数,其中标识RunLoop的运作状态的running_则会分别在这两个处理函数中标记

交由RunLoop::Delegate继续Run,继承这个代理的就是ThreadControllerWithMessagePumpImpl,会由ThreadControllerWithMessagePumpImpl拉起MessagePump中的RunDoRunLoop真正开始任务循环

void RunLoop::Run(const Location& location) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  // "test" tracing category is used here because in regular scenarios RunLoop
  // trace events are not useful (each process normally has one RunLoop covering
  // its entire lifetime) and might be confusing (they make idle processes look
  // non-idle). In tests, however, creating a RunLoop is a frequent and an
  // explicit action making this trace event very useful.
  TRACE_EVENT("test", "RunLoop::Run", "location", location);

  if (!BeforeRun())
    return;

  // If there is a RunLoopTimeout active then set the timeout.
  // TODO(crbug.com/40602467): Use real-time for Run() timeouts so that they
  // can be applied even in tests which mock TimeTicks::Now().
  CancelableOnceClosure cancelable_timeout;
  const RunLoopTimeout* run_timeout = GetTimeoutForCurrentThread();
  if (run_timeout) {
    cancelable_timeout.Reset(BindOnce(&OnRunLoopTimeout, Unretained(this),
                                      location, run_timeout->on_timeout));
    origin_task_runner_->PostDelayedTask(
        FROM_HERE, cancelable_timeout.callback(), run_timeout->timeout);
  }

  DCHECK_EQ(this, delegate_->active_run_loops_.top());
  const bool application_tasks_allowed =
      delegate_->active_run_loops_.size() == 1U ||
      type_ == Type::kNestableTasksAllowed;
  delegate_->Run(application_tasks_allowed, TimeDelta::Max());

  AfterRun();
}

MessagePump

有以下几种MessagePump的种类,在Thread创建的时候通过option传入确定

enum class MessagePumpType {
  // This type of pump only supports tasks and timers.
  DEFAULT,

  // This type of pump also supports native UI events (e.g., Windows
  // messages).
  UI,

  // User provided implementation of MessagePump interface
  CUSTOM,

  // This type of pump also supports asynchronous IO.
  IO,

#if BUILDFLAG(IS_ANDROID)
  // This type of pump is backed by a Java message handler which is
  // responsible for running the tasks added to the ML. This is only for use
  // on Android. TYPE_JAVA behaves in essence like TYPE_UI, except during
  // construction where it does not use the main thread specific pump factory.
  JAVA,
#endif  // BUILDFLAG(IS_ANDROID)

#if BUILDFLAG(IS_APPLE)
  // This type of pump is backed by a NSRunLoop. This is only for use on
  // OSX and IOS.
  NS_RUNLOOP,
#endif  // BUILDFLAG(IS_APPLE)
};

}  // namespace base

创建代码如下所示,通过type生产对应的MessagePump,十分的简洁明了

bool Thread::StartWithOptions(Options options) {
  DCHECK(options.IsValid());
  DCHECK(owning_sequence_checker_.CalledOnValidSequence());
  DCHECK(!delegate_);
  DCHECK(!IsRunning());
  DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's "
                     << "not allowed!";
#if BUILDFLAG(IS_WIN)
  DCHECK((com_status_ != STA) ||
         (options.message_pump_type == MessagePumpType::UI));
#endif

  // Reset |id_| here to support restarting the thread.
  id_event_.Reset();
  id_ = kInvalidThreadId;

  SetThreadWasQuitProperly(false);

  if (options.delegate) {
    DCHECK(!options.message_pump_factory);
    delegate_ = std::move(options.delegate);
  } else if (options.message_pump_factory) {
    delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
        MessagePumpType::CUSTOM, options.message_pump_factory);
  } else {
    delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
        options.message_pump_type,
        BindOnce([](MessagePumpType type) { return MessagePump::Create(type); },
                 options.message_pump_type));
  }
...

SequenceManagerThreadDelegate在构造的时候会在对象内部构造SequenceManagerImpl保存在对象内部。
并在线程主函数启动后Thread::ThreadMain中有delegate_->BindToCurrentThread();将MessagePump绑定到对象保存的SequenceManager上。
MessagePumpWinMessagePumpForUI举例,前者定义了一些基础接口,后者继承实现(还有其他用途的pump比如for IO之类的,因为UI特殊负责接受Windows的UI messages)

需要关注的接口DoRunLoop,是消息循环的重要接口。

Run函数拉起DoRunLoop循环。

MessagePumpForUI

消息循环的UI循环,工作内容就是执行任务(DoWork),然后等待下一次任务的执行(WaitForWork(next_work_info);)

void MessagePumpForUI::DoRunLoop() {
  DCHECK_CALLED_ON_VALID_THREAD(bound_thread_);

  // IF this was just a simple PeekMessage() loop (servicing all possible work
  // queues), then Windows would try to achieve the following order according
  // to MSDN documentation about PeekMessage with no filter):
  //    * Sent messages
  //    * Posted messages
  //    * Sent messages (again)
  //    * WM_PAINT messages
  //    * WM_TIMER messages
  //
  // Summary: none of the above classes is starved, and sent messages has twice
  // the chance of being processed (i.e., reduced service time).

  for (;;) {
    // If we do any work, we may create more messages etc., and more work may
    // possibly be waiting in another task group.  When we (for example)
    // ProcessNextWindowsMessage(), there is a good chance there are still more
    // messages waiting.  On the other hand, when any of these methods return
    // having done no work, then it is pretty unlikely that calling them again
    // quickly will find any work to do.  Finally, if they all say they had no
    // work, then it is a good time to consider sleeping (waiting) for more
    // work.

    in_native_loop_ = false;

    bool more_work_is_plausible = ProcessNextWindowsMessage();
    in_native_loop_ = false;
    if (run_state_->should_quit)
      break;

    Delegate::NextWorkInfo next_work_info = run_state_->delegate->DoWork();
    in_native_loop_ = false;
    more_work_is_plausible |= next_work_info.is_immediate();
    if (run_state_->should_quit)
      break;

    if (installed_native_timer_) {
      // As described in ScheduleNativeTimer(), the native timer is only
      // installed and needed while in a nested native loop. If it is installed,
      // it means the above work entered such a loop. Having now resumed, the
      // native timer is no longer needed.
      KillNativeTimer();
    }

    if (more_work_is_plausible)
      continue;

    more_work_is_plausible = run_state_->delegate->DoIdleWork();
    // DoIdleWork() shouldn't end up in native nested loops and thus shouldn't
    // have any chance of reinstalling a native timer.
    DCHECK(!in_native_loop_);
    DCHECK(!installed_native_timer_);
    if (run_state_->should_quit)
      break;

    if (more_work_is_plausible)
      continue;

    WaitForWork(next_work_info);
  }
}

MessagePumpDefault

forUI可能还包含额外的UI处理,那么看一下Default方案里面的函数实现,会相应简单一些

void MessagePumpDefault::Run(Delegate* delegate) {
  AutoReset<bool> auto_reset_keep_running(&keep_running_, true);

  for (;;) {
#if BUILDFLAG(IS_APPLE)
    apple::ScopedNSAutoreleasePool autorelease_pool;
#endif

    Delegate::NextWorkInfo next_work_info = delegate->DoWork();
    bool has_more_immediate_work = next_work_info.is_immediate();
    if (!keep_running_)
      break;

    if (has_more_immediate_work)
      continue;

    delegate->DoIdleWork();
    if (!keep_running_) // 关注这里在执行完任务之后也做了任务循环是否继续的检查,避免在进入睡眠之前需要退出但是进入睡眠导致退出延后的问题。
      break;
    
    // 判断下个任务的等待时间,如果下个任务不存在或者不需要执行,需要无限制的等待新任务唤醒。
    if (next_work_info.delayed_run_time.is_max()) {
      event_.Wait();
    } else {
      event_.TimedWait(next_work_info.remaining_delay());
    }
    // Since event_ is auto-reset, we don't need to do anything special here
    // other than service each delegate method.
  }
}

// 其他线程唤醒当前线程任务循环使用的方法
void MessagePumpDefault::ScheduleWork() {
  // Since this can be called on any thread, we need to ensure that our Run
  // loop wakes up.
  event_.Signal();
}

DoWorkDoIdleWork

这两个函数的执行都是靠delegate实现的,这个delegate就是ThreadControllerWithMessagePumpImpl对象
DoWork看成普通任务执行的话,DoIdleWork就如字面意思一样,执行空闲任务

ThreadControllerWithMessagePumpImpl

这个类型的对象通过RegisterDelegateForCurrentThread注册到RunLoop对象的delegate_成员上(这个操作是在上述BindToCurrentThread操作时一同实现的),是真正Run接口的实现类。

DoWork接口的实现类,获取需要执行的任务并执行然后返回下一个任务信息

DoWorkImpl内部实现,用来获取和执行任务,下面是循环执行任务的条件代码

for (int num_tasks_executed = 0;
       (!batch_duration.is_zero() && work_executed < batch_duration) ||
       (batch_duration.is_zero() &&
        num_tasks_executed < main_thread_only().work_batch_size);
       ++num_tasks_executed) {

这个是用来获取任务并执行的循环,条件中显示,一次有最多执行的上限,如果这一次循环执行任务达到一定数量就会退出循环(当然如果全是延时任务并未到达执行时间点的话也会退出循环)

absl::optional<SequencedTaskSource::SelectedTask> selected_task =
        main_thread_only().task_source->SelectNextTask(lazy_now_select_task,
                                                       select_task_option);

获取任务的代码

任务执行也是在这里实现,在循环中,获取到任务之后会执行任务

    task_annotator_.RunTask("ThreadControllerImpl::RunTask",
                            selected_task->task,
                            [&selected_task](perfetto::EventContext& ctx) {
                              if (selected_task->task_execution_trace_logger)
                                selected_task->task_execution_trace_logger.Run(
                                    ctx, selected_task->task);
                              SequenceManagerImpl::MaybeEmitTaskDetails(
                                  ctx, selected_task.value());
                            });

SequenceManagerImpl

可以调用CreateTaskQueue创建任务队列
获取任务的实现者,内部实现SelectNextTaskImpl

TaskQueue

高层抽象,提供对外的易用接口,持有4个主要队列

  • 立刻执行队列
    • immediate_incoming_queue,用于PostTask
    • immediate_work_queue
  • 延迟执行队列
    • delayed_incoming_queue,用于PostDelayedTask
    • delayed_work_queue

每种类型都准备了incoming队列,是因为这个队列是专门提供给其他线程访问的,用于其他线程发布任务,是需要保证线程安全的(加锁),同时还提供了普通队列,普通队列要求必须要从当前线程访问,这个队列是不加锁的,这样区分开了之后可以有效减少加锁。

immediate_work_queue

上述四种队列的类型都是底层的WorkQueue

TaskQueueSelector

提供任务队列的选择,Selector是一个提供笼统抽象的接口的类型,并不关心具体的TaskQueueImpl对象,他的内部只会存储WorkQueue,存储在WorkQueueSets

  // base\task\task\sequence_manager\task_queue_selector.h
  WorkQueueSets delayed_work_queue_sets_;
  WorkQueueSets immediate_work_queue_sets_;

这些WorkQueue是在创建TaskQueueImpl对象的时候就已经创建并添加进对尹大哥selector当中了,值得注意的是,SequenceManagerImpl对selector的访问前都有main_thread_only()标记,这意味着只允许在主线程内操作不加锁的任务队列。
所以在操作队列添加到selector的时候,也就只添加了某两个任务队列,是哪两个已经显而易见了

void TaskQueueSelector::AddQueueImpl(internal::TaskQueueImpl* queue,
                                     TaskQueue::QueuePriority priority) {
#if DCHECK_IS_ON()
  DCHECK(!CheckContainsQueueForTest(queue));
#endif
  // 只添加了主线程可以访问的两个队列,PostTask可以访问的队列不在其中
  delayed_work_queue_sets_.AddQueue(queue->delayed_work_queue(), priority);
  immediate_work_queue_sets_.AddQueue(queue->immediate_work_queue(), priority);
#if DCHECK_IS_ON()
  DCHECK(CheckContainsQueueForTest(queue));
#endif
}

那PostTask来的任务一定是在某个地方完成了队列的转移。

任务转移

需要把其他线程发布的任务执行的话需要怎么做呢?任务筛选器不会访问PostTask可以访问的队列,意味着任务必须要转移进当前线程可以访问的队列才行。

immediate_work_queue为空的时候,需要重新装弹!把immediate_incoming_queueimmediate_work_queue交换

void TaskQueueImpl::ReloadEmptyImmediateWorkQueue() {
  DCHECK(main_thread_only().immediate_work_queue->Empty());
  main_thread_only().immediate_work_queue->TakeImmediateIncomingQueueTasks();

  if (main_thread_only().throttler && IsQueueEnabled()) {
    main_thread_only().throttler->OnHasImmediateTask();
  }
}


void TaskQueueImpl::TakeImmediateIncomingQueueTasks(TaskDeque* queue) {
  DCHECK(queue->empty());
  // Now is a good time to consider reducing the empty queue's capacity if we're
  // wasting memory, before we make it the `immediate_incoming_queue`.
  queue->MaybeShrinkQueue();

  base::internal::CheckedAutoLock lock(any_thread_lock_);
  queue->swap(any_thread_.immediate_incoming_queue);
  ...

上述代码定义了这一行为,但是这个函数是以人物的形式绑定到SequenceManagerImpl的,由其决定何时Reload

在选择下一个任务的实现代码中有重载任务相关的逻辑,但是不一定会触发重载

std::optional<SequenceManagerImpl::SelectedTask>
SequenceManagerImpl::SelectNextTaskImpl(LazyNow& lazy_now,
                                        SelectTaskOption option) {
  DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
  TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
               "SequenceManagerImpl::SelectNextTask");

  ReloadEmptyWorkQueues();
  MoveReadyDelayedTasksToWorkQueues(&lazy_now);

  // If we sampled now, check if it's time to reclaim memory next time we go
  // idle.
  if (lazy_now.has_value() &&
      lazy_now.Now() >= main_thread_only().next_time_to_reclaim_memory) {
    main_thread_only().memory_reclaim_scheduled = true;
  }
  ...

ReloadEmptyWorkQueues具体实现如下,他本身并没有重载人物的逻辑,他只是一个标记为,标记是否要被激活执行真正的任务重载callback

void SequenceManagerImpl::ReloadEmptyWorkQueues() {
  work_tracker_.WillReloadImmediateWorkQueues();

  // There are two cases where a queue needs reloading.  First, it might be
  // completely empty and we've just posted a task (this method handles that
  // case). Secondly if the work queue becomes empty when calling
  // WorkQueue::TakeTaskFromWorkQueue (handled there).
  //
  // Invokes callbacks created by GetFlagToRequestReloadForEmptyQueue above.
  empty_queues_to_reload_.RunActiveCallbacks();
}

TaskQeueueImpl类型的成员变量empty_queues_to_reload_handle_在新的任务插入incoming queue的时候会在incoming queue为空的时候设置标记位为true,激活设置的任务重载callback,从而在选择下个任务的时候触发任务转移,这样做就不会有额外的消耗了。
empty_queues_to_reload_handle_incoming任务队列被移空之后,标记位就会被设置为false,说明被搬空之后就不需要每次拿任务的时候转移了,直到标记为被再次设置。

WorkQueue

真正持有存储任务的队列,TaskQueueImpl::TaskDeque tasks_;任务就存储在tasks_中,注意别和上面的TaskQueue搞混,这里的TaskDeque不是一个类型

标签:task,thread,队列,work,queue,任务,多线程,Chromium
From: https://www.cnblogs.com/lenomirei/p/18570345

相关文章

  • 请为什么说js是单线程,而不是多线程呢?
    JavaScript的单线程性质主要源于其最初的设计目标:操作浏览器中的DOM(文档对象模型)。如果JavaScript是多线程的,并且多个线程同时尝试修改DOM,就可能会出现竞态条件,导致DOM处于不一致或损坏的状态。想象一下,一个线程试图添加一个元素,而另一个线程同时试图删除同一个元素的父元素,这会导......
  • Java面试之多线程&并发篇(8)
    前言本来想着给自己放松一下,刷刷博客,突然被几道面试题难倒!引用类型有哪些?有什么区别?说说ThreadLocal原理?线程池原理知道吗?以及核心参数?线程池的拒绝策略有哪些?似乎有点模糊了,那就大概看一下面试题吧。好记性不如烂键盘***12万字的java面试题整理******java核心面试知识整理......
  • JavaEE 【知识改变命运】03 多线程(2)
    文章目录复习1.1进程和线程的区别1.2线程创建的方式1.3两者创建的区别2多线程2.1多线程的优势-增加了运行的速度2.2Thread类及常用的方法2.2.1常用见的构造方法2.2.2获取当前类的信息2.2.3Thread的⼏个常⻅属性1演示后台线程2线程是否存活3名称4线程中断5等待......
  • 机器翻译(队列版)
    小晨的电脑上安装了一个机器翻译软件,他经常用这个软件来翻译英语文章。这个翻译软件的原理很简单,它只是从头到尾,依次将每个英文单词用对应的中文含义来替换。对于每个英文单词,软件会先在内存中查找这个单词的中文含义,如果内存中有,软件就会用它进行翻译;如果内存中没有,软件就会......
  • 一文搞懂 volatile:多线程编程的关键基础
    1.引言1.1什么是volatile?volatile是一个常用于多线程编程的关键字,其主要作用是确保线程对共享变量的访问保持最新状态。在现代计算机中,由于CPU缓存和编译器优化的存在,线程可能会读取到共享变量的旧值,导致逻辑错误。通过声明变量为volatile,我们可以告诉编译器和运行......
  • 栈与队列 408相关
    栈与队列一、栈的全面解析(一)栈的基本概念栈(Stack)作为一种特殊的线性表,其核心特性是遵循后进先出(LastInFirstOut,LIFO)原则。想象一个垂直放置的容器,只能在顶端进行元素的插入与移除操作,这个顶端就是所谓的栈顶(top),而底部则为栈底(bottom)。例如,一摞叠放的餐盘,最后放置上去......
  • Python学习笔记(4)Python多线程
    线程可以分为:内核线程:由操作系统内核创建和撤销。用户线程:不需要内核支持而在用户程序中实现的线程。Python3线程中常用的两个模块为:_threadthreading(推荐使用)_thread提供了低级别的、原始的线程以及一个简单的锁,它相比于threading模块的功能还是比较有限的......
  • springboot 整合 rabbitMQ (延迟队列)
    前言:延迟队列是一个内部有序的数据结构,其主要功能体现在其延时特性上。这种队列存储的元素都设定了特定的处理时间,意味着它们需要在规定的时间点或者延迟之后才能被取出并进行相应的处理。简而言之,延时队列被设计用于存放那些需要在特定时间到达时才处理的元素。使用场景:1、......
  • 代码随想录算法训练营第十一天|LC150.逆波兰表达式求值|LC239.滑动窗口最大值|LC347.
    150.逆波兰表达式求值-力扣(LeetCode)题目要求:    1、整数除法只保留整数部分;    2、该表达式总会得出有效数值且部存在除数为0的情况;    3、逆波兰表达式:是一种后缀表达式,所谓后缀就是指运算符写在后面。fromtypingimportListfromoperato......
  • 【JavaEE初阶 — 多线程】定时器的应用及模拟实现
         目录  1.标准库中的定时器      1.1Timer的定义      1.2Timer的原理      1.3Timer的使用     1.4Timer的弊端      1.5ScheduledExecutorService     2.模拟实现定时器    ......