文章目录
更多相关内容可查看
Semaphore(信号量)
Semaphore介绍
Semaphore(信号量)是Java并发包java.util.concurrent中的一个类,它主要用于控制对多个共享资源的访问。与CountDownLatch和CyclicBarrier等并发工具不同,Semaphore通常用于限制对某个资源池(或称为资源集)的并发访问数量
Semaphore基本概念
- 许可(Permits):Semaphore管理一组虚拟的许可,每个许可代表对一个
资源的访问权限
- 获取许可(Acquire):当一个线程需要访问一个资源时,它必须首先从Semaphore获取一个或多个许可。如果Semaphore中有足够的许可,则获取成功,线程可以继续执行;否则,线程将被阻塞,直到有可用的许可为止。
- 释放许可(Release):当线程完成对资源的访问后,它应该释放先前获取的许可,以便其他线程可以访问该资源。
Semaphore使用场景
- 资源池限制:例如,一个系统可能有10个数据库连接,而多个线程可能同时需要访问这些连接。使用Semaphore可以确保在任何时候,最多只有10个线程可以访问数据库连接。
- 并发限制:有时,你可能希望限制同时执行特定操作的线程数量。例如,你可能希望同时只有5个线程可以访问某个API或执行某个计算密集型任务。
Semaphore示例
下面是一个简单的示例,展示了如何使用Semaphore
来限制对资源池的并发访问:
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private final Semaphore semaphore;
private final int maxConcurrentAccess;
public SemaphoreExample(int maxConcurrentAccess) {
this.maxConcurrentAccess = maxConcurrentAccess;
this.semaphore = new Semaphore(maxConcurrentAccess);
}
public void accessResource() throws InterruptedException {
// 获取一个许可
semaphore.acquire();
try {
// 访问资源的代码...
System.out.println("Accessing resource. Remaining permits: " + semaphore.availablePermits());
// 模拟资源访问的耗时操作
Thread.sleep(1000);
} finally {
// 释放许可
semaphore.release();
}
}
public static void main(String[] args) {
SemaphoreExample example = new SemaphoreExample(3);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
example.accessResource();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
可以看到, 在这个示例中,我们创建了一个Semaphore对象,其初始许可数量为3。然后,我们启动了5个线程来模拟对资源的并发访问。由于Semaphore的限制,在任何时候最多只有3个线程可以访问资源。
CountDownLatch (计数器/闭锁)
CountDownLatch 介绍
CountDownLatch 是 Java 并发工具包 java.util.concurrent 中的一个类,它允许一个或多个线程等待其他线程完成一组操作。CountDownLatch 的主要应用场景是协调多个线程,以便它们能够彼此等待,直到某个条件(一组操作的完成)被满足
CountDownLatch 基本概念
- 计数器(Count):CountDownLatch 包含一个计数器,该计数器在创建 CountDownLatch
对象时被初始化为给定的值。 - 等待(Await):线程可以调用 await() 方法等待,该方法会阻塞线程,直到计数器达到零。
- 计数(Count Down):当一个或多个线程完成了某个任务后,它们可以调用 countDown() 方法来减少计数器的值。
CountDownLatch 使用场景
- 并行计算:你可以启动多个线程进行并行计算,并在所有线程都完成计算后,主线程继续执行后续操作。
- 资源初始化:在应用程序启动时,可能需要加载或初始化一些资源。你可以使用多个线程并行加载资源,并在所有资源都加载完成后,主线程继续执行。
CountDownLatch 基本方法
- CountDownLatch(int count):创建一个 CountDownLatch 实例,并设置计数器的初始值。
- countDown():将计数器的值减一。如果计数器的值变为零,那么所有因调用 await() 方法而等待的线程将被唤醒。
- await():使当前线程等待,直到计数器达到零。如果计数器已经是零,那么该方法立即返回。否则,线程将处于休眠状态,直到计数器变为零。
- await(long timeout, TimeUnit unit):使当前线程等待,直到计数器达到零,或者等待时间超过了指定的超时时间。
- getCount():返回计数器的当前值。
CountDownLatch 示例
- 某一线程在开始运行前等待 n 个线程执行完毕
将CountDownLatch
的计数器初始化为 n (new CountDownLatch(n)
),每当一个任务线程执行完毕,就将计数器减 1 (countdownlatch.countDown()
),当计数器的值变为 0 时,在CountDownLatch 上 await()
的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
@Test
void name() throws InterruptedException {
//需求 : 有T1,T2,T3三个线程, 希望T1,T2,T3 按照顺序执行 join方法
//需求 : T1,T2,T3执行完毕之后, 再执行主线程 CountDownLatch
//线程计数器
CountDownLatch latch = new CountDownLatch(3);
Thread t1 = new Thread(() -> {
try {
Thread.sleep(10000);
log.info("t1线程开始执行--------------");
latch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread t2 = new Thread(() -> {
try {
Thread.sleep(20000);
log.info("t2线程开始执行--------------");
latch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread t3 = new Thread(() -> {
try {
Thread.sleep(30000);
log.info("t3线程开始执行--------------");
latch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t1.start();
t2.start();
t3.start();
latch.await();
log.info("主线程执行-------");
}
- 实现多个线程开始执行任务的最大并行性
注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch
对象,将其计数器初始化为 1 (new CountDownLatch(1)
),多个线程在开始执行任务前首先coundownlatch.await()
,当主线程调用countDown()
时,计数器变为 0,多个线程同时被唤醒
@Test
void name() throws InterruptedException {
//需求 : 有T1,T2,T3三个线程, 希望T1,T2,T3 按照顺序执行 join方法
//需求 : T1,T2,T3执行完毕之后, 再执行主线程 CountDownLatch
//线程计数器
CountDownLatch latch = new CountDownLatch(1);
//RCountDownLatch latch = redissonClient.getCountDownLatch("countdown");
//latch.trySetCount(3);
Thread t1 = new Thread(() -> {
try {
latch.await();
Thread.sleep(1000);
log.info("t1线程开始执行--------------");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread t2 = new Thread(() -> {
try {
latch.await();
Thread.sleep(2000);
log.info("t2线程开始执行--------------");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread t3 = new Thread(() -> {
try {
latch.await();
Thread.sleep(3000);
log.info("t3线程开始执行--------------");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t1.start();
t2.start();
t3.start();
log.info("主线程执行-------");
latch.countDown();
Thread.sleep(5000);
}
CyclicBarrier(循环栅栏)
CyclicBarrier介绍
CountDownLatch 是 Java 并发工具包 java.util.concurrent 中的一个类,它允许一个或多个线程等待其他线程完成一组操作。CountDownLatch 的主要应用场景是协调多个线程,以便它们能够彼此等待,直到某个条件(一组操作的完成)被满足。
CyclicBarrier基本概念
- 计数器(Count):CountDownLatch 包含一个计数器,该计数器在创建 CountDownLatch
对象时被初始化为给定的值。 - 等待(Await):线程可以调用 await() 方法等待,该方法会阻塞线程,直到计数器达到零。
- 计数(Count Down):当一个或多个线程完成了某个任务后,它们可以调用 countDown() 方法来减少计数器的值。
CyclicBarrier使用场景
- 并行计算:你可以启动多个线程进行并行计算,并在所有线程都完成计算后,主线程继续执行后续操作。
- 资源初始化:在应用程序启动时,可能需要加载或初始化一些资源。你可以使用多个线程并行加载资源,并在所有资源都加载完成后,主线程继续执行。
CyclicBarrier基本方法
- CountDownLatch(int count):创建一个 CountDownLatch 实例,并设置计数器的初始值。
- countDown():将计数器的值减一。如果计数器的值变为零,那么所有因调用 await() 方法而等待的线程将被唤醒。
- await():使当前线程等待,直到计数器达到零。如果计数器已经是零,那么该方法立即返回。否则,线程将处于休眠状态,直到计数器变为零。
- await(long timeout, TimeUnit unit):使当前线程等待,直到计数器达到零,或者等待时间超过了指定的超时时间。
- getCount():返回计数器的当前值。
CyclicBarrier示例(银行流水)
CyclicBarrier
可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction
用这些线程的计算结果,计算出整个 Excel 的日均银行流水。可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await()
方法之后的方法才被执行。
另外,CyclicBarrier
还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction)
,用于在线程到达屏障时,优先执行 barrierAction
,方便处理更复杂的业务场景。示例代码如下:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class BankTransactionProcessor {
private static final int NUM_THREADS = 5; // 假设有5个线程用于处理不同的sheet
public static void main(String[] args) {
// 创建一个CyclicBarrier,当所有线程处理完各自的sheet后,执行barrierAction
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_THREADS, () -> {
// 这里是barrierAction,当所有sheet都处理完成后,执行这个操作来合并结果
List<Double> dailyAverages = new ArrayList<>();
// 假设dailyAverages是从各个线程中收集的
// 在这里计算整个Excel的日均银行流水
double totalAverage = dailyAverages.stream().mapToDouble(Double::doubleValue).average().orElse(0);
System.out.println("Total daily average bank transaction: " + totalAverage);
});
// 模拟处理Excel sheet的线程
for (int i = 0; i < NUM_THREADS; i++) {
final int sheetIndex = i;
new Thread(() -> {
try {
// 处理单个sheet的逻辑,假设得到每个sheet的日均银行流水
double dailyAverage = processSheet(sheetIndex);
// 这里可以假设有一个方式将dailyAverage加入到dailyAverages列表中
// 但是在这个示例中,我们只是打印出来
System.out.println("Sheet " + sheetIndex + " daily average bank transaction: " + dailyAverage);
// 等待所有sheet处理完毕
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
// 模拟处理单个sheet的逻辑
private static double processSheet(int sheetIndex) {
// 这里应该是读取sheet数据,计算日均银行流水的逻辑
// 为了示例简单,我们直接返回一个模拟值
return sheetIndex * 1000.0; // 假设每个sheet的日均银行流水是递增的
}
}
CyclicBarrier 和 CountDownLatch 的区别
标签:JUC,CountDownLatch,计数器,线程,Semaphore,new,CyclicBarrier From: https://blog.csdn.net/Aaaaaaatwl/article/details/139231836
CountDownLatch
的实现是基于 AQS 的,而CycliBarrier
是基于ReentrantLock
CountDownLatch
是计数器,只能使用一次,而CyclicBarrier
的计数器提供reset
功能,可以多次使用。对于
CountDownLatch
来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于
CyclicBarrier
,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
CountDownLatch
是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier
更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。