首页 > 其他分享 >调度器43—migration 内核线程

调度器43—migration 内核线程

时间:2023-02-13 23:15:28浏览次数:61  
标签:struct work stop 43 线程 migration done cpu fn

基于LInux-5.10

相关:Linux内核机制—smp_hotplug_thread:https://www.cnblogs.com/hellokitty2/p/17114737.html

 

一、相关数据结构

1. struct cpu_stop_done

struct cpu_stop_done {
    atomic_t        nr_todo;    /* nr left to execute */
    int            ret;        /* collected return value */
    struct completion    completion;    /* fired if nr_todo reaches 0 */
};

一个辅助结构,提供与同步调用,获取返回值等功能。成员解释:

nr_todo: 表示此次queue work需要等待几个CPU完成工作
ret: 用户queue的回调函数的执行返回值,见 cpu_stopper_thread()。
completion: 当用户queue的回调函数被执行完后,若 work->done != NULL 且 done->nr_todo - 1 ==0,则 complete这个完成量,可以唤醒以同步方式调用接口而阻塞的线程。见 cpu_stopper_thread().


2. struct cpu_stop_work

struct cpu_stop_work {
    struct list_head    list; /* cpu_stopper->works */
    cpu_stop_fn_t        fn;
    void            *arg;
    struct cpu_stop_done    *done;
};

作为一个work结构挂在 cpu_stopper->works 链表上,在migration/X这个stop调度类的内核线程从这个链表上取下work,执行fn回调。成员解释:

list: 通过它挂在 cpu_stopper->works 链表上。
fn: 用户queue的回调函数,此回调函数在stop调度类上下文中运行。
arg: 用户queue的回调函数的参数。
done: 若是使用同步的接口,用户queue完回调函数后在这个结构的 completion 上 TASK_UNINTERRUPTIBLE 休眠等待。异步接口 stop_one_cpu_async 中将其初始化为NULL.


3. struct multi_stop_data

struct multi_stop_data {
    cpu_stop_fn_t        fn;
    void            *data;
    unsigned int        num_threads;
    const struct cpumask    *active_cpus;

    enum multi_stop_state    state;
    atomic_t        thread_ack;
};

同时stop 2个CPU,调用两个CPU上的 migration/X 内核线程时使用。


4. struct cpu_stopper

struct cpu_stopper {
    struct task_struct    *thread; //指向per-cpu的"migration/X"内核线程

    raw_spinlock_t        lock;
    bool            enabled;    /* is this stopper enabled? */
    struct list_head    works;        /* list of pending works */

    struct cpu_stop_work    stop_work;    /* for stop_cpus */
};

参数解释:

thread: 指向本CPU上的"migration/X"内核线程。
enabled: 内核启动后enable,down cpu后设置为false。
works: 用户调用接口queue过来的work挂在这个链表上。
stop_work: 主要用于在CPU hotplug down CPU 时使用,调用路径如下:

            stop_machine_from_inactive_cpu //没有调用过
set_pelt_halflife //pelt.c 设置多少个周期衰减为0.5,一般不设置
    stop_machine
takedown_cpu //cpu.c  stop_machine_cpuslocked(take_cpu_down, NULL, cpumask_of(cpu)); CPU hotplug down CPU 时调用
    stop_machine_cpuslocked    //传参cpumask=cpu_online_mask,down所有online的cpu.
        stop_cpus
            __stop_cpus
                queue_stop_cpus_work
                    work = &per_cpu(cpu_stopper.stop_work, cpu); 
                    work->fn = fn
                    cpu_stop_queue_work(cpu, work)

 

二、migration线程的创建

1. 注册后会为每个CPU都创建一个per-cpu的 migration/X 内核线程,执行函数体为。在注册过程中(此时还是内核启动阶段,非进程上下文)会调用 create 回调将自己设置为stop调度类。

static struct smp_hotplug_thread cpu_stop_threads = {
    .store            = &cpu_stopper.thread, //per-cpu的 "migration/X" 线程存放在 cpu_stopper.thread 中
    .thread_should_run    = cpu_stop_should_run, //判断 thread_fn 回调是否应该运行
    .thread_fn        = cpu_stopper_thread,
    .thread_comm        = "migration/%u",
    .create            = cpu_stop_create, //会设置为stop调度类
    .park            = cpu_stop_park, //提供了park回调但并没有提供unpark回调,park也只是一个WARN_ON(),没有实际动作。
    .selfparking        = true, //创建后就是没有parked的状态
};

