首页 > 编程语言 >多线程编程同步:互斥锁和条件变量

多线程编程同步:互斥锁和条件变量

时间:2023-10-16 10:00:29浏览次数:27  
标签:线程 NULL 编程 include 互斥 mutex pthread shared 多线程

多线程同步

怎样同步多个线程或多个进程的活动?

为允许在线程或进程间共享数据,同步通常是必需的。而互斥锁和条件变量是同步的基本组成部分。

互斥锁用于保护 临界区(critical region),以保证任何时刻只有一个线程在执行其中的代码,或者任何时刻只有一个进程在执行其中的代码。

互斥锁用于上锁,条件变量则用于等待。这两种不同类型的同步都是需要的。

多生产者-单消费者(仅考虑多生产者之间的同步)

例子说明:多个生产者线程执行完毕,才开始消费者线程。故仅需考虑生产者线程之间的同步问题

目的:

1)测试互斥锁对于生产者线程的同步作用

2)测试互斥锁的性能开销(主要指时间开销)

prodscons_mutex.c

//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>

//==============================================================================
// Constants
#define MAX_N_ITEMS     100000000
#define MAX_N_THREADS   100

#define MAX(a,b) ((a)>(b) ? (a):(b)) 
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct SHARED_ST
{
    pthread_mutex_t mutex;
    int buff[MAX_N_ITEMS];
    int nPut;
    int nVal;
};

//==============================================================================
// global varibles
static int g_nItems; // read-only by producer and consumer
static struct SHARED_ST g_shared = {PTHREAD_MUTEX_INITIALIZER};

//==============================================================================
// global functions

static void *produce(void *arg);
static void *consume(void *arg);

//==============================================================================
// The main entry-point function.

int main(int argc, char **argv)
{
    int i = 0, nThreads = 0;
    int count[MAX_N_THREADS] = {0};
    pthread_t tid_produce[MAX_N_THREADS] = {0};
    pthread_t tid_consume = 0;

    if (argc != 3)
    {
        printf("usage: %s <#item> <#threads>\n", argv[0]);
        exit(1);
    }
    g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
    nThreads = MIN(atoi(argv[2]), MAX_N_THREADS);

	if (pthread_setconcurrency(nThreads) != 0)
    {
        printf("set thread concurrency failed\n");
        exit(1);
    }

    time_t time1, time2;
    time1 = time((time_t *) NULL);
    /* start all the producer threads */
    for (i = 0; i < nThreads; i++)
    {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }
   
    /* wait for all the producer threads */
    for (i = 0; i < nThreads; i++)
    {
        pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);
    }
    time2 = time((time_t *) NULL);
    printf("%d producer produce %d data, use %.2f s\n", nThreads, g_nItems, difftime(time2, time1));
    
    /* start, then wait for the consumer thread */
    pthread_create(&tid_consume, NULL, consume, NULL);
    pthread_join(tid_consume, NULL);


    exit(0);
}

static void *produce(void *arg)
{
    while (1)
    {
        pthread_mutex_lock (&g_shared.mutex);

        if (g_shared.nPut >= g_nItems)
        {
            pthread_mutex_unlock(&g_shared.mutex);
            
            return (NULL); // array is full, we are done
        }
        g_shared.buff[g_shared.nPut] = g_shared.nVal;
        g_shared.nPut++;
        g_shared.nVal++;

        pthread_mutex_unlock(&g_shared.mutex);
        *((int *) arg) += 1;
    }

}

static void *consume(void *arg)
{
    int i = 0;
    for (i = 0; i < g_nItems; i++)
    {
        if (g_shared.buff[i] != i)
        {
            printf("buff[%d] = %d\n", i, g_shared.buff[i]);
        }
    }

    return (NULL);
}

使用3个线程生产1千万条数据:

使用1个线程生产1千万条数据:

取消互斥锁

在代码中注释互斥锁语句,重新编译运行。

使用3个线程生产100条数据:

count[i]的所有元素的值之和不等于100,生产数据出错。
buff[i] != i,意味着消费者线程消费数据时检测到生产者线程之间生产数据出错(正确的数据要求:buff[i] == i)。

