首页 > 其他分享 >生产者消费者问题

生产者消费者问题

时间:2024-01-26 11:01:28浏览次数:30  
标签:消费者 生产者 问题 amount 线程 new public

1. 概述

  • 生产者-消费者问题(Producer-Consumer Problem)也称有限缓冲问题(Bounded-BufferProblem),是一个多线程同步问题的经典案例。
  • 生产者一消费者问题描述了两类访问共享缓冲区的线程(即所谓的生产者消费者)在实际运行时会发生的问题。
    • 生产者线程的主要功能是生成一定量的数据放到缓冲区中,然后重复此过程。
    • 消费者线程的主要功能是从缓冲区提取(或消耗)数据
  • 生产者一消费者问题关键是:
    1. 保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中为空时消耗数据。
    2. 保证在生产者加入过程、消费者消耗过程中,不会产生错误的数据和行为。
  • 生产者一消费者问题不仅仅是一个多线程同步问题的经典案例,而且业内已经将解决该问题的方案,抽象成为了一种设计模式生产者一消费者模式生产者一消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。

1.1 生产者一消费者模式

  • 生产者一消费者模式中,通常由两类线程,即生产者线程(若干个)和消费者线程(若干个)。生产者线程向数据缓冲区(DataBuffer)加入数据,消费者线程则从DataBuffer消耗数据。生产者和消费者、内存缓冲区之间的关系如图。

image

  • 生产者一消费者模式中,至少有以下关键点:
    1. 生产者与生产者之间、消费者与消费者之间,对数据缓冲区的操作是并发进行的。
    2. 数据缓冲区是有容量上限的。数据缓冲区满后,生产者不能再加入数据:DataBufer空时消费者不能再取出数据。
    3. 数据缓冲区是线程安全的。在并发操作数据区的过程中,不能出现数据不一致情况;或者在多个线程并发更改共享数据后,不会造成出现脏数据的情况。
    4. 生产者或者消费者线程在空闲时,需要尽可能阻塞而不是执行无效的空操作,尽量节约CPU资源。

1.2 一个线程不安全的实现版本

public class NotSafeDataBuffer<T> {
    public static final int MAX_AMOUNT = 10;
    private List<T> dataList = new LinkedList<>();
    
    //保存数量
    private AtomicInteger amount = new AtomicInteger(0);
    
    /**
     * 向数据区增加一个元素
     */
    public void add(T element) throws Exception {
        if (amount.get() > MAX_AMOUNT) {
            Print.tcfo("队列已经满了!");
            return;
        }
        dataList.add(element);
        System.out.println(element + "");
        amount.incrementAndGet();
        
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
    }
    
    /**
     * 从数据区取出一个元素
     */
    public T fetch() throws Exception {
        if (amount.get() <= 0) {
            System.out.println("队列已经空了!");
            return null;
        }
        T element = dataList.remove(0);
        System.out.println(element + "");
        amount.decrementAndGet();
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}

DataBuffer类型的实例属性dataList保存具体数据元素,实例属性amount保存元素的数量。DataBuffer类型有两个实例方法,实例方法add()用于向数据区增加元素,实例方法fetch()用于从数据区消耗元素。
add()实例方法中,加入元素之前首先会对amount是否达到上限进行判断,如果数据区满了则不能加入数据;在fetch()实例方法中,消耗元素前首先会对amount是否大于零进行判断,如果数据区空了,就不能取出数据。

1.3 生产者、消费者的逻辑与动作解耦

  • 生产者一消费者模式有多个不同版本的实现,这些版本的区别在于数据缓冲区(DataBuffer)类以及相应的生产、消费动作(Action)不同,而生产者类、消费者类的执行逻辑是相同的。
  • 分离变与不变是软件设计的一个基本原则。现在将生产者类、消费者类与具体的生产、消费Action解耦,从而使得生产者类、消费者类的代码在后续可以复用。生产者、消费者逻辑与对应Action解耦后的类结构图。

image

分离变与不变原则的背后蕴藏着丰富的软件工程思想,例如信息的分装与隐藏、系统的模块化、使用分层构架等。其中,是指易变的代码或者模块,不变就是指系统中不易变化的部分。

在解耦后的生产者一消费者模式:

  • 不变的部分为生产者(Producer)类、消费者(Consumer)类,后续可以直接复用,不需要修改代码;
  • 变化的部分为数据缓冲区(DataBufer)类以及相应的生产和消费动作,

2. 通用的 Producer 类实现

public class Producer implements Runnable {
    // 生产的时间间隔,产一次等待的时间,默认为200ms
    public static final int PRODUCE_GAP = 200;
    
    //总次数
    static final AtomicInteger TURN = new AtomicInteger(0);
    
    //生产者对象编号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);
    
    //生产者名称
    String name = null;
    
    //生产的动作
    Callable action = null;
    
    int gap = PRODUCE_GAP;
    
    public Producer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        if (this.gap <= 0) {
            this.gap = PRODUCE_GAP;
        }
        name = "生产者-" + PRODUCER_NO.incrementAndGet();
    }
    
