首页 > 编程语言 >Java并发工具类深度解析

Java并发工具类深度解析

时间:2024-11-27 18:32:54浏览次数:6  
标签:executorService Java System util 并发 线程 println 解析 out

目录


Java提供了丰富的并发工具类,这些工具类大大简化了多线程编程的复杂性,提高了并发程序的性能和可靠性。本文将详细介绍几个常用的并发工具类,包括ConcurrentHashMap、AtomicInteger、Semaphore、CyclicBarrier、CountDownLatch和BlockingQueue。

1. ConcurrentHashMap

ConcurrentHashMap是线程安全的HashMap实现,专门为并发环境设计。

1.1 原理

ConcurrentHashMap在JDK 1.8中的实现原理如下:

  1. 使用分段锁机制
  2. 使用CAS + synchronized来保证并发安全
  3. 允许并发读取,读操作不需要加锁
  4. 使用红黑树优化链表结构,提高检索效率

更详细的内容可以参考:Java中的Map集合:从HashMap到ConcurrentHashMap-CSDN博客

1.2 示例

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ConcurrentHashMapExample {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // 并发添加元素
        for (int i = 0; i < 100; i++) {
            final String key = "Key" + i;
            executorService.submit(() -> map.put(key, 1));
        }

        // 并发更新元素
        for (int i = 0; i < 100; i++) {
            final String key = "Key" + i;
            executorService.submit(
                    //如果键 key 存在于 map 中,则将对应的值加1。
                    () -> map.computeIfPresent(key, (k, v) -> v + 1)
            );
        }

        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);// 等待线程池关闭

        System.out.println("Map size: " + map.size());//Map size: 100
        System.out.println("Key50: " + map.get("Key50"));// Key50: 2

        // 原子操作示例
        map.putIfAbsent("czf", 100);//100不存在,则添加,否则不添加
        map.replace("czf", 100, 200);
        System.out.println("czf value: " + map.get("czf"));// czf value: 200
        // 更新失败
        map.replace("czf", 100, 300);
        System.out.println("czf value: " + map.get("czf"));// czf value: 200
    }
}

在这个示例中,我们展示了ConcurrentHashMap的以下特性:

  1. 并发添加和更新元素
  2. 使用computeIfPresent进行原子更新
  3. 使用putIfAbsent和replace进行原子操作
  4. 使用forEach进行并行遍历

ConcurrentHashMap保证了在高并发环境下的线程安全性,同时提供了优秀的性能。

2. AtomicInteger

java.util.concurrent.atomic包下的原子类,提供原子操作的整数。这里以AtomicInteger为例

2.1 原理

AtomicInteger主要基于以下原理:

  1. 使用volatile关键字保证可见性
  2. 使用CAS(Compare and Swap)操作保证原子性
  3. 内部维护一个volatile的整型value

什么是CAS? CAS操作是一种无锁算法,它通过硬件指令来保证操作的原子性。CAS操作包含三个操作数:

  • 内存位置V
  • 旧的预期值A
  • 新值B。

在修改时当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。

2.2 CAS操作图解

2.3 代码示例

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AtomicIntegerExample {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger atomicInt = new AtomicInteger(0);
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // 并发递增
        for (int i = 0; i < 100; i++) {
            // 使用lambda表达式 atomicInt进行递增
            executorService.submit(atomicInt::incrementAndGet);
        }

        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);

        System.out.println("最终值: " + atomicInt.get());// 最终值: 100

        // 演示其他原子操作
        System.out.println("添加并取值: " + atomicInt.addAndGet(10));// 添加并取值: 110
        // 比较(是不是指定的旧值)并更新
        System.out.println("第一次更新是否成功: " + atomicInt.compareAndSet(110, 200));// 第一次更新是否成功: true
        System.out.println("第一次更新后的值: " + atomicInt.get());// 第一次更新后的值: 200
        System.out.println("第二次更新是否成功: " + atomicInt.compareAndSet(110, 200));// 第二次更新是否成功: false
        System.out.println("第二次更新后的值: " + atomicInt.get());// 第二次更新后的值: 200

        // 演示getAndUpdate操作 先返回旧值再更新
        int result = atomicInt.getAndUpdate(n -> n * 2);
        System.out.println("先取值再更新: 旧值=" + result + ", 新值=" + atomicInt.get());// 先取值再更新: 旧值=200, 新值=400

        // 演示updateAndGet操作 先更新再返回旧值
        result = atomicInt.updateAndGet(n -> n + 100);
        System.out.println("先更新再取值: new=" + result);// 先更新再取值: new=500
    }
}

