首页 > 其他分享 >高并发(阻塞队列)

高并发(阻塞队列)

时间:2024-04-19 16:38:01浏览次数:20  
标签:Thread 队列 items 元素 阻塞 并发 线程

阻塞队列是一种常用的并发编程工具,它能够在多线程环境下提供一种安全而高效的数据传输机制

阻塞队列是一种特殊的队列,它具有以下几个特点:

1、阻塞特性:当队列为空时,从队列中获取元素的操作将会被阻塞,直到队列中有新的元素被添加;当队列已满时,向队列中添加元素的操作将会被阻塞,直到队列中有空的位置,这就是等待唤醒机制。

2、线程安全:阻塞队列内部通过锁或其他同步机制来保证多线程环境下的数据一致性。

3、有界性:阻塞队列可以设置容量上限,当队列满时,后续的元素将无法添加。

4、公平性:阻塞队列可以选择公平或非公平的策略来决定线程的获取顺序。公平队列会按照线程的请求顺序进行处理(线程按先来后到顺序排队获取元素),而非公平队列则允许新的线程插队执行(线程竞争)。比如:SynchronousQueue。

主要方法

添加方法

方法 描述 是否阻塞
add方法 往队列尾部添加元素,内部是调用offer方法,如果队列已满,则会抛异常
put方法 往队列尾部添加元素,如果队列已满,则阻塞等待
offer方法 往队列尾部添加元素,如果队列已满,则返回false,不会阻塞

获取方法

方法 描述 是否阻塞
take方法 take方法:移除并返回队列头部的元素,如果队列为空,则阻塞等待
poll方法 移除并返回队列头部的元素,如果队列为空,则返回null,不会阻塞
peek方法 返回队列头部的元素(不移除),如果队列为空,则返回null,不会阻塞

常见阻塞队列

1、ArrayBlockingQueue
基于数组的阻塞队列实现,其内部维护一个定长的数组,用于存储队列元素。线程阻塞的实现是通过ReentrantLock来完成的,数据的插入与取出共用同一个锁,因此ArrayBlockingQueue并不能实现生产、消费同时进行。而且在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

2、LinkedBlockingQueue 由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列(两个独立锁提高并发)
基于单向链表的阻塞队列实现,在初始化LinkedBlockingQueue的时候可以指定大小,也可以不指定,默认类似一个无限大小的容量(Integer.MAX_VALUE),不指队列容量大小也是会有风险的,一旦数据生产速度大于消费速度,系统内存将有可能被消耗殆尽,因此要谨慎操作。另外LinkedBlockingQueue中用于阻塞生产者、消费者的锁是两个(锁分离),因此生产与消费是可以同时进行的。

3、PriorityBlockingQueue 支持优先级排序的无界阻塞队列(compareTo 排序优先)
一个支持优先级排序的无界阻塞队列,进入队列的元素会按照优先级进行排序

4、DelayQueue 使用优先级队列实现的延迟无界阻塞队列(用于缓存失效,定时任务)
DelayQueue是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素

5、SynchronusQueue 不存储元素的阻塞队列,也即单个元素的队列(不存储数据,可用于数据传递)
同步阻塞队列,SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然

6、LinkedTransferQueue
LinkedTransferQueue是基于链表的FIFO无界阻塞队列,它出现在JDK7中,Doug Lea大神说LinkedTransferQueue是一个聪明的队列,它是ConcurrentLinkedQueue、SynchronousQueue(公平模式下)、无界的LinkedBlockingQueues等的超集,LinkedTransferQueue包含了ConcurrentLinkedQueue、SynchronousQueue、LinkedBlockingQueues三种队列的功能

7、LinkedBlockDeque 由链表结构组成的双向阻塞队列

8、ConcurrentLinkedQueue 它是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,是个高性能无锁队列

阻塞队列的原理

常用的阻塞队列,比如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue底层都是采用ReentrantLock锁来实现线程的互斥,而ReentrantLock底层采用了AQS框架实现线程队列的同步,线程的阻塞是调用LockSupport.park实现,唤醒是调用LockSupport.unpark实现

阻塞队列的原理可以通过两个关键组件来解释:锁和条件变量。

阻塞队列使用锁来保护共享资源,控制线程的互斥访问。在队列为空或已满时,线程需要等待相应的条件满足才能继续执行。

  • 条件变量

