首页 > 其他分享 >futex系统调用分析

futex系统调用分析

时间:2024-11-07 16:33:36浏览次数:1  
标签:task struct 系统 futex 调用 wake key hb

futex - fast user-space locking

futex是一个内核态和用户态共同参与实现的锁。它基于一个观察,大多数情况下可能并没有锁的争抢,所以没有必要每次都陷入内核态,可以首先在用户态查询一下锁是否被其他进程/线程占用,如果没有占用可直接返回,无需调用futex syscall。用户态这一部分实现比较简单,可以简单理解为使用原子操作读写比较一个共享变量,然后决定下一步的操作。内核态较为复杂,我们下面分析一下内核态的实现。在深入到内核代码之前,我们先了解一下如何使用它。

下面futex接口签名。

       long futex(uint32_t *uaddr, int futex_op, uint32_t val,
                 const struct timespec *timeout,   /* or: uint32_t val2 */
                 uint32_t *uaddr2, uint32_t val3);

uaddr表示futex会在该地址等待或唤醒等待在该地址的进程/线程,对于多进程必须使用共享内存,对于多线程就简单一些。futex_op表示各种操作,比如等待,唤醒,val是跟op相关的值,对于op为FUTEX_WAIT,只有addr指向的地址的值等于val才会等待,否则立即返回。对于FUTEX_WAKE,val表示唤醒进程/线程的个数。timeout表示超时时间,也就是唤醒有两种情形,一种是被其他进程/线程主动唤醒,另一种是超时后自动醒来。这里有一篇文章提供了一个例子可以参考:https://eli.thegreenplace.net/2018/basics-of-futexes/

下面我们进入主题,看一下kernel的实现。

futex syscall。

SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,
                const struct __kernel_timespec __user *, utime,
                u32 __user *, uaddr2, u32, val3)
{
...
        return do_futex(uaddr, op, val, tp, uaddr2, (unsigned long)utime, val3);
}

在用户态调用futex后系统调用函数会调用do_futex完成具体任务。

long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
        u32 __user *uaddr2, u32 val2, u32 val3)
{
    int cmd = op & FUTEX_CMD_MASK;
    unsigned int flags = 0;
...
    switch (cmd) {
    case FUTEX_WAIT:
        val3 = FUTEX_BITSET_MATCH_ANY;
        fallthrough;
    case FUTEX_WAIT_BITSET:
        return futex_wait(uaddr, flags, val, timeout, val3);
    case FUTEX_WAKE:
        val3 = FUTEX_BITSET_MATCH_ANY;
        fallthrough;
    case FUTEX_WAKE_BITSET:
        return futex_wake(uaddr, flags, val, val3);
    ...
    return -ENOSYS;
}

这里处理所有的op,本文只分析wake和wait。

先来分析wait。

int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset)
{
    struct hrtimer_sleeper timeout, *to;
    struct restart_block *restart;
    struct futex_hash_bucket *hb;
    struct futex_q q = futex_q_init;
    int ret;

    if (!bitset)
        return -EINVAL;
    q.bitset = bitset;

    to = futex_setup_timer(abs_time, &timeout, flags,
                   current->timer_slack_ns);
retry:
    /*
     * Prepare to wait on uaddr. On success, it holds hb->lock and q
     * is initialized.
     */
    ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
    if (ret)
        goto out;

    /* futex_queue and wait for wakeup, timeout, or a signal. */
    futex_wait_queue(hb, &q, to);
...
}

第一步,设置timer,如果timeout不为空;

第二步,获取futex_q和futex_hash_bucket,为在uaddr上等待做准备;

第三步,在futex queue上等待;

其余是唤醒后的操作。

先看一下futex_q和futex_hash_bucket结构。

//表示哈希futex队列entry,每个task都有一个
struct futex_q {
    struct plist_node list;  // 在该futex上等待的所有task组成的优先排序链表项,会挂到哈希桶上

    struct task_struct *task;   // 该结构所关联的task
    spinlock_t *lock_ptr;  // 哈希桶的锁
    union futex_key key;   // 该futex在哈希桶的key
    struct futex_pi_state *pi_state;
    struct rt_mutex_waiter *rt_waiter;
    union futex_key *requeue_pi_key;
    u32 bitset;    
    atomic_t requeue_state;
#ifdef CONFIG_PREEMPT_RT
    struct rcuwait requeue_wait;
#endif
} __randomize_layout;

在一个futex上等待的task可以有多个,每个task都有一个futex_q与之对应,他们组成一个链表,每个链表在哈希桶中有一个key表示。

看一下futex_key结构。