static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper);

early_initcall(cpu_stop_init);
    cpu_stop_init
        smpboot_register_percpu_thread(&cpu_stop_threads)

 

2. 如何被设置为stop调度类的

cpu_stop_threads.create //stop_machine.c "migration/%u" 通过此方法设置为stop调度类
    cpu_stop_create
        sched_set_stop_task
            p->sched_class = &stop_sched_class;

 

3. 线程主要逻辑

可以理解为主要逻辑是在 smpboot_thread_fn() 中循环调用 thread_should_run 回调判断 thread_fn 回调是否应该运行,若是应该运行则调用 thread_fn() 回调。thread_fn() 回调里面会去遍历 cpu_stopper->works 链表,依次处
理用户在本CPU上注册的钩子函数。

 

三、对外接口

migration线程对外接口主要定义在 stop_machine.h 中,有如下接口,下面来看看每个接口的执行逻辑:

int stop_one_cpu(unsigned int cpu, cpu_stop_fn_t fn, void *arg);
int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *arg);
bool stop_one_cpu_nowait(unsigned int cpu, cpu_stop_fn_t fn, void *arg, struct cpu_stop_work *work_buf);
void stop_machine_park(int cpu);
void stop_machine_unpark(int cpu);
void stop_machine_yield(const struct cpumask *cpumask);
int stop_one_cpu_async(unsigned int cpu, cpu_stop_fn_t fn, void *arg, struct cpu_stop_work *work_buf, struct cpu_stop_done *done);
void cpu_stop_work_wait(struct cpu_stop_work *work_buf);

1. stop_one_cpu

(1) 函数实现

在queue work后会唤醒指定cpu上的 migration/X 线程的执行。这是一个同步调用,会等待queue的 work->func() 在migration/X线程中执行完成。

int stop_one_cpu(unsigned int cpu, cpu_stop_fn_t fn, void *arg)
{
    struct cpu_stop_done done;
    struct cpu_stop_work work = { .fn = fn, .arg = arg, .done = &done };

    cpu_stop_init_done(&done, 1); //赋值 done->nr_todo = 1
    if (!cpu_stop_queue_work(cpu, &work)) //调用这个接口去queue work,返回stopper是否enabled
        return -ENOENT;

    cond_resched(); //可抢占配置下是空函数
    /* 休眠等待操作完成 */
    wait_for_completion(&done.completion);
    return done.ret;
}

static bool cpu_stop_queue_work(unsigned int cpu, struct cpu_stop_work *work)
{
    struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);
    DEFINE_WAKE_Q(wakeq);
    unsigned long flags;
    bool enabled;

    preempt_disable();
    raw_spin_lock_irqsave(&stopper->lock, flags);
    /* 有个开关决定是否能queue work */
    enabled = stopper->enabled;
    if (enabled)
        /* 将work挂在 stopper->works 链表上,然后将cpu的"migration/X"线程挂在wakeq上 */
        __cpu_stop_queue_work(stopper, work, &wakeq);
    else if (work->done)
        /* 若 done->nr_todo-1!=0 则complete(&done->completion) */
        cpu_stop_signal_done(work->done);
    raw_spin_unlock_irqrestore(&stopper->lock, flags);

    /* 唤醒"migration/X"线程线程 */
    wake_up_q(&wakeq);
    preempt_enable();

    return enabled;
}

(2) 主要调用路径

    set_cpus_allowed_ptr // core.c 【1】绑核
    sched_setaffinity //core.c 【2】设置CPU亲和性
        __set_cpus_allowed_ptr //core.c
    force_compatible_cpus_allowed_ptr //core.c 没有调用位置
        restrict_cpus_allowed_ptr //core.c
                __set_cpus_allowed_ptr_locked //core.c p正在运行或正在被唤醒过程中则调用
                    stop_one_cpu(cpu_of(rq), migration_cpu_stop, &arg);

do_execveat_common 【2】exec 方式创建一个进程时调用
kernel_execve
    bprm_execve //exec.c    
        sched_exec //core.c 若dest cpu不是当前cpu且是active就调用,自己迁移自己。
            stop_one_cpu(task_cpu(p), migration_cpu_stop, &arg);

        migrate_task_to //配置CONFIG_NUMA_BALANCING才生效(默认不生效)
            stop_one_cpu(curr_cpu, migration_cpu_stop, &arg);