在这个示例中,我们展示了AtomicInteger的以下特性:

  1. 并发递增操作
  2. 原子性的加法操作(addAndGet)
  3. 比较并设置值(compareAndSet)
  4. 获取并更新(getAndUpdate)
  5. 更新并获取(updateAndGet)

AtomicInteger保证了在高并发环境下的线程安全性,同时避免了使用synchronized关键字带来的性能开销。

3. Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量的工具类。

3.1 原理

Semaphore的主要原理如下:

  1. 维护一个许可证计数器
  2. 每次acquire()减少一个许可,release()增加一个许可
  3. 当许可不足时,线程会被阻塞
  4. 支持公平和非公平两种模式

Semaphore内部也是使用AQS(AbstractQueuedSynchronizer)来实现同步机制(ReentrantLock底层也是AQS实现,更多:Java同步机制深度解析: synchronized vs ReentrantLock-CSDN博客)。在公平模式下,线程按照请求的顺序获取许可,防止线程饥饿(某个线程 一直抢不到资源),但是性能会有所降低;在非公平模式下,允许线程抢占式地获取许可,可能出现线程饥饿。

3.2 Semaphore工作流程

3.3 代码示例

import java.util.concurrent.Semaphore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SemaphoreExample {
    public static void main(String[] args) {
        // 创建一个只有5个许可的Semaphore
        Semaphore semaphore = new Semaphore(5);
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            executorService.submit(() -> {
                try {
                    System.out.println("Thread " + threadNum + " 正在尝试获得许可证");
                    semaphore.acquire();// 尝试获取许可证,如果获取失败,则阻塞当前线程
                    System.out.println("Thread " + threadNum + " 获得了许可");
                    
                    // 模拟一些耗时操作
                    Thread.sleep(1000);
                    semaphore.release();// 释放许可证
                    System.out.println("Thread " + threadNum + "释放了许可证");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
        try {
            // 等待所有线程执行完毕
            executorService.awaitTermination(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("所有线程都已完成");
        System.out.println("可用许可证数量: " + semaphore.availablePermits());// 打印可用许可证数量 5
    }
}

在这个示例中,我们展示了Semaphore的以下特性:

  1. 限制并发访问资源的线程数
  2. 使用acquire()获取许可
  3. 使用release()释放许可
  4. 在资源有限的情况下,如何管理多个线程的访问

Semaphore非常适用于限制对某些资源的并发访问数量,例如数据库连接池。

4. CyclicBarrier

CyclicBarrier是一种同步辅助工具,允许一组线程互相等待,直到所有线程都到达某个公共屏障点再继续执行。

4.1 原理

CyclicBarrier的主要原理如下:

  1. 构造时设置参与的线程数N(),同时有一个计数器进行记录还未到达屏障的线程数

    ​ 构造器:parties就是传入的初始线程数量

  2. 每个线程执行完后调用await(),线程会被阻塞,直至所有线程到达屏障

  3. 当第N个线程调用await()时,所有线程被释放,继续执行

  4. CyclicBarrier可以重复使用

  5. 可以在所有线程到达屏障时执行一个Runnable回调任务

CyclicBarrier内部使用ReentrantLock和Condition来实现线程的等待和唤醒机制。

4.2 CyclicBarrier工作流程

4.3 代码示例

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierExample {
    private static class Task implements Runnable {
        private CyclicBarrier barrier;

        public Task(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "正在屏障处等待");
                barrier.await();
                System.out.println(Thread.currentThread().getName() + "已经越过了障碍");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("所有线程都已到达 barrier!"));

//        System.out.println("Starting threads...");
        for (int i = 0; i < 3; i++) {
            executorService.submit(new Task(barrier));
        }

        // 等待第一轮完成
        Thread.sleep(1000);

        System.out.println("重新开始第二轮...");
        for (int i = 0; i < 3; i++) {
            executorService.submit(new Task(barrier));
        }

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.out.println("所有任务都已完成。");
    }
}

运行结果:

在这里插入图片描述

在这个示例中,我们展示了CyclicBarrier的以下特性:

  1. 多个线程在屏障点同步
  2. 当所有线程到达屏障时执行一个动作
  3. CyclicBarrier的可重用性

CyclicBarrier特别适用于需要分阶段进行的并行计算任务。

5. CountDownLatch

CountDownLatch是一个同步辅助类,允许一个或多个线程等待直到一组操作在其他线程中完成。

5.1 原理

CountDownLatch的主要原理如下:

  1. 初始化时指定计数值

  2. 每次调用countDown()方法会将计数值减1

  3. await()方法会阻塞调用线程,直到计数值变为0

  4. 计数值为0后,所有等待的线程被释放,CountDownLatch不能被重置

CountDownLatch内部使用AQS(AbstractQueuedSynchronizer)来实现同步机制。

5.2 CountDownLatch工作流程

5.3 代码示例

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);// 等待所有线程启动信号
        CountDownLatch doneSignal = new CountDownLatch(3);

        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 3; i++) {
            final int taskId = i;
            executorService.submit(() -> {
                try {
                    System.out.println("Task " + taskId + "正在等待启动");
                    startSignal.await();// 等待启动信号 startSignal = 0
                    System.out.println("Task " + taskId + "正在启动");
                    // 模拟任务执行
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println("Task " + taskId + " is done");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    doneSignal.countDown();
                }
            });
        }

        System.out.println("所有任务都已准备好开始");
        startSignal.countDown(); // 让所有任务开始执行

        System.out.println("等待所有任务完成");
        doneSignal.await(); // 等待所有任务完成 doneSignal = 0
        System.out.println("所有任务均已完成");

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }
}

