C 多线程
C 程序中经常同时执行多项任务。例如,一个程序可能:
(1) 在执行程序过程中通过完成并行任务来提高性能。
(2) 在处理用户输入的同时,在后台进行耗时的数据通信和实时操作。
通过并行执行(concurrent execution)程序中的部分代码,可以实现不同任务同时进行。特别是在多处理器系统(当然也包括多核处理器)上,程序通过并行机制更有效地使用系统资源,其意义越来越重大。
C11 标准以前,C 开发人员必须依赖操作系统或相应链接库来实现并行。C11 标准发布以后,使得 C 程序可便捷地实现并行。C11 支持多线程执行(multithreaded execution)。
多线程指的是在一个程序进程中处理控制流的多路并行通道,它在所有操作系统上为运行该程序提供了相同程度的并发性。为此,C11 标准定义了一个相应的内存模型(memory model),并且支持原子操作(atomic operation)。
在 C11 标准下,对于多线程和原子操作的支持是可选的。如果支持 C11 标准的实现版本定义了宏 _STDC_NO_THREADS_ 和 _STDC_NO_ATOMICS_,则表示该实现版本不支持多线程与原子操作。
你可能曾使用过针对 C 语言的 POSIX 线程扩展(简称 pthreads),该扩展是根据 UNIX 可移植操作系统接口标准(POSIX)——IEEE 1003.1c——实现多线程编程的链接库。如果使用过该扩展,你会发现 C11 线程编程的接口在很多方面与 POSIX 标准类似。
C语言线程和进程
当启动一个程序时,操作系统创建一个进程,并在该进程中执行程序。一个进程包括一个或多个线程。每个线程又是一个局部进程,它以独立于其他局部进程的方式执行一个命令序列。
当进程启动时,它的主线程则成为活动线程。这时,任何正在运行的线程都可以启动其他线程。当进程终止时,例如,通过在 main()函数中执行一个 return 语句或通过调用 exit()函数,所有已开启但还未结束的线程都会被终止。
系统调度器为所有可运行的线程平均分配可用的 CPU 时间。通常,调度器是抢占式的:它会中断正在执行的线程,给中央处理单元(CPU)留出可用的短暂时间,并将 CPU 分配给其他线程使用一段时间。
这种调度的结果是:即使是在单处理系统上,在用户面前运行的线程看上去像是在同时执行,实际上,只有在多处理器系统中,几个线程才可能真正地同时执行。
每一个进程在内存中都有自己的地址空间,并拥有独占的资源,例如,打开的文件。一个进程中的所有线程都继承该进程的资源。最具有意义的是,在一个进程中的几个线程共享一个地址空间。这使得在一个进程中的任务切换比在不同进程间的任务切换要简单得多。
然而,为了在不同线程间切换任务,每个线程也拥有自己的资源:包括栈存储器和 CPU 寄存器。这些资源允许每个线程在不受其他线程干扰的条件下,处理自身的本地数据。此外,一个线程也可以具有线程专用的永久内存。
对于一个给定进程,由于它内部的所有线程均使用相同的地址空间,所以它们共享全局数据与静态数据。然而,这也意味着,同一个进程中的两个不同线程可以同时访问同一个内存单元。这种情况在 C 标准中被称为数据竞争(data race),或者通常称之为竞态条件(race condition)。
为了防止在共享数据时出现冲突,当这些不同线程使用内存中相同位置时,程序员必须明确地同步这些不同线程的写操作或读写操作。
C语言创建线程thread_create()
在头文件 threads.h 中,定义和声明了支持多线程的宏、类型和函数。所有直接与线程相关的标识符,均以前缀 thrd_ 作为开头。例如,thrd_t 是一个对象类型,它标识了一个线程。
函数 thrd_create()用于创建并开始执行一个新线程。函数 thrd_create()的其中一个参数为在新线程中需要被执行的函数 thrd_create()的其中一个参数为在新线程中需要被执行的函数。thrd_create()的完整原型是:
int thrd_create(thrd_t *thr, thrd_start_t func, void *arg);
参数 func 是一个指针,它指向在新线程需要被执行的函数,而 void 指针 arg 用于向该函数传递参数。换句话说,新线程将执行函数调用 func(arg)。
参数 func 的类型为 thrd_start_t,它被定义为 int(*)(void*)(这是一个函数指针,指向一个 void 指针作为其参数并返回一个 int 值的函数),因此,该线程执行的函数返回一个 int 类型的值。
程序在后续过程中可以通过调用函数 thread_join()获得这个 int 类型的返回值(必要时,需等待该线程执行完)。
如果一个线程启动成功,函数 thread_create()将新线程写入一个对象进行标识,并通过参数 thr 指向该对象,然后返回宏值 thread_success。
在大多数情况下,后续的其他操作均依赖于该线程的执行结果,并且只有当该线程完成后,才能执行其他操作。函数 thread_join()用于确保一个线程已完成。它的原型是:
int thrd_join(thrd_t thr, int *result);
调用 thread_join()的线程会被阻塞,直到通过 thr 标识的线程执行完成,这里“阻塞”(block)指的是:线程会在调用 thread_join()的位置停留必要的时间。然后,thread_join()将线程 thr 中执行函数的返回值写入指针 result 所引用的 int 变量中,假设 result 不是一个空指针。最后,thread_join()释放属于线程 thr 的所有资源。
如果程序逻辑上并不需要等待线程 thr 结束,则应该调用以下函数:
int thrd_detach(thrd_t thr);
thrd_detach()使得当线程 thr 执行完成后,自动释放线程占用的所有资源。一旦一个线程执行了分离操作(调用 thrd_detach()),将不用程序等待其结束,程序也不会获得该线程执行函数的返回值。对于每个创建的线程,调用 thread_join()或 thread_detach()不得超过一次。
在例 1 中的程序展示了使用并行操作处理数组的一种方式。各个线程先自行处理数组的各部分,然后将它们的处理结果组合在一起。该程序仅需计算一个数字序列的总和。
函数 sum()首先根据创建线程的最大数量确定划分数组所得的各组元素的最大数量,然后调用递归辅助函数 parallel_sum()。
函数 parallel_sum()将数组平均分为两半,将其中的一半交给一个新线程处理,同时调用自身来处理另一半数组。如该例所示,一个线程函数需要多个参数,这些参数通常采用结构进行封装。
【例1】在几个并行线程中计算数组元素的和
#include <stdbool.h> #include <threads.h> #define MAX_THREADS 8 // 1、2、4、8……所创建线程数量的最大值 #define MIN_BLOCK_SIZE 100 // 一个数组块的最小值 typedef struct // 函数parallel_sum()的参数 { float *start; // 传递给parallel_sum()的数组块的起始地址 int len; // 数组块长度 int block_size; // 最小数组块的大小 double sum; // 求和结果 } Sum_arg; int parallel_sum(void *arg); // 线程函数的原型 // --------------------------------------------------------------- // 计算数组元素的和,并写入*sumPtr // sum()调用函数parallel_sum()进行并行处理 // 返回值:如果没有发生错误,则返回true;否则,返回false bool sum(float arr[], int len, double* sumPtr) { int block_size = len / MAX_THREADS; if (block_size < MIN_BLOCK_SIZE) block_size = len; Sum_arg args = { arr, len, block_size, 0.0 }; if (parallel_sum(&args)) { *sumPtr = args.sum; return true; } else return false; } // --------------------------------------------------------------- // 递归辅助函数,用以将工作分解到几个线程中处理 int parallel_sum(void *arg) { Sum_arg *argp = (Sum_arg*)arg; // 指向参数的指针 if (argp->len <= argp->block_size) // 如果length <= block_size, // 对所有元素求和 { for (int i = 0; i < argp->len; ++i) argp->sum += argp->start[i]; return 1; } else // 如果length > block_size, // 分解数组 { int mid = argp->len / 2; Sum_arg arg2 = { argp->start+mid, argp->len-mid, argp->block_size, 0}; // 指定后一半数组 argp->len = mid; // 前一半数组的长度 thrd_t th; // 在新线程中处理前一半数组 int res = 0; if (thrd_create(&th, parallel_sum, arg) != thrd_success) return 0; // 没能成功创建新线程 if (!parallel_sum(&arg2)) // 在当前线程下,以递归方式处理后一半数组 { thrd_detach(th); return 0; // 递归调用失败 } thrd_join(th, &res); if (!res) return 0; // 同级线程报告执行失败 argp->sum += arg2.sum; return 1; } }
C语言线程函数
除了创建线程的 thread_create()函数、获取返回值的 thread_join()函数和释放线程占用资源的 thread_detach()函数,C11 还提供了另外用于线程控制的 5 个函数:
- thrd_t thrd_current(void);
该函数返回其所在线程的线程标识。
- int thrd_equal(thrd_t thr0,thrd_t thr1);
仅当两个线程标识符 thr0、thr1 分别引用了两个不同线程时,返回 0。
- int thrd_sleep(const struct timespec*duration,struct timespec*remaining);
使得正在调用的线程等待一段时间,等待时间由 duration 指定。仅当该函数收到唤醒的信号时,它才提前返回。在这种情况下,该函数将剩余倒数时间保留在 remaining 引用的对象中,假设 remaining 不是一个空指针。指针 duration 和 remaining 不得指向同一个对象。
结构参数 timespec 有两个成员,分别用于存储秒和纳秒:
- time_t tv_sec; // 秒≥0
- long tv_nsec; // 0 ≤纳秒≤999 999 999
结构中成员的顺序未被指定。在下面的例子中,除非通过信号来唤醒,否则正在调用的线程将等待至少 100 毫秒:
- struct timespec duration = {0};
- duration.tv_nsec = 100*1E6; // 1毫秒 = 1 000 000纳秒
- thrd_sleep(&duration,NULL); // 休眠100毫秒
如果倒计时完成,则 thrd_sleep()返回 0;如果线程函数由一个信号唤醒,即实现提前返回,则 thrd_sleep()返回 -1。其他负数返回值表示错误。
- void thrd_yield(void);
该函数建议操作系统调度器中断当前调用的线程,并将 CPU 时间分给另一个线程。
- _Noreturn void thrd_exit(int result);
以 result 作为结果值结束正在调用线程。在线程中执行的所有函数都可以调用 thrd_exit()。该函数调用相当于在线程函数中执行语句 return result;。最后一个线程退出后,整个程序将正常退出,换句话说,它类似于调用具有参数 EXIT_SUCCESS 的函数 exit()
C语言线程互斥和原子操作
如果多个线程访问相同的数据,并且它们中至少有一个修改了数据,那么对共享数据的所有访问必须同步以防止数据竞争。但是,一个正在读取共享数据的线程可能中断另一个正在修改相同共享数据的线程,因此,可能导致线程读取到不一致的数据。
甚至,由于程序在每次执行时系统可能调度不同的线程,导致每次运行程序时错误消息只能间歇地反映当时情况,很难在测试中复现错误。如例 1 所示,哪怕是自增一个计数器这样的简单操作,都可能产生数据竞争。
【例1】没有同步下的并行存储访问
#include <stdio.h> #include <threads.h> #define COUNT 10000000L long counter = 0 void incFunc(void) { for (long i = 0; i < COUNT; ++i) ++counter; } void decFunc(void) { for (long i = 0; i < COUNT; ++I) --counter; } int main(void) { clock_t cl = clock() thrd_t th1, th2; if (thrd_create(&th1, (thrd-start_t)incFunc, NULL) != thrd_success || thrd_create(&th2, (thrd_start_t)decFunc, NULL) != thrd_success) { fpintf(stderr,"Error creating thread/n"); return -1; } thrd_join(th1, NULL); thrd_join(th2, NULL); printf("Counter: %ld \t", counter); printf("CPU time: %ld ms\n", (clock()-cl)*1000L/CLOCKS_PER_SEC); return 0; }
在程序结束时,计数器应为 0。然而,在没有同步的情况下,结果则不同:程序每次运行时,最终获得计数器值都是不同的。下面是一个典型的输出示例:
Counter: -714573 CPU time: 59 ms
为了保障同步,C 标准库提供了互斥操作(mutex operation)和原子操作(atomic operation)。
互斥
互相排斥(mutex exclusion)技术,简称为互斥(mutex),它用于防止多个线程同时访问共享资源。互斥技术采用一个对象控制独占访问权限,该对象称之为互斥。配合条件变量(condition variable),互斥可以实现广泛的同步访问控制。例如,它们允许程序员为数据访问操作指定执行次序。
在 C 程序中,一个互斥采用类型为 mtx_t 的对象表示,它能在一段时间内被一个线程锁定,而其他线程必须等待,直到它被解锁。在头文件 threads.h 中,包括了关于互斥操作的所有声明。最重要的互斥函数有:
int mtx_init(mtx_t*mtx,int mutextype);
创建一个互斥,该互斥的属性由 mutextype 指定。如果成功创建了一个新互斥,函数 mtx_init()会将新互斥写入由参数 mtx 引用的对象,然后返回宏值 thrd_success。
参数 mutextype 的取值可以是以下 4 个:
mtx_plain
mtx_timed
mtx_plain | mtx_recursive
mtx_timed | mtx_recursive
mtx_plain 表示请求一个简单的互斥,它既不支持超时也不支持递归,而其他 3 个值则表示支持超时和(或)递归。
void mtx_destroy(mtx_t*mtx);
销毁 mtx 引用的互斥,并释放它的所有资源。
int mtx_lock(mtx_t*mtx);
阻塞正在调用的线程,直到该线程获得参数 mtx 引用的互斥。除该互斥支持递归的情况以外,正在调用的线程不能是已持有该互斥的线程。如果调用成功获得互斥,则函数返回值 thrd_success,否则,返回值 thrd_error。
int mtx_unlock(mtx_t*mtx);
释放参数 mtx 引用的互斥。在调用函数 mtx_unlock()之前,调用者必须持有该互斥。如果调用释放互斥成功,则函数返回值 thrd_success,否则,返回值 thrd_error。
通常情况下,在代码某个关键区间(critical section)的起始点调用函数 mtx_lock(),在其结束点调用函数 mtx_unlock(),在这段区间中只有一个线程执行。
函数 mtx_lock()还有两个替代的选择:一个选择是函数 mtx_trylock(),如果该互斥恰好未被其他任何线程获取,它则为当前线程获得互斥,如果该互斥被其他线程获取,它也不会阻塞当前线程;另一个选择是函数 mtx_timedlock(),它仅在指定的时间内阻塞线程。所有这些函数都通过其返回值表明调用它们后,是否成功地获得了互斥。
例 2 中的程序是例 1 的修改版本,它展示了如何使用互斥来消除对变量 counter 的数据竞争。
【例2】在例 1 的程序中添加一个互斥
#include <stdio.h> #include <threads.h> #define COUNT 10000000L long counter = 0; mtx_t mtx; // 为访问counter而设立的互斥 void incFunc(void) { for (long i = 0; i < COUNT; ++i) { mtx_lock(&mtx); ++counter; mtx_unlock(&mtx); } } void decFunc(void) { for (long i = 0; i < COUNT; ++i) { mtx_lock(&mtx); --counter; mtx_unlock(&mtx); } } int main(void) { if (mtx_init(&mtx, mtx_plain) != thrd_success) { fprintf(stderr, "Error initializing the mutex.\n"); return -1; } // 如例14-2所示,启动线程,等待它们完成,打印输出 mtx_destroy(&mtx); return 0; }
函数 incFunc()和 decFunc()将不再并行地访问 counter,因为一次只有其中一个可以锁定互斥(为保障可读性,省略错误检查)。现在,在程序结束时,计数器具有正确的值:0。下面是一个典型的输出示例:
Counter: 0 CPU time: 650 ms
实现同步性需要付出代价。较高的 CPU 时间表明:修改后的程序需要大约 10 倍于原来的时间来运行。其原因是,通过锁定互斥实现同步性远比自增和自减一个变量具有更为复杂的操作。在不需要互斥锁定的情况下,使用原子对象可以获得更好的性能。
原子对象
原子对象(atomic object)是一个可通过原子操作(atomic operation)被读取或修改的对象。原子操作是指不能被并行线程中断的操作。在C11标准下,可以使用类型限定符_Atomic声明一个原子对象(如果实现版本定义了宏__STDC_NO_ATOMICS__,则表明该实现版本不支持原子操作,自然也不能声明原子对象)。例如,在例14-2程序中的变量counter可以通过以下方式声明它为原子对象:
_Atomic long counter = ATOMIC_VAR_INIT(0L);
上述声明定义了原子化的 long 类型变量 counter,并将其值初始化为 0。在头文件 stdatomic.h 中定义了宏 ATOMIC_VAR_INIT,以及其他所有用于原子对象的宏、类型和声明。特别是,stdatomic.h 中还定义了对应于所有整数类型的原子类型缩写。例如,类型 atomic_uchar 等效于 _Atomic unsigned char。
语法 _Atomic(T)也可用于为给定的非原子类型 T 指定其对应的原子类型。数组和函数类型不能为原子类型。然而,原子类型可以具有不同于其对应的非原子类型的空间大小和对齐方式。
原子操作
读取或写入一个原子对象是一个原子操作,也就是说它是不能被中断的操作。这意味着:不同的线程可以同时访问一个原子对象而不引起竞态条件。对于每个原子对象,对象的所有修改以一个确定的全局化次序执行,这称为该对象的修改次序(modification order)。
具有结构或联合类型的原子对象只能被作为一个整体读取或写入:为了安全地访问单个成员,原子结构或联合应首先复制到等效的非原子对象中。
注意,无论是使用宏 ATOMIC_VAR_INIT,还是通过泛型函数 ATOMIC_INIT(),一个原子对象的初始化不是一个原子操作。
原子操作通常用于进行读-修改-写操作。例如,后缀自增和自减运算符 ++ 和 --,当它们应用于原子对象时,是原子化的读-修改-写操作。同样,复合赋值运算符,如 +=,当其原子化使用时,它们的左操作数是一个原子对象。
例 1 中的程序可以通过声明变量 counter 作为原子对象,在不受任何其他影响下执行正确的计数,以最终获得 0 值。该方案计时结果显示,使用原子类型变量 counter 比例 2 所使用的互斥方法要快两倍多。
除了已经提到的运算符,还有许多函数可以执行原子操作,包括 atomic_store()、atomic_exchange()和 atomic_compare_exchange_strong()。
原子类型具有无锁(lock-free)属性,它表示不使用锁定和解锁操作实现对一个原子对象的原子访问。该方式只需要使用类型 atomic_flag(它是一个结构类型)以确保实现无锁,atomic_flag 有“设置”和“清除”两种状态。宏 ATOMIC_FLAG_INIT 将一个 atomic_flag 对象初始化为“清除”状态,如以下示例声明所示:
atomic_flag done = ATOMIC_FLAG_INIT;
C11 提供了函数 atomic_flag_test_and_set()和 atomic_flag_clear(),由此对一个 atomic_flag 对象执行状态操作。整型原子类型通常也都是无锁的。要确定一个给定的类型是否是无锁的,程序可以检查宏 ATOMIC_type_LOCK_FREE,其中 type 是一个指定整数类型的大写缩写,如 BOOL、INT,或 LLONG。
与指针类型对应的宏是 ATOMIC_POINTER_LOCK_FREE。所有这些宏的值可能为 0、1 或 2。值为 0,表示该类型不是无锁的;值为 1,表示该类型对特定对象是无锁的;值为 2,表示该类型始终是无锁的。或者,可以调用泛型函数来确定一个给定的原子对象是否是无锁的:
_Bool atomic_is_lock_free(const volatile A *obj);
在函数参数声明中的占位符A代表任一原子类型。因此,参数 obj 为指针,它指向任一给定原子对象。
C语言原子操作的应用(内存次序,内存屏障)
内存次序
为优化程序代码,编译器和处理器可以自由地对任何无相互依赖关系的命令进行重新排列。例如,两个分配语句 a=0;B=1;,它们可以以任一顺序执行。然而,在多线程环境下,由于不同线程内存操作之间的依赖性对于编译器或处理器通常是不可见的,所以对编译器或处理器执行命令重新排序可能会引发错误。
使用原子对象可以默认地防止此类重新排序。但是,防止优化意味着可能会牺牲速度。有经验的程序员可以在较低的内存次序请求下,通过明确地使用原子操作提高性能。
对于每个执行原子操作的函数(例如 atomic_store()),都有另一个版本,这些函数的名称以 _explicit 结尾,如 atomic_store_explicit(),它们增加了一个类型为 memory_order 的参数。
memory_order 类型是一个枚举,它定义了以下常量,以指定特定的内存次序请求:
memory_order_relaxed
调用者指定无任何内存次序请求,从而使编译器可以自由地改变操作的顺序。
memory_order_release
指定在当前线程 T1 中对一个原子对象A进行写访问时执行释放操作(release operation)。释放操作的作用是:当另一个线程 T2 对 A 执行捕获操作时(读访问),所有 T1 曾对 A 执行的操作在 T2 捕获 A 以后,对 T2 都是可见的。
memory_order_acquire
指定对一个原子对象进行读访问时执行捕获操作(acquire operation)。它确保在该函数调用前,后续的内存访问操作不发生重新排列。
memory_order_consume
一个消耗操作(consume operation)的限制小于一个捕获操作:它仅当后续内存访问操作直接依赖读取原子变量时,防止重新排序。
memory_order_acq_rel
同时具有捕获和释放操作。
memory_order_seq_cst
顺序一致性(sequential consistency)请求包括对 memory_order_acq_rel 的捕获和释放操作。此外,它还指定了所有操作按一个次序严格执行,该次序为所包含原子对象的修改次序。顺序一致性是默认的内存顺序请求,如果没有显式指定更低的请求,这种请求会应用到所有原子操作。
如果将 counter 声明为原子对象,自增和自减 counter 都是独立于其他操作的,因此不必指定内存访问次序。换句话说,在下述语句位置:
- ++counter; // 隐含memory_order_seq_cst
下面语句充分且允许编译器执行更多的优化:
- atomic_fetch_add_explicit( &counter, 1, memory_order_relaxed );
释放和捕获操作是在命令间建立 happens-before 关系的有效途径。换句话说,如下例所示,_explicit 函数确保一个线程完成操作 A 后才能执行操作 B:
- struct Data *dp = NULL, data;
- atomic_intptr_t aptr = ATOMIC_VAR_INIT(0);
- // 线程1
- data = ...; // 操作A
- atomic_store_explicit( &aptr, (intptr_t)&data,
- memory_order_release );
- // 线程2
- dp = (struct Data*)atomic_load_explicit( &aptr,
- memory_order_acquire );
- if( dp != NULL)
- // 处理*dp所引用的数据
- // 操作B
- else
- // *dp所引用的数据还不可获得
对于一个使用互斥同步的程序,当互斥锁定时,隐含了一个捕获操作,当互斥解锁时,隐含了一个释放操作。这意味着:如果一个线程 T1 使用互斥来保护一个操作 A,而另一个线程 T2 使用相同的互斥来保护一个操作 B,假如 T1 先锁定互斥,那么操作 A 将先完成执行,然后才执行操作 B。相反,假如 T2 先锁定互斥,那么当 T1 执行操作 A 时,通过操作 B 所执行的所有修改,对于线程 T1 是可见的。
栅栏(内存屏障)
对于一个原子操作的内存次序请求,也可以通过一个原子操作单独指定。这种技术被称为建立一个栅栏(fence)或内存屏障(memory barrier)。要设置一个栅栏,C11 提供了以下函数:
- void atomic_thread_fence(memory_order order);
若参数值为 memory_order_release,函数 atomic_thread_fence()建立一个释放栅栏(releas fence)。在这种情况下,原子写操作必须在释放栅栏之后发生。
若参数值为 memory_order_acquire 或 memory_order_consume,函数 atomic_thread_fence()建立一个捕获栅栏(acquire fence)。在这种情况下,原子读操作必须在捕获栅栏之前发生。
栅栏允许更大程度的内存顺序优化。
- // 线程2
- dp = (struct Data*)atomic_load_explicit( &aptr, memory_order_relaxed );
- if( dp != NULL)
- {
- atomic_thread_fence(memory_order_acquire);
- // 操作B:处理*dp所引用的数据
- }
- else
- // *dp所引用的数据还不可获得
C语言线程间通信
C11 标准为线程间通信提供了条件变量(condition variable)。线程可以使用条件变量,以等待来自另一个线程的通知,通知告知了指定的条件已被满足。例如,这类通知可能代表某些数据已经准备好进行处理。
条件变量由类型为 cnd_t 的对象表示,并配合互斥一起使用。一般过程如下:线程获得互斥,然后测试条件。如果条件不满足,则线程继续等待条件变量(释放互斥),直到另一个线程再次唤醒它,然后该线程再次获得互斥,并再次测试条件,重复上述过程,直到条件满足。
头文件 threads.h 定义了使用条件变量的函数,它们如下所示:
int cnd_init(cnd_t*cond);
初始化 cond 引用的条件变量。
void cnd_destroy(cnd_t*cond);
释放指定条件变量使用的所有资源。
int cnd_signal(cnd_t*cond);
在等待指定条件变量的任意数量的线程中,唤醒其中一个线程。
int cnd_broadcast(cnd_t*cond);
唤醒所有等待指定条件变量的线程。
int cnd_wait(cnd_t*cond,mtx_t*mtx);
阻塞正在调用的线程,并释放指定的互斥。在调用 cnd_wait()之前,线程必须持有互斥。如果另一线程通过发送一个信号解除当前线程的阻塞(也就是说,通过指定同样的条件变量作为参数调用 cond_signal()或 cnd_broadcast()),那么调用 cnd_wait()的线程在 cnd_wait()返回之前会再次获得互斥。
int cnd_timedwait(cnd_t*restrict cond,mtx_t*restrict mtx,const struct timespec*restrict ts);
与 cnd_wait()类似,cnd_timedwait()阻塞调用它们的线程,但仅维持由参数 ts 指定的时间。可以通过调用函数 timespec_get()获得一个 struct timespec 对象,它表示当前时间。
除 cnd_destroy()以外的所有条件变量函数,如果它们引发错误,则返回值 thrd_error,否则返回值 thrd_success。当时间达到限定值时,函数 cnd_timedwait()也会返回值 thrd_timedout。
例 1 与例 2 中的程序展示了在常见的“生产者-消费者”模型中使用条件变量。程序为每个生产者和消费者开启一个新线程。生产者将一个新产品(在我们的示例中,新产品为一个 int 变量)放入一个环形缓冲区中,假设这个缓冲区没有满,然后通知等待的消费者:产品已经准备好。每个消费者从该缓冲区中取出产品,然后将实际情况通知给正在等待的生产者。
在任一特定时间,只有一个线程可以修改环形缓冲器。因此,在函数 bufPut()和 bufGet()间将存在线程同步问题,函数 bufPut()将一个元素插入到缓冲区,函数 buf-Get()将一个元素从缓冲区移除。
有两个条件变量:生产者等待其中一个条件变量,以判断缓冲器是否满了;消费者等待另一个条件变量,以判断缓冲器是否空了。缓冲区的所有必需元素都包括在结构 Buffer 中。函数 bufInit()初始化具有指定大小的 Buffer 对象,而函数 bufDestroy()销毁 Buffer 对象。
【例1】用于“生产者-消费者”模型的环形缓冲区
/* buffer.h * 用于线程安全缓冲区的所有声明 */ #include <stdbool.h> #include <threads.h> typedef struct Buffer { int *data; // 指向数据数组的指针 size_t size, count; // 元素数量的最大值和当前值 size_t tip, tail; // tip = 下一个空点的索引 mtx_t mtx; // 一个互斥 cnd_t cndPut, cndGet; // 两个条件变量 } Buffer; bool bufInit( Buffer *bufPtr, size_t size ); void bufDestroy(Buffer *bufPtr); bool bufPut(Buffer *bufPtr, int data); bool bufGet(Buffer *bufPtr, int *dataPtr, int sec); /* ------------------------------------------------------------- * buffer.c * 定义用于处理Buffer的函数 */ #include "buffer.h" #include <stdlib.h> // 为了使用malloc()和free() bool bufInit( Buffer *bufPtr, size_t size) { if ((bufPtr->data = malloc( size * sizeof(int))) == NULL) return false; bufPtr->size = size; bufPtr->count = 0; bufPtr->tip = bufPtr->tail = 0; return mtx_init( &bufPtr->mtx, mtx_plain) == thrd_success && cnd_init( &bufPtr->cndPut) == thrd_success && cnd_init( &bufPtr->cndGet) == thrd_success; } void bufDestroy(Buffer *bufPtr) { cnd_destroy( &bufPtr->cndGet ); cnd_destroy( &bufPtr->cndPut ); mtx_destroy( &bufPtr->mtx ); free( bufPtr->data ); } // 在缓冲区中插入一个新元素 bool bufPut(Buffer *bufPtr, int data) { mtx_lock( &bufPtr->mtx ); while (bufPtr->count == bufPtr->size) if (cnd_wait( &bufPtr->cndPut, &bufPtr->mtx ) != thrd_success) return false; bufPtr->data[bufPtr->tip] = data; bufPtr->tip = (bufPtr->tip + 1) % bufPtr->size; ++bufPtr->count; mtx_unlock( &bufPtr->mtx ); cnd_signal( &bufPtr->cndGet ); return true; } // 从缓冲区中移除一个元素 // 如果缓冲区是空的,则等待不超过sec秒 bool bufGet(Buffer *bufPtr, int *dataPtr, int sec) { struct timespec ts; timespec_get( &ts, TIME_UTC ); // 当前时间 ts.tv_sec += sec; // + sec秒延时 mtx_lock( &bufPtr->mtx ); while ( bufPtr->count == 0 ) if (cnd_timedwait(&bufPtr->cndGet, &bufPtr->mtx, &ts) != thrd_success) return false; *dataPtr = bufPtr->data[bufPtr->tail]; bufPtr->tail = (bufPtr->tail + 1) % bufPtr->size; --bufPtr->count; mtx_unlock( &bufPtr->mtx ); cnd_signal( &bufPtr->cndPut ); return true; }
例 2 中的 main()函数创建了一个缓冲区,并启动了若干个生产者和消费者线程,给予每个线程一个识别号码和一个指向缓冲区的指针。每个生产者线程创建一定数量的“产品”,然后用一个 return 语句退出。一个消费者线程如果在给定延时期间无法获得产品以进行消费,则直接返回。
【例2】启动生产者和消费者线程
- // producer_consumer.c
- #include "buffer.h"
- #include <stdio.h>
- #include <stdlib.h>
- #define NP 2 // 生产者的数量
- #define NC 3 // 消费者的数量
- int producer(void *); // 线程函数
- int consumer(void *);
- struct Arg { int id; Buffer *bufPtr; }; // 线程函数的参数
- _Noreturn void errorExit(const char* msg)
- {
- fprintf(stderr, "%s\n", msg); exit(0xff);
- }
- int main(void)
- {
- printf("Producer-Consumer Demo\n\n");
- Buffer buf; // 为5个产品创建一个缓冲区
- bufInit( &buf, 5 );
- thrd_t prod[NP], cons[NC]; // 线程
- struct Arg prodArg[NP], consArg[NC]; // 线程的参数
- int i = 0, res = 0;
- for ( i = 0; i < NP; ++i ) // 启动生产者
- {
- prodArg[i].id = i+1, prodArg[i].bufPtr = &buf;
- if (thrd_create( &prod[i], producer, &prodArg[i] ) != thrd_success)
- errorExit("Thread error.");
- }
- for ( i = 0; i < NC; ++i ) // 启动消费者
- {
- consArg[i].id = i+1, consArg[i].bufPtr = &buf;
- if ( thrd_create( &cons[i], consumer, &consArg[i] ) != thrd_success)
- errorExit("Thread error.");
- }
- for ( i = 0; i < NP; ++i ) // 等待线程结束
- thrd_join(prod[i], &res),
- printf("\nProducer %d ended with result %d.\n", prodArg[i].id, res);
- for ( i = 0; i < NC; ++i )
- thrd_join(cons[i], &res),
- printf("Consumer %d ended with result %d.\n", consArg[i].id, res);
- bufDestroy( &buf );
- return 0;
- }
- int producer(void *arg) // 生产者线程函数
- {
- struct Arg *argPtr = (struct Arg *)arg;
- int id = argPtr->id;
- Buffer *bufPtr = argPtr->bufPtr;
- int count = 0;
- for (int i = 0; i < 10; ++i)
- {
- int data = 10*id + i;
- if (bufPut( bufPtr, data ))
- printf("Producer %d produced %d\n", id, data), ++count;
- else
- { fprintf( stderr,
- "Producer %d: error storing %d\n", id, data);
- return -id;
- }
- }
- return count;
- }
- int consumer(void *arg) // 消费者线程函数
- {
- struct Arg *argPtr = (struct Arg *)arg;
- int id = argPtr->id;
- Buffer *bufPtr = argPtr->bufPtr;
- int count = 0;
- int data = 0;
- while (bufGet( bufPtr, &data, 2 ))
- {
- ++count;
- printf("Consumer %d consumed %d\n", id, data);
- }
- return count;
- }
C语言线程对象和线程存储
当每个线程为各自的变量使用全局标识符时,为保留这些变量各自的数据,可以采用线程对象(thread-local object)和线程存储(thread-specific storage)。
这两项技术允许在一个给定线程中执行的函数可以共享数据而不造成冲突,即便当其他线程也在执行同样函数的情况下。
使用线程对象
线程对象是在声明中包含新存储类修饰符 _Thread_local 的全局或静态对象。这意味着:每一个线程拥有属于自己的线程对象实例,它在线程启动时创建并初始化。对象的存储周期等于线程的运行时间。在一个线程内表达式里面的线程对象名,将引用这个对象在当前线程下的本地实例。
修饰符 _Thread_local 可以与修饰符 static 或 extern 同时使用。头文件 threads.h 定义了 thread_local 作为 _Thread_local 的同义词。在例 1 中,主线程和新启动线程各自拥有线程本地变量 var 的一个实例。
【例1】使用一个线程对象
#include <stdio.h> #include <threads.h> thread_local int var = 10; void print_var(void){ printf("var = %d\n", var); } int func(void *); // 线程函数 int main(int argc, char *argv[]) { thrd_t th1; if ( thrd_create( &th1, func, NULL ) != thrd_success ){ fprintf(stderr,"Error creating thread.\n"); return 0xff; } print_var(); // 输出:var = 10 thrd_join(th1, NULL); return 0; } int func(void *arg) // 线程函数 { var += 10; // 线程本地变量 print_var(); // 输出:var = 20 return 0 }
使用线程存储
线程存储技术要比线程对象更加灵活。例如,独立线程可以使用不同大小的内存。它们可以动态地分配内存,并通过调用析构函数再次释放内存。同时,可以使用相同的标识符访问这些独立线程所在的不同内存区域。
这种灵活性通过初始创建一个全局的键(key)实现,该键表示了一个指向线程存储的指针。然后,独立线程通过指定其线程存储的位置加载这个指针。该全局键值是类型为 tss_t 的对象。头文件 threads.h 包含了该类型的定义以及 4 个用于管理线程存储(简称 TSS)函数的声明:
int tss_create(tss_t*key,tss_dtor_t dtor);
通过析构函数 dtor 生成一个新的 TSS 指针,并且将 key 引用的对象设置为唯一标识该 TSS 指针的值。类型 tss_dtor_t 是一个函数指针,定义为 void(*)(void*)(它指的是一个函数指针,该函数参数为 void 指针,并且该函数没有返回值)。dtor 的返回值可以是一个空指针。
void tss_delete(tss_t key);
释放 TSS 键 key 所使用的所有资源。
int tss_set(tss_t key,void*val);
对于调用 tss_set()的线程,将 key 所标识的 TSS 指针设置为 val 所引用的内存地址。
void*tss_get(tss_t key);
返回指向内存块的指针,该内存块为正在调用的线程通过函数 tss_set()设置。如果发生错误,tss_get()返回 NULL。
如果函数 tss_create()和 tss_set()发生错误,则返回 thrd_error;否则,返回 thrd_success。
例 2 中的程序在动态分配的线程存储中,保留线程的名称。
【例2】使用线程存储
- #include <threads.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- tss_t key; // 用于TSS指针的全局键
- int thFunc(void *arg); // 线程函数
- void destructor(void *data); // 析构函数
- int main(void)
- {
- thrd_t th1, th2;
- int result1 = 0, result2 = 0;
- // 创建一个TSS密钥
- if (tss_create(&key, destructor) != thrd_success)
- return -1;
- // 创建线程
- if (thrd_create(&th1, thFunc, "Thread_1") != thrd_success
- || thrd_create(&th2, thFunc, "Thread_2") != thrd_success)
- return -2;
- thrd_join(th1, &result1); thrd_join(th2, &result2);
- if ( result1 != 0 || result2 != 0 )
- fputs("Thread error\n", stderr);
- else
- puts("Threads finished without error.");
- tss_delete(key); // 释放TSS指针所有的资源
- return 0;
- }
- void print(void) // 显示线程存储
- {
- printf( "print: %s\n", (char*)tss_get(key) );
- }
- int thFunc( void *arg )
- {
- char *name = (char*)arg;
- size_t size = strlen(name)+1;
- // 设置线程存储
- if ( tss_set(key, malloc(size)) != thrd_success )
- return -1;
- // 存储数据
- strcpy((char*)tss_get(key), name);
- print();
- return 0;
- }
- void destructor(void *data)
- {
- printf("Destructor for %s\n", (char*)data);
- free(data); // 释放内存
- }
标签:mtx,int,thrd,bufPtr,互斥,线程,多线程 From: https://www.cnblogs.com/596014054-yangdongsheng/p/10248288.html