首页 > 其他分享 >多线程系列(十二) -生产者和消费者模型

多线程系列(十二) -生产者和消费者模型

时间:2024-03-04 16:11:05浏览次数:27  
标签:消费者 Thread 生产者 十二 value 线程 缓冲区 多线程

一、简介

在 Java 多线程编程中,还有一个非常重要的设计模式,它就是:生产者和消费者模型。

这种模型可以充分发挥 cpu 的多线程特性,通过一些平衡手段能有效的提升系统整体处理数据的速度,减轻系统负载,提高程序的效率和稳定性,同时实现模块之间的解耦。

那什么是生产者和消费者模型呢?

简单的说,生产者和消费者之间不直接进行交互,而是通过一个缓冲区来进行交互,生产者负责生成数据,然后存入缓冲区;消费者则负责处理数据,从缓冲区获取。

大致流程图如下:

对于最简单的生产者和消费者模型,总结下来,大概有以下几个特点:

  • 缓冲区为空的时候,消费者不能消费,会进入休眠状态,直到有新数据进入缓冲区,再次被唤醒
  • 缓冲区填满的时候,生产者不能生产,也会进入休眠状态,直到缓冲区有空间,再次被唤醒

生产者和消费者模型作为一个非常重要的设计模型,它的优点在于:

  • 解耦:生产者和消费者之间不直接进行交互,即使生产者和消费者的代码发生变化,也不会对对方产生影响
  • 消峰:例如在某项工作中,假如 A 操作生产数据的速度很快,B 操作处理速度很慢,那么 A 操作就必须等待 B 操作完成才能结束,反之亦然。如果将 A 操作和B 操作进行解耦,中间插入一个缓冲区,这样 A 操作将生产的数据存入缓冲区,就接受了;B 操作从缓冲区获取数据并进行处理,平衡好 A 操作和 B 操作之间的缓冲区,可以显著提升系统的数据处理能力

生产者和消费者模型的应用场景非常多,例如 Java 的线程池任务执行框架、消息中间件 rabbitMQ 等,因此掌握生产者和消费者模型,对于开发者至关重要。

下面我们通过几个案例,一起来了解一下生产者和消费者设计模型的实践思路。

二、代码实践

2.1、利用 wait / notify 方法实现思路

生产者和消费者模型,最简单的一种技术实践方案就是基于线程的 wait() / notify() 方法,也就是通知和唤醒机制,可以将两个操作实现解耦,具体代码实践如下。

/**
 * 缓冲区容器类
 */
public class Container {

    /**
     * 缓冲区最大容量
     */
    private int capacity = 3;

    /**
     * 缓冲区
     */
    private LinkedList<Integer> list = new LinkedList<Integer>();


    /**
     * 添加数据到缓冲区
     * @param value
     */
    public synchronized void add(Integer value) {
        if(list.size() >= capacity){
            System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
            try {
                // 进入等待状态
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
        list.add(value);

        //唤醒其他所有处于wait()的线程,包括消费者和生产者
        notifyAll();
    }


    /**
     * 从缓冲区获取数据
     */
    public synchronized void get() {
        if(list.size() == 0){
            System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
            try {
                // 进入等待状态
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 从头部获取数据,并移除元素
        Integer val = list.removeFirst();
        System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);

        //唤醒其他所有处于wait()的线程,包括消费者和生产者
        notifyAll();
    }
}
/**
 * 生产者类
 */
public class Producer extends Thread{

    private Container container;

    public Producer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 6; i++) {
            container.add(i);
        }
    }
}
/**
 * 消费者类
 */
public class Consumer extends Thread{

    private Container container;

    public Consumer(Container container) {
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 6; i++) {
            container.get();
        }
    }
}
/**
 * 测试类
 */
public class MyThreadTest {

    public static void main(String[] args) {
        Container container = new Container();
        Producer producer = new Producer(container);
        Consumer consumer = new Consumer(container);

        producer.start();
        consumer.start();
    }
}

运行结果如下:

生产者:Thread-0,add:0
生产者:Thread-0,add:1
生产者:Thread-0,add:2
生产者:Thread-0,缓冲区已满,生产者进入waiting...
消费者:Thread-1,value:0
消费者:Thread-1,value:1
消费者:Thread-1,value:2
消费者:Thread-1,缓冲区为空,消费者进入waiting...
生产者:Thread-0,add:3
生产者:Thread-0,add:4
生产者:Thread-0,add:5
消费者:Thread-1,value:3
消费者:Thread-1,value:4
消费者:Thread-1,value:5

从日志上可以很清晰的看到,生产者线程生产一批数据之后,当缓冲区已经满了,会进入等待状态,此时会通知消费者线程;消费者线程处理完数据之后,当缓冲区没有数据时,也会进入等待状态,再次通知生产者线程。

2.2、利用 await / signal 方法实现思路

除此之外,我们还可以利用ReentrantLockCondition类中的 await() / signal() 方法实现生产者和消费者模型。