条件变量是锁的一个补充,在某些特定的条件下,线程会进入等待状态。当条件满足时,其他线程会通过调用条件变量的唤醒方法(比如signal()或signalAll())来通知等待的线程进行下一步操作。
当一个线程试图从空的阻塞队列中获取元素时,它会获取队列的锁,并检查队列是否为空。如果为空,这个线程将进入等待状态,直到其他线程向队列中插入元素并通过条件变量唤醒它。当一个线程试图向已满的阻塞队列插入元素时,它会获取队列的锁,并检查队列是否已满。如果已满,这个线程将进入等待状态,直到其他线程从队列中获取元素并通过条件变量唤醒它。
接下来我们看下阻塞队列的获取元素和插入元素的核心代码:
ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue的带阻塞的插入和获取方法都是基于ReentrantLock锁+条件变量的等待和通知来实现的。

ArrayBlockingQueue代码:

/**
 * 插入元素,带阻塞
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    // 这里使用的是ReentrantLock锁
    final ReentrantLock lock = this.lock;
    // 获取锁并支持响应中断,注意:获取锁的过程中不响应中断,是在获取到锁后根据当前线程的中断标识来处理。
    lock.lockInterruptibly();
    try {
        // 元素大小等于数组长度时阻塞,说明放满了,生产者需要暂停,阻塞在条件变量上,等待被唤醒
        while (count == items.length)
            notFull.await();
        // 放入元素到数组指定的下标处
        enqueue(e);
    } finally {
        // 释放锁
        lock.unlock();
    }
}
 
/**
 * 插入元素,唤醒等待获取元素的线程
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
	// 放入元素后,通知消费线程继续获取元素
    notEmpty.signal();
}
 
/**
 * 获取元素,带阻塞
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 数组无元素时阻塞,阻塞在条件变量上,等待被唤醒
        // 元素大小等于0时阻塞,说明数组被取空了,消费者需要暂停,阻塞在条件变量上,等待被唤醒
        while (count == 0)
            notEmpty.await();
        // 移除元素并返回
        return dequeue();
    } finally {
        lock.unlock();
    }
}
 
 
/**
 * 移除元素并返回
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    // 数组时循环使用的,取元素的index到达数组长度时,下次需要从第0个位置
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 移除元素后,通知消费者线程可以继续放入元素
    notFull.signal();
    return x;
}

常用的使用场景

  1. 生产者-消费者模型:阻塞队列能够很好地平衡生产者和消费者之间的速度差异,既能保护消费者不会消费到空数据,也能保护生产者不会造成队列溢出,能够有效地解耦生产者和消费者,提高系统的稳定性和吞吐量。
  2. 线程池:在线程池中,阻塞队列可以作为任务缓冲区,将待执行的任务放入队列中,由线程池中的工作线程按照一定的策略进行执行。
  3. 同步工具:阻塞队列还可以作为一种同步工具,在多线程环境下实现线程之间的协作。
  4. 数据缓冲:阻塞队列可以用作数据缓冲区,当生产者的速度大于消费者的速度时,数据可以先存储在队列中,等待消费者处理
  5. 事件驱动编程:阻塞队列可以用于事件驱动的编程模型,当事件发生时,将事件对象放入队列中,由消费者进行处理

使用demo

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
 
public class BlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为10的ArrayBlockingQueue
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
//        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
//        BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10);
        // 创建生产者线程
        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i <= 5; i++) {
                    // 将数据放入队列
                    queue.put(i);
                    System.out.println(Thread.currentThread().getName() + "Produced: " + i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 创建消费者线程
        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i <= 5; i++) {
                    // 从队列中取出数据
                    int num = queue.take();
                    System.out.println(Thread.currentThread().getName() + "Consumed: " + num);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 启动生产者和消费者线程
        producerThread.start();
        consumerThread.start();
        // 等待线程执行完毕
        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行输出:

Thread-0Produced: 0
Thread-1Consumed: 0
Thread-0Produced: 1
Thread-1Consumed: 1
Thread-0Produced: 2
Thread-0Produced: 3
Thread-1Consumed: 2
Thread-0Produced: 4
Thread-0Produced: 5
Thread-1Consumed: 3
Thread-1Consumed: 4
Thread-1Consumed: 5

总结

通过了解阻塞队列的原理和使用场景,我们可以更好地应对多线程编程中的并发问题,提高代码的可维护性和可扩展性。阻塞队列作为一种常见的并发编程工具,能够帮助我们实现高效的数据传输和线程协作,为我们的应用程序提供更好的性能和可靠性保障

参考博客:https://www.cnblogs.com/zimug/p/14831660.html

标签:Thread,队列,items,元素,阻塞,并发,线程
From: https://www.cnblogs.com/luojw/p/18146234

相关文章

  • ROS笔记[2]-获取OpenMV数据并发布到ROS消息
    摘要Orangepi(香橙派)通过USB-CDC获取OpenMV数据并使用Python发布到ROS的/openmv主题,实现打印"helloros"字符串.关键信息python3.8ROS1:NoeticUbuntu20.04Orangepi3B原理简介OpenMV的USB-CDC虚拟串口(VCP)通信[https://blog.csdn.net/qq_34440409/article/details/1......
  • 队列
    队列目录队列LinkedBlockingQueue阻塞队列SynchronousQueue交换队列DelayQueue延时队列ArrayBlockingQueue有界循环队列常见面试题LinkedBlockingQueue阻塞队列新增操作add队列满的时候抛出异常offer队列满的时候返回false查看并删除remove队列空的时候抛出异常......
  • 高并发(AQS)
    AQS抽象的队列同步器框架,主要通过程序来构建锁和同步器AQS的全称为AbstractQueuedSynchronizer,翻译过来的意思就是抽象队列同步器,它和Java的Synchronized作用和一样,用来同步加锁;特性对比ReentrantLockSynchronized锁实现依赖AQS监视器Monitor模式灵活性支......
  • Java并发(二十五)----异步模式之生产者/消费者
    1.定义要点与Java并发(二十二)----同步模式之保护性暂停中的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应这样的好处是消费队列可以用来平衡生产和消费的线程资源生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据......
  • 高并发(锁)
    锁是用于控制多个线程对共享资源的访问的机制,防止出现程序对共享资源的竞态关系线程安全在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况线程的竞态条件竞态条件(racecondition)竞态......
  • 第十六章——处理锁、阻塞和死锁(3)——使用SQLServer Profiler侦测死锁
    前言:作为DBA,可能经常会遇到有同事或者客户反映经常发生死锁,影响了系统的使用。此时,你需要尽快侦测和处理这类问题。死锁是当两个或者以上的事务互相阻塞引起的。在这种情况下两个事务会无限期地等待对方释放资源以便操作。下面是死锁的示意图: 本文将使用SQLServerProfi......
  • 基于K8s+Docker+Openresty+Lua+SpringCloudAlibaba的高并发秒杀系统——与京东淘宝同
    ​介绍基于K8s+Docker+Openresty+Lua+SpringCloudAlibaba的高并发高性能商品秒杀系统,本系统实测单台(16核32G主频2.2GHz)openresty(nginx)的QPS可高达6w并发,如果您需要应对100w的并发,则需要100w/6w=17台openresty服务器,17台服务器同时接收并处理这100w的并发流量呢?当然是商业......
  • 测试Netty高并发工具
    测试Netty应用程序的高并发性能工具JMeterJMeter:ApacheJMeter是一个功能强大的用于性能测试的工具,可以模拟大量用户对Netty服务器的并发请求。你可以创建各种测试计划来模拟不同负载条件下的性能表现。wrkwrk:wrk是一个现代的HTTP基准测试工具,它可以轻松地对Netty服务器进......
  • 我们如何实现最基础的并发?
    OS的目标是在1保持控制权下2高性能的并发。因此现在我们有两大问题需要解决:1.如何高性能地并发?2.如何保持OS对计算机的控制权?(我们姑且只讨论在单CPU的机器上,运行微内核OS) 这篇博客中,我们先来回答这个问题:我们如何实现最基础的并发?       首先我们来回答一下什......
  • QPS才算高并发
    QPS才算高并发高并发场景QPS等专业指标揭秘大全与调优实战  合集-三“高”架构设计与调优(1) 1.高并发场景QPS等专业指标揭秘大全与调优实战04-14收起 高并发场景QPS等专业指标揭秘大全与调优实战最近经常有小伙伴问及高并发场景下QPS的一些问题,特意结合......