多线程同步
怎样同步多个线程或多个进程的活动?
为允许在线程或进程间共享数据,同步通常是必需的。而互斥锁和条件变量是同步的基本组成部分。
互斥锁用于保护 临界区(critical region)
,以保证任何时刻只有一个线程在执行其中的代码,或者任何时刻只有一个进程在执行其中的代码。
互斥锁用于上锁,条件变量则用于等待。这两种不同类型的同步都是需要的。
多生产者-单消费者(仅考虑多生产者之间的同步)
例子说明:多个生产者线程执行完毕,才开始消费者线程。故仅需考虑生产者线程之间的同步问题
目的:
1)测试互斥锁对于生产者线程的同步作用
2)测试互斥锁的性能开销(主要指时间开销)
prodscons_mutex.c
//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
//==============================================================================
// Constants
#define MAX_N_ITEMS 100000000
#define MAX_N_THREADS 100
#define MAX(a,b) ((a)>(b) ? (a):(b))
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct SHARED_ST
{
pthread_mutex_t mutex;
int buff[MAX_N_ITEMS];
int nPut;
int nVal;
};
//==============================================================================
// global varibles
static int g_nItems; // read-only by producer and consumer
static struct SHARED_ST g_shared = {PTHREAD_MUTEX_INITIALIZER};
//==============================================================================
// global functions
static void *produce(void *arg);
static void *consume(void *arg);
//==============================================================================
// The main entry-point function.
int main(int argc, char **argv)
{
int i = 0, nThreads = 0;
int count[MAX_N_THREADS] = {0};
pthread_t tid_produce[MAX_N_THREADS] = {0};
pthread_t tid_consume = 0;
if (argc != 3)
{
printf("usage: %s <#item> <#threads>\n", argv[0]);
exit(1);
}
g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
nThreads = MIN(atoi(argv[2]), MAX_N_THREADS);
if (pthread_setconcurrency(nThreads) != 0)
{
printf("set thread concurrency failed\n");
exit(1);
}
time_t time1, time2;
time1 = time((time_t *) NULL);
/* start all the producer threads */
for (i = 0; i < nThreads; i++)
{
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
/* wait for all the producer threads */
for (i = 0; i < nThreads; i++)
{
pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
time2 = time((time_t *) NULL);
printf("%d producer produce %d data, use %.2f s\n", nThreads, g_nItems, difftime(time2, time1));
/* start, then wait for the consumer thread */
pthread_create(&tid_consume, NULL, consume, NULL);
pthread_join(tid_consume, NULL);
exit(0);
}
static void *produce(void *arg)
{
while (1)
{
pthread_mutex_lock (&g_shared.mutex);
if (g_shared.nPut >= g_nItems)
{
pthread_mutex_unlock(&g_shared.mutex);
return (NULL); // array is full, we are done
}
g_shared.buff[g_shared.nPut] = g_shared.nVal;
g_shared.nPut++;
g_shared.nVal++;
pthread_mutex_unlock(&g_shared.mutex);
*((int *) arg) += 1;
}
}
static void *consume(void *arg)
{
int i = 0;
for (i = 0; i < g_nItems; i++)
{
if (g_shared.buff[i] != i)
{
printf("buff[%d] = %d\n", i, g_shared.buff[i]);
}
}
return (NULL);
}
使用3个线程生产1千万条数据:
使用1个线程生产1千万条数据:
取消互斥锁
在代码中注释互斥锁语句,重新编译运行。
使用3个线程生产100条数据:
count[i]的所有元素的值之和不等于100,生产数据出错。
buff[i] != i,意味着消费者线程消费数据时检测到生产者线程之间生产数据出错(正确的数据要求:buff[i] == i)。
互斥锁开销
生产者线程数量 | 生产数据数量 | 耗时(s) |
---|---|---|
1 | 1亿 | 2s |
2 | 1亿 | 18s |
3 | 1亿 | 9s |
4 | 1亿 | 15s |
5 | 1亿 | 16s |
由上述结果可以得出结论:生产同样量的数据,为保证数据同步,使用多线程的互斥锁会产生额外的开销。
多生产者-单消费者(考虑多生产者之间的同步、生产者和消费者的同步)
例子说明:在所有生产者线程都启动后,立即启动消费者线程。不仅要考虑生产者线程之间的同步,而且还要考虑生产者线程和消费者线程之间的同步。
prodscons_mutex2.c
/*
* @Description: consider between producer threads and consumer synchronization.
* when producer threads started, the consumer start right now.
* @Author: caojun
* @version:
* @Date: 2023-10-13 10:01:24
* @LastEditors: caojun
* @LastEditTime: 2023-10-13 10:31:11
*/
//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
//==============================================================================
// Constants
#define MAX_N_ITEMS 100000000
#define MAX_N_THREADS 100
#define MAX(a,b) ((a)>(b) ? (a):(b))
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct SHARED_ST
{
pthread_mutex_t mutex;
int buff[MAX_N_ITEMS];
int nPut;
int nVal;
};
//==============================================================================
// global varibles
static int g_nItems; // read-only by producer and consumer
static struct SHARED_ST g_shared = {PTHREAD_MUTEX_INITIALIZER};
//==============================================================================
// global functions
static void *produce(void *arg);
static void *consume(void *arg);
//==============================================================================
// The main entry-point function.
int main(int argc, char **argv)
{
int i = 0, nThreads = 0;
int count[MAX_N_THREADS] = {0};
pthread_t tid_produce[MAX_N_THREADS] = {0};
pthread_t tid_consume = 0;
if (argc != 3)
{
printf("usage: %s <#item> <#threads>\n", argv[0]);
exit(1);
}
g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
nThreads = MIN(atoi(argv[2]), MAX_N_THREADS);
if (pthread_setconcurrency(nThreads + 1) != 0)
{
printf("set thread concurrency failed\n");
exit(1);
}
time_t time1, time2;
time1 = time((time_t *) NULL);
/* start all the producer threads and one consumer thread */
for (i = 0; i < nThreads; i++)
{
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
pthread_create(&tid_consume, NULL, consume, NULL);
/* wait for all the producer threads and the consumer */
for (i = 0; i < nThreads; i++)
{
pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
pthread_join(tid_consume, NULL);
time2 = time((time_t *) NULL);
printf("%d producer and 1 consumer handle %d data, use %.2f s\n", nThreads, g_nItems, difftime(time2, time1));
exit(0);
}
static void *produce(void *arg)
{
while (1)
{
pthread_mutex_lock (&g_shared.mutex);
if (g_shared.nPut >= g_nItems)
{
pthread_mutex_unlock(&g_shared.mutex);
return (NULL); // array is full, we are done
}
g_shared.buff[g_shared.nPut] = g_shared.nVal;
g_shared.nPut++;
g_shared.nVal++;
pthread_mutex_unlock(&g_shared.mutex);
*((int *) arg) += 1;
}
}
static void consume_wait(int i)
{
while (1)
{
pthread_mutex_lock(&g_shared.mutex);
if (i < g_shared.nPut)
{
pthread_mutex_unlock(&g_shared.mutex);
return ; // an item is ready
}
pthread_mutex_unlock(&g_shared.mutex);
}
}
static void *consume(void *arg)
{
int i = 0;
for (i = 0; i < g_nItems; i++)
{
consume_wait(i);
if (g_shared.buff[i] != i)
{
printf("buff[%d] = %d\n", i, g_shared.buff[i]);
}
}
return (NULL);
}
启动3个生产者线程生产一千万条数据:
对于消费者线程来说,每次消费一条数据都要取询问数据是否准备好,若数据还在准备,就要不停的询问。这种轮询方式比较耗费cpu资源,应该修改为数据准备好后,唤醒消费者线程(引入条件变量)。
多生产者-单消费者 (条件变量)
例子说明:将生产者相关的变量和互斥锁组合到一个结构体中,把计数器、条件变量和互斥锁组合到另一个结构体中。
生产者每生产一条数据后,检查计数器nready是否为0,若nready==0,则唤醒消费者线程,然后将计数器nready加1。
消费者每消费一条数据前,都要检查计数器nready是否为0,若nready==0,则线程投入睡眠并等待唤醒信号。
prodscons_mutex_cond.c
/*
* @Description: consider between producer threads and consumer synchronization.
* when producer threads started, the consumer start right now.
* @Author: caojun
* @version:
* @Date: 2023-10-13 10:01:24
* @LastEditors: caojun
* @LastEditTime: 2023-10-13 10:31:11
*/
//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
//==============================================================================
// Constants
#define MAX_N_ITEMS 100000000
#define MAX_N_THREADS 100
#define MAX(a,b) ((a)>(b) ? (a):(b))
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct PUT_ST
{
pthread_mutex_t mutex;
int nPut; // next index to store
int nVal; // next value to store
};
struct READY_ST
{
pthread_mutex_t mutex;
pthread_cond_t cond;
int nready; // number ready for consumer
};
//==============================================================================
// global varibles
static int g_nItems; // read-only by producer and consumer
static int g_buff[MAX_N_ITEMS];
static struct PUT_ST g_put = {PTHREAD_MUTEX_INITIALIZER};
static struct READY_ST g_ready = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
//==============================================================================
// global functions
static void *produce(void *arg);
static void *consume(void *arg);
//==============================================================================
// The main entry-point function.
int main(int argc, char **argv)
{
int i = 0, nThreads = 0;
int count[MAX_N_THREADS] = {0};
pthread_t tid_produce[MAX_N_THREADS] = {0};
pthread_t tid_consume = 0;
if (argc != 3)
{
printf("usage: %s <#item> <#threads>\n", argv[0]);
exit(1);
}
g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
nThreads = MIN(atoi(argv[2]), MAX_N_THREADS);
if (pthread_setconcurrency(nThreads + 1) != 0)
{
printf("set thread concurrency failed\n");
exit(1);
}
time_t time1, time2;
time1 = time((time_t *) NULL);
/* start all the producer threads and one consumer thread */
for (i = 0; i < nThreads; i++)
{
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
pthread_create(&tid_consume, NULL, consume, NULL);
/* wait for all the producer threads and the consumer */
for (i = 0; i < nThreads; i++)
{
pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
pthread_join(tid_consume, NULL);
time2 = time((time_t *) NULL);
printf("%d producer and 1 consumer handle %d data, use %.2f s\n", nThreads, g_nItems, difftime(time2, time1));
exit(0);
}
static void *produce(void *arg)
{
while (1)
{
pthread_mutex_lock (&g_put.mutex);
if (g_put.nPut >= g_nItems)
{
pthread_mutex_unlock(&g_put.mutex);
return (NULL); // array is full, we are done
}
g_buff[g_put.nPut] = g_put.nVal;
g_put.nPut++;
g_put.nVal++;
pthread_mutex_unlock(&g_put.mutex);
pthread_mutex_lock(&g_ready.mutex);
if (g_ready.nready == 0)
{
pthread_cond_signal(&g_ready.cond);
}
g_ready.nready++;
pthread_mutex_unlock(&g_ready.mutex);
*((int *) arg) += 1;
}
}
static void *consume(void *arg)
{
int i = 0;
for (i = 0; i < g_nItems; i++)
{
pthread_mutex_lock(&g_ready.mutex);
while (g_ready.nready == 0)
{
pthread_cond_wait(&g_ready.cond, &g_ready.mutex);
}
g_ready.nready--;
pthread_mutex_unlock(&g_ready.mutex);
if (g_buff[i] != i)
{
printf("buff[%d] = %d\n", i, g_buff[i]);
}
}
return (NULL);
}
启动3个生产者,生产1千万条数据:
参考引用
UNIX网络编程 卷2 进程间通信 第2版 【Richard Stevens】
标签:线程,NULL,编程,include,互斥,mutex,pthread,shared,多线程 From: https://www.cnblogs.com/caojun97/p/17761168.html