基于LInux-5.10
相关:Linux内核机制—smp_hotplug_thread:https://www.cnblogs.com/hellokitty2/p/17114737.html
一、相关数据结构
1. struct cpu_stop_done
struct cpu_stop_done { atomic_t nr_todo; /* nr left to execute */ int ret; /* collected return value */ struct completion completion; /* fired if nr_todo reaches 0 */ };
一个辅助结构,提供与同步调用,获取返回值等功能。成员解释:
nr_todo: 表示此次queue work需要等待几个CPU完成工作
ret: 用户queue的回调函数的执行返回值,见 cpu_stopper_thread()。
completion: 当用户queue的回调函数被执行完后,若 work->done != NULL 且 done->nr_todo - 1 ==0,则 complete这个完成量,可以唤醒以同步方式调用接口而阻塞的线程。见 cpu_stopper_thread().
2. struct cpu_stop_work
struct cpu_stop_work { struct list_head list; /* cpu_stopper->works */ cpu_stop_fn_t fn; void *arg; struct cpu_stop_done *done; };
作为一个work结构挂在 cpu_stopper->works 链表上,在migration/X这个stop调度类的内核线程从这个链表上取下work,执行fn回调。成员解释:
list: 通过它挂在 cpu_stopper->works 链表上。
fn: 用户queue的回调函数,此回调函数在stop调度类上下文中运行。
arg: 用户queue的回调函数的参数。
done: 若是使用同步的接口,用户queue完回调函数后在这个结构的 completion 上 TASK_UNINTERRUPTIBLE 休眠等待。异步接口 stop_one_cpu_async 中将其初始化为NULL.
3. struct multi_stop_data
struct multi_stop_data { cpu_stop_fn_t fn; void *data; unsigned int num_threads; const struct cpumask *active_cpus; enum multi_stop_state state; atomic_t thread_ack; };
同时stop 2个CPU,调用两个CPU上的 migration/X 内核线程时使用。
4. struct cpu_stopper
struct cpu_stopper { struct task_struct *thread; //指向per-cpu的"migration/X"内核线程 raw_spinlock_t lock; bool enabled; /* is this stopper enabled? */ struct list_head works; /* list of pending works */ struct cpu_stop_work stop_work; /* for stop_cpus */ };
参数解释:
thread: 指向本CPU上的"migration/X"内核线程。
enabled: 内核启动后enable,down cpu后设置为false。
works: 用户调用接口queue过来的work挂在这个链表上。
stop_work: 主要用于在CPU hotplug down CPU 时使用,调用路径如下:
stop_machine_from_inactive_cpu //没有调用过 set_pelt_halflife //pelt.c 设置多少个周期衰减为0.5,一般不设置 stop_machine takedown_cpu //cpu.c stop_machine_cpuslocked(take_cpu_down, NULL, cpumask_of(cpu)); CPU hotplug down CPU 时调用 stop_machine_cpuslocked //传参cpumask=cpu_online_mask,down所有online的cpu. stop_cpus __stop_cpus queue_stop_cpus_work work = &per_cpu(cpu_stopper.stop_work, cpu); work->fn = fn cpu_stop_queue_work(cpu, work)
二、migration线程的创建
1. 注册后会为每个CPU都创建一个per-cpu的 migration/X 内核线程,执行函数体为。在注册过程中(此时还是内核启动阶段,非进程上下文)会调用 create 回调将自己设置为stop调度类。
static struct smp_hotplug_thread cpu_stop_threads = { .store = &cpu_stopper.thread, //per-cpu的 "migration/X" 线程存放在 cpu_stopper.thread 中 .thread_should_run = cpu_stop_should_run, //判断 thread_fn 回调是否应该运行 .thread_fn = cpu_stopper_thread, .thread_comm = "migration/%u", .create = cpu_stop_create, //会设置为stop调度类 .park = cpu_stop_park, //提供了park回调但并没有提供unpark回调,park也只是一个WARN_ON(),没有实际动作。 .selfparking = true, //创建后就是没有parked的状态 }; static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper); early_initcall(cpu_stop_init); cpu_stop_init smpboot_register_percpu_thread(&cpu_stop_threads)
2. 如何被设置为stop调度类的
cpu_stop_threads.create //stop_machine.c "migration/%u" 通过此方法设置为stop调度类 cpu_stop_create sched_set_stop_task p->sched_class = &stop_sched_class;
3. 线程主要逻辑
可以理解为主要逻辑是在 smpboot_thread_fn() 中循环调用 thread_should_run 回调判断 thread_fn 回调是否应该运行,若是应该运行则调用 thread_fn() 回调。thread_fn() 回调里面会去遍历 cpu_stopper->works 链表,依次处
理用户在本CPU上注册的钩子函数。
三、对外接口
migration线程对外接口主要定义在 stop_machine.h 中,有如下接口,下面来看看每个接口的执行逻辑:
int stop_one_cpu(unsigned int cpu, cpu_stop_fn_t fn, void *arg); int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *arg); bool stop_one_cpu_nowait(unsigned int cpu, cpu_stop_fn_t fn, void *arg, struct cpu_stop_work *work_buf); void stop_machine_park(int cpu); void stop_machine_unpark(int cpu); void stop_machine_yield(const struct cpumask *cpumask); int stop_one_cpu_async(unsigned int cpu, cpu_stop_fn_t fn, void *arg, struct cpu_stop_work *work_buf, struct cpu_stop_done *done); void cpu_stop_work_wait(struct cpu_stop_work *work_buf);
1. stop_one_cpu
(1) 函数实现
在queue work后会唤醒指定cpu上的 migration/X 线程的执行。这是一个同步调用,会等待queue的 work->func() 在migration/X线程中执行完成。
int stop_one_cpu(unsigned int cpu, cpu_stop_fn_t fn, void *arg) { struct cpu_stop_done done; struct cpu_stop_work work = { .fn = fn, .arg = arg, .done = &done }; cpu_stop_init_done(&done, 1); //赋值 done->nr_todo = 1 if (!cpu_stop_queue_work(cpu, &work)) //调用这个接口去queue work,返回stopper是否enabled return -ENOENT; cond_resched(); //可抢占配置下是空函数 /* 休眠等待操作完成 */ wait_for_completion(&done.completion); return done.ret; } static bool cpu_stop_queue_work(unsigned int cpu, struct cpu_stop_work *work) { struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu); DEFINE_WAKE_Q(wakeq); unsigned long flags; bool enabled; preempt_disable(); raw_spin_lock_irqsave(&stopper->lock, flags); /* 有个开关决定是否能queue work */ enabled = stopper->enabled; if (enabled) /* 将work挂在 stopper->works 链表上,然后将cpu的"migration/X"线程挂在wakeq上 */ __cpu_stop_queue_work(stopper, work, &wakeq); else if (work->done) /* 若 done->nr_todo-1!=0 则complete(&done->completion) */ cpu_stop_signal_done(work->done); raw_spin_unlock_irqrestore(&stopper->lock, flags); /* 唤醒"migration/X"线程线程 */ wake_up_q(&wakeq); preempt_enable(); return enabled; }
(2) 主要调用路径
set_cpus_allowed_ptr // core.c 【1】绑核 sched_setaffinity //core.c 【2】设置CPU亲和性 __set_cpus_allowed_ptr //core.c force_compatible_cpus_allowed_ptr //core.c 没有调用位置 restrict_cpus_allowed_ptr //core.c __set_cpus_allowed_ptr_locked //core.c p正在运行或正在被唤醒过程中则调用 stop_one_cpu(cpu_of(rq), migration_cpu_stop, &arg); do_execveat_common 【2】exec 方式创建一个进程时调用 kernel_execve bprm_execve //exec.c sched_exec //core.c 若dest cpu不是当前cpu且是active就调用,自己迁移自己。 stop_one_cpu(task_cpu(p), migration_cpu_stop, &arg); migrate_task_to //配置CONFIG_NUMA_BALANCING才生效(默认不生效) stop_one_cpu(curr_cpu, migration_cpu_stop, &arg);
可以看到,设置任务CPU亲和性进行绑核的时候,若此时正在运行或正在被唤醒过程中则进行调用进行主动迁移。或当exec执行一个新任务时会主动迁移自己。这是同步迁移,当函数返回后迁移已经完成了。
2. stop_two_cpus
(1) 函数实现
int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *arg) { struct cpu_stop_done done; struct cpu_stop_work work1, work2; struct multi_stop_data msdata; msdata = (struct multi_stop_data){ .fn = fn, .data = arg, .num_threads = 2, .active_cpus = cpumask_of(cpu1), }; /* 初始化两个work */ work1 = work2 = (struct cpu_stop_work){ .fn = multi_cpu_stop, .arg = &msdata, .done = &done }; cpu_stop_init_done(&done, 2); set_state(&msdata, MULTI_STOP_PREPARE); /* 保证cpu1的数值比cpu2大,可能为了防死锁 */ if (cpu1 > cpu2) swap(cpu1, cpu2); /* 向两个cpu上queue work,然后唤醒两个cpu上的migration/X线程执行 */ if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2)) return -ENOENT; wait_for_completion(&done.completion); return done.ret; }
(2) 主要调用路径
check_for_migration //eas_plus.c task_check_for_rotation //rotate.c task_rotate_work_func //rotate.c task_numa_migrate //fair.c 若使能 CONFIG_NUMA_BALANCING 才存在,默认不存在 migrate_swap stop_two_cpus(arg.dst_cpu, arg.src_cpu, migrate_swap_stop, &arg)
可以看到,在rotate机制中,要在大核和小核之间交换两个 misfit 任务,使每个 misfit 任务的运行时间均等分布,以便可以用于多核并行线程以减少执行时间时会用到。
3. stop_one_cpu_nowait
(1) 函数实现
此函数只是queue一个work,但是当前进程不必等待任务执行完成,相当于异步迁移,调用函数执行完时迁移可能并没有完成。此时调用者需要保证 @work_buf 当前未被使用,并且在stopper开始执行 @fn 之前保持不变。
bool stop_one_cpu_nowait(unsigned int cpu, cpu_stop_fn_t fn, void *arg, struct cpu_stop_work *work_buf) { *work_buf = (struct cpu_stop_work){ .fn = fn, .arg = arg, }; /* 仅仅是queue一个work,并不等待work执行完成 */ return cpu_stop_queue_work(cpu, work_buf); }
(2) 主要调用路径
newidle_balance //fair.c 【1】 new idle balnace调用路径 trace_android_rvh_sched_newidle_balance mtk_sched_newidle_balance //mtk/fair.c scheduler_tick //core.c 【2】 tick中的主动迁移 trace_android_vh_scheduler_tick check_for_migration //eas_plus.c migrate_running_task //mtk/fair.c 判断是主动迁移则调用 stop_one_cpu_nowait(cpu_of(target), mtk_active_load_balance_cpu_stop, p, &target->active_balance_work); newidle_balance //fair.c 【7】一个CPU新进入idle调用 nohz_newidle_balance run_rebalance_domains 【8】如下,SCHED_SOFTIRQ 的软中断回调函数 nohz_idle_balance //fair.c 为所有tick停止的cpu调用 _nohz_idle_balance //fair.c 为所有idle cpu调用,传参 (rq, CPU_IDLE) scheduler_tick 【6】 tick中触发 trigger_load_balance //fair.c scheduler_tick //core.c 【5】tick中触发 trigger_load_balance //fair.c nohz_balancer_kick //fair.c newidle_balance //fair.c 【4】一个CPU新进入idle调用 nohz_newidle_balance //fair.c kick_ilb //fair.c kick一个cpu做nohz balance smp_call_function_single_async nohz_csd_func //core.c 作为 rq->nohz_csd 的回调函数 run_rebalance_domains //fair.c SCHED_SOFTIRQ 的软中断回调函数,传参 (rq, CPU_IDLE/CPU_NOT_IDLE) rebalance_domains //fair.c newidle_balance //fair.c 【3】一个CPU新进入idle调用 load_balance //fair.c stop_one_cpu_nowait(cpu_of(busiest), active_load_balance_cpu_stop, busiest, &busiest->active_balance_work);
看来 active_load_balance_cpu_stop() 是CFS调度中主动迁移的执行函数。
4. stop_one_cpu_async
(1) 函数实现
int stop_one_cpu_async(unsigned int cpu, cpu_stop_fn_t fn, void *arg, struct cpu_stop_work *work_buf, struct cpu_stop_done *done) { cpu_stop_init_done(done, 1); work_buf->done = done; work_buf->fn = fn; work_buf->arg = arg; if (cpu_stop_queue_work(cpu, work_buf)) return 0; /* 区别在这里,将done赋值为NULL */ work_buf->done = NULL; return -ENOENT; }
(2) 调用路径
do_core_ctl //core_ctl.c 【1】内核core ctl接口 try_to_pause //core_ctl.c eas_ioctl_impl //perf_ioctl.c 【2】对上层的接口 core_ctl_force_pause_cpu //core_ctl.c sched_pause_cpu //core_pause.c pause_cpus //cpu.c __pause_drain_rq //cpu.c 对参数cpus中的每个cpu都调用 sched_cpu_drain_rq //core.c stop_one_cpu_async(cpu, drain_rq_cpu_stop, NULL, rq_drain, rq_drain_done); __wait_drain_rq(cpus) //cpu.c 对cpus中的每个cpu都调用。同在 pause_cpus 下调用,等待上面的 stop_one_cpu_async() 注册的回调执行完 sched_cpu_drain_rq_wait(cpu) //core.c if (work->done) cpu_stop_work_wait(rq_drain);
可以看到主要是在CPU isolate路径中调用,把能迁移走的所有调度类的任务迁移走,把不能迁移走的(只绑定这个单个CPU的内核线程)deactive掉。实测绑定单核的非内核线程,无论isolate还是offline都会将其cpu mask复位为所有CPU,
并迁移到其它CPU上继续运行。
5. cpu_stop_work_wait
等待由 stop_one_cpu_async() 发起的回调。
(1) 函数实现
void cpu_stop_work_wait(struct cpu_stop_work *work_buf) { struct cpu_stop_done *done = work_buf->done; wait_for_completion(&done->completion); work_buf->done = NULL; }
(2) 调用路径
见上面 stop_one_cpu_async() 的调用路径。
6. stop_machine_park
(1) 函数实现
void stop_machine_park(int cpu) { struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu); /* * 无锁执行。 cpu_stopper_thread() 中将持有 stopper->lock * 并在停放它之前flush pending的works,直到可以将排队新works。 */ stopper->enabled = false; /* kthread->flags |= KTHREAD_SHOULD_PARK, 然后在loop * 里将自己设置为 TASK_PARKED 然后切走。 * 若thread不是当前正在执行的任务,则会先唤醒,然后等待其进 * 入TASK_PARKED状态。 */ kthread_park(stopper->thread); }
(2) 调用路径
cpuhp_hp_states[CPUHP_TEARDOWN_CPU].teardown.single 回调 takedown_cpu //cpu.c 也将下面函数作为回调在migration/X内核线程上下文执行 take_cpu_down //cpu.c 先回调一些hotplug回调,这些回调是在migration/X内核线程上下文执行的,函数最后才park stop_machine_park(cpu);
可以看到在CPU hotplug down CPU 的时候,会将很多hotplug状态对应的回调放到migration/X内核线程上下文中去执行,执行完后就把migration/X内核线程给park了。
7. stop_machine_unpark
(1) 函数实现
void stop_machine_unpark(int cpu) { struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu); stopper->enabled = true; /* * 将此线程重新bind到cpu上,然后 从 kthread->flags 中清除 * KTHREAD_SHOULD_PARK 标志,然后唤醒处于 TASK_PARKED 状态 * 的 migration/X 线程。 * 唤醒后便会去 cpu_stopper.works 链表上取任务执行了。 */ kthread_unpark(stopper->thread); }
(2) 调用路径
rest_init //main.c cpu_startup_entry //idle.c cpuhp_online_idle //cpu.c 在开始进入idle loop之前对stopper thread进行unpark stop_machine_unpark(smp_processor_id());
四、总结
stop_machine.c 中实现的per-cpu的 migration/X 内核线程是stop调度类的,线程函数体是 smpboot_thread_fn(),其被唤醒后主要是调用 thread_should_run 回调判断是否需要调用 thread_fn 回调,若是需要则调用。
migration/X 内核线程对外是以queue work的方式向用户提供接口的,用户可以通过向其queue work的方式使work func回调运行在stop调度类上下文中。在任务迁移(绑核、负载均衡)、cpu hotplug等相关机制中使用较多。
负载均衡中的任务迁移注册的回调函数是 active_load_balance_cpu_stop().
当一个CPU被isolate或offline时,会将其工作队列上的任务移到其它CPU上,对于只绑定单个CPU的内核线程,会对其deactive而不会迁移到其它CPU,对于只绑定单个CPU的用户线程,会将其cpumask reset到所有CPU上并迁移走。
参考:
How migration thread works inside of Linux Kernel: https://www.systutorials.com/migration-thread-works-inside-linux-kernel/
标签:struct,work,stop,43,线程,migration,done,cpu,fn From: https://www.cnblogs.com/hellokitty2/p/17118216.html