首页 > 其他分享 >生产者-消费者模式:多线程并发协作的经典案例

生产者-消费者模式:多线程并发协作的经典案例

时间:2024-12-08 22:59:00浏览次数:11  
标签:Thread lock 并发 协作 线程 static 多线程 public wait

生产者-消费者模式是多线程并发编程中一个非常经典的模式,它通过解耦生产者和消费者的关系,使得两者可以独立工作,从而提高系统的并发性和可扩展性。本文将详细介绍生产者-消费者模式的概念、实现方式以及应用场景。

1 生产者-消费者模式概述

生产者-消费者模式包含两类线程:

  • 生产者线程:负责生产数据,并将数据放入共享数据区。
  • 消费者线程:负责从共享数据区中取出数据并进行消费。

为了解耦生产者和消费者的关系,通常会使用一个共享的数据区域(如队列)作为缓冲区。生产者将数据放入缓冲区,消费者从缓冲区中取出数据,两者之间不需要直接通信。

共享数据区域需要具备以下功能:

  • 当缓冲区已满时,阻塞生产者线程,防止其继续生产数据。
  • 当缓冲区为空时,阻塞消费者线程,防止其继续消费数据。

2 wait/notify 的消息通知机制

wait/notify 是 Java 中用于线程间通信的经典机制,通过 Object 类提供的 waitnotify/notifyAll 方法,可以实现线程的等待和唤醒。下面将详细介绍 wait/notify 机制的使用方法、注意事项以及常见问题的解决方案。

2.1 wait 方法

wait 方法用于将当前线程置入休眠状态,直到其他线程调用 notifynotifyAll 方法唤醒它。

  • 调用条件wait 方法必须在同步方法或同步块中调用,即线程必须持有对象的监视器锁(monitor lock)。
  • 释放锁:调用 wait 方法后,当前线程会释放锁,并进入等待状态。
  • 异常:如果线程在调用 wait 方法时没有持有锁,则会抛出 IllegalMonitorStateException 异常。

示例代码:

synchronized (lockObject) {
    try {
        lockObject.wait();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

2.2 notify 方法

notify 方法用于唤醒一个正在等待该对象监视器锁的线程。

  • 调用条件notify 方法也必须在同步方法或同步块中调用,线程必须持有对象的监视器锁。
  • 唤醒线程notify 方法会从等待队列中随机选择一个线程进行唤醒,使其从 wait 方法处退出,并进入同步队列,等待获取锁。
  • 释放锁:调用 notify 后,当前线程不会立即释放锁,而是在退出同步块后才会释放锁。

示例代码:

synchronized (lockObject) {
    lockObject.notify();
}

2.3 notifyAll 方法

notifyAll 方法与 notify 方法类似,但它会唤醒所有正在等待该对象监视器锁的线程。

  • 唤醒所有线程notifyAll 方法会将所有等待队列中的线程移入同步队列,等待获取锁。

示例代码:

synchronized (lockObject) {
    lockObject.notifyAll();
}

2.4 wait/notify 机制的常见问题及解决方案

2.4.1 notify 早期通知

问题描述:如果 notify 方法在 wait 方法之前被调用,可能会导致通知遗漏,使得等待的线程一直处于阻塞状态。

示例代码:

public class EarlyNotify {
    private static String lockObject = "";

    public static void main(String[] args) {
        WaitThread waitThread = new WaitThread(lockObject);
        NotifyThread notifyThread = new NotifyThread(lockObject);
        notifyThread.start();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        waitThread.start();
    }

    static class WaitThread extends Thread {
        private String lock;

        public WaitThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    System.out.println(Thread.currentThread().getName() + "  进去代码块");
                    System.out.println(Thread.currentThread().getName() + "  开始wait");
                    lock.wait();
                    System.out.println(Thread.currentThread().getName() + "   结束wait");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class NotifyThread extends Thread {
        private String lock;

        public NotifyThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + "  进去代码块");
                System.out.println(Thread.currentThread().getName() + "  开始notify");
                lock.notify();
                System.out.println(Thread.currentThread().getName() + "   结束开始notify");
            }
        }
    }
}

解决方案:添加一个状态标志,在 wait 方法调用前判断状态是否已经改变。

优化后的代码:

public class EarlyNotify {
    private static String lockObject = "";
    private static boolean isWait = true;