可以看到,设置任务CPU亲和性进行绑核的时候,若此时正在运行或正在被唤醒过程中则进行调用进行主动迁移。或当exec执行一个新任务时会主动迁移自己。这是同步迁移,当函数返回后迁移已经完成了。


2. stop_two_cpus

(1) 函数实现

int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *arg)
{
    struct cpu_stop_done done;
    struct cpu_stop_work work1, work2;
    struct multi_stop_data msdata;

    msdata = (struct multi_stop_data){
        .fn = fn,
        .data = arg,
        .num_threads = 2,
        .active_cpus = cpumask_of(cpu1),
    };

    /* 初始化两个work */
    work1 = work2 = (struct cpu_stop_work){
        .fn = multi_cpu_stop,
        .arg = &msdata,
        .done = &done
    };

    cpu_stop_init_done(&done, 2);
    set_state(&msdata, MULTI_STOP_PREPARE);

    /* 保证cpu1的数值比cpu2大,可能为了防死锁 */
    if (cpu1 > cpu2)
        swap(cpu1, cpu2);
    /* 向两个cpu上queue work,然后唤醒两个cpu上的migration/X线程执行 */
    if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2))
        return -ENOENT;

    wait_for_completion(&done.completion);

    return done.ret;
}

(2) 主要调用路径

check_for_migration //eas_plus.c
    task_check_for_rotation //rotate.c
        task_rotate_work_func //rotate.c
        task_numa_migrate //fair.c 若使能 CONFIG_NUMA_BALANCING 才存在,默认不存在
            migrate_swap
                stop_two_cpus(arg.dst_cpu, arg.src_cpu, migrate_swap_stop, &arg)

可以看到,在rotate机制中,要在大核和小核之间交换两个 misfit 任务,使每个 misfit 任务的运行时间均等分布,以便可以用于多核并行线程以减少执行时间时会用到。


3. stop_one_cpu_nowait

(1) 函数实现

此函数只是queue一个work,但是当前进程不必等待任务执行完成,相当于异步迁移,调用函数执行完时迁移可能并没有完成。此时调用者需要保证 @work_buf 当前未被使用,并且在stopper开始执行 @fn 之前保持不变。

bool stop_one_cpu_nowait(unsigned int cpu, cpu_stop_fn_t fn, void *arg, struct cpu_stop_work *work_buf)
{
    *work_buf = (struct cpu_stop_work){ .fn = fn, .arg = arg, };
    /* 仅仅是queue一个work,并不等待work执行完成 */
    return cpu_stop_queue_work(cpu, work_buf);
}

(2) 主要调用路径

newidle_balance //fair.c 【1】 new idle balnace调用路径
    trace_android_rvh_sched_newidle_balance
        mtk_sched_newidle_balance //mtk/fair.c
scheduler_tick //core.c 【2】 tick中的主动迁移
    trace_android_vh_scheduler_tick
        check_for_migration //eas_plus.c
            migrate_running_task //mtk/fair.c 判断是主动迁移则调用
                stop_one_cpu_nowait(cpu_of(target), mtk_active_load_balance_cpu_stop, p, &target->active_balance_work); 


                newidle_balance //fair.c 【7】一个CPU新进入idle调用
                    nohz_newidle_balance
                run_rebalance_domains 【8】如下,SCHED_SOFTIRQ 的软中断回调函数
                    nohz_idle_balance //fair.c 为所有tick停止的cpu调用
                        _nohz_idle_balance //fair.c 为所有idle cpu调用,传参 (rq, CPU_IDLE)
                scheduler_tick 【6】 tick中触发
                    trigger_load_balance //fair.c
scheduler_tick //core.c 【5】tick中触发
    trigger_load_balance //fair.c
        nohz_balancer_kick //fair.c
    newidle_balance //fair.c 【4】一个CPU新进入idle调用
        nohz_newidle_balance //fair.c
            kick_ilb //fair.c kick一个cpu做nohz balance
                smp_call_function_single_async
                    nohz_csd_func //core.c 作为 rq->nohz_csd 的回调函数
                        run_rebalance_domains //fair.c SCHED_SOFTIRQ 的软中断回调函数,传参 (rq, CPU_IDLE/CPU_NOT_IDLE)
                            rebalance_domains //fair.c
                            newidle_balance //fair.c 【3】一个CPU新进入idle调用
                                load_balance //fair.c
                                    stop_one_cpu_nowait(cpu_of(busiest), active_load_balance_cpu_stop, busiest, &busiest->active_balance_work);

