目录
Java提供了丰富的并发工具类,这些工具类大大简化了多线程编程的复杂性,提高了并发程序的性能和可靠性。本文将详细介绍几个常用的并发工具类,包括ConcurrentHashMap、AtomicInteger、Semaphore、CyclicBarrier、CountDownLatch和BlockingQueue。
1. ConcurrentHashMap
ConcurrentHashMap是线程安全的HashMap实现,专门为并发环境设计。
1.1 原理
ConcurrentHashMap在JDK 1.8中的实现原理如下:
- 使用分段锁机制
- 使用CAS + synchronized来保证并发安全
- 允许并发读取,读操作不需要加锁
- 使用红黑树优化链表结构,提高检索效率
更详细的内容可以参考:Java中的Map集合:从HashMap到ConcurrentHashMap-CSDN博客
1.2 示例
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ConcurrentHashMapExample {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 并发添加元素
for (int i = 0; i < 100; i++) {
final String key = "Key" + i;
executorService.submit(() -> map.put(key, 1));
}
// 并发更新元素
for (int i = 0; i < 100; i++) {
final String key = "Key" + i;
executorService.submit(
//如果键 key 存在于 map 中,则将对应的值加1。
() -> map.computeIfPresent(key, (k, v) -> v + 1)
);
}
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);// 等待线程池关闭
System.out.println("Map size: " + map.size());//Map size: 100
System.out.println("Key50: " + map.get("Key50"));// Key50: 2
// 原子操作示例
map.putIfAbsent("czf", 100);//100不存在,则添加,否则不添加
map.replace("czf", 100, 200);
System.out.println("czf value: " + map.get("czf"));// czf value: 200
// 更新失败
map.replace("czf", 100, 300);
System.out.println("czf value: " + map.get("czf"));// czf value: 200
}
}
在这个示例中,我们展示了ConcurrentHashMap的以下特性:
- 并发添加和更新元素
- 使用computeIfPresent进行原子更新
- 使用putIfAbsent和replace进行原子操作
- 使用forEach进行并行遍历
ConcurrentHashMap保证了在高并发环境下的线程安全性,同时提供了优秀的性能。
2. AtomicInteger
java.util.concurrent.atomic包下的原子类,提供原子操作的整数。这里以AtomicInteger为例
2.1 原理
AtomicInteger主要基于以下原理:
- 使用volatile关键字保证可见性
- 使用CAS(Compare and Swap)操作保证原子性
- 内部维护一个volatile的整型value
什么是CAS? CAS操作是一种无锁算法,它通过硬件指令来保证操作的原子性。CAS操作包含三个操作数:
- 内存位置V
- 旧的预期值A
- 新值B。
在修改时当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。
2.2 CAS操作图解
2.3 代码示例
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AtomicIntegerExample {
public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 并发递增
for (int i = 0; i < 100; i++) {
// 使用lambda表达式 atomicInt进行递增
executorService.submit(atomicInt::incrementAndGet);
}
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("最终值: " + atomicInt.get());// 最终值: 100
// 演示其他原子操作
System.out.println("添加并取值: " + atomicInt.addAndGet(10));// 添加并取值: 110
// 比较(是不是指定的旧值)并更新
System.out.println("第一次更新是否成功: " + atomicInt.compareAndSet(110, 200));// 第一次更新是否成功: true
System.out.println("第一次更新后的值: " + atomicInt.get());// 第一次更新后的值: 200
System.out.println("第二次更新是否成功: " + atomicInt.compareAndSet(110, 200));// 第二次更新是否成功: false
System.out.println("第二次更新后的值: " + atomicInt.get());// 第二次更新后的值: 200
// 演示getAndUpdate操作 先返回旧值再更新
int result = atomicInt.getAndUpdate(n -> n * 2);
System.out.println("先取值再更新: 旧值=" + result + ", 新值=" + atomicInt.get());// 先取值再更新: 旧值=200, 新值=400
// 演示updateAndGet操作 先更新再返回旧值
result = atomicInt.updateAndGet(n -> n + 100);
System.out.println("先更新再取值: new=" + result);// 先更新再取值: new=500
}
}
在这个示例中,我们展示了AtomicInteger的以下特性:
- 并发递增操作
- 原子性的加法操作(addAndGet)
- 比较并设置值(compareAndSet)
- 获取并更新(getAndUpdate)
- 更新并获取(updateAndGet)
AtomicInteger保证了在高并发环境下的线程安全性,同时避免了使用synchronized关键字带来的性能开销。
3. Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量的工具类。
3.1 原理
Semaphore的主要原理如下:
- 维护一个许可证计数器
- 每次acquire()减少一个许可,release()增加一个许可
- 当许可不足时,线程会被阻塞
- 支持公平和非公平两种模式
Semaphore内部也是使用AQS(AbstractQueuedSynchronizer)来实现同步机制(ReentrantLock底层也是AQS实现,更多:Java同步机制深度解析: synchronized vs ReentrantLock-CSDN博客)。在公平模式下,线程按照请求的顺序获取许可,防止线程饥饿(某个线程 一直抢不到资源),但是性能会有所降低;在非公平模式下,允许线程抢占式地获取许可,可能出现线程饥饿。
3.2 Semaphore工作流程
3.3 代码示例
import java.util.concurrent.Semaphore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SemaphoreExample {
public static void main(String[] args) {
// 创建一个只有5个许可的Semaphore
Semaphore semaphore = new Semaphore(5);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
final int threadNum = i;
executorService.submit(() -> {
try {
System.out.println("Thread " + threadNum + " 正在尝试获得许可证");
semaphore.acquire();// 尝试获取许可证,如果获取失败,则阻塞当前线程
System.out.println("Thread " + threadNum + " 获得了许可");
// 模拟一些耗时操作
Thread.sleep(1000);
semaphore.release();// 释放许可证
System.out.println("Thread " + threadNum + "释放了许可证");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
try {
// 等待所有线程执行完毕
executorService.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有线程都已完成");
System.out.println("可用许可证数量: " + semaphore.availablePermits());// 打印可用许可证数量 5
}
}
在这个示例中,我们展示了Semaphore的以下特性:
- 限制并发访问资源的线程数
- 使用acquire()获取许可
- 使用release()释放许可
- 在资源有限的情况下,如何管理多个线程的访问
Semaphore非常适用于限制对某些资源的并发访问数量,例如数据库连接池。
4. CyclicBarrier
CyclicBarrier是一种同步辅助工具,允许一组线程互相等待,直到所有线程都到达某个公共屏障点再继续执行。
4.1 原理
CyclicBarrier的主要原理如下:
-
构造时设置参与的线程数N(),同时有一个计数器进行记录还未到达屏障的线程数
构造器:parties就是传入的初始线程数量
-
每个线程执行完后调用await(),线程会被阻塞,直至所有线程到达屏障
-
当第N个线程调用await()时,所有线程被释放,继续执行
-
CyclicBarrier可以重复使用
-
可以在所有线程到达屏障时执行一个Runnable回调任务
CyclicBarrier内部使用ReentrantLock和Condition来实现线程的等待和唤醒机制。
4.2 CyclicBarrier工作流程
4.3 代码示例
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierExample {
private static class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "正在屏障处等待");
barrier.await();
System.out.println(Thread.currentThread().getName() + "已经越过了障碍");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("所有线程都已到达 barrier!"));
// System.out.println("Starting threads...");
for (int i = 0; i < 3; i++) {
executorService.submit(new Task(barrier));
}
// 等待第一轮完成
Thread.sleep(1000);
System.out.println("重新开始第二轮...");
for (int i = 0; i < 3; i++) {
executorService.submit(new Task(barrier));
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("所有任务都已完成。");
}
}
运行结果:
在这个示例中,我们展示了CyclicBarrier的以下特性:
- 多个线程在屏障点同步
- 当所有线程到达屏障时执行一个动作
- CyclicBarrier的可重用性
CyclicBarrier特别适用于需要分阶段进行的并行计算任务。
5. CountDownLatch
CountDownLatch是一个同步辅助类,允许一个或多个线程等待直到一组操作在其他线程中完成。
5.1 原理
CountDownLatch的主要原理如下:
-
初始化时指定计数值
-
每次调用countDown()方法会将计数值减1
-
await()方法会阻塞调用线程,直到计数值变为0
-
计数值为0后,所有等待的线程被释放,CountDownLatch不能被重置
CountDownLatch内部使用AQS(AbstractQueuedSynchronizer)来实现同步机制。
5.2 CountDownLatch工作流程
5.3 代码示例
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);// 等待所有线程启动信号
CountDownLatch doneSignal = new CountDownLatch(3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
final int taskId = i;
executorService.submit(() -> {
try {
System.out.println("Task " + taskId + "正在等待启动");
startSignal.await();// 等待启动信号 startSignal = 0
System.out.println("Task " + taskId + "正在启动");
// 模拟任务执行
Thread.sleep((long) (Math.random() * 1000));
System.out.println("Task " + taskId + " is done");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
doneSignal.countDown();
}
});
}
System.out.println("所有任务都已准备好开始");
startSignal.countDown(); // 让所有任务开始执行
System.out.println("等待所有任务完成");
doneSignal.await(); // 等待所有任务完成 doneSignal = 0
System.out.println("所有任务均已完成");
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
}
运行结果:
在这个示例中,我们展示了CountDownLatch的以下特性:
- 使用startSignal控制多个线程同时开始执行
- 使用doneSignal等待多个线程完成执行
- countDown()方法的使用
- await()方法的使用
CountDownLatch非常适用于一个线程等待多个线程完成某些操作的场景。
6. BlockingQueue
BlockingQueue是一个支持两个附加操作的队列。这两个操作是:当队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
6.1 原理
BlockingQueue的主要原理如下:
- 当队列满时,put操作会被阻塞
- 当队列空时,take操作会被阻塞
- 使用锁和条件变量实现线程间的同步
- 支持超时操作
BlockingQueue有多种实现,如ArrayBlockingQueue(基于数组的有界队列)、LinkedBlockingQueue(基于链表的可选有界队列)、PriorityBlockingQueue(带优先级的无界队列)等。
6.2 BlockingQueue工作流程
6.3 代码示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class BlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 生产者
executorService.submit(() -> {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Producer 生产了产品: " + i);
System.out.println("Producer 将产品放到存储区: " + i);
queue.put(i);
Thread.sleep(100); // 模拟生产过程
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者
executorService.submit(() -> {
try {
while (true) {
System.out.println("Consumer 尝试从存储区拿取商品");
int element = queue.take();
System.out.println("Consumer 拿到了商品: " + element);
Thread.sleep(200); // 模拟消费过程
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
}
运行结果(部分):
在这个示例中,我们展示了BlockingQueue的以下特性:
- 使用put()方法添加元素,如果队列满则阻塞
- 使用take()方法获取元素,如果队列空则阻塞
- 生产者-消费者模式的实现
BlockingQueue非常适用于生产者-消费者场景,可以有效地协调生产和消费的速度。
总结与比较
让我们对这些并发工具类进行一个总体的比较和总结:
工具类 | 主要用途 | 特点 | 适用场景 |
---|---|---|---|
ConcurrentHashMap | 线程安全的哈希表 | 高并发、分段锁、读操作无锁 | 需要线程安全的Map,且有高并发读写 |
AtomicInteger | 原子操作的整数 | 无锁、CAS操作、高性能 | 计数器、序列号生成 |
Semaphore | 控制并发访问的数量 | 可限制访问数量、支持公平性 | 资源池限流、数据库连接池 |
CyclicBarrier | 同步屏障 | 可重用、支持执行屏障动作 | 并行迭代算法、分阶段并发任务 |
CountDownLatch | 等待多个线程完成 | 一次性使用、计数器操作 | 主线程等待多个子任务完成 |
BlockingQueue | 线程安全的队列 | 支持阻塞操作、多种实现 | 生产者-消费者模式、任务队列 |
选择建议:
- 如果需要线程安全的Map,优先考虑ConcurrentHashMap,它在高并发情况下性能优于同步的HashMap。
- 需要线程安全的计数器时,使用AtomicInteger比使用synchronized的整数更高效。
- 当需要限制对资源的并发访问量时,Semaphore是很好的选择。
- 对于需要多个线程在同一时间点同步的场景,如并行迭代算法,CyclicBarrier非常适用。
- 当一个线程需要等待多个其他线程完成某些操作时,CountDownLatch是理想的选择。
- 在实现生产者-消费者模式时,BlockingQueue提供了简单而有效的解决方案。
这些并发工具类极大地简化了多线程编程的复杂性,提高了程序的可靠性和性能。在实际应用中,应根据具体需求选择合适的工具类,并正确使用它们的API来确保线程安全和高效的并发操作。同时,深入理解这些工具类的内部实现原理,有助于更好地使用它们,并在必要时进行性能调优。
标签:executorService,Java,System,util,并发,线程,println,解析,out From: https://blog.csdn.net/2303_76892351/article/details/144066362