生产者消费者模式(Java实现)
-
定义
在⼀个⽣产环境中,⽣产者和消费者在同⼀时间段内共享同⼀块缓冲区,⽣产者负责向缓冲区添加数 据,消费者负责从缓冲区取出数据
-
使用
资源类:
/** * 资源类 */ public class Resource { private int id; public Resource(int id) { this.id = id; } @Override public String toString() { return "Resource{" + "id=" + id + '}'; } }
容器类
/** * 容器类 */ public class Contanier { //30个位置 private Resource[] resources = new Resource[30]; private static final Integer mutex = 0; private int index = 0; public Contanier(Resource[] resources) { this.resources = resources; } public Contanier() { } public synchronized boolean push(Resource resource){ //此处相当于P(empty) 不用减1因为当等于29个时候时候会有个index++操作 如果是==length-1那会少存一个 while (index==resources.length){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //此处相当于P(mutex) synchronized(Contanier.mutex){ resources[index] = resource; System.out.println(Thread.currentThread().getName()+"生产了一个资源"+resources[index]); //此处相当于V(full) index++; this.notify(); return true; //代码执行完 就是V(mutex) } } public synchronized boolean pop(){ //此处相当于P(full) while (index==0){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //此处相当于P(mutex) synchronized (Contanier.mutex){ index--; System.out.println(Thread.currentThread().getName()+"输出了一个资源"+resources[index]); resources[index] = null; //此处相当于V(empty) this.notify(); return true; //代码执行完 就是V(mutex) } } }
生产者类
import java.util.concurrent.TimeUnit; /** * 生产者 */ public class Producer { //生产者需要容器的 private Contanier contanier; public Producer(Contanier contanier) { this.contanier = contanier; } public boolean product(){ for (int i = 0; i < 30; i++) { //生产一个资源类 Resource resource = new Resource(i); //生产一个资源是需要时间的 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //生产完放入容器类中 this.contanier.push(resource); } return true; } }
消费者类
import java.util.concurrent.TimeUnit; /** * 消费者 */ public class Consumer { //消费者也需要一个容器 private Contanier contanier ; public Consumer(Contanier contanier) { this.contanier = contanier; } public boolean consume (){ for (int i = 0; i < 30; i++) { this.contanier.pop(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } return true; } }
测试类
/** * 测试类 */ public class Main { public static void main(String[] args) { Contanier contanier = new Contanier(); //生产者消费者用的是同一个容器 所以传同一个容器对象进去 Producer producer = new Producer(contanier); Consumer consumer = new Consumer(contanier); //一个线程生产 一个线程消耗 new Thread(()->{ producer.product(); },"生产者线程").start(); new Thread(()->{ consumer.consume(); },"消费者线程").start(); } }