看来 active_load_balance_cpu_stop() 是CFS调度中主动迁移的执行函数。


4. stop_one_cpu_async

(1) 函数实现

int stop_one_cpu_async(unsigned int cpu, cpu_stop_fn_t fn, void *arg,
               struct cpu_stop_work *work_buf, struct cpu_stop_done *done)
{
    cpu_stop_init_done(done, 1);

    work_buf->done = done;
    work_buf->fn = fn;
    work_buf->arg = arg;

    if (cpu_stop_queue_work(cpu, work_buf))
        return 0;

    /* 区别在这里,将done赋值为NULL */
    work_buf->done = NULL;

    return -ENOENT;
}

(2) 调用路径

do_core_ctl //core_ctl.c 【1】内核core ctl接口
    try_to_pause //core_ctl.c
eas_ioctl_impl //perf_ioctl.c 【2】对上层的接口
    core_ctl_force_pause_cpu //core_ctl.c
        sched_pause_cpu //core_pause.c
            pause_cpus //cpu.c
                __pause_drain_rq //cpu.c 对参数cpus中的每个cpu都调用
                    sched_cpu_drain_rq //core.c
                        stop_one_cpu_async(cpu, drain_rq_cpu_stop, NULL, rq_drain, rq_drain_done);
                __wait_drain_rq(cpus) //cpu.c 对cpus中的每个cpu都调用。同在 pause_cpus 下调用,等待上面的 stop_one_cpu_async() 注册的回调执行完
                    sched_cpu_drain_rq_wait(cpu) //core.c
                        if (work->done)
                            cpu_stop_work_wait(rq_drain);

可以看到主要是在CPU isolate路径中调用,把能迁移走的所有调度类的任务迁移走,把不能迁移走的(只绑定这个单个CPU的内核线程)deactive掉。实测绑定单核的非内核线程,无论isolate还是offline都会将其cpu mask复位为所有CPU,
并迁移到其它CPU上继续运行。


5. cpu_stop_work_wait

等待由 stop_one_cpu_async() 发起的回调。

(1) 函数实现

void cpu_stop_work_wait(struct cpu_stop_work *work_buf)
{
    struct cpu_stop_done *done = work_buf->done;

    wait_for_completion(&done->completion);
    work_buf->done = NULL;
}

(2) 调用路径

见上面 stop_one_cpu_async() 的调用路径。


6. stop_machine_park

(1) 函数实现

void stop_machine_park(int cpu)
{
    struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);
    /*
     * 无锁执行。 cpu_stopper_thread() 中将持有 stopper->lock
     * 并在停放它之前flush pending的works,直到可以将排队新works。
     */
    stopper->enabled = false;
    /* kthread->flags |= KTHREAD_SHOULD_PARK, 然后在loop
     * 里将自己设置为 TASK_PARKED 然后切走。
     * 若thread不是当前正在执行的任务,则会先唤醒,然后等待其进
     * 入TASK_PARKED状态。
     */
    kthread_park(stopper->thread);
}

(2) 调用路径

cpuhp_hp_states[CPUHP_TEARDOWN_CPU].teardown.single 回调
    takedown_cpu //cpu.c 也将下面函数作为回调在migration/X内核线程上下文执行
        take_cpu_down //cpu.c 先回调一些hotplug回调,这些回调是在migration/X内核线程上下文执行的,函数最后才park
            stop_machine_park(cpu);

可以看到在CPU hotplug down CPU 的时候,会将很多hotplug状态对应的回调放到migration/X内核线程上下文中去执行,执行完后就把migration/X内核线程给park了。


7. stop_machine_unpark

(1) 函数实现

void stop_machine_unpark(int cpu)
{
    struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);

    stopper->enabled = true;
    /*
     * 将此线程重新bind到cpu上,然后  从 kthread->flags 中清除
     * KTHREAD_SHOULD_PARK 标志,然后唤醒处于 TASK_PARKED 状态
     * 的 migration/X 线程。
     * 唤醒后便会去 cpu_stopper.works 链表上取任务执行了。
     */
    kthread_unpark(stopper->thread);
}