缓冲区容器类,具体代码实践如下。

/**
 * 缓冲区容器类
 */
public class Container {

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    private int capacity = 3;

    private LinkedList<Integer> list = new LinkedList<Integer>();


    /**
     * 添加数据到缓冲区
     * @param value
     */
    public void add(Integer value) {
        boolean flag = false;
        try {
            flag = lock.tryLock(3, TimeUnit.SECONDS);
            if(list.size() >= capacity){
                System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
                // 进入等待状态
                condition.await();
            }
            System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
            list.add(value);

            //唤醒其他所有处于wait()的线程,包括消费者和生产者
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(flag){
                lock.unlock();
            }
        }
    }


    /**
     * 从缓冲区获取数据
     */
    public void get() {
        boolean flag = false;
        try {
            flag = lock.tryLock(3, TimeUnit.SECONDS);
            if(list.size() == 0){
                System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
                // 进入等待状态
                condition.await();
            }
            // 从头部获取数据,并移除元素
            Integer val = list.removeFirst();
            System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);

            //唤醒其他所有处于wait()的线程,包括消费者和生产者
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(flag){
                lock.unlock();
            }
        }
    }
}

生产者、消费者、测试类代码,跟上文一致,运行结果和上文介绍的也是一样。

2.3、多生产者和消费者的实现思路

上面介绍的都是一个生产者线程和一个消费者线程,模型比较简单。实际上,在业务开发中,经常会出现多个生产者线程和多个消费者线程,按照以上的实现思路,会出现什么情况呢?

有可能会出现程序假死现象!下面我们来分析一下案例,假如有两个生产者线程 a1、a2,两个消费者线程 b1、b2,执行过程如下:

  • 1.生产者线程 a1 执行生产数据的操作,发现缓冲区数据已经填满了,然后进入等待阶段,同时向外发起通知,唤醒其它线程
  • 2.因为线程唤醒具有随机性,本应该唤醒消费者线程 b1,结果可能生产者线程 a2 被唤醒,检查缓冲区数据已经填满了,又进入等待阶段,紧接向外发起通知,消费者线程得不到被执行的机会
  • 3.消费者线程 b1、b2,也有可能会出现这个现象,本应该唤醒生产者线程,结果唤醒了消费者线程

遇到这种情况,应该如何解决呢?

因为ReentrantLockCondition的结合,编程具有高度灵活性,我们可以采用这种组合解决多生产者和多消费者中的假死问题。

具体实现逻辑如下:

/**
 * 缓冲区容器类
 */
public class ContainerDemo {

    private Lock lock = new ReentrantLock();
    private Condition producerCondition = lock.newCondition();
    private Condition consumerCondition = lock.newCondition();

    private int capacity = 3;
    private LinkedList<Integer> list = new LinkedList<Integer>();


    /**
     * 添加数据到缓冲区
     * @param value
     */
    public void add(Integer value) {
        boolean flag = false;
        try {
            flag = lock.tryLock(3, TimeUnit.SECONDS);
            if(list.size() >= capacity){
                System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
                // 生产者进入等待状态
                producerCondition.await();
            }
            System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
            list.add(value);

            // 唤醒所有消费者处于wait()的线程
            consumerCondition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(flag){
                lock.unlock();
            }
        }
    }


    /**
     * 从缓冲区获取数据
     */
    public void get() {
        boolean flag = false;
        try {
            flag = lock.tryLock(3, TimeUnit.SECONDS);
            if(list.size() == 0){
                System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
                // 消费者进入等待状态
                consumerCondition.await();
            }
            // 从头部获取数据,并移除元素
            Integer val = list.removeFirst();
            System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);

            // 唤醒所有生产者处于wait()的线程
            producerCondition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(flag){
                lock.unlock();
            }
        }
    }
}
/**
 * 生产者
 */
public class Producer extends Thread{

    private ContainerDemo container;

    private Integer value;

    public Producer(ContainerDemo container, Integer value) {
        this.container = container;
        this.value = value;
    }

    @Override
    public void run() {
        container.add(value);
    }
}
/**
 * 消费者
 */
public class Consumer extends Thread{

    private ContainerDemo container;

    public Consumer(ContainerDemo container) {
        this.container = container;
    }

    @Override
    public void run() {
        container.get();
    }
}

/**
 * 测试类
 */
public class MyThreadTest {

    public static void main(String[] args) {
        ContainerDemo container = new ContainerDemo();

        List<Thread> threadList = new ArrayList<>();
        // 初始化6个生产者线程
        for (int i = 0; i < 6; i++) {
            threadList.add(new Producer(container, i));
        }
        // 初始化6个消费者线程
        for (int i = 0; i < 6; i++) {
            threadList.add(new Consumer(container));
        }

        // 启动线程
        for (Thread thread : threadList) {
            thread.start();
        }
    }
}

运行结果如下:

生产者:Thread-0,add:0
生产者:Thread-1,add:1
生产者:Thread-2,add:2
生产者:Thread-3,缓冲区已满,生产者进入waiting...
生产者:Thread-4,缓冲区已满,生产者进入waiting...
生产者:Thread-5,缓冲区已满,生产者进入waiting...
消费者:Thread-6,value:0
消费者:Thread-7,value:1
生产者:Thread-3,add:3
生产者:Thread-4,add:4
生产者:Thread-5,add:5
消费者:Thread-8,value:2
消费者:Thread-9,value:3
消费者:Thread-10,value:4
消费者:Thread-11,value:5

通过ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用对应的signalAll()方法就可以解决假死现象。

三、小结

最后我们来总结一下,对于生产者和消费者模型,通过合理的编程实现,可以充分充分发挥 cpu 多线程的特性,显著的提升系统处理数据的效率。

对于生产者和消费者模型中的假死现象,可以使用ReentrantLock定义两个Condition,进行交叉唤醒,以解决假死问题。

四、参考

1、https://www.cnblogs.com/xrq730/p/4855663.html

标签:消费者,Thread,生产者,十二,value,线程,缓冲区,多线程
From: https://www.cnblogs.com/dxflqm/p/18051189

相关文章

  • 多线程系列(十一) -浅析并发读写锁StampedLock
    一、摘要在上一篇文章中,我们讲到了使用ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。如果继续深入的分析ReadWriteLock,从锁的角度分析,会发现它有一个潜在的问题:如果有线程正在读数据,写线程准备修改数据的时候,需要等待读线程释放锁后才能获取写锁,简单的说就是,读......
  • 《程序是怎样跑起来的》第十二章观后感
    在《程序是怎样跑起来的》的第十二章中,我们进一步深入到了计算机科学的一个至关重要的领域——软件调试与测试。这一章节的内容对于理解如何确保软件的正确性和稳定性有着举足轻重的作用,它揭示了软件开发过程中不可或缺的调试和测试环节。首先,本章深入探讨了软件调试的基本概念和......
  • python中的多线程及锁介绍
    线程CPU执行调度的最小单位。不能独立存在,依赖进程存在。一个进程至少有一个线程,叫做主线程,另外还有内核线程、用户线程。线程之间共享内存。线程之间的通信效率远高于进程间通信效率,线程之间切换代价也比进程小很多。适用场景Python的多线程适用于IO密集型任务。多任务可以......
  • 多线程限流工具类-Semaphore
    Semaphore介绍Semaphore(信号量)是JAVA多线程中的一个工具类,它可以通过指定参数来控制执行线程数量,一般用于限流访问某个资源时使用。Semaphore使用示例需求场景:用一个核心线程数为6,最大线程数为20的线程池执行任务,但是要求最多只能同时运行3个线程代码:publicclassdemo{......
  • C#多线程
    在C#编程中,多线程是实现高效并发编程的关键技术之一。通过创建多个线程,程序可以同时执行多个任务,从而充分利用多核处理器的计算能力。本文将带你快速回顾C#多线程的基础知识,通过10分钟的学习,你将能够掌握多线程的核心概念,并学会使用C#语言创建和管理线程。一、多线程基础概念在C......
  • 并发编程补充:基于多线程实现并发的套接字通信
    服务端:fromsocketimport*fromthreadingimportThreaddefcommunicate(conn):whileTrue:try:data=conn.recv(1024)ifnotdata:breakconn.send(data.upper())exceptConnectionResetError:......
  • 对于需要实时处理的代码语句 就用定时器中断模式,实现多线程模式,建议不要用查询模式。
    对于需要实时处理的代码语句就用定时器中断模式,实现多线程模式,建议不要用查询模式。 示例代码1:查看代码#include"delay.h"#include"sysInt.h"#include"intrins.h"charSMGDuan[]={0x5B,0x3F,0x5B,0x66, 0x40,0x40, 0x3F,0x3F}; //2024--MMcharsegDuan[]={0x3F,0......
  • pyqt5中多线程爬虫
       设立爬虫Class,继承pyqt5中的Thread函数中使用普通线程  整体代码:importsysimportpandasaspdimportjson,requests,time,threadingfromPyQt5.QtWidgetsimportQMainWindow,QApplication,QVBoxLayout,QMessageBoxfromui.ui_testimportUi_MainWind......
  • c++多线程按行读取同一个每行长度不规则文件
    对于非常大的比如上百G的大文件读取,单线程读是非常非常慢的,需要考虑用多线程读,多个线程读同一个文件时不用加锁的,每个线程打开一个独立的文件句柄多线程读同一个文件实现思路思路1先打开一个文件句柄,获取整个文件大小file_size确定要采用线程读取的部分大小read_size和......
  • c++多线程编程
    c++线程库:<thread>创建线程:需要可调用的函数或者函数对象作为线程入口点例:std::threadthreadname(function_name,args...)在C++中,当使用std::thread创建线程并传递类的成员函数时,需要使用&来获取成员函数的地址,同时还需要传递对象的指针(或引用)作为第一个参数。......