从std::mutex到futex机制
我原本出发点是C++ std::mutex的实现原理,但是经过一番查阅,发现C++ std::mutex是对glibc-nptl库pthread_mutex的封装;而nptl库在用户态完成了futex机制的一部分;最后在以linux为内核的操作系统中,又提供了futex系统调用给glibc-nptl给与底层支撑。
因此本文先泛泛讲一讲C++mutex的实现,然后着重介绍为了实现futex机制,glibc-nptl库所作的工作,最后再大致地说一说linuxfutex系统调用的实现。
注意,futex系统调用不等于futex机制。futex机制需要依靠用户态代码与内核态代码的通力合作,其中用户态的部分glibc库已经替我们完成了,我们只需要调用它给的接口就可以了。
glibc版本:2.25
linux内核版本 : 4.19.277
C++ std::mutex
随意ctrl + 鼠标左键点击一个C++ std::mutex变量的声明,能够跳转到mutex的头文件中:
class mutex : private __mutex_base // 基类为__mutex
{
// ...
void
lock()
{
int __e = __gthread_mutex_lock(&_M_mutex); // _M_mutex在基类中。 实际上调用glibc c库的pthread_mutex_lock
// ...
}
void
unlock()
{
__gthread_mutex_unlock(&_M_mutex);
}
// ...
}
class __mutex_base // mutexd的基类
{
protected:
typedef __gthread_mutex_t __native_type;
__native_type _M_mutex = __GTHREAD_MUTEX_INIT; // 初始化宏
constexpr __mutex_base() noexcept = default;
// ...
}
typedef pthread_mutex_t __gthread_mutex_t; // _M_mutex的原始类型就是pthread_mutex_t
#define __GTHREAD_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER // 初始化宏是glibc库的初始化宏
如上所示,C++的std::mutex只是对glibc-nptl库的pthread_mutex_t的一层封装。std::mutex.lock()也只是调用该库的pthread_mutex_lock函数。
下面的内容就着重于glibc-nptl库的mutex实现。
glibc-nptl futex机制
futex全称为fast userspace mutex,译为“快速用户空间锁”,意味着该机制需要用户空间参与而不仅仅是操作系统内核的责任。它的设计要点主要是:
当没有发生竞争时,将在用户空间快速完成任务,当竞争发生时,将通过系统调用进入内核进行仲裁。
数据结构
pthread_mutex_t其实是个union结构,它的主干部分是其中的__pthread_mutex_s,去掉一些
typedef union {
struct __pthread_mutex_s {
int __lock; // !< 表示是否被加锁,是否存在竞争
unsigned int __count; //!< __kind代表可重入锁时,重复上锁会对__count进行递增。
int __owner; //!< 持有线程的线程ID。
unsigned int __nusers;
int __kind; //!< 上锁类型。
int __spins;
__pthread_list_t __list;
} __data;
} pthread_mutex_t;
pthread_mutex_t.__data._lock的值有3
- 0, 表示还没有进程/线程获得这把锁
- 1, 表示有进程/线程已经获得了这个锁
- 2, 当lock值已经为1,且又有进程/线程要获取这个锁时,将lock置为2,表示发生了竞争
futex锁机制使用3个值做一些性能的优化,主要体现在尽可能减少系统调用的次数。
pthread_mutex_t.__data._kind的是一个32位的整数,库的设计者将这个32字节分成几个部分来提通不同的分类方式
-
其中的第1到第7位,代表一个锁的类别。
PTHREAD_MUTEX_TIMED_NP
是mutex的默认类型,它是非递归的,这也是我之后要着重分析的锁类别。而PTHREAD_MUTEX_RECURSIVE_NP
则表示可重入锁。其他类别不展开了(能力也不够)
-
它的第8位表示该锁是用在进程间同步还是线程间同步,通常情况下我们在线程同步中使用mutex,此时只需要将mutex声明在全局数据段即可;但如果是进程间的同步,则需要先开辟一个共享内存,将mutex放入共享内存中,然后不同进程才能操作同一个mutex。这些前置工作是用户要完成的,可以参考这篇博客。
判断该位的宏如下所示:
# define PTHREAD_MUTEX_PSHARED(m) \ ((m)->__data.__kind & 128)
-
其他位的意义我暂时还没有去了解
pthread_mutex_t.__data._count是给可重入锁使用的,如果同一个线程重复对同一个mutex加锁,则对该属性进行递增即可。
pthread_mutex_lock
pthread_mutex_lock在库函数中实际名称为__pthread_mutex_lock:
int
__pthread_mutex_lock (pthread_mutex_t *mutex)
{
// ...
if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP)) /* Normal mutex. */
{
FORCE_ELISION (mutex, goto elision);
simple:
LLL_MUTEX_LOCK (mutex); // 调用LLL_MUTEX_LOCK
assert (mutex->__data.__owner == 0);
}
如果锁的类型是默认类型,则调用LLL_MUTEX_LOCK,它是个宏:
# define LLL_MUTEX_LOCK(mutex) \
lll_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex))
最终将调用的___lll_lock, 传递的两个参数分别为:
- mutex锁中表示改锁是否上锁、是否存在竞争的状态变量__lock
- mutex用在线程同步还是进程同步(见上一小节对pthread_mutex_t.__data._kind概述)
___lll_lock也是个宏:
#define __lll_lock(futex, private) \
((void) \
({ \
int *__futex = (futex); \
if (__glibc_unlikely \
(atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \
{ \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__lll_lock_wait_private (__futex); \
else \
__lll_lock_wait (__futex, private); \
} \
}))
atomic_compare_and_exchange_bool_acq是个CAS操作,可以简写为 CAS(lock, new_val, old_val), 能够达成的效果是,如果old_val == lock,则将new_val赋值给lock,并将old_val返回。
那么CAS(__futex, 1, 0)将会在mutex还没有上锁时,返回0,已经被上锁时返回1。如果lock旧值为0表示没有线程获得锁,那么这一次的pthread_mutex_lock调用就到此结束了,仅在用户空间完成;如果lock旧值非0,表示改锁已被其他线程获取,则根据mutex是否LLL_PRIVATE进一步调用其他函数,这里我们关注线程同步的情况,因此下一步是__lll_lock_wait_private,是一个汇编函数:
__lll_lock_wait_private:
cfi_startproc
pushq %r10
cfi_adjust_cfa_offset(8)
pushq %rdx
cfi_adjust_cfa_offset(8)
cfi_offset(%r10, -16)
cfi_offset(%rdx, -24)
xorq %r10, %r10 /* No timeout. */
movl $2, %edx
LOAD_PRIVATE_FUTEX_WAIT (%esi)
cmpl %edx, %eax /* NB: %edx == 2 */
jne 2f
1: LIBC_PROBE (lll_lock_wait_private, 1, %rdi)
movl $SYS_futex, %eax
syscall # futex系统调用
2: movl %edx, %eax
xchgl %eax, (%rdi) /* NB: lock is implied */
testl %eax, %eax
jnz 1b
# ....
说实话,看不太懂,但最终要的是该汇编代码会在lock = 2时,进行futex系统调用。如果lock不等于2(也不等于0,为1),表示之前有线程获取了这把锁,那么先将lock值设为2,表示发生了竞争,然后再调回去执行futex系统调用。
pthread_mutex_unlock
略过一些前置内容,pthread_mutex_unlock最后将调用__lll_unlock,它也是个宏:
#define __lll_unlock(futex, private) \
((void) \
({ \
int *__futex = (futex); \
int __private = (private); \
int __oldval = atomic_exchange_rel (__futex, 0); \
if (__glibc_unlikely (__oldval > 1)) \
{ \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__lll_lock_wake_private (__futex); \
else \
__lll_lock_wake (__futex, __private); \
} \
}))
它将调用atomic_exchange_rel将lock的值原子地交换为0,且返回lock的旧值
- 如果lock的旧值为1, 表示只有当前线程占有了该锁,没有发生竞争,那么此次调用就结束了,没有进入内核态
- 如果lock的旧值为2,表示发生了竞争,那么就要调用__lll_lock_wake_private继续下去,该函数会进行futex系统调用进入内核,唤醒其他被阻塞的线程。
从pthread_mutex_lock 和 pthread_mutex_unlock的行为我们可以初步地体会到futex机制的妙处:它尽可能识别没有发生竞争的场景,仅在用户态使用原子指令达到同步的目的,而避免使用系统调用进入内核,极大地提高了程序运行效率。
万一真的发生了竞争,那也没啥好说的,使用futex系统调用进入内核,寻求内核的帮助。接下来就进入linux内核,瞧一瞧它对futex机制的支持。
linux内核对futex机制的支持
Linux内核对futex机制的支持主要体现在futex系统调用上。futex系统调用不等于futex机制,前文已经看到过,库函数已经在用户态做了许多优化措施,尽量避免系统调用进入内核,这些也是futex机制的重要组成部分。
futex系统调用
futex系统调用的声明如下:
long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3);
这里主要看前四个参数
- SYS_futex,没什么好说的,这时系统调用更好
- uint32_t *uaddr, 是一个地址,指向上节所说的 pthread_mutex_t.__data._lock 属性
- int futex_op, 分成两个部分
- 第一个部分,指明对futex的具体操作,我这里主要关注 FUTEX_WAIT 和 FUTEX_WAKE 操作,分别是请求内核将当前线程阻塞、将其他等待该futex的其他线程唤醒。
- 第二个部分,指明futex的属性或者指定它的某些行为。比如FUTEX_PRIVATE_FLAG属性表示这个futex是用于线程同步的,而不是进程同步,同时也对应glibc库中的pthread_mutex_t.__data._kind的第8位(见上一节)。对于线程同步的futex,内核有额外的性能优化方案。
- 更多内容请参考futex的man page
- int val, 它的意义据futex_op而定
通过系统调用进入内核后,会调用do_futex进一步处理,它将识别futex_op, 进一步调用其他处理函数,本文主要关注futex_wait与futex_wake函数
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
u32 __user *uaddr2, u32 val2, u32 val3)
{
int cmd = op & FUTEX_CMD_MASK;
unsigned int flags = 0;
if (!(op & FUTEX_PRIVATE_FLAG))
flags |= FLAGS_SHARED; // 用于线程间同步
// ...
switch (cmd) {
case FUTEX_WAIT:
val3 = FUTEX_BITSET_MATCH_ANY;
/* fall through */
case FUTEX_WAIT_BITSET:
return futex_wait(uaddr, flags, val, timeout, val3); // futex_wait
case FUTEX_WAKE:
val3 = FUTEX_BITSET_MATCH_ANY;
/* fall through */
case FUTEX_WAKE_BITSET:
return futex_wake(uaddr, flags, val, val3); // futex_wake
// ....
数据结构
在深入具体的函数之前,首先浏览一些涉及的数据结构。( 之一部分参考了这篇博客的写法)
首先引用一张图片作为总览:
当一个线程需要等待一个锁而陷入睡眠时,内核会为其分配一个futex_q结构,一切的唤醒、睡眠操作都需要先找到这个futex_q结构。
但如果争抢情况比较激烈,那么内核存在的futex_q结构会有很多,如何快速查找对应的futex_q? -- 内核的解决办法是使用哈希表,因为哈希表的查询、插入操作都是O(1)的时间复杂度。如上图所示,内核使用开链法处理哈希碰撞。
当一个线程需要等待一个锁而陷入睡眠时,内核会为其分配一个futex_q结构:
struct futex_q {
struct plist_node list; // 通过这个属性链入hash表的某个链表
struct task_struct *task; // 被挂起的线程的task_struct结构
spinlock_t *lock_ptr; // 自旋锁,控制链表访问
union futex_key key; // hashtable的key
//下面三个与优先级继承相关,在此不多介绍
struct futex_pi_state *pi_state;
struct rt_mutex_waiter *rt_waiter;
union futex_key *requeue_pi_key;
u32 bitset; //类似掩码匹配
};
- task_struct *task, 指向被挂起线程的task struct
- futex_key key, 传入哈希函数的key
- plist_node, futex_q通过这个属性链入hash表的某个链表
哈希表涉及的数据结构如下所示:
struct futex_hash_bucket {
//当前自旋等待哈希桶的waiter数目
atomic_t waiters;
//自旋锁,用于控制chain的访问,
//struct futex_q中lock_ptr,就是引用其所在的bucket的自旋锁
spinlock_t lock;
//优先级链,与传统等待队列不同,futex使用优先级链表来实现等待队列,
//是为了实现优先级继承,从而解决优先级翻转问题
struct plist_head chain;
} ____cacheline_aligned_in_smp;
//全局哈希表
static struct {
struct futex_hash_bucket *queues;
unsigned long hashsize;
} __futex_data __read_mostly __aligned(2*sizeof(long));
#define futex_queues (__futex_data.queues)
#define futex_hashsize (__futex_data.hashsize)
最后是futex_q中的futex_key,它被传入hash函数将futex_q散列到不同的bucket中。这个结构是一个联合结构,如果futex被用来进行进程间同步,则可被解释为 futex_key.shared, 如果futex被用来进行线程同步,则可被解释为futex_key.private。
union futex_key {
struct {
u64 i_seq;
unsigned long pgoff;
unsigned int offset;
} shared; //不同进程间通过文件共享futex变量,表明该变量在文件中的位置
struct {
struct mm_struct *mm;
unsigned long address;
unsigned int offset;
} private; //同一进程的不同线程共享futex变量,表明该变量在进程地址空间中的位置
struct {
u64 ptr;
unsigned long word;
unsigned int offset;
} both;
};
如果被用来进行线程间同步,那么futex的虚拟地址在不同的线程间是相同的,因此只需要使用虚拟地址就可以唯一地标识同一线程组(进程)中的某个futex。但是内核中的hash表是系统中的所有线程共用的,不只对一个线程组服务,因此需要额外的值来标识不同线程组,linux内核选择使用 mm_struct的虚拟地址(若两个线程属于同一进程,则它们的mm_struct的地址相同,若两个线程分属不同进程,则它们的mm_struct的地址不同)。
最终linux使用(mm_struct的虚拟地址, 锁的虚拟地址)来表示一个private futex的key。
如果futex被用来进行进程间同行,那么futex的虚拟地址在不同的进程间不一定相同,因此linux内核需要计算它的物理地址,才可能唯一地表示futex。
linux使用( inode->i_sequence, page->index, offset_within_page)标识一个shared futex地key。其中inode是用来作为共享内存载体的文件inode, page则是futex虚拟地址所对应的struct page结构(page结构在内核中唯一地对应了一个物理页),offest就是futex结构在实际物理页中偏移量。
总而言之,由于不同进程的futex虚拟地址不同,因此内核选择使用物理地址作为hash表的key
。
从这里就可以看出,用来进程间通信的futex所需的计算步骤比用来线程间通信的futex多了很多。将这两种实现区分开来,本身就属于一种性能优化了,因为大部分情况下,当然也是默认情况下我们使用mutex是用来做线程间同步工作的。
futex_wait
搞清楚了数据结构,那么函数实现就相对好理解一些了。当然,我略过了timeout机制:
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q q = futex_q_init;
int ret;
// 这里掠过timie_out机制
// 获取对应的某个hash表链表结构
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)
goto out;
/* 从陷入阻塞 */
futex_wait_queue_me(hb, &q, to);
/* 如果执行到这一步,说明被唤醒了 */
ret = 0;
// ...
return ret;
}
比较简单不是吗,再看看futex_wait_queue_me函数:
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
struct hrtimer_sleeper *timeout)
{
set_current_state(TASK_INTERRUPTIBLE); // 将本线程状态设置为TASK_INTERRUPTIBLE
queue_me(q, hb);
/* Arm the timer */
if (timeout)
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
/*
* If we have been removed from the hash list, then another task
* has tried to wake us, and we can skip the call to schedule().
*/
if (likely(!plist_node_empty(&q->list))) {
/*
* If the timer has already expired, current will already be
* flagged for rescheduling. Only call schedule if there
* is no timeout, or if it has yet to expire.
*/
if (!timeout || timeout->task)
freezable_schedule(); ///
}
// 没有陷入阻塞或者被唤醒后执行到这里
__set_current_state(TASK_RUNNING);
}
主要逻辑为:
- 调用set_current_state将进程状态设置为TASK_INTERRUPTIBLE
- 调用queue_me将futex_q挂载到hash链表上
- 调用freezable_schedule进行冲调度,此后除非有其他线程唤醒本线程,本线程不会被调度器选择执行。这就是陷入了阻塞状态的含义。
当然这里还有一些timeout机制和提前返回不陷入阻塞的额外处理,这里就不展开了。
futex_wake
futex_wake函数则与futex_wait相反,它会找到对应的futex_q,将其记录的task_struct的进程状态设置为TASK_RUNNING。源码如下
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
union futex_key key = FUTEX_KEY_INIT;
int ret;
DEFINE_WAKE_Q(wake_q); // 申明一个wake_up队列,所有满足条件的线程都存放于此
// hash处理得到某个hahs链表
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, VERIFY_READ);
hb = hash_futex(&key);
spin_lock(&hb->lock);
// 遍历链表查找匹配的key
plist_for_each_entry_safe(this, next, &hb->chain, list) {
if (match_futex (&this->key, &key)) {
if (this->pi_state || this->rt_waiter) {
ret = -EINVAL;
break;
}
/* Check if one of the bits is set in both bitsets */
if (!(this->bitset & bitset))
continue;
mark_wake_futex(&wake_q, this); // 将符合条件的task_struct放入wake_up队列
if (++ret >= nr_wake) // 达到唤醒的最大总数
break;
}
}
spin_unlock(&hb->lock);
wake_up_q(&wake_q); // 遍历wake_up队列, 将其中所有的线程唤醒(将他们的进程状态改为TASK_RUNNING)
return ret;
}
参考资料
关于C++ MUTEX
标准库的互斥锁 std::mutex 是如何实现的 - 阅微堂 (zhiqiang.org)
关于glibc mutex
glibc nptl库pthread_mutex_lock和pthread_mutex_unlock浅析_lcjmsr的博客-CSDN博客
pthread包的mutex实现分析_天下无敌笨笨熊的博客-CSDN博客
关于Linux futex
futex(2) - Linux manual page (man7.org)
linux 内核的futex - bbqz007 - 博客园 (cnblogs.com)
futex-based pthread_cond 源代码分析 - bbqz007 - 博客园 (cnblogs.com)
linux--futex原理分析 (openeuler.org)
如何使用pthread_mutex完成进程同步而不是线程同步
使用互斥锁mutex进行进程间同步的说明和示例_c++ mutex 进程间_土豆西瓜大芝麻的博客-CSDN博客
标签:__,struct,lock,C++,futex,线程,mutex From: https://www.cnblogs.com/HeyLUMouMou/p/17481385.html