    public static void main(String[] args) {
        WaitThread waitThread = new WaitThread(lockObject);
        NotifyThread notifyThread = new NotifyThread(lockObject);
        notifyThread.start();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        waitThread.start();
    }

    static class WaitThread extends Thread {
        private String lock;

        public WaitThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    while (isWait) {
                        System.out.println(Thread.currentThread().getName() + "  进去代码块");
                        System.out.println(Thread.currentThread().getName() + "  开始wait");
                        lock.wait();
                        System.out.println(Thread.currentThread().getName() + "   结束wait");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class NotifyThread extends Thread {
        private String lock;

        public NotifyThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + "  进去代码块");
                System.out.println(Thread.currentThread().getName() + "  开始notify");
                lock.notifyAll();
                isWait = false;
                System.out.println(Thread.currentThread().getName() + "   结束开始notify");
            }
        }
    }
}

2.4.2 等待条件发生变化

问题描述:如果线程在等待时接收到通知,但之后等待的条件发生了变化,可能会导致程序出错。

示例代码:

public class ConditionChange {
    private static List<String> lockObject = new ArrayList();

    public static void main(String[] args) {
        Consumer consumer1 = new Consumer(lockObject);
        Consumer consumer2 = new Consumer(lockObject);
        Productor productor = new Productor(lockObject);
        consumer1.start();
        consumer2.start();
        productor.start();
    }

    static class Consumer extends Thread {
        private List<String> lock;

        public Consumer(List lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    if (lock.isEmpty()) {
                        System.out.println(Thread.currentThread().getName() + " list为空");
                        System.out.println(Thread.currentThread().getName() + " 调用wait方法");
                        lock.wait();
                        System.out.println(Thread.currentThread().getName() + "  wait方法结束");
                    }
                    String element = lock.remove(0);
                    System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Productor extends Thread {
        private List<String> lock;

        public Productor(List lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " 开始添加元素");
                lock.add(Thread.currentThread().getName());
                lock.notifyAll();
            }
        }
    }
}

解决方案:在 wait 方法退出后再次检查等待条件。

优化后的代码:

public class ConditionChange {
    private static List<String> lockObject = new ArrayList();

    public static void main(String[] args) {
        Consumer consumer1 = new Consumer(lockObject);
        Consumer consumer2 = new Consumer(lockObject);
        Productor productor = new Productor(lockObject);
        consumer1.start();
        consumer2.start();
        productor.start();
    }

    static class Consumer extends Thread {
        private List<String> lock;

        public Consumer(List lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    while (lock.isEmpty()) {
                        System.out.println(Thread.currentThread().getName() + " list为空");
                        System.out.println(Thread.currentThread().getName() + " 调用wait方法");
                        lock.wait();
                        System.out.println(Thread.currentThread().getName() + "  wait方法结束");
                    }
                    String element = lock.remove(0);
                    System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Productor extends Thread {
        private List<String> lock;

        public Productor(List lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " 开始添加元素");
                lock.add(Thread.currentThread().getName());
                lock.notifyAll();
            }
        }
    }
}

2.4.3 “假死”状态

问题描述:在多消费者和多生产者的情况下,使用 notify 方法可能会导致“假死”状态,即所有线程都处于等待状态,无法被唤醒。

原因分析:如果多个生产者线程调用了 wait 方法阻塞等待,其中一个生产者线程获取到对象锁后使用 notify 方法唤醒其他线程,如果唤醒的仍然是生产者线程,就会导致所有生产者线程都处于等待状态。

解决方案:将 notify 方法替换成 notifyAll 方法。

总结:在使用 wait/notify 机制时,应遵循以下原则:

  • 永远在 while 循环中对条件进行判断,而不是在 if 语句中进行 wait 条件的判断。
  • 使用 notifyAll 而不是 notify

基本的使用范式如下:

synchronized (sharedObject) {
    while (condition) {
        sharedObject.wait();
        // (Releases lock, and reacquires on wakeup)
    }
    // do action based upon condition e.g. take or put into queue
}

3 实现生产者-消费者模式的三种方式

生产者-消费者模式可以通过以下三种方式实现:

3.1 使用 Objectwait/notify 机制

