生产者与消费者问题:
(一)基础:
(1.0)生产者消费者的背景
1》为了平衡生产者和消费者的处理能力,起到一个数据缓存的作用,同时也达到了一个解耦的作用
在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才能够继续生产数据,因为生产那么多也没有地方放啊;同理如果消费者的速度大于生产者那么消费者就会经常处理等待状态,所以为了达到生产者和消费者生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模式;
(1.1)生产者消费者的优点
1》解耦:
将生产者类和消费者类进行解耦,消除代码之间的依赖性,简化工作负载的管理;
2》复用:
通过将生产者类和消费者类独立开来,那么可以对生产者类和消费者类进行独立的复用与扩展
3》调整并发数:
由于生产者和消费者的处理速度是不一样的,可以调整并发数,给予慢的一方多的并发数,来提高任务的处理速度;
4》异步:
对于生产者和消费者来说能够各司其职,生产者只需要关心缓冲区是否还有数据,不需要等待消费者处理完;同样的对于消费者来说,也只需要关注缓冲区的内容,不需要关注生产者,通过异步的方式支持高并发,将一个耗时的流程拆成生产和消费两个阶段,这样生产者因为执行put()的时间比较短,而支持高并发;
5》支持分布式:
生产者和消费者通过队列进行通讯,所以不需要运行在同一台机器上,在分布式环境中可以通过redis的list作为队列,而消费者只需要轮询队列中是否有数据。同时还能支持集群的伸缩性,当某台机器宕掉的时候,不会导致整个集群宕掉;
(1.2)为什么要有缓冲区
不让生产者直接调用消费者的某个函数,引入缓冲区好处是:
1. 解耦;
2. 支持并发;(生产者不用等待消费者完成)
3. 支持忙闲不均;
(1.3)环形队列缓冲区
对于常见的队列实现:在每次 push 时,可能涉及到【堆内存】的分配;在每次 pop 时,可能涉及【堆内存】的释放。假如流量很大,频繁地 push、pop,那内存分配的开销就有可能很大;
从上图可以看出,环形缓冲区所有的 push/pop 操作都是在一个【固定】的存储空间内进行。而队列缓冲区在 push 的时候,可能会分配存储空间用于存储新元素;在 pop 时,可能会释放废弃元素的存储空间。所以环形方式相比队列方式,少掉了对于缓冲区元素所用存储空间的分配、释放。这是环形缓冲区的一个主要优势。
和线程中的队列缓冲区类似,线程中的环形缓冲区也要考虑线程安全的问题。除非你使用的环形缓冲区的库已经帮你实现了线程安全,否则你还是得自己动手搞定。
(1.4)双缓冲区
(1.5)生产者和消费者,生产者和生产者,消费者和消费者的关系:
生产者与消费者模型是用来解决资源供求的问题,生产者与消费者其实就可以分别看做是资源的提供者与使用者。通过这个,我们可以分析出他们之间的关系:
1)生产者与消费者之间是供求关系(同步)
2)两个生产者之间是竞争关系(互斥)
3)两个消费者之间也是竞争关系(互斥)
(1.6)总结:
通过这个关系我们可以看到生产者与消费者之间需要进行资源的交易,生产者生产出来的资源要被消费者使用,那么就需要一个交易场所,也就是一个消费场所。
所以我们提供一个“321”原则用来记忆;3代表3种关系,2代表2个角色,1代表一个消费场所。
我这里给一个大概的结构图来解释生产者与消费者模型:
(二)信号量实现解析:
1.生产者只有存在空闲的缓冲区单元时才可以向其中放产品,令空闲缓冲区的信号量为empty;
2.消费者只有存在满缓冲区时才可以取产品,令满缓存区的信号量为full.
3.缓冲区为一个临界资源,使用一个互斥信号量mutex来保证生产者和消费者不能同时访问缓冲区。
ps:此中以及下面的注意是使用信号量实现生产者、消费者的分析;
(三)信号量实现注意:
1.full+empty=缓冲区的个数;
2.如果缓冲区单元的个数>1,则需要设置互斥信号量;
如果缓冲区的单元个数只有1个,则可以不设置互斥信号量;与生产者,消费者的人数无关。
3.先后顺序1:
向缓冲区放产品->从缓冲区取产品:
所以:向缓冲区放产品,V(full),P(full),从缓冲区取产品;
V(full)为发生消息,P(full)为接受消息;
4.先后顺序2:
从缓冲区取产品->向缓冲区放产品;
所以:从缓冲区取产品,V(empty),P(empty),向缓冲区放产品;
V发送消息,P接受消息。
5.对临界资源的保护:
P->临界资源->V
6.生产者,消费者问题是典型的进程同步的问题;
(四)实现:
(4.1)信号量+ 互斥锁 实现生产者、消费者
(4.1.1)思路
信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程/进程(车辆)都会将该整数减一(通过它当然是为了使用资源),当该整数值为零时,所有试图通过它的线程都将处于等待状态。在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。当一个线程调用Wait操作时,它要么得到资源然后将信号量减一,要么一直等下去(指放入阻塞队列),直到信号量大于等于一时。Release(释放)实际上是在信号量上执行加操作,对应于车辆离开停车场,该操作之所以叫做“释放”是因为释放了由信号量守护的资源。
(4.1.2)相关函数
sem_t 类型:信号量变量类型;
sem_init();
sem_destory();
int sem_wait(sem_t * sem);
int sem_post(sem_t * sem);
1)设定两个信号量,empty用来表示空槽的个数,full用来表示占有的个数;
而互斥锁仅仅是为了防止多个线程同时对队列进行操作,造成未知的结果。
2)生产者在向任务队列里放资源时,调用sem_wait(&full)来检查队列是否已满,如果满的话,就阻塞,直到有消费者从里面取资源再苏醒,如果不满,就放资源,并通知消费者来取。
3)消费者在从任务队列里取资源时,调用sem_wait(&empty)来检查队列是否为空,如果空的话,就阻塞,直到有生产者向里面放资源再苏醒,如果不空,就取资源,并通知生产者来放。
(4.1.3)伪代码:
/*多个生产者,多个消费者,n个缓冲区单元*/
Semaphore full=0;//满缓冲区单元个数
Semaphore empty=n;//空缓冲区单元个数
Semaphore mutex=1;//控制对临界区的互斥信号量main(){
producer();
consumer();
}
//生产者进程
producer(){
while(1){
生产一个产品produce; p(empty);//空单元个数-1
p(mutex);//保护临界资源
buffer[i]=produce;//将产品放入缓冲区;
i=(i+1)%n;
v(mutex); v(full);//满单元个数+1;
}
}//消费者进程
consumer(){
while(1){
p(full);//满单元个数-1p(mutex);//保护临界资源
producer=buffer[out];//从缓冲区中取一个产品
out=(out+1)%n;
v(mutex); v(empty);
}
}(4.1.4)代码实现
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#define MAX 5 //队列长度
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
sem_t full; //填充的个数,初始化为0;
sem_t empty; //空槽的个数,初始化为Max;
int top = 0; //队尾
int bottom = 0; //队头
void* produce(void* arg)
{
int i;
for ( i = 0; i < MAX*2; i++)
{
printf("producer is preparing data\n");
sem_wait(&empty);//若空槽个数低于0阻塞
pthread_mutex_lock(&mutex);
top = (top+1) % MAX;
printf("now top is %d\n", top);
pthread_mutex_unlock(&mutex);
sem_post(&full);
}
return (void*)1;
}
void* consume(void* arg)
{
int i;
for ( i = 0; i < MAX*2; i++)
{
printf("consumer is preparing data\n");
sem_wait(&full);//若填充个数低于0阻塞
pthread_mutex_lock(&mutex);
bottom = (bottom+1) % MAX;
printf("now bottom is %d\n", bottom);
pthread_mutex_unlock(&mutex);
sem_post(&empty);
}
return (void*)2;
}
int main(int argc, char *argv[])
{
pthread_t thid1;
pthread_t thid2;
pthread_t thid3;
pthread_t thid4;
int ret1;
int ret2;
int ret3;
int ret4;
sem_init(&full, 0, 0);
sem_init(&empty, 0, MAX);
pthread_create(&thid1, NULL, produce, NULL);
pthread_create(&thid2, NULL, consume, NULL);
pthread_create(&thid3, NULL, produce, NULL);
pthread_create(&thid4, NULL, consume, NULL);
pthread_join(thid1, (void**)&ret1);
pthread_join(thid2, (void**)&ret2);
pthread_join(thid3, (void**)&ret3);
pthread_join(thid4, (void**)&ret4);
return 0;
}
注:如果把sem_wait()和sem_post()放到pthread_mutex_lock()与pthread_mutex_unlock()之间会如何呢?
答:死锁,因为我们不能预知线程进入共享区顺序,如果消费者线程先对mutex加锁,并进入,sem_wait()发现队列为空,阻塞,而生产者在对mutex加锁时,发现已上锁也阻塞,双方永远无法唤醒对方。
(4.2)互斥量+线程条件变量实现生产者与消费者
(4.2.1)思路
条件变量的常见用法是在不满足某些条件时,阻塞自己,直到有线程通知自己醒来。而互斥量在这里的作用依然还是防止多线程对共享资源同时操作,造成未知结果。
生产者消费者的行为与之前相同,只不过原来只调用sem_wait()可以完成两步,1是检查条件,2是阻塞;
现在条件变量需要我们自己来设定条件
(所以说条件变量配合互斥锁比信号量的功能更强大,因为它可以自定义休眠条件,但是这对使用者的要求也提高了,必须理清逻辑关系避免死锁)
(4.2.2)相关函数
pthread_mutex_t 变量类型;
pthread_mutex_unlock();
pthread_mutex_lock();
pthread_cond_t 条件变量类型;
pthread_cond_wait();
pthread_cond_notify();
初始化:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t notfull = PTHREAD_COND_INITIALIZER; //是否队满
pthread_cond_t notempty = PTHREAD_COND_INITIALIZER; //是否队空
条件变量与互斥锁一般来说是同步的,目的就是为了防止资源的竞争。其主要作用就是为了在消费者申请到锁,但是条件变量为假的时候,及时的释放锁资源。其实就是为了避免死锁;
(4.2.3)代码实现1
#include <stdio.h>
#include <pthread.h>
#define MAX 5
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t notfull = PTHREAD_COND_INITIALIZER; //是否队满
pthread_cond_t notempty = PTHREAD_COND_INITIALIZER; //是否队空
int top = 0;
int bottom = 0;
void* produce(void* arg)
{
int i;
for ( i = 0; i < MAX*2; i++)
{
pthread_mutex_lock(&mutex);
while ((top+1)%MAX == bottom)
{
printf("full! producer is waiting\n");
pthread_cond_wait(notfull, &mutex);//队列满时,等待队不满
}
top = (top+1) % MAX;
printf("now top is %d\n", top);
pthread_cond_signal(notempty);//发出队非空的消息
pthread_mutex_unlock(&mutex);
}
return (void*)1;
}
void* consume(void* arg)
{
int i;
for ( i = 0; i < MAX*2; i++)
{
pthread_mutex_lock(&mutex);
while ( top%MAX == bottom)
{
printf("empty! consumer is waiting\n");
pthread_cond_wait(notempty, &mutex);//队列空时,等待队不空
}
bottom = (bottom+1) % MAX;
printf("now bottom is %d\n", bottom);
pthread_cond_signal(notfull);//发出队不满的消息
pthread_mutex_unlock(&mutex);
}
return (void*)2;
}
int main(int argc, char *argv[])
{
pthread_t thid1;
pthread_t thid2;
pthread_t thid3;
pthread_t thid4;
int ret1;
int ret2;
int ret3;
int ret4;
pthread_create(&thid1, NULL, produce, NULL);
pthread_create(&thid2, NULL, consume, NULL);
pthread_create(&thid3, NULL, produce, NULL);
pthread_create(&thid4, NULL, consume, NULL);
pthread_join(thid1, (void**)&ret1);
pthread_join(thid2, (void**)&ret2);
pthread_join(thid3, (void**)&ret3);
pthread_join(thid4, (void**)&ret4);
return 0;
}
说明:
Q: 为什么信号量在互斥区外,而条件变量在互斥区内呢?
A: 因为互斥锁本质上是二元信号量,和信号量互斥的原理相同,而且放在互斥区会死锁,而条件变量是和互斥锁协同配合的,
我们从pthread_cond_wait()和pthread_cond_signal()的内部实现就可以看出
pthread_cond_wait()是先将互斥锁解开,并陷入阻塞,直到pthread_signal()发出信号后pthread_cond_wait()再加上锁,然后退出,可以看到它们在设计时就是为了协同配合,而互斥锁和信号量都是由Linux下的futex机制实现的,这里就不展开说了
ps: 所以,使用条件变量时,条件变量在互斥锁内是不会死锁;
Q: 此中的条件变量+ 互斥量 与 上面的互斥量+信号量的实现效果的区别:
A:
(4.2.4)代码实现2
#include<stdio.h>
#include<stdlib.h>
#include<time.h>
#include<pthread.h>
#define BUFFER_SIZE 8
struct prodcons {
int buffer[BUFFER_SIZE];
pthread_mutex_t lock; //互斥LOCK
int readpos , writepos;
pthread_cond_t notempty; //缓冲区非空条件判断
pthread_cond_t notfull; //缓冲区未满条件判断
};
void init(struct prodcons * b){
pthread_mutex_init(&b->lock,NULL);
pthread_cond_init(&b->notempty,NULL);
pthread_cond_init(&b->notfull,NULL);
b->readpos=0;
b->writepos=0;
}
void put(struct prodcons* b,int data){
pthread_mutex_lock(&b->lock);
if((b->writepos + 1) % BUFFER_SIZE == b->readpos)
{
pthread_cond_wait(&b->notfull, &b->lock) ;
}
b->buffer[b->writepos]=data;
b->writepos++;
if(b->writepos >= BUFFER_SIZE)
b->writepos=0;
pthread_cond_signal(&b->notempty);
pthread_mutex_unlock(&b->lock);
}
int get(struct prodcons *b){
int data;
pthread_mutex_lock(&b->lock);
if(b->writepos == b->readpos)
{
pthread_cond _wait(&b->notempty, &b->lock);
}
data = b->buffer[b->readpos];
b->readpos++;
if(b->readpos >= BUFFER_SIZE)
b->readpos=0;
pthread_cond_signal(&b->notfull);
pthread_mutex_unlock(&b->lock);
return data;
}
#define OVER (-1)
struct prodcons buffer;
void *producer(void *data)
{
int n;
for(n = 0; n < 10000; n++)
{
printf("%d \n", n) ;
put(&buffer, n);
}
put(&buffer, OVER);
return NULL;
}
void *consumer(void * data)
{
int d;
while(1)
{
d = get(&buffer);
if(d == OVER)
break;
printf("%d\n", d);
}
return NULL;
}
int main(void)
{
pthread_t th_a, th_b;
void *retval;
init(&buffer);
pthread_create(&th_a, NULL, producer, 0);
pthread_create(&th_b, NULL, consumer, 0);
pthread_join(th_a, &retval);
pthread_join(th_b, &retval);
return 0;
}
(五)拓展:
(1)拓展一:
1)问题:
一个生产者,一个消费者,一个缓冲区单元;
2)实现:
Semaphore empty=1;
Semaphore full=0;
main(){
producer();
consumer();
}//生产者进程:
producer(){
while(1){
p(empty);
向缓冲区放产品;
v(full);
}
}
//消费者进程:
consumer(){
while(1){
p(full);
从缓冲区取产品;
f(empty);
}
}
3)分析:
因为只有一个缓冲区单元,所以生产者与消费者不可能同时访问缓冲区,所以不需要设置互斥变量保护缓冲区,与生产者,消费者个数无关。
----
(2)拓展二:
1)问题:
多个生产者,一个消费者,一个缓冲区单元:
2)实现:
同上
----
(3)拓展三:
1)问题:
一个生产者,一个消费者,n个缓冲区单元;
2)实现:
同多个生产者,多个消费者,n个缓冲区单元;
3)分析:
如果不设置mutex互斥信号量对缓冲区保护,则是错误的,即使消费者,生产者只有一个。
如:消费者放一个产品,然后消费者再放入一个产品,此时full=2;
如果此时消费者与生产者同时访问缓冲区,由于没有mutex互斥信号量的保护,所以可以同时访问缓冲区。
--
(六)生产者,消费者变形: