首页 > 编程语言 >Java并发(二十五)----异步模式之生产者/消费者

Java并发(二十五)----异步模式之生产者/消费者

时间:2024-04-17 22:00:43浏览次数:31  
标签:异步 Java 48 10 生产者 queue ---- message TestProducerConsumer

1. 定义

要点

  • Java并发(二十二)----同步模式之保护性暂停中的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应

  • 这样的好处是消费队列可以用来平衡生产和消费的线程资源

  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据

  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据

  • JDK 中各种阻塞队列,采用的就是这种模式

2. 实现

// 线程安全,只有get方法,且没有子类
final class Message {
    private int id;
    private Object message;
​
    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }
​
    public int getId() {
        return id;
    }
​
    public Object getMessage() {
        return message;
    }
}
​
// 消息队列类 Java 线程之间通信
class MessageQueue {
    private LinkedList<Message> queue;
    private int capacity;
​
    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }
​
    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 从队列头部获取消息并返回
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }
​
    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 将新的消息加在尾部
            queue.addLast(message);
            queue.notifyAll(); 
        }
    }
}

3. 应用

MessageQueue messageQueue = new MessageQueue(2);
// 4 个生产者线程, 下载任务
for (int i = 0; i < 4; i++) {
    int id = i;
    new Thread(() -> {
        try {
            log.debug("download...");
            List<String> response = Downloader.download(); // 模拟生产消息,三秒后返回response
            log.debug("try put message({})", id);
            messageQueue.put(new Message(id, response));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }, "生产者" + i).start();
}
​
// 1 个消费者线程, 处理结果
new Thread(() -> {
    while (true) {
        Message message = messageQueue.take();
        List<String> response = (List<String>) message.getMessage();
        log.debug("take message({}): [{}] lines", message.getId(), response.size());
    }
​
}, "消费者").start();

某次运行结果

10:48:38.070 [生产者3] c.TestProducerConsumer - download...
10:48:38.070 [生产者0] c.TestProducerConsumer - download...
10:48:38.070 [消费者] c.MessageQueue - 没货了, wait
10:48:38.070 [生产者1] c.TestProducerConsumer - download...
10:48:38.070 [生产者2] c.TestProducerConsumer - download...
10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1)
10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2)
10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0)
10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3)
10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines
10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines
10:48:41.240 [消费者] c.MessageQueue - 没货了, wait

 

标签:异步,Java,48,10,生产者,queue,----,message,TestProducerConsumer
From: https://www.cnblogs.com/xiaoyh/p/17161749.html

相关文章

  • Chrome免安装绿色版制作教程
    chrome离线安装包https://downzen.com/en/windows/google-chrome/versions/?page=1方法一:1)下载最新版GoogleChrome离线安装包文件1.Stable版(稳定版、正式版)下载:http://www.google.com/chrome/eula.html?standalone=1注:Stable似乎只有最新稳定版可以从官方下载,尚未找到其它......
  • P9414 「NnOI R1-T3」元组
    P9414「NnOIR1-T3」元组树上背包首先思考题意,每个方案都存在一个唯一的\(x\),所以我们可以枚举\(x\),计算有多少方案使得\(\rmLCA\)为\(x\)。\(x\)上方的点一定不能选,那么就变成了在\(x\)子树内的选点问题。思考后可以发现,要满足题意,就是要满足每个\(son_u\)子树中......
  • xxx,一个神奇的 Python 库
    前几天,我在《技术周刊的转变:如何平衡热爱与现实?》一文里写过国内Python自媒体圈在近几年的两个现象(仅个人观感,无科学数据支撑):Python广告投放出现断崖式萎缩Python大号出现很多改名/转行本文想继续分享我观察到的另一个挺有意思的现象。如果你能从中受到一些启发,进而为自......
  • 打造心灵栖息地:YY日记App——原型设计分享
    YY日记App是一个专为年轻人打造的心灵日记应用,旨在提供一个私密、个性化的日记记录平台。在本篇博客中,我将分享我对YY日记App的原型设计思路和实现过程。一、用户研究  在设计YY日记App的原型之前,我进行了深入的用户研究,明确了目标用户群体为年轻人,他们希望有一个简洁易用、......
  • cpu scheduling
    基本概念多道程序设计的目的将CPU的利用率最大化多个进程同时存在于内存(并发),当一个进程暂不使用cpu时,系统调用另一个进程占用cpu。cpu调度程序wheneverthecpubecomesidle(空闲)theoperatingsystemmustselectoneoftheprocessesinthereadyqueuetobeexecu......
  • 银行功能测试之权限测试
    通常一个用户会有多个角色,也会有有兼职机构。有这么一个需求,一个菜单只允许某个岗位可以查看以及相应的增删改查,而这个岗位只能总行管理员配置,分行管理员是不可以配置的首先分析这个岗位只能总行管理员配置,那么分行管理员是不允许选择这个岗位的,但是同时如果总行管理员给用户赋......
  • 计算自然数 num 的数根
    数根又称数字根,是自然数的一种性质,每个自然数都有一个数根。对于给定的自然数,反复将各个位上的数字相加,直到结果为一位数,则该一位数即为原自然数的数根。代码:publicclassSolution{publicintAddDigits(intnum){while(num>=10){intsum=......
  • 基于RAM的几何变换——平移
    基于RAM的几何变换——平移一、平移的基本概念  平移的概念很好理解,但是在具体操作中可能会涉及到两个问题:平移量有正数也有负数,涉及到Verilog语法中的有符号数处理平移会导致部分像素超出我们的显示范围,对这部分的像素应当做丢弃处理二、MATLAB实现  实现代码和实验......
  • Effective Python:第5条 用辅助函数取代复杂的表达式
    初始代码: 第一次优化:多次使用:  ......
  • excel对比脚本
    对比excel  fromos.pathimportexistsfromxlwingsimportAppimportyamldefcheck_list_tool():ifnotexists('主管.xlsx'):print("缺少主管文件,无法比较")else:#比较横梁ifexists('横梁.xlsx'):do_compare......