首页 > 编程语言 >Java面试要点111 - Java BlockingQueue实现原理

Java面试要点111 - Java BlockingQueue实现原理

时间:2025-01-01 18:55:36浏览次数:7  
标签:Java Thread 队列 private final 111 new public BlockingQueue

在这里插入图片描述

文章目录

引言

BlockingQueue是Java并发包中的重要组件,它在生产者-消费者场景中发挥着关键作用。作为线程安全的队列实现,BlockingQueue不仅提供了普通队列操作,还支持阻塞操作,使其成为并发编程中不可或缺的工具。

一、BlockingQueue基本概念

BlockingQueue在Java中是一个接口,它扩展了Queue接口,提供了阻塞的插入和获取操作。当队列满时,插入操作将被阻塞;当队列空时,获取操作将被阻塞。这种特性使其特别适合于生产者-消费者模式的实现。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueBasics {
    public static void demonstrateBasicOperations() {
        // 创建一个容量为3的ArrayBlockingQueue
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        
        try {
            // 添加元素的不同方式
            queue.put("First");  // 阻塞式添加
            queue.offer("Second");  // 非阻塞式添加
            boolean success = queue.offer("Third", 1, TimeUnit.SECONDS);  // 限时添加
            
            System.out.println("队列大小: " + queue.size());
            
            // 获取元素的不同方式
            String item1 = queue.take();  // 阻塞式获取
            String item2 = queue.poll();  // 非阻塞式获取
            String item3 = queue.poll(1, TimeUnit.SECONDS);  // 限时获取
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

二、主要实现类解析

2.1 ArrayBlockingQueue实现原理

ArrayBlockingQueue是一个由数组支持的有界阻塞队列。它使用ReentrantLock来保证线程安全,使用Condition来实现线程等待和唤醒机制。

public class ArrayBlockingQueueAnalysis {
    static class CustomArrayBlockingQueue<E> {
        private final Object[] items;
        private final ReentrantLock lock;
        private final Condition notEmpty;
        private final Condition notFull;
        private int takeIndex;
        private int putIndex;
        private int count;
        
        public CustomArrayBlockingQueue(int capacity) {
            this.items = new Object[capacity];
            this.lock = new ReentrantLock(true);  // 使用公平锁
            this.notEmpty = lock.newCondition();
            this.notFull = lock.newCondition();
        }
        
        public void put(E e) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length) {
                    // 队列满时等待
                    notFull.await();
                }
                items[putIndex] = e;
                putIndex = (putIndex + 1) % items.length;
                count++;
                // 通知等待的消费者
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
        
        @SuppressWarnings("unchecked")
        public E take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    // 队列空时等待
                    notEmpty.await();
                }
                E item = (E) items[takeIndex];
                items[takeIndex] = null;
                takeIndex = (takeIndex + 1) % items.length;
                count--;
                // 通知等待的生产者
                notFull.signal();
                return item;
            } finally {
                lock.unlock();
            }
        }
    }
}

2.2 LinkedBlockingQueue实现原理

LinkedBlockingQueue是基于链表的可选有界阻塞队列。它使用两个ReentrantLock来分别控制头尾节点的访问,从而提高并发性能。

public class LinkedBlockingQueueAnalysis {
    static class CustomLinkedBlockingQueue<E> {
        static class Node<E> {
            E item;
            Node<E> next;
            Node(E item) { this.item = item; }
        }
        
        private final ReentrantLock takeLock;
        private final ReentrantLock putLock;
        private final Condition notEmpty;
        private final Condition notFull;
        private final int capacity;
        private Node<E> head;
        private Node<E> last;
        private AtomicInteger count;
        
        public CustomLinkedBlockingQueue(int capacity) {
            this.capacity = capacity;
            this.count = new AtomicInteger(0);
            this.takeLock = new ReentrantLock();
            this.putLock = new ReentrantLock();
            this.notEmpty = takeLock.newCondition();
            this.notFull = putLock.newCondition();
            this.head = this.last = new Node<>(null);
        }
        