运行结果:

在这个示例中,我们展示了CountDownLatch的以下特性:

  1. 使用startSignal控制多个线程同时开始执行
  2. 使用doneSignal等待多个线程完成执行
  3. countDown()方法的使用
  4. await()方法的使用

CountDownLatch非常适用于一个线程等待多个线程完成某些操作的场景。

6. BlockingQueue

BlockingQueue是一个支持两个附加操作的队列。这两个操作是:当队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。

6.1 原理

BlockingQueue的主要原理如下:

  1. 当队列满时,put操作会被阻塞
  2. 当队列空时,take操作会被阻塞
  3. 使用锁和条件变量实现线程间的同步
  4. 支持超时操作

BlockingQueue有多种实现,如ArrayBlockingQueue(基于数组的有界队列)、LinkedBlockingQueue(基于链表的可选有界队列)、PriorityBlockingQueue(带优先级的无界队列)等。

6.2 BlockingQueue工作流程

6.3 代码示例

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class BlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 生产者
        executorService.submit(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.println("Producer 生产了产品: " + i);
                    System.out.println("Producer 将产品放到存储区: " + i);
                    queue.put(i);
                    Thread.sleep(100);  // 模拟生产过程
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消费者
        executorService.submit(() -> {
            try {
                while (true) {
                    System.out.println("Consumer 尝试从存储区拿取商品");
                    int element = queue.take();
                    System.out.println("Consumer 拿到了商品: " + element);
                    Thread.sleep(200);  // 模拟消费过程
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }
}

运行结果(部分):

在这个示例中,我们展示了BlockingQueue的以下特性:

  1. 使用put()方法添加元素,如果队列满则阻塞
  2. 使用take()方法获取元素,如果队列空则阻塞
  3. 生产者-消费者模式的实现

BlockingQueue非常适用于生产者-消费者场景,可以有效地协调生产和消费的速度。

总结与比较

让我们对这些并发工具类进行一个总体的比较和总结:

工具类主要用途特点适用场景
ConcurrentHashMap线程安全的哈希表高并发、分段锁、读操作无锁需要线程安全的Map,且有高并发读写
AtomicInteger原子操作的整数无锁、CAS操作、高性能计数器、序列号生成
Semaphore控制并发访问的数量可限制访问数量、支持公平性资源池限流、数据库连接池
CyclicBarrier同步屏障可重用、支持执行屏障动作并行迭代算法、分阶段并发任务
CountDownLatch等待多个线程完成一次性使用、计数器操作主线程等待多个子任务完成
BlockingQueue线程安全的队列支持阻塞操作、多种实现生产者-消费者模式、任务队列

选择建议:

  1. 如果需要线程安全的Map,优先考虑ConcurrentHashMap,它在高并发情况下性能优于同步的HashMap。
  2. 需要线程安全的计数器时,使用AtomicInteger比使用synchronized的整数更高效。
  3. 当需要限制对资源的并发访问量时,Semaphore是很好的选择。
  4. 对于需要多个线程在同一时间点同步的场景,如并行迭代算法,CyclicBarrier非常适用。
  5. 当一个线程需要等待多个其他线程完成某些操作时,CountDownLatch是理想的选择。
  6. 在实现生产者-消费者模式时,BlockingQueue提供了简单而有效的解决方案。

这些并发工具类极大地简化了多线程编程的复杂性,提高了程序的可靠性和性能。在实际应用中,应根据具体需求选择合适的工具类,并正确使用它们的API来确保线程安全和高效的并发操作。同时,深入理解这些工具类的内部实现原理,有助于更好地使用它们,并在必要时进行性能调优。

标签:executorService,Java,System,util,并发,线程,println,解析,out
From: https://blog.csdn.net/2303_76892351/article/details/144066362

相关文章

  • 深入探讨 JavaScript 的事件循环
    深入探讨JavaScript的事件循环......
  • SpringBoot 如何解析配置文件中的list?
    1.情景展示在配置文件当中,我们是可以使用list来设置参数对应的参数值的(也就是:参数值可以是list)。YML文件如上图所示,在配置文件当中(如:Yml),我们是可以直接使用list。其格式就是:下划线➕空格,后面跟数组元素即可,一行就代表一个元素。properties文件如果是在properties文件......
  • java 基础知识汇总(1)
    目录1.什么是面向对象?1.1面向对象的特征1.1.1封装(Encapsulation):1.1.2继承(Inheritance):1.1.3多态(Polymorphism):1.1.4抽象(Abstraction):1.2面向对象与面向过程的区别1.3重载(Overload)与重写(Override)的区别   1.3.1重写(Override)1.3.2重载(Overload)1.4构造......
  • 360 度评估大揭秘:团队报告深度解析
    在360度评估中,团队报告是至关重要的一环。其扉页清晰展示了评估项目名称及“团队报告”字样,例如“360°评估问卷【参考示例】-团队报告”,同时明确团队人数和报告日期。 前言部分阐明,此报告通过汇总被评价人得分,从评价关系、指标或行为的得分进行对比分析,旨在了解团队能力......
  • JAVA开发规范v1.0
    01-中铜国贸JAVA开发规范v1.0一、编程规约(一)命名风格【强制】代码中的命名均不能以下划线或美元符号开始,也不能以下划线或美元符号结束。反例:_name/_name/$Object/name/name$/Object$【强制】代码中的命名严禁使用拼音与英文混合的方式,更不允许直接使用中文的......
  • JAVA台球教练软件源码,多端适配技术
    针对JAVA台球教练软件源码及其多端适配技术的需求,以下是一个详细的解答:一、JAVA台球教练软件源码概述技术栈:后端:采用SpringBoot框架,用于快速构建独立的、生产级别的基于Spring的应用程序。同时,使用MyBatisPlus进行数据持久化操作,以简化CRUD操作并提高开发效率。MySQL或Po......
  • 【java开发】Java中的FileUtils类使用详解
    在Java开发中,文件操作是一个非常常见的需求。ApacheCommonsIO库中的FileUtils类提供了丰富的文件操作功能,极大地简化了文件系统的管理。本文将浅入深出地介绍FileUtils类的使用,并通过实例演示其功能。一、FileUtils简介FileUtils是ApacheCommonsIO库中的一个工具类,它提供了......
  • transformer口语化解析
    Transformer是一种基于自注意力机制的深度神经网络模型,常用于处理序列到序列的任务,例如机器翻译、文本摘要、问答系统等。它由Encoder和Decoder两个主要部分组成,每个部分包含多个相同的Block。Transformer结构图Transformer结构importtorchimporttorch.nn......
  • 【扩展你的思路】JAVA在不修改接口的情况下传递参数的方法
    JAVA在不修改接口的情况下传递参数的方法在进行二次开发或修改已有代码时,有时需要传递额外的参数给某个方法,但又不希望修改原有接口,因为这样可能会影响到其他调用该方法的地方。本文将介绍一种常见且有效的方法,即使用ThreadLocal来传递参数,并探讨其优缺点。背景假设我......
  • 【Spring 全家桶】Spring MVC 快速入门,开始web 更好上手(1) , 万字解析, 建议收藏 ! ! !
    本篇会加入个人的所谓鱼式疯言❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言而是理解过并总结出来通俗易懂的大白话,小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的.......