union futex_key {
    struct {
        u64 i_seq;
        unsigned long pgoff;
        unsigned int offset;
    } shared;
    struct {
        union {
            struct mm_struct *mm;
            u64 __tmp;
        };
        unsigned long address;
        unsigned int offset;
    } private;
    struct {
        u64 ptr;
        unsigned long word;
        unsigned int offset;
    } both;
};

这是一个unin,用来标识futex的位置,根据futex所在位置属于私有还是共享来选择使用哪一项。每一项可以看成是表示位置的三元组。

//所有哈希到同一个位置的futex key共享一个哈希桶,每个key有多个futex_q结构,每个futex_q代表某个task在一个futex上等待
struct futex_hash_bucket {
    atomic_t waiters;    //等待者的数量
    spinlock_t lock;
    struct plist_head chain;  // futex_q list的头
} ____cacheline_aligned_in_smp;

 

futex_wait_setup会准备好futex等待队列或将uaddr加入等待队列。 

int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
             struct futex_q *q, struct futex_hash_bucket **hb)
{
...
retry:
    ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, FUTEX_READ);

retry_private:
    *hb = futex_q_lock(q);

    ret = futex_get_value_locked(&uval, uaddr);
...
	if (uval != val) {
		futex_q_unlock(*hb);
		ret = -EWOULDBLOCK;
	}

	return ret;
}

第一步针对uaddr获取一个futex_key;

第二步获取根据futex_q获取哈希桶hb;

第三步,获取uaddr指向的值,跟入参val比较,如果不相等就返回-EWOUBLOCK的错误,也就是在用户态传入参数时如果futex指向的值不等于val就会立即返回的原因。

get_futex_key会根据uaddr是私有还是共享找出表示该位置的三元组,对于私有:key为{current->mm, address, 0},其中address是uaddr所在虚拟页框的地址;对于共享:key为 {inode->i_sequence, page->index, offset within page}。私有映射很容的得到key,共享映射需要复杂的步骤才能得到key,因此线程模型比进程模型在使用futex上更高效。

futex_hash_bucke根据上面得到的futex_q生成futex_hash_bucket结构。

struct futex_hash_bucket *futex_q_lock(struct futex_q *q)
    __acquires(&hb->lock)
{
    struct futex_hash_bucket *hb;

    hb = futex_hash(&q->key);   //从全局哈希队列中找到一个hash桶

    /*
     * Increment the counter before taking the lock so that
     * a potential waker won't miss a to-be-slept task that is
     * waiting for the spinlock. This is safe as all futex_q_lock()
     * users end up calling futex_queue(). Similarly, for housekeeping,
     * decrement the counter at futex_q_unlock() when some error has
     * occurred and we don't end up adding the task to the list.
     */
    futex_hb_waiters_inc(hb); /* implies smp_mb(); (A) */  // 增加哈希桶的waiters计数

    q->lock_ptr = &hb->lock;

    spin_lock(&hb->lock); //锁住哈希桶
    return hb;
}

这个哈希桶是从全局的futex_queue中找到的。

struct futex_hash_bucket *futex_hash(union futex_key *key)
{
    u32 hash = jhash2((u32 *)key, offsetof(typeof(*key), both.offset) / 4,
              key->both.offset);

    return &futex_queues[hash & (futex_hashsize - 1)];
}

futex_queues是在futex_init时初始化的。

static int __init futex_init(void)
{
    unsigned int futex_shift;
    unsigned long i;

#if CONFIG_BASE_SMALL
    futex_hashsize = 16;
#else
    futex_hashsize = roundup_pow_of_two(256 * num_possible_cpus());
#endif

    futex_queues = alloc_large_system_hash("futex", sizeof(*futex_queues),
                           futex_hashsize, 0, 0,
                           &futex_shift, NULL,
                           futex_hashsize, futex_hashsize);
    futex_hashsize = 1UL << futex_shift;

    for (i = 0; i < futex_hashsize; i++) {
        atomic_set(&futex_queues[i].waiters, 0);
        plist_head_init(&futex_queues[i].chain);
        spin_lock_init(&futex_queues[i].lock);
    }

    return 0;
}
core_initcall(futex_init);

这个全局hash_queues的数量是初始化的时候决定的,对于一个拥有256个cpu的机器也只会分配16项。

现在已经准备好哈希桶和futex_q,可以wait了。

void futex_wait_queue(struct futex_hash_bucket *hb, struct futex_q *q,
                struct hrtimer_sleeper *timeout)
{
    /*
     * The task state is guaranteed to be set before another task can
     * wake it. set_current_state() is implemented using smp_store_mb() and
     * futex_queue() calls spin_unlock() upon completion, both serializing
     * access to the hash list and forcing another memory barrier.
     */
    set_current_state(TASK_INTERRUPTIBLE|TASK_FREEZABLE);    //设置当前task的状态为interruptable加freezable
    futex_queue(q, hb);       //将futex加入等待队列