(2) 调用路径

rest_init //main.c
    cpu_startup_entry //idle.c
        cpuhp_online_idle //cpu.c 在开始进入idle loop之前对stopper thread进行unpark
            stop_machine_unpark(smp_processor_id());

 

四、总结

stop_machine.c 中实现的per-cpu的 migration/X 内核线程是stop调度类的,线程函数体是 smpboot_thread_fn(),其被唤醒后主要是调用 thread_should_run 回调判断是否需要调用 thread_fn 回调,若是需要则调用。

migration/X 内核线程对外是以queue work的方式向用户提供接口的,用户可以通过向其queue work的方式使work func回调运行在stop调度类上下文中。在任务迁移(绑核、负载均衡)、cpu hotplug等相关机制中使用较多。

负载均衡中的任务迁移注册的回调函数是 active_load_balance_cpu_stop().

当一个CPU被isolate或offline时,会将其工作队列上的任务移到其它CPU上,对于只绑定单个CPU的内核线程,会对其deactive而不会迁移到其它CPU,对于只绑定单个CPU的用户线程,会将其cpumask reset到所有CPU上并迁移走。

 


参考:
How migration thread works inside of Linux Kernel: https://www.systutorials.com/migration-thread-works-inside-linux-kernel/

 

标签:struct,work,stop,43,线程,migration,done,cpu,fn
From: https://www.cnblogs.com/hellokitty2/p/17118216.html

相关文章

  • python 多线程 join
    当一个进程启动之后,会默认产生一个线程。这个线程就是主线程。如果是多线程时,主线程会创建多个线程。主线程执行完成后就会退出。多线程执行完成后也会退出。注意:只有守护......
  • P2430 严酷的训练 题解
    题目背景Lj的朋友WKY是一名神奇的少年,在同龄人之中有着极高的地位。。。题目描述他的老师老王对他的程序水平赞叹不已,于是下决心培养这名小子。老王的训练方式很奇怪,他......
  • C/C++多线程实现龟兔赛跑
    题⽬:⻳兔赛跑跑道距离50⽶乌⻳(⼀个线程)每秒3⽶不睡觉;兔⼦(⼀个线程)每秒5⽶每跑15⽶睡2秒钟。请模拟⽐赛情况:#include<iostream>#include<thread>#include<......
  • 为什么leveldb/rocksdb只允许一个线程修改memtable?
    leveldb/rocksdb采用deque控制多线程只允许一个线程修改memtable,是由于memtable不存在读-写冲突(采用MVCC,保证Sequence一定不同,memtable的key就一定不同,update/delete/ins......
  • java中的多线程
    多线程​​1、线程概述​​​​1.1进程​​​​1.2线程​​​​2、线程的创建和启动​​​​2.1继承Thread类​​​​2.2实现Runnable接口​​​​2.3继承Thread类和实......
  • JavaScript的原型、原型链、异步与单线程复习回顾
     原型和原型链有对象的地方就有原型,每个对象都会在其内部初始化一个属性,就是prototype(原型),原型中存储共享的属性和方法。当我们访问一个对象的属性时,js引擎会先看当......
  • 关于适配服务器创建多线程任务的一些见解
    1、首先理解I/O密集和CPU密集CPU密集型CPU密集型,也叫计算密集型,一般是指服务器的硬盘、内存硬件性能相对CPU好很多,或者使用率低很多。系统运行CPU读写I/O(硬盘/内存)时可......
  • react18-学习笔记43-接受无限多的参数
    classnames可以接受很多参数报告  ......
  • 多线程2(设置线程名,同步代码块:给线程加锁)
    继承,不能继承父类的构造方法,如果想要使用,需要重写构造器,然后调用super关键字,就可使用父类的构造方法了(setName方法)给线程设置名字,1用setName方法,2构造器方法   同......
  • JAVA多线程(一)--实现/创建方式
    JAVA多线程(一)--实现/创建方式一、继承Thread类Thread类本质上是一个实现了Runnable接口的实例,代表一个线程的实例。启动线程的唯一方法是调用Thread类的start()方法,sta......