互斥锁开销

生产者线程数量 生产数据数量 耗时(s)
1 1亿 2s
2 1亿 18s
3 1亿 9s
4 1亿 15s
5 1亿 16s

由上述结果可以得出结论:生产同样量的数据,为保证数据同步,使用多线程的互斥锁会产生额外的开销。

多生产者-单消费者(考虑多生产者之间的同步、生产者和消费者的同步)

例子说明:在所有生产者线程都启动后,立即启动消费者线程。不仅要考虑生产者线程之间的同步,而且还要考虑生产者线程和消费者线程之间的同步。

prodscons_mutex2.c

/*
 * @Description: consider between producer threads and consumer synchronization.
 * when producer threads started, the consumer start right now.
 * @Author: caojun
 * @version: 
 * @Date: 2023-10-13 10:01:24
 * @LastEditors: caojun
 * @LastEditTime: 2023-10-13 10:31:11
 */

//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>

//==============================================================================
// Constants
#define MAX_N_ITEMS     100000000
#define MAX_N_THREADS   100

#define MAX(a,b) ((a)>(b) ? (a):(b)) 
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct SHARED_ST
{
    pthread_mutex_t mutex;
    int buff[MAX_N_ITEMS];
    int nPut;
    int nVal;
};

//==============================================================================
// global varibles
static int g_nItems; // read-only by producer and consumer
static struct SHARED_ST g_shared = {PTHREAD_MUTEX_INITIALIZER};

//==============================================================================
// global functions

static void *produce(void *arg);
static void *consume(void *arg);

//==============================================================================
// The main entry-point function.

int main(int argc, char **argv)
{
    int i = 0, nThreads = 0;
    int count[MAX_N_THREADS] = {0};
    pthread_t tid_produce[MAX_N_THREADS] = {0};
    pthread_t tid_consume = 0;

    if (argc != 3)
    {
        printf("usage: %s <#item> <#threads>\n", argv[0]);
        exit(1);
    }
    g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
    nThreads = MIN(atoi(argv[2]), MAX_N_THREADS);

	if (pthread_setconcurrency(nThreads + 1) != 0)
    {
        printf("set thread concurrency failed\n");
        exit(1);
    }

    time_t time1, time2;
    time1 = time((time_t *) NULL);
    /* start all the producer threads and one consumer thread */
    for (i = 0; i < nThreads; i++)
    {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }
    pthread_create(&tid_consume, NULL, consume, NULL);
   
    /* wait for all the producer threads and the consumer */
    for (i = 0; i < nThreads; i++)
    {
        pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);
    }
    pthread_join(tid_consume, NULL);

    time2 = time((time_t *) NULL);
    printf("%d producer and 1 consumer handle %d data, use %.2f s\n", nThreads, g_nItems, difftime(time2, time1));

    exit(0);
}

static void *produce(void *arg)
{
    while (1)
    {
        pthread_mutex_lock (&g_shared.mutex);

        if (g_shared.nPut >= g_nItems)
        {
            pthread_mutex_unlock(&g_shared.mutex);
            
            return (NULL); // array is full, we are done
        }
        g_shared.buff[g_shared.nPut] = g_shared.nVal;
        g_shared.nPut++;
        g_shared.nVal++;

        pthread_mutex_unlock(&g_shared.mutex);
        *((int *) arg) += 1;
    }

}

static void consume_wait(int i)
{
    while (1)
    {
        pthread_mutex_lock(&g_shared.mutex);
        if (i < g_shared.nPut)
        {
            pthread_mutex_unlock(&g_shared.mutex);
            return ; // an item is ready
        }
        pthread_mutex_unlock(&g_shared.mutex);
    }
}

static void *consume(void *arg)
{
    int i = 0;
    for (i = 0; i < g_nItems; i++)
    {
        consume_wait(i);
        if (g_shared.buff[i] != i)
        {
            printf("buff[%d] = %d\n", i, g_shared.buff[i]);
        }
    }

    return (NULL);
}