    public Producer(Callable action) {
        this.action = action;
        this.gap = PRODUCE_GAP;
        name = "生产者-" + PRODUCER_NO.incrementAndGet();
    }
    
    @Override
    public void run() {
        while (true) {
            try {
                //执行生产动作
                Object out = action.call();
                
                //输出生产的结果
                if (null != out) {
                    System.out.println("第" + TURN.get() + "轮生产:" + out);
                }
                //每一轮生产之后,稍微等待一下
                sleepMilliSeconds(gap);
                
                //增加生产轮次
                TURN.incrementAndGet();
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

3. 通用的 Consumer 类实现

public class Consumer implements Runnable {
    
    //消费的时间间隔,默认等待100毫秒
    public static final int CONSUME_GAP = 100;
    
    //消费总次数
    static final AtomicInteger TURN = new AtomicInteger(0);
    
    //消费者对象编号
    static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);
    
    //消费者名称
    String name;
    
    //消费的动作
    Callable action = null;
    
    //消费一次等待的时间,默认为1000ms
    int gap = CONSUME_GAP;
    
    public Consumer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        name = "消费者-" + CONSUMER_NO.incrementAndGet();
        
    }
    
    public Consumer(Callable action) {
        this.action = action;
        this.gap = gap;
        this.gap = CONSUME_GAP;
        name = "消费者-" + CONSUMER_NO.incrementAndGet();
    }
    
    @Override
    public void run() {
        while (true) {
            //增加消费次数
            TURN.incrementAndGet();
            try {
                //执行消费动作
                Object out = action.call();
                if (null != out) {
                    System.out.println("第" + TURN.get() + "轮消费:" + out);
                }
                //每一轮消费之后,稍微等待一下
                sleepMilliSeconds(gap);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

4. 数据缓冲区实例、生产动作、消费动作的定义

public class NotSafePetStore {
    //共享数据区,实例对象
    private static NotSafeDataBuffer<IGoods> notSafeDataBuffer = new NotSafeDataBuffer();
    
    //生产者执行的动作
    static Callable<IGoods> produceAction = () ->
    {
        //首先生成一个随机的商品
        IGoods goods = Goods.produceOne();
        //将商品加上共享数据区
        try {
            notSafeDataBuffer.add(goods);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };
    //消费者执行的动作
    static Callable<IGoods> consumerAction = () ->
    {
        // 从PetStore获取商品
        IGoods goods = null;
        try {
            goods = notSafeDataBuffer.fetch();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };    
}

5. 生产者一消费者简单实现版本(不安全)

public static void main(String[] args) throws InterruptedException {
    System.setErr(System.out);

    // 同时并发执行的线程数
    final int THREAD_TOTAL = 20;
    //线程池,用于多线程模拟测试
    ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);
    for (int i = 0; i < 5; i++) {
        //生产者线程每生产一个商品,间隔500ms
        threadPool.submit(new Producer(produceAction, 500));
        //消费者线程每消费一个商品,间隔1500ms
        threadPool.submit(new Consumer(consumerAction, 1500));
    }
}

image

从以上异常可以看出,在向数据缓冲区进行元素的增加或者提取时,多个线程在并发执行对amountdataList两个成员操作时次序已经混乱,导致了数据不一致和线程安全问题。

6. 生产者一消费者简单实现版本(安全)

public class SafeDataBuffer<T> {
    public static final int MAX_AMOUNT = 10;
    private List<T> dataList = new LinkedList<>();
    
    //保存数量
    private AtomicInteger amount = new AtomicInteger(0);
    
    /**
     * 向数据区增加一个元素
     */
    public synchronized void add(T element) throws Exception {
        if (amount.get() > MAX_AMOUNT) {
            System.out.println("队列已经满了!");
            return;
        }
        dataList.add(element);
       	System.out.println(element + "");
        amount.incrementAndGet();
        
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
    }
    
    /**
     * 从数据区取出一个元素
     */
    public synchronized T fetch() throws Exception {
        if (amount.get() <= 0) {
            Print.tcfo("队列已经空了!");
            return null;
        }
        T element = dataList.remove(0);
        System.out.println(element + "");
        amount.decrementAndGet();
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}


public class SafePetStore {
    //共享数据区,实例对象
    private static SafeDataBuffer<IGoods> notSafeDataBuffer = new SafeDataBuffer();
    
    //生产者执行的动作
    static Callable<IGoods> produceAction = () ->
    {
        //首先生成一个随机的商品
        IGoods goods = Goods.produceOne();
        //将商品加上共享数据区
        try {
            notSafeDataBuffer.add(goods);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };
    //消费者执行的动作
    static Callable<IGoods> consumerAction = () ->
    {
        // 从PetStore获取商品
        IGoods goods = null;
        try {
            goods = notSafeDataBuffer.fetch();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };
    
    
    public static void main(String[] args) throws InterruptedException {
        System.setErr(System.out);
        
        // 同时并发执行的线程数
        final int THREAD_TOTAL = 20;
        //线程池,用于多线程模拟测试
        ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);
        for (int i = 0; i < 5; i++) {
            //生产者线程每生产一个商品,间隔500ms
            threadPool.submit(new Producer(produceAction, 500));
            //消费者线程每消费一个商品,间隔1500ms
            threadPool.submit(new Consumer(consumerAction, 1500));
        }
    }
    
}


虽然线程安全问题顺利解决,但是以上的解决方式使用了SafeDataBuffer的实例的对象锁作为同步锁,这样一来,所有的生产、消费动作在执行过程中都需要抢占同一个同步锁,最终的结果是所有的生产、消费动作都被串行化了。
高效率的生产者一消费者模式,生产、消费动作是肯定不能串行执行,而是需要并行执行的,而且并行化程度越高越好。

标签:消费者,生产者,问题,amount,线程,new,public
From: https://www.cnblogs.com/ccblblog/p/17988873

相关文章

  • [转]使用 mathjs 解决 js 小数精度问题
    原文地址:使用mathjs解决js小数精度问题-公瑾当年-博客园很经典的例子是0.1+0.2!=0.3(实际等于0.30000000000000004)不等的原因机器中采用二进制存储数据,比如,35会被存储为:00100011(2^5+2^1+2^0)。0.375会被存储为:0.011(1/2^2+1/2^3=1/4+1/8=0.375)而对于像......
  • MySQL间隙锁死锁问题
    一、场景还原当时同事A在线上代码中使用了Mybatis-plus的如下方法com.baomidou.mybatisplus.extension.service.IServicesaveOrUpdate(T, com.baomidou.mybatisplus.core.conditions.Wrapper<T>)该方法先执行了update操作,如果更新到就不再执行后续操作,如果没有更新到,才进行主......
  • 事务的隔离级别及脏读,不可重复读,幻读等问题
    事务隔离级别以及对应的问题如上所示。读未提交:在修改数据时在没有提交时就修改了数据库,如果修改回滚则又修改为原值,这样的话在修改与回滚之间读取的数据就是不准确的,会产生脏读现象。脏读现象是读取到未修改的数据,即是数据逻辑上不存在的数据(因为回滚未提交),而下面产生的问题均......
  • # python3 安装Crypto包 出现No module named ‘Crypto‘和No module named ‘Crypto.
    python3安装Crypto包出现Nomodulenamed‘Crypto‘和Nomodulenamed‘Crypto.Util‘问题解决方法1.改成安装pycryptodome然而在python36中无法报错:error:MicrosoftVisualC++14.0orgreaterisrequired"2.改用Anaconda安装指定版本的pycryptodomepipins......
  • 2024年1月Java项目开发指南12:前后端分离项目跨域问题解决
    创建config文件夹,创建WebConfig文件代码如下(可以直接抄)packagecc.xrilang.serversystem.config;importorg.springframework.context.annotation.Configuration;importorg.springframework.web.servlet.config.annotation.CorsRegistry;importorg.springframework.web.se......
  • Kafka 中一些常见的问题
    消息消费的顺序问题消息在被追加到Partition的时候都会分配一个特定的偏移量(offset),Kafka通过偏移量(offset)来保证消息在分区内的顺序性。为了保证Kafka中消息消费的顺序,可以采用以下2种方法:设置1个Topic只对应一个Partition破坏了Kafka的设计初衷,不推荐使用。......
  • AndroidStudio配置问题
    最开始的显示这个错误 解决方法:关闭代理,手动去下载gradle文件,先在这个设置里把代理关闭  然后在settings里面把gradle目录改为这个:然后按照AndroidStudio的提示,去这个网址把7.2的gradle下载下来下载下来后,把7.2的压缩包以及解压后的文件都放入这个随机码的目录下,......
  • 如何提高消费者运营的跨部门协同效率
    当下企业逐渐越来越重视用户的精细化运营。 如何将运营策略在不断的迭代的过程当中,逐渐形成有效策略库?在用户精细化运营的过程中,运营部门可能是会员运营团队或用户运营团队,但同时还需要协同Digital部门、商品部门或者门店运营部门共同将策略落地。在整个用户精细化运营策略落地的......
  • .NET GC的SustainedLowLatency模式引发内存的问题
    最近遇到一个问题,应用的内存占用升上去后一直降不下来,打了dump文件后发现GC的Generation0上有很多空白区间没释放,按道理第0代堆是经常回收的,怎么会有那么多空白区间呢?查阅了相关文档后,发现这是由代码中的System.Runtime.GCSettings.LatencyMode=System.Runtime.GCLatencyMode......
  • fiddler手机抓包遇到的问题
    问题一:SecureClientPipeDirectfailed:System.IO.IOException无法从传输连接中读取数据:远程主机强迫关闭了一个现有的连接。系统:IOS原因:证书未被信任,导致主机拒绝连接解决方法:确认fiddler证书是否在ios受信任列表操作步骤:①浏览器输入:ip:端口(比如10.10.49.15:8888)此时......