    /* Arm the timer */
    if (timeout)               // 如果设置了超时,开始计时
        hrtimer_sleeper_start_expires(timeout, HRTIMER_MODE_ABS);

    /*
     * If we have been removed from the hash list, then another task
     * has tried to wake us, and we can skip the call to schedule().
     */
    if (likely(!plist_node_empty(&q->list))) {         // 我们应该还是队列里面没人这么快就唤醒我们,如果没有设置超时或者超时时间还没到就主动调度
        /*
         * If the timer has already expired, current will already be
         * flagged for rescheduling. Only call schedule if there
         * is no timeout, or if it has yet to expire.
         */
        if (!timeout || timeout->task)
            schedule();
    }
__set_current_state(TASK_RUNNING); //这次我们真的活过来了,设置当前task的状态为TASK_RUNNING }

第一步,设置当前进程的state为可中断和可冻结;

第二步,将futex_q加入哈希桶的链上;

第三步,如果设置了超时,开启timer;

第四步,如果现在还没被唤醒或者超时没到就主动调度;

第五步,再次醒来,设置本task的state为TASK_RUNNING;

futex_queue会调用__futex_queue将当前futex_q加入哈希桶的队列。

void __futex_queue(struct futex_q *q, struct futex_hash_bucket *hb)
{
    int prio;

    /*
     * The priority used to register this element is
     * - either the real thread-priority for the real-time threads
     * (i.e. threads with a priority lower than MAX_RT_PRIO)
     * - or MAX_RT_PRIO for non-RT threads.
     * Thus, all RT-threads are woken first in priority order, and
     * the others are woken last, in FIFO order.
     */
    prio = min(current->normal_prio, MAX_RT_PRIO);

    plist_node_init(&q->list, prio);      //初始化futex_q的list,这是一个优先排序链表
    plist_add(&q->list, &hb->chain);     //将futex_q加入哈希桶的链
    q->task = current;
}

futex_wait在进程醒来之后的代码暂时不分析了。至此我们已经了解了futex的等待机制。搞清楚几个数据结构基本就了解了其中的原理,futex_q, futex_hash_bucket, futex_key。画一张图表示一下:

再看一下唤醒的情况。

long futex(uint32_t *uaddr, int futex_op, uint32_t val,
                 const struct timespec *timeout,   /* or: uint32_t val2 */
                 uint32_t *uaddr2, uint32_t val3);

futex_op为FUTEX_WAKE, val代表要唤醒的进程的个数。其他的参数会被忽略。

当我们在用户态调用使用FUTEX_WAKE作为入参调用futex后调用链是do_futex->futex_wake

int futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
    struct futex_hash_bucket *hb;
    struct futex_q *this, *next;
    union futex_key key = FUTEX_KEY_INIT;
    int ret;
    DEFINE_WAKE_Q(wake_q);     // 定义一个唤醒队列

    if (!bitset)
        return -EINVAL;

    ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ);    //根据uaddr找到futex_key
    if (unlikely(ret != 0))
        return ret;

    hb = futex_hash(&key);      //根据futex_key找到哈希桶

    /* Make sure we really have tasks to wakeup */
    if (!futex_hb_waiters_pending(hb))      //check一下是不是有task在等待
        return ret;

    spin_lock(&hb->lock);        //锁住哈希桶

    plist_for_each_entry_safe(this, next, &hb->chain, list) {      //在哈希桶的chain上循环查找futex_q
        if (futex_match (&this->key, &key)) {                      //每次比对一下futex_key,看看是不是我们要找的
            if (this->pi_state || this->rt_waiter) {
                ret = -EINVAL;
                break;
            }

            /* Check if one of the bits is set in both bitsets */
            if (!(this->bitset & bitset))                       //位图也要对的上
                continue;

            futex_wake_mark(&wake_q, this);          //将futex_q从对应哈希桶的链上取下来,将task加入到wake_q中
            if (++ret >= nr_wake)                    //如果已经唤醒的数量大于需要唤醒的数量break
                break;
        }
    }

    spin_unlock(&hb->lock);
    wake_up_q(&wake_q);                             //这里开始真正的唤醒动作
    return ret;
}

 第一步,获取futex_key和哈希桶;

第二步,在哈希桶上循环找对应futex_key的futex_q,找到后将futex_q从哈希桶上取下来,将对应的task加入到wake_q

第三步,调用wake_up_q唤醒wake_q中的task。