Object 类提供了 waitnotify/notifyAll 方法,用于线程间的通信。

  • wait 方法:使当前线程进入等待状态,直到其他线程调用 notifynotifyAll 方法唤醒它。调用 wait 方法前,线程必须持有对象的监视器锁,否则会抛出 IllegalMonitorStateException 异常。
  • notify 方法:唤醒一个正在等待该对象监视器锁的线程。如果有多个线程在等待,则随机选择一个唤醒。
  • notifyAll 方法:唤醒所有正在等待该对象监视器锁的线程。

示例代码:

public class ProductorConsumer {
    private static LinkedList<Integer> list = new LinkedList<>();
    private static final int MAX_SIZE = 10;

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor());
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer());
        }
    }

    static class Productor implements Runnable {
        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.size() == MAX_SIZE) {
                            System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                            list.wait();
                        }
                        int data = new Random().nextInt();
                        System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + data);
                        list.add(data);
                        list.notifyAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.isEmpty()) {
                            System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                            list.wait();
                        }
                        int data = list.removeFirst();
                        System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + data);
                        list.notifyAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

3.2 使用 LockConditionawait/signal 机制

LockCondition 提供了比 Objectwait/notify 更灵活的线程通信机制。Condition 对象可以通过 lock.newCondition() 创建,并提供了 awaitsignal/signalAll 方法。

  • await 方法:使当前线程进入等待状态,直到其他线程调用 signalsignalAll 方法唤醒它。
  • signal 方法:唤醒一个正在等待该 Condition 的线程。
  • signalAll 方法:唤醒所有正在等待该 Condition 的线程。

示例代码:

public class ProductorConsumer {
    private static LinkedList<Integer> list = new LinkedList<>();
    private static final int MAX_SIZE = 10;
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition full = lock.newCondition();
    private static Condition empty = lock.newCondition();

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor());
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer());
        }
    }

    static class Productor implements Runnable {
        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.size() == MAX_SIZE) {
                        System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                        full.await();
                    }
                    int data = new Random().nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + data);
                    list.add(data);
                    empty.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.isEmpty()) {
                        System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                        empty.await();
                    }
                    int data = list.removeFirst();
                    System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + data);
                    full.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

3.3 使用 BlockingQueue 实现

BlockingQueue 是 Java 并发包中提供的一个接口,它提供了可阻塞的插入和移除操作。BlockingQueue 非常适合用来实现生产者-消费者模型,因为它可以自动处理线程的阻塞和唤醒。

  • put 方法:如果队列已满,则阻塞生产者线程,直到队列有空闲空间。
  • take 方法:如果队列为空,则阻塞消费者线程,直到队列中有数据。

示例代码:

public class ProductorConsumer {
    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor());
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer());
        }
    }

    static class Productor implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    int data = new Random().nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + data);
                    queue.put(data);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    int data = queue.take();
                    System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + data);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

4 生产者-消费者模式的应用场景

生产者-消费者模式广泛应用于以下场景:

  • 任务执行框架:如 Executor 框架,将任务的提交和执行解耦,提交任务的操作相当于生产者,执行任务的操作相当于消费者。
  • 消息中间件:如 MQ(消息队列),用户下单相当于生产者,处理订单的线程相当于消费者。
  • 任务处理时间较长:如上传附件并处理,用户上传附件相当于生产者,处理附件的线程相当于消费者。

5 生产者-消费者模式的优点

  • 解耦:生产者和消费者之间通过缓冲区进行通信,彼此独立,简化了系统的复杂性。
  • 复用:生产者和消费者可以独立复用和扩展,提高了代码的可维护性。
  • 调整并发数:可以根据生产者和消费者的处理速度调整并发数,优化系统性能。
  • 异步:生产者和消费者各司其职,生产者不需要等待消费者处理完数据,消费者也不需要等待生产者生产数据,提高了系统的响应速度。
  • 支持分布式:生产者和消费者可以通过分布式队列进行通信,支持分布式系统的扩展。

6 小结

生产者-消费者模式是多线程并发编程中的经典模式,通过解耦生产者和消费者的关系,提高了系统的并发性和可扩展性。本文介绍了三种实现生产者-消费者模式的方式,并给出了相应的示例代码。通过理解这些实现方式,可以更好地应用生产者-消费者模式解决实际问题。

7 思维导图

在这里插入图片描述