启动3个生产者线程生产一千万条数据:

对于消费者线程来说,每次消费一条数据都要取询问数据是否准备好,若数据还在准备,就要不停的询问。这种轮询方式比较耗费cpu资源,应该修改为数据准备好后,唤醒消费者线程(引入条件变量)。

多生产者-单消费者 (条件变量)

例子说明:将生产者相关的变量和互斥锁组合到一个结构体中,把计数器、条件变量和互斥锁组合到另一个结构体中。
生产者每生产一条数据后,检查计数器nready是否为0,若nready==0,则唤醒消费者线程,然后将计数器nready加1。
消费者每消费一条数据前,都要检查计数器nready是否为0,若nready==0,则线程投入睡眠并等待唤醒信号。

prodscons_mutex_cond.c

/*
 * @Description: consider between producer threads and consumer synchronization.
 * when producer threads started, the consumer start right now.
 * @Author: caojun
 * @version: 
 * @Date: 2023-10-13 10:01:24
 * @LastEditors: caojun
 * @LastEditTime: 2023-10-13 10:31:11
 */

//==============================================================================
// Include files
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>

//==============================================================================
// Constants
#define MAX_N_ITEMS     100000000
#define MAX_N_THREADS   100

#define MAX(a,b) ((a)>(b) ? (a):(b)) 
#define MIN(a,b) ((a)<(b) ? (a):(b))
//==============================================================================
// types
struct PUT_ST
{
    pthread_mutex_t mutex;
    int nPut; // next index to store
    int nVal; // next value to store
};

struct READY_ST
{
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int nready; // number ready for consumer
};
//==============================================================================
// global varibles

static int g_nItems; // read-only by producer and consumer
static int g_buff[MAX_N_ITEMS];
static struct PUT_ST g_put = {PTHREAD_MUTEX_INITIALIZER};
static struct READY_ST g_ready = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};

//==============================================================================
// global functions

static void *produce(void *arg);
static void *consume(void *arg);

//==============================================================================
// The main entry-point function.

int main(int argc, char **argv)
{
    int i = 0, nThreads = 0;
    int count[MAX_N_THREADS] = {0};
    pthread_t tid_produce[MAX_N_THREADS] = {0};
    pthread_t tid_consume = 0;

    if (argc != 3)
    {
        printf("usage: %s <#item> <#threads>\n", argv[0]);
        exit(1);
    }
    g_nItems = MIN(atoi(argv[1]), MAX_N_ITEMS);
    nThreads = MIN(atoi(argv[2]), MAX_N_THREADS);

	if (pthread_setconcurrency(nThreads + 1) != 0)
    {
        printf("set thread concurrency failed\n");
        exit(1);
    }

    time_t time1, time2;
    time1 = time((time_t *) NULL);
    /* start all the producer threads and one consumer thread */
    for (i = 0; i < nThreads; i++)
    {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }
    pthread_create(&tid_consume, NULL, consume, NULL);
   
    /* wait for all the producer threads and the consumer */
    for (i = 0; i < nThreads; i++)
    {
        pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);
    }
    pthread_join(tid_consume, NULL);

    time2 = time((time_t *) NULL);
    printf("%d producer and 1 consumer handle %d data, use %.2f s\n", nThreads, g_nItems, difftime(time2, time1));

    exit(0);
}

static void *produce(void *arg)
{
    while (1)
    {
        pthread_mutex_lock (&g_put.mutex);
        if (g_put.nPut >= g_nItems)
        {
            pthread_mutex_unlock(&g_put.mutex);
            
            return (NULL); // array is full, we are done
        }
        g_buff[g_put.nPut] = g_put.nVal;
        g_put.nPut++;
        g_put.nVal++;
        pthread_mutex_unlock(&g_put.mutex);

        pthread_mutex_lock(&g_ready.mutex);
        if (g_ready.nready == 0)
        {
            pthread_cond_signal(&g_ready.cond);
        }
        g_ready.nready++;
        pthread_mutex_unlock(&g_ready.mutex);

        *((int *) arg) += 1;
    }
}