void wake_up_q(struct wake_q_head *head)
{
    struct wake_q_node *node = head->first;

    while (node != WAKE_Q_TAIL) {
        struct task_struct *task;

        task = container_of(node, struct task_struct, wake_q);
        /* Task can safely be re-inserted now: */
        node = node->next;
        task->wake_q.next = NULL;

        /*
         * wake_up_process() executes a full barrier, which pairs with
         * the queueing in wake_q_add() so as not to miss wakeups.
         */
        wake_up_process(task);
        put_task_struct(task);
    }
}

wake_up_q会循环查找wake_q中的task,调用wake_up_process执行唤醒动作,这是另外一个话题了,futex的wait和wake就分析到这里。

标签:task,struct,系统,futex,调用,wake,key,hb
From: https://www.cnblogs.com/banshanjushi/p/18466499

相关文章

  • 皮带跑偏识别智慧矿山一体机皮带危险区域人员违规闯入识别非煤矿山视频智能监控系统
    在当今快速发展的工业时代,矿山行业作为国家经济的重要支柱之一,面临着日益严峻的安全挑战。随着技术的进步,传统的矿山安全管理方式已无法满足现代矿山对于高效、智能化管理的需求。为了提高矿山安全管理水平,减少事故发生,保障矿工生命安全,同时提升矿山生产效率和经济效益,迫切需要一......
  • 鸿蒙开发进阶(HarmonyOS )分布式文件系统
     鸿蒙NEXT开发实战往期必看文章:一分钟了解”纯血版!鸿蒙HarmonyOSNext应用开发!“非常详细的”鸿蒙HarmonyOSNext应用开发学习路线!(从零基础入门到精通)HarmonyOSNEXT应用开发案例实践总结合(持续更新......)HarmonyOSNEXT应用开发性能优化实践总结(持续更新......)分布式......
  • 超赞的微信管理系统:你还不心动?
    在微信广泛应用于商业和团队协作的今天,一款优秀的微信管理系统对于运营和管理来说至关重要。今天,我们就来深入了解一下这类系统强大的管理监控功能。1、在数据统计方面,这个系统堪称强大。它能够自动统计总好友量,让你对人脉资源有清晰的了解。同时,新增好友、添加好友的数量......
  • ndis.sys 是 Windows 操作系统中的一个关键系统文件,属于 网络驱动接口规范(NDIS,Network
    ndis.sys是Windows操作系统中的一个关键系统文件,属于网络驱动接口规范(NDIS,NetworkDriverInterfaceSpecification)的一部分。它是一个用于管理网络驱动程序和网络接口卡(NIC)之间通信的核心组件,主要负责网络设备驱动与操作系统的接口。具体来说,ndis.sys充当了操作系统与网络......
  • 系统调用
    系统调用‍​​‍系统调用的概念和作用操作系统作为用户和计算机硬件之间的接口,需要向上提供一些简单易用的服务。主要包括命令接口和程序接口。其中,程序接口由一组系统调用组成。应用程序可通过系统调用来请求获得操作系统内核的服务。‍——系统调用与库函数的区别​​......
  • 计算机系统体系结构
    计算机系统体系结构​​​​​​‍大内核(宏内核)与微内核​​CPU在内核态和用户态之间的转换需要耗费时间,影响性能。​​‍分层结构​​‍模块化​​‍外核理解:未经抽象的内存空间资源未虚拟化的,即连续地址的内存空间资源→减少了虚拟硬件资源的“映射层”,提......
  • 操作系统的运行机制
    操作系统的运行机制‍​​‍一、内核程序与应用程序​​‍二、特权指令与非特权指令特权指令,如内存清零指令等。非特权指令,如加法指令、减法指令等。CPU设计和生产的时候就划分了特权指令和非特权指令,因此CPU执行一条指令前就能判断出其类型。‍三、内核态与用户......
  • 操作系统的运行机制
    操作系统的运行机制‍​​‍一、内核程序与应用程序​​‍二、特权指令与非特权指令特权指令,如内存清零指令等。非特权指令,如加法指令、减法指令等。CPU设计和生产的时候就划分了特权指令和非特权指令,因此CPU执行一条指令前就能判断出其类型。‍三、内核态与用户......
  • node.js毕设自助收银系统的分析与研究(程序+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带文档lw万字以上,文末可获取源码系统程序文件列表开题报告内容一、选题背景随着信息技术的迅猛发展,自助收银系统在零售业中的应用越来越广泛。传统的收银方式通常需要大量的人力资源,且容易出现人为错误,效率较低。特别是在一些中......
  • SpringBoot推荐图书阅读系统6c613 带论文文档1万字以上,文末可获取,
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表系统内容;用户,图书分类,图书信息开题报告内容一、课题背景与意义随着信息时代的到来,图书馆和在线阅读平台的书籍数量急剧增加,用户在寻找适合自己的阅读材料时......