8 参考链接

从根上理解生产者-消费者模式

标签:Thread,lock,并发,协作,线程,static,多线程,public,wait
From: https://blog.csdn.net/gaosw0521/article/details/144161206

相关文章

  • 记一次线上高并发环境 TCP 握手丢包的故障处理
    背景业务场景需要有客户端通过tcp连接线上环境emqx集群环境,集群规模有5台node节点承载emqx业务,每台节点在业务端口上都有15w左右的tcp连接保持。近期发现与emqx相关的业务功能会出现间歇性的连接等待状态,索性运维同学在内网环境进行网络层的连接测试,确实复现了连接间......
  • 【并发编程】第三章 在线程之间共享数据
    3.1线程间共享数据的问题如果所有共享数据都是只读的,则不会有问题,因为一个线程读取的数据不受另一个线程的影响不变量(invariants):在程序或数据结构的特定状态下始终为真的属性或条件无论代码如何执行,这个不变量都应该始终保持成立。如果不成立,那就可能出现了错误考虑一个......
  • Java多线程与线程池技术详解(四)
    接受失败:“失败是什么?没有什么,只是更走近成功一步;成功是什么?就是走过了所有通向失败的路,只剩下一条路,那就是成功的路。”这句话很好地诠释了如何看待失败的问题,即每一次跌倒都是通往胜利道路上不可或缺的一部分。创造机会:“不要等待机会,而要创造机会。”这句话鼓励人们主动出......
  • .NET线程池技术详解与优化策略:提升高并发应用性能的关键
    在现代应用程序中,线程池是管理和调度线程的一种重要技术。它通过提供一个可重用的线程集合,避免了频繁创建和销毁线程的开销,从而提升了系统的性能和响应能力。.NET平台的线程池是一个高度优化的资源管理机制,它支持高并发应用的开发,尤其是在Web应用和服务中,广泛用于后台任务处理......
  • 7.1 多线程 QThread 与 Qt Concurrent
    7.1多线程QThread与QtConcurrentQt提供了多种方法实现多线程编程,包括低层次的QThread类和高层次的QtConcurrent模块。多线程编程是现代应用程序中提升性能和响应能力的重要手段,特别是对于需要大量计算或I/O操作的场景。一、多线程编程的基础概念在Qt中,多......
  • 【项目设计】->高并发内存池(谷歌开源项目简化版)
    目录​编辑项目介绍内存池池化技术内存池malloc定长内存池的实现定长内存池模拟实现:高并发内存池框架设计高并发内存池的三小只threadcache(主线1开始)设计思路threadcache哈希桶映射对齐规则对齐映射函数的编写ThreadCache类编写 threadcache无锁访问(不熟)c......
  • Mitel MiCollab 企业协作平台 任意文件读取漏洞复现(CVE-2024-41713)
    0x01产品简介MitelMiCollab是加拿大Mitel(敏迪)公司推出的一款企业级协作平台,旨在为企业提供统一、高效、安全的通信与协作解决方案。通过该平台,员工可以在任何时间、任何地点,使用任何设备,实现即时通信、语音通话、视频会议、文件共享等功能,从而提升工作效率和团队协作能力。......
  • 【多线程编程】多线程编程的基本概念
    3.1多线程编程的基本概念3.1.1为何要用多线程前面的绝大多数程序都是单线程程序,如果程序中有多个任务,比如读写文件、更新用户界面、网络连接、打印文档等操作,比如按照先后次序,先完成前面的任务才能执行后面的任务。如果某个任务持续的时间较长,比如读写一个大文件,那么用......
  • 多线程和多进程的区别与相同
    一:进程的定义 进程是操作系统分配资源的基本单位,每个进程之间的资源互不相通,不进行资源共享(除非使用管道或者其他共享资源的手段),每个进程都有独立的PCB(操作系统用于管理进程的数据结构,包含进程的基本信息,如进程ID、状态、优先级、程序计数器、寄存器集合、进程的内存管理信......
  • 并发编程系列---【数组切割并行查询-解决sql的in超过1000的问题】
    1.问题List<List<Object>>splitList=CollUtil.split(dataList,800);List<User>resultList=newArrayList<>();//使用parallelStream输出切割后的结果,每个子列表的大小splitList.parallelStream().forEach(sublist->{List<User>users=userMa......