static void *consume(void *arg)
{
    int i = 0;
    for (i = 0; i < g_nItems; i++)
    {
        pthread_mutex_lock(&g_ready.mutex);
        while (g_ready.nready == 0)
        {
            pthread_cond_wait(&g_ready.cond, &g_ready.mutex);
        }
        g_ready.nready--;
        pthread_mutex_unlock(&g_ready.mutex);
        if (g_buff[i] != i)
        {
            printf("buff[%d] = %d\n", i, g_buff[i]);
        }
    }

    return (NULL);
}

启动3个生产者,生产1千万条数据:

参考引用

UNIX网络编程 卷2 进程间通信 第2版 【Richard Stevens】

标签:线程,NULL,编程,include,互斥,mutex,pthread,shared,多线程
From: https://www.cnblogs.com/caojun97/p/17761168.html

相关文章

  • 《Python计算机视觉编程》高清高质量电子书PDF
    下载:https://pan.quark.cn/s/3c386f89afec......
  • 经典多线程题目
    1.三种线程按顺序执行publicclassTest1{//privatestaticLoggerlog=Logger.getLogger(Test2.class);publicstaticvoidmain(String[]args)throwsInterruptedException{//创建三个线程按照线程a,b,c执行Threada=newPrintThread()......
  • 研发必会-异步编程利器之CompletableFuture(含源码 中)
    微信公众号访问地址:研发必会-异步编程利器之CompletableFuture(含源码中)近期热推文章:    1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表;    2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;    3、基于Redis的Geo实现附......
  • 同步和互斥概念
       ......
  • 《Unix/linux系统编程》教材第11章学习笔记
    第11章:EXT2文件系统EXT2文件系统Linux一直使用EXT2作为默认文件系统。EXT2文件系统数据结构创建虚拟硬盘mke2fs[-bblksize-Nninodes]devicenblockseg:ddif=/dev/zeroof=vdiskbs=1024count=1440mke2fsvdisk1440在一个名为vdisk的虚拟磁盘文件上创建一个EXT2文......
  • Python初学者指南:一步一步学习编程
    引言:欢迎来到Python的世界!Python是一种高级编程语言,以其简洁、易读的代码和广泛的应用领域而闻名。无论你是首次接触编程,还是已经熟悉其他语言,Python都是一个极好的选择。本文将为你提供一个Python的初学者指南,帮助你一步一步开始你的编程旅程。一、为什么选择Python?Python的设计......
  • 《Unix/Linux系统编程》教材学习笔记第十一章
    chapter11EXT2文件系统Linux一直使用EXT2(Card等1995)作为默认文件系统。EXT3(EXT3,2014)是EXT2的扩展。EXT3中增加的主要内容是一个日志文件,它将文件系统的变更记录在日志中。日志可在文件系统崩溃时更快地从错误中恢复。没有错误的EXT3文件系统与EXT2文件系统相同。EXT3的最新......
  • 实验三 互斥锁
    不加锁的多线程售票系统存在的问题售票系统实现代码#include<stdio.h>#include<pthread.h>#include<unistd.h>intticketAmout=2;//票的数量:全局变量void*ticketAgent(void*arg){intt=ticketAmout;if(t>0){printf("Oneticketsold\n");t--;}el......
  • 23-Vue组件化编程-非单文件组件和单文件组件
    非单文件组件一个文件中包含有n个组件 Vue中使用组件的三大步骤1.定义组件(也就是创建组件)2.注册组件(这里有局部注册和全局注册)3.使用组件(编写组件标签) 注册组件(局部注册)靠newVue的时候传入components选项<!DOCTYPEhtml><htmllang="en"><head><meta......
  • 免费 AI 编程助手 Amazon CodeWhisperer 体验
    文章作者:米菲爸爸2022年6月23亚马逊云科技就已经推出了AmazonCodeWhisperer(预览版)。经过不到一年的测试和AIGC的飓风在2023年4月18日实时AI编程助手AmazonCodeWhisperer正式可用AmazonCodeWhisperer是一种采用机器学习(ML)的服务,可以根据开发人员用自......