        public void put(E e) throws InterruptedException {
            int c = -1;
            final Node<E> node = new Node<>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    notFull.await();
                }
                last.next = node;
                last = node;
                c = count.getAndIncrement();
                if (c + 1 < capacity) {
                    notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0) {
                signalNotEmpty();
            }
        }
        
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    }
}

三、高级特性应用

3.1 优先级队列实现

PriorityBlockingQueue是一个支持优先级的无界阻塞队列,内部使用堆来维护元素顺序。

public class PriorityQueueExample {
    static class Task implements Comparable<Task> {
        private final int priority;
        private final String name;
        
        public Task(int priority, String name) {
            this.priority = priority;
            this.name = name;
        }
        
        @Override
        public int compareTo(Task other) {
            return Integer.compare(other.priority, this.priority); // 高优先级在前
        }
        
        @Override
        public String toString() {
            return String.format("Task[priority=%d, name='%s']", priority, name);
        }
    }
    
    public static void demonstratePriorityQueue() {
        BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();
        
        // 生产者线程
        new Thread(() -> {
            try {
                priorityQueue.put(new Task(2, "中等优先级任务"));
                priorityQueue.put(new Task(1, "低优先级任务"));
                priorityQueue.put(new Task(3, "高优先级任务"));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        
        // 消费者线程
        new Thread(() -> {
            try {
                Thread.sleep(100); // 确保所有任务都已添加
                while (!priorityQueue.isEmpty()) {
                    Task task = priorityQueue.take();
                    System.out.println("处理任务: " + task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

3.2 延迟队列实现

DelayQueue是一个支持延迟获取元素的无界阻塞队列,元素只有在其指定的延迟时间到期后才能被获取。

public class DelayQueueExample {
    static class DelayedTask implements Delayed {
        private final String name;
        private final long startTime;
        
        public DelayedTask(String name, long delayInMillis) {
            this.name = name;
            this.startTime = System.currentTimeMillis() + delayInMillis;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            long diff = startTime - System.currentTimeMillis();
            return unit.convert(diff, TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(getDelay(TimeUnit.MILLISECONDS), 
                              o.getDelay(TimeUnit.MILLISECONDS));
        }
        
        @Override
        public String toString() {
            return "DelayedTask[name=" + name + "]";
        }
    }
    
    public static void demonstrateDelayQueue() {
        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
        
        // 添加延迟任务
        delayQueue.put(new DelayedTask("Task1", 2000));
        delayQueue.put(new DelayedTask("Task2", 1000));
        delayQueue.put(new DelayedTask("Task3", 3000));
        
        // 处理延迟任务
        new Thread(() -> {
            try {
                while (!delayQueue.isEmpty()) {
                    DelayedTask task = delayQueue.take();
                    System.out.println("执行任务: " + task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

四、实际应用场景

4.1 生产者-消费者模式

BlockingQueue最常见的应用场景是实现生产者-消费者模式,它能够自动处理线程同步问题。

public class ProducerConsumerExample {
    private static final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<>(10);
    
    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    Task task = generateTask();
                    taskQueue.put(task);
                    System.out.println("生产任务: " + task);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        private Task generateTask() {
            return new Task(
                ThreadLocalRandom.current().nextInt(3) + 1,
                "Task-" + System.currentTimeMillis()
            );
        }
    }
    
    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    Task task = taskQueue.take();
                    processTask(task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        private void processTask(Task task) {
            System.out.println("消费任务: " + task);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void startProducerConsumer() {
        // 启动生产者和消费者
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(new Producer());
        executor.submit(new Consumer());
        
        // 运行一段时间后关闭
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        executor.shutdownNow();
    }
}

总结

BlockingQueue作为Java并发包中的核心组件,通过提供线程安全的队列操作和阻塞机制,有效地解决了并发编程中的生产者-消费者问题。在实际应用中,可以根据具体需求选择合适的BlockingQueue实现类,如有界队列ArrayBlockingQueue、可选有界的LinkedBlockingQueue、支持优先级的PriorityBlockingQueue或支持延迟的DelayQueue。通过合理使用BlockingQueue提供的API,可以构建出高效、可靠的并发应用。在使用时需要注意选择合适的队列实现和容量设置,并正确处理中断异常,以确保应用的稳定性和可靠性。

标签:Java,Thread,队列,private,final,111,new,public,BlockingQueue
From: https://blog.csdn.net/weixin_55344375/article/details/144745738

相关文章

  • Java面试要点112 - Java延迟队列DelayQueue技术解析
    文章目录引言一、DelayQueue工作原理二、延迟队列的内部实现三、高级特性与优化3.1优先级控制3.2性能优化四、消息延迟投递系统五、定时任务调度实现六、异常处理与资源管理总结引言DelayQueue是Java并发包中一个专门用于延迟处理的阻塞队列实现,它根据延迟时间......
  • 第一节 Java中的For循环到底多强大?从基础到高级让你彻底搞懂!
    在Java编程中,for循环是一个“平凡又伟大”的存在!......
  • JAVA 分布式锁
    分布式锁JVM自带的synchronized及ReentrantLock锁都是单进程内的,不能跨进程,如下,同时来个两个请求被分配到不同的tomcat,这种锁将失效:REDIS实现分布式锁可以借助REDIS的setnx命令实现:https://blog.csdn.net/T_Y_F_/article/details/144238022注:redis的工作线程为单......
  • Java难绷知识05--Swing中的事件调度线程和资源释放
    Swing中的事件调度线程先了解一下Swing中的单线程模型单线程模型有什么作用虽然大伙认为Swing又丑又落后(但是我编写gui入门真的是从Swing开始)Swing最初设计是单线程模型,这意味着所有与Swing组件交互的代码都应该在同一个线程中执行。单线程模型避免了Swing组件可能因为......
  • 【“C语言高冷,Java正统,python亲民...”】
    1.引言     在编程语言的世界中,每种语言不仅是工具,还带有一定的文化和气质特征。例如,人们将C语言称为“高冷”,因为它以性能和底层控制而闻名;Java被认为“正统”,它的“编写一次,到处运行”理念深入人心;Python则以其简单易用和包容性社区被称为“亲民”。     ......
  • 【Java教程】Day15-16 多线程:线程同步——Java的原子操作类
    在Java中,除了常见的底层锁和并发集合类,java.util.concurrent 包还提供了一组专门用于原子操作的封装类,位于 java.util.concurrent.atomic 包。通过这些类,我们可以在多线程环境下安全地进行无锁操作,避免了传统锁的性能开销。今天我们就来详细了解其中一个常用的类:AtomicInt......
  • 【Java教程】Day14-01 加密与安全:从ASCII到Base64
    ​1.什么是编码?在计算机科学中,编码(Encoding)是将信息从一种格式转换成另一种格式的过程。在我们日常生活中,编码算法广泛应用于文本、文件和网络传输等领域。了解编码的基础知识是学习计算机编程与算法的第一步。1.1ASCII编码ASCII(AmericanStandardCodeforInformationI......
  • 【Java教程】Day11-07 时间与日期:日期与时间API的转换与数据库存储
    Java提供了两个日期与时间处理API:旧的 java.util.Date 和 java.util.Calendar,以及新的 java.time 包。新的API以 Instant、LocalDateTime 等为核心,具有更清晰的设计和更强大的功能。除非你需要与遗留代码进行交互,否则建议使用新的API。在需要将新旧API进行转换时,......
  • java毕业设计基于SpringBoot的高校运动会管理系统
    一、项目介绍开发语言:Java框架:springbootJDK版本:JDK1.8服务器:tomcat7数据库:mysql数据库工具:Navicat11开发软件:eclipse/myeclipse/ideaMaven包:Maven————————————————二、功能介绍后端技术SpringBoot:作为系统的后端框架,SpringBoot提供了快速搭建......
  • [Java/Spring] 深入理解:Spring Web DispatcherServlet
    1概述:SpringWebDispatcherServletDispatcherServlet简介org.springframework.web.servlet.DispatcherServlet是一个Servlet,它接收所有的HTTP请求,并根据请求的信息将其分发给相应的处理器(Handler)进行处理。它是SpringMVC架构模式中的关键部分,将请求处理逻辑与实际的......