针对并发编程,Java提供了很多并发工具类供我们使用,下面我们详细介绍一下。
Semaphore
Semaphore,现在普遍翻译为“信号量”,以前也曾被翻译成“信号灯”,因为类似现实生活里的红绿灯,车辆能不能通行,要看是不是绿灯。同样,在编程世界里,线程能不能执行,也要看信号量是不是允许。信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down()和up()。如下图所示:这三个方法详细的语义具体如下所示。
- init():设置计数器的初始值。
- down():计数器的值减1;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行。
- up():计数器的值加1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
如何使用信号量
我们用一个累加器的例子来说明信号量的使用吧。在累加器的例子里面,count+=1操作是个临界区,只允许一个线程执行,也就是说要保证互斥。那这种情况用信号量怎么控制呢? 其实很简单,就像我们用互斥锁一样,只需要在进入临界区之前执行一下down()操作,退出临界区之前执行一下up()操作就可以了。下面是Java代码的示例,acquire()就是信号量里的down()操作,release()就是信号量里的up()操作。static int count; //初始化信号量 static final Semaphore s = new Semaphore( 1 ); //用信号量保证互斥 static void addOne() { s.acquire(); try { count+= 1 ; } finally { s.release(); } }
下面再来分析一下,信号量是如何保证互斥的。假设两个线程T1和T2同时访问addOne()方法,当它们同时调用acquire()的时候,由于acquire()是一个原子操作,所以只能有一个线程(假设T1)把信号量里的计数器减为0,另外一个线程(T2)则是将计数器减为-1。对于线程T1,信号量里面的计数器的值是0,大于等于0,所以线程T1会继续执行;对于线程T2,信号量里面的计数器的值是-1,小于0,按照信号量模型里对down()操作的描述,线程T2将被阻塞。所以此时只有线程T1会进入临界区执行count+=1;。
当线程T1执行release()操作,也就是up()操作的时候,信号量里计数器的值是-1,加1之后的值是0,小于等于0,按照信号量模型里对up()操作的描述,此时等待队列中的T2将会被唤醒。于是T2在T1执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。快速实现一个限流器
上面的例子,我们用信号量实现了一个最简单的互斥锁功能。估计你会觉得奇怪,既然有Java SDK里面提供了Lock,为啥还要提供一个Semaphore ?其实实现一个互斥锁,仅仅是 Semaphore的部分功能,Semaphore还有一个功能是Lock不容易实现的,那就是:Semaphore可以允许多个线程访问一个临界区。 信号量的计数器,在上面的例子中,我们设置成了1,这个1表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数N,就能完美解决对象池的限流问题了。下面就是对象池的示例代码。class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool =
new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
在上面的示例代码中,我们用一个List来保存对象实例,用Semaphore实现限流器。关键的代码是ObjPool里面的exec()方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用acquire()方法(与之匹配的是在finally里面调用release()方法),假设对象池的大小是10,信号量的计数器初始化为10,那么前10个线程调用acquire()方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于0,那么说明有线程在等待,此时会自动唤醒等待的线程。
ReadWriteLock
在开发场景中,有一种非常普遍的并发场景:读多写少场景。实际工作中,为了优化性能,我们经常会使用缓存,例如缓存元数据、缓存基础数据等,这就是一种典型的读多写少应用场景。缓存之所以能提升性能,一个重要的条件就是缓存的数据一定是读多写少的,例如元数据和基础数据基本上不会发生变化(写少),但是使用它们的地方却很多(读多)。 针对读多写少这种并发场景,Java SDK并发包提供了读写锁——ReadWriteLock,非常容易使用,并且性能很好。ReentrantReadWriteLock特点
ReentrantReadWriteLock类是ReadWriteLock接口的实现,支持与之类似的语法。该类具有以下属性: 1.获取顺序 这个类不会强行指定访问锁的读写顺序,但是它支持一个可选的公平策略: 非公平模式(默认)- 在非公平模式下,进入读锁和写锁的顺序取决于可重入约束,是不确定的。
- 在非公平模式下,连续争用可能会导致一个或者多个读线程或写线程进入无限等待状态。
- 不过,通常非公平锁会比公平锁有更高的吞吐量。
- 在公平模式下,进入读锁和写锁的顺序使用一种近乎顺序的策略。
- 当前持有的锁被释放时,等待时间最长的单个写线程会被授予写锁,或者如果有一组读线程比所有的写线程等待的时间都长,则这组读线程将被授予锁。
- 如果有写线程持有锁或者有写线程正在等待锁,试图去获取一个公平的读锁(不可重入)的读线程将被阻塞。
- 这个读线程不会获得锁,直到当前等待的所有写线程获取并释放了锁。
- 当然,如果写线程放弃了等待,使得等待队列中只剩下一个或者多个等待时间最长的读线程,并且当前读锁可用,则这些读线程将会被授予锁。
- 除非当前的读锁和写锁都是可用的,写线程尝试去获取一个公平的写锁(不可重入)才不会被阻塞。
快速实现一个缓存
用ReadWriteLock快速实现一个通用的缓存工具类。 在下面的代码中,我们声明了一个Cache<K, V>类,其中类型参数K代表缓存里key的类型,V代表缓存里value的类型。缓存的数据保存在Cache类内部的HashMap里面,HashMap不是线程安全的,这里我们使用读写锁ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是ReentrantReadWriteLock,通过名字你应该就能判断出来,它是支持可重入的。下面我们通过rwl创建了一把读锁和一把写锁。 Cache这个工具类,我们提供了两个方法,一个是读缓存方法get(),另一个是写缓存方法put()。读缓存需要用到读锁,读锁的使用和前面我们介绍的Lock的使用是相同的,都是try{}finally{}这个编程范式。写缓存则需要用到写锁,写锁的使用和读锁是类似的。这样看来,读写锁的使用还是非常简单的。class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 写缓存
V put(K key, V value) {
w.lock();
try { return m.put(key, v); }
finally { w.unlock(); }
}
}
读写锁的升级与降级
我们思考一个问题,我们可以先获取读锁,然后再升级为写锁吗?这种操作还有个专业的名字,叫锁的升级。可惜ReadWriteLock并不支持这种升级。如果读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。 不过,虽然锁的升级是不允许的,但是锁的降级却是允许的。以下代码来源自ReentrantReadWriteLock的官方示例,略做了改动。你会发现在代码①处,获取读锁的时候线程还是持有写锁的,这种锁的降级是支持的。class CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
//写锁
final Lock w = rwl.writeLock();
void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock(); ①
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁
try {use(data);}
finally {r.unlock();}
}
}
StampedLock
StampedLock支持的三种锁模式 我们先来看看在使用上StampedLock和ReadWriteLock有哪些区别。 ReadWriteLock支持两种模式:一种是读锁,一种是写锁。而StampedLock支持三种模式,分别是:写锁、悲观读锁和乐观读。其中,写锁、悲观读锁的语义和ReadWriteLock的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock里的写锁和悲观读锁加锁成功之后,都会返回一个stamp;然后解锁的时候,需要传入这个stamp。相关的示例代码如下。final StampedLock sl =
new StampedLock();
// 获取/释放悲观读锁示意代码
long stamp = sl.readLock();
try {
//省略业务相关代码
} finally {
sl.unlockRead(stamp);
}
// 获取/释放写锁示意代码
long stamp = sl.writeLock();
try {
//省略业务相关代码
} finally {
sl.unlockWrite(stamp);
}
StampedLock的性能之所以比ReadWriteLock还要好,其关键是StampedLock支持乐观读的方式。ReadWriteLock支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而StampedLock提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。
注意这里,我们用的是“乐观读”这个词,而不是“乐观读锁”,乐观读这个操作是无锁的,所以相比较ReadWriteLock的读锁,乐观读的性能更好一些。
StampedLock的乐观读和数据库的乐观锁有异曲同工之妙。你会发现数据库里的乐观锁,查询的时候需要把 version 字段(也可以是其他能够标识更新前预期值的字段)查出来,更新的时候要利用 version 字段做验证。这个 version 字段就类似于StampedLock里面的stamp。这样对比着看,相信你会更容易理解StampedLock里乐观读的用法。
StampedLock使用注意事项
对于读多写少的场景StampedLock性能很好,简单的应用场景基本上可以替代ReadWriteLock,但是StampedLock的功能仅仅是ReadWriteLock的子集,在使用的时候,还是有几个地方需要注意一下。
StampedLock在命名上并没有增加Reentrant,想必你已经猜测到StampedLock应该是不可重入的。事实上,的确是这样的,StampedLock不支持重入。这个是在使用中必须要特别注意的。另外,StampedLock的悲观读锁、写锁都不支持条件变量,这个也需要注意。还有一点需要特别注意,那就是:如果线程阻塞在StampedLock的readLock()或者writeLock()上时,此时调用该阻塞线程的interrupt()方法,会导致CPU飙升。所以,使用StampedLock一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁readLockInterruptibly()和写锁writeLockInterruptibly()。这个规则一定要记清楚。
CountDownLatch
CountDownLatch可以使一个获多个线程等待其他线程各自执行完毕后再执行。
CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。
CountDownLatch常用方法
CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。
await();//阻塞当前线程,将当前线程加入阻塞队列。
await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,
countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
使用示例
假设我们有这么一个需求,需要 1查询未对账订单和 2查询派件单,然后根据这两笔单子的结果去 3执行对账操作。要是串行操作的话,那执行流程是 1-》2-》3.为了提高执行效率,1,2我们可以并行操作,等待1,2都执行结束后,再根据1,2的执行结果执行步骤3的对账操作。根据前面对CountDownLatch的介绍,我们可以这样操作:
// 创建2个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为2
CountDownLatch latch =
new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
CyclicBarrier
CyclicBarrier 是 Java 中的一种同步工具,它可以让多个线程在一个屏障点处等待,直到所有线程都到达该点后,才能继续执行。CyclicBarrier 可以用于协调多个线程的执行,以便它们可以在某个点上同步执行。CyclicBarrier 还支持一个可选的回调函数,在所有的线程都到达屏障点后,会调起指定的回调函数。 CyclicBarrier 还支持一个更高级的用法,即可以在等待线程到达屏障点时,执行一些额外的操作。可以通过 await 方法的返回值来实现这一点,如下所示:int index = barrier.await();
if (index == 0) {
// 执行额外的操作
}
在这个例子中,await 方法的返回值表示线程在等待队列中的位置,如果返回值为 0,则表示当前线程是最后一个到达屏障点的线程,可以执行一些额外的操作,比如说做一些数据清理之类的收尾工作。
非常值得一提的是,CyclicBarrier的计数器有自动重置的功能,当减到0的时候,会自动重置你设置的初始值。这个功能用起来实在是太方便了。
参考上面使用CountDownLatch进行对账的案例。假设我们需要循环查询未对账订单和派送单然后执行对账操作,我们可以这样操作:
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
线程T1负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减1,同时等待计数器变成0;线程T2负责查询派送单,当查出一条时,也调用 barrier.await() 来将计数器减1,同时等待计数器变成0;当T1和T2都调用 barrier.await() 的时候,计数器会减到0,此时T1和T2就可以执行下一条语句了,同时会调用barrier的回调函数来执行对账操作。