一、Semaphore、Exchanger、CountDownLatch、CyclicBarrier、Phaser
JDK中提供了⼀些线程通信⼯具类以供开发者使⽤。这样的话我们在遇到⼀些常⻅的应⽤场景时就可以使⽤这些⼯具类,⽽不⽤⾃⼰再重复造轮⼦了,这些工具类都在java.util.concurrent包下,如下
Semaphore:限制线程的数量
Exchanger:两个线程交换数据
CountDownLatch:线程等待直到计数器减为0时开始工作
CyclicBarrier:作用跟CountDownLatch类似,但是可以重复使用
Phaser:增强的CyclicBarrier
1、Semaphore
锁作为线程通信的一种方式,本质上锁的目的是保护一种受限资源,保证同一时刻只有一个线程能访问(ReentrantLock),或者只有一个线程能写入(ReadWriteLock)。
还有一种受限资源,它需要保证同一时刻最多有N个线程能访问,比如同一时刻最多创建100个数据库连接,最多允许10个用户下载等。
这种限制数量的锁,如果用Lock数组来实现,就太麻烦了。这种情况就可以使用Semaphore。
Semaphore 信号量维护了一个许可集,每次使用时执行acquire()从Semaphore获取许可,如果没有则会阻塞,每次使用完执行release()释放许可。
使用场景:Semaphore对用于对资源的控制,比如数据连接有限,使用Semaphore限制访问数据库的线程数。
案例:最多允许3个线程同时访问:
public class SemaphoreDemo {
static class MyThread implements Runnable {
private int value;
private Semaphore semaphore;
public MyThread(int value, Semaphore semaphore) {
this.value = value;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); // 获取permit
System.out.println("当前线程" + value+"还剩资源数" + semaphore.availablePermits()+"还有" + semaphore.getQueueLength());
// 睡眠随机时间,打乱释放顺序
Random random = new Random();
Thread.sleep(random.nextInt(1000));
semaphore.release(); // 释放permit
System.out.println(String.format("线程%d释放了资源", value));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(new MyThread(i, semaphore)).start();
}
}
}
Semaphore
本质上就是一个信号计数器,用于限制同一时间的最大访问数量。
2、Exchanger
Exchanger 用于两个线程间的数据交换,它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
使用场景:两个线程相互等待处理结果并进行数据传递。
public void latch() throws InterruptedException, IOException {
int count = 5;
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService executorService = Executors.newFixedThreadPool(count);
for (int x=0;x<count;x++){
executorService.execute(new Worker(x,exchanger));
}
System.in.read();
}
class Worker extends Thread {
Integer start;
Exchanger<String> exchanger; public Worker(Integer start, Exchanger<String> exchanger) {
this.start = start;
this.exchanger = exchanger;
} @Override
public void run() throws IllegalArgumentException {
try {
System.out.println(Thread.currentThread().getName() + " 准备执行");
TimeUnit.SECONDS.sleep(start);
System.out.println(Thread.currentThread().getName() + " 等待交换");
String value = exchanger.exchange(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " 交换得到数据为:"+value);
} catch (InterruptedException e) {
e.printStackTrace();
} }
}
输出
pool-1-thread-1 准备执行
pool-1-thread-1 等待交换
pool-1-thread-3 准备执行
pool-1-thread-2 准备执行
pool-1-thread-5 准备执行
pool-1-thread-4 准备执行
pool-1-thread-2 等待交换
pool-1-thread-1 交换得到数据为:pool-1-thread-2
pool-1-thread-2 交换得到数据为:pool-1-thread-1
pool-1-thread-3 等待交换
pool-1-thread-4 等待交换
pool-1-thread-4 交换得到数据为:pool-1-thread-3
pool-1-thread-3 交换得到数据为:pool-1-thread-4
pool-1-thread-5 等待交换
Exchanger必须成对出现,否则会像上面代码执行结果那样,pool-1-thread-5一直阻塞等待与其交换数据的线程,为了避免这一现象,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。
3、CountDownLatch、CyclicBarrier
CountDownLatch的作用很简单,就是一个或者一组线程在开始执行操作之前,必须要等到其他线程执行完才可以。我们举一个例子来说明,在考试的时候,老师必须要等到所有人交了试卷才可以走。此时老师就相当于等待线程,而学生就好比是执行的线程。
注意:java中还有一个同步工具类叫做CyclicBarrier,他的作用和CountDownLatch类似。同样是等待其他线程都完成了,才可以进行下一步操作,我们再举一个例子,在打王者的时候,在开局前所有人都必须要加载到100%才可以进入,否则所有玩家都相互等待。
CountDownLatch和CyclicBarrier都是juc下的并发工具类,二者功能在处理某些事情下看着很相似:都是阻塞线程,但是如果细品和查看源码的话会发现二者之间还是有区别的:
CountDownLatch主要是阻塞主线程,等待多线程执行完成之后再执行主线程await之后的代码片段,侧重点是主线程等待子线程(多线程)完成之后被唤醒。
CyclicBarrier主要是每个多线程在某一刻阻塞,然后各个多线程之间相互等待,直到最后一个多线程被阻塞,然后冲破栅栏,各自执行自己await()之后的代码段(另一个写法是用两个参数的构造去共完成另一个Runnable任务),侧重点是多线程之间的相互等待。
不同点:
CountDownLatch是主线程被多线程阻塞,直到多线程执行完成才被唤醒继续执行,所以更关注主线程等待多线程执行完成再继续执行的场景;
CyclicBarrier是多线程各自被阻塞在栅栏前,是多线程之间的相互等待,直到全部的多线程全部执行完成,然后并发的去共同做某件事,比如:赛跑比赛的时候,需要等所有运动员都准备完成之后,才能开始进行比赛。
另外,CountDownLatch是一次性,而CyclicBarrier是可重复利用的(查看源码可以发现当最后一道栅栏被冲破之后,如果还需要用到的话会重新new Generation栅栏对象)
CountDownLatch 提供了一些方法:
await() :使当前线程进入同步队列进行等待,直到latch的值被减到0或者当前线程被中断,当前线程就会被唤醒。
await(long timeout, TimeUnit unit) :带超时时间的await()。
countDown():使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程。
getCount():获得latch的数值。
下面代码演示2个等待线程通过CountDownLatch去等待3个工作线程完成操作:
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
// 让2个线程去等待3个三个工作线程执行完成
CountDownLatch c = new CountDownLatch(3);
// 2 个等待线程
WaitThread waitThread1 = new WaitThread("wait-thread-1", c);
WaitThread waitThread2 = new WaitThread("wait-thread-2", c);
// 3个工作线程
Worker worker1 = new Worker("worker-thread-1", c);
Worker worker2 = new Worker("worker-thread-2", c);
Worker worker3 = new Worker("worker-thread-3", c);
// 启动所有线程
waitThread1.start();
waitThread2.start();
Thread.sleep(1000);
worker1.start();
worker2.start();
worker3.start();
}
}
/**
* 等待线程
*/
class WaitThread extends Thread {
private String name;
private CountDownLatch c;
public WaitThread(String name, CountDownLatch c) {
this.name = name;
this.c = c;
}
@Override
public void run() {
try {
// 等待
System.out.println(this.name + " wait...");
c.await();
System.out.println(this.name + " continue running...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 工作线程
*/
class Worker extends Thread {
private String name;
private CountDownLatch c;
public Worker(String name, CountDownLatch c) {
this.name = name;
this.c = c;
}
@Override
public void run() {
System.out.println(this.name + " is running...");
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.name + " is end.");
c.countDown();
}
}
运行结果
wait-thread-1 wait...
wait-thread-2 wait...
worker-thread-3 is running...
worker-thread-2 is running...
worker-thread-1 is running...
worker-thread-1 is end.
worker-thread-3 is end.
worker-thread-2 is end.
wait-thread-1 continue running...
wait-thread-2 continue running...
Process finished with exit code 0
二、并发容器集合
1、介绍
在java.util包下提供了⼀些同步容器类,其中Vector和HashTable是线程安全的容器类,但是这些容器实现同步的⽅式是通过对⽅法加锁(sychronized)⽅式实现的, 这样读写均需要锁操作,导致性能低下,即使是Vector这样线程安全的类,在⾯对多线程下的复合操作的时候也是需要通过客户端加锁的⽅式保证原⼦性
案例:
public class TestVector {
private Vector<String> vector;
//⽅法⼀
public Object getLast(Vector vector) {
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}
//⽅法⼆
public void deleteLast(Vector vector) {
int lastIndex = vector.size() - 1;
vector.remove(lastIndex);
}
//⽅法三
public Object getLastSysnchronized(Vector vector) {
synchronized(vector){
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}
}
//⽅法四
public void deleteLastSysnchronized(Vector vector) {
synchronized (vector){
int lastIndex = vector.size() - 1;
vector.remove(lastIndex);
}
}
}
如果⽅法⼀和⽅法⼆为⼀个组合的话。那么当⽅法⼀获取到了 vector 的size之 后,⽅法⼆已经执⾏完毕,这样就导致程序的错误。
如果⽅法三与⽅法四组合的话。通过锁机制保证了在 vector 上的操作的原⼦性。
针对queue、List
、Map
、Set
、Deque
等,java.util.concurrent包
提供了对应的并发集合类
interface | non-thread-safe | thread-safe |
List | ArrayList | CopyOnWriteArrayList |
Map | HashMap | ConcurrentHashMap |
Set | HashSet / TreeSet | CopyOnWriteArraySet |
Queue | ArrayDeque / LinkedList | ArrayBlockingQueue / LinkedBlockingQueue |
Deque | ArrayDeque / LinkedList | LinkedBlockingDeque |
整体架构
2、并发map
HashTable是线程安全的, 因为里面加了很多synchronized, 每次put操作, 都会锁整个Map
hashmap因为设计到Map的resize扩容, 多线程环境下, 同时进入扩容操作, 会出现问题
SynchronizedHashMap相当于有锁版的HashMap, 锁的粒度比HashTable小了一些
ConcurrentMap接⼝继承了Map接⼝,在Map接⼝的基础上⼜定义了四个⽅法:
public interface ConcurrentMap<K, V> extends Map<K, V> {
//插⼊元素,与原有put⽅法不同的是,putIfAbsent⽅法中如果插⼊的key相同,
则不替换原有的value值
V putIfAbsent(K key, V value);
//移除元素,与原有remove⽅法不同的是,新remove⽅法中增加了对value的判断,
如果要删除的key-value不能与Map中原有的key-value对应上,则不会删除该元素
boolean remove(Object key, Object value);
//替换元素,增加了对value值的判断,如果key-oldValue能与Map中原有的
key-value对应上,才进⾏替换操作
boolean replace(K key, V oldValue, V newValue);
//替换元素,与上⾯的replace不同的是,此replace不会对Map中原有的keyvalue进⾏⽐较,如果key存在则直接替换
V replace(K key, V value);
}
ConcurrentHashMap同HashMap⼀样也是基于散列表的map,但是它提供了⼀种 与HashTable完全不同的加锁策略提供更⾼效的并发性和伸缩性。
ConcurrentHashMap提供了⼀种粒度更细的加锁机制来实现在多线程下更⾼的性 能,这种机制叫分段锁(Lock Striping)。
提供的优点是:在并发环境下将实现更⾼的吞吐量,⽽在单线程环境下只损失⾮常 ⼩的性能。 可以这样理解分段锁,就是将数据分段,对每⼀段数据分配⼀把锁。当⼀个线程占 ⽤锁访问其中⼀个段数据的时候,其他段的数据也能被其他线程访问。 有些⽅法需要跨段,⽐如size()、isEmpty()、containsValue(),它们可能需要锁定 整个表⽽⽽不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,⼜按顺序释 放所有段的锁。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment 是⼀种可重⼊锁ReentrantLock,HashEntry则⽤于存储键值对数据。 ⼀个ConcurrentHashMap⾥包含⼀个Segment数组,Segment的结构和HashMap 类似,是⼀种数组和链表结构, ⼀个Segment⾥包含⼀个HashEntry数组,每个 HashEntry是⼀个链表结构(同HashMap⼀样,它也会在⻓度达到8的时候转化为 红⿊树)的元素, 每个Segment守护者⼀个HashEntry数组⾥的元素,当对 HashEntry数组的数据进⾏修改时,必须⾸先获得它对应的Segment锁。
在使用并发集合与使用非线程安全的集合类完全相同。首先以ConcurrentHashMap
为例:
Map<String, String> map = new ConcurrentHashMap<>();
// 在不同的线程读写:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");
因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:
Map<String, String> map = new HashMap<>();
改为:
Map<String, String> map = new ConcurrentHashMap<>();
就可以了。
java.util.Collections
工具类还提供了一个旧的线程安全集合转换器,可以这么用:
Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);
但是它实际上是用一个包装类包装了非线程安全的Map
,然后对所有读写方法都用synchronized
加锁,这样获得的线程安全集合的性能比java.util.concurrent
集合要低很多,所以不推荐使用。
ConcurrentNavigableMap接⼝继承了NavigableMap接⼝,这个接⼝提供了针对给 定搜索⽬标返回最接近匹配项的导航⽅法。
ConcurrentNavigableMap接⼝的主要实现类是ConcurrentSkipListMap类。从名字 上来看,它的底层使⽤的是跳表(SkipList)的数据结构。
关于跳表的数据结构这 ⾥不做太多介绍,它是⼀种”空间换时间“的数据结构,可以使⽤CAS来保证并发安 全性。
3、并发list
获取线程安全的List我们可以通过Vector、Collections.synchronizedList()方法和CopyOnWriteArrayList三种方式
读少写多的情况下,推荐使用Collections.synchronizedList()的方式
读多写少的情况下,推荐使用CopyOnWriteArrayList方
什么是CopyOnWrite机制, CopyOnWrite是计算机设计领域中的⼀种优化策略,也是⼀种在并发场景下常⽤的 设计思想——写⼊时复制思想。
那什么是写⼊时复制思想呢?就是当有多个调⽤者同时去请求⼀个资源数据的时 候,有⼀个调⽤者出于某些原因需要对当前的数据源进⾏修改,这个时候系统将会 复制⼀个当前数据源的副本给调⽤者修改。
CopyOnWrite容器即写时复制的容器,当我们往⼀个容器中添加元素的时候,不直接 往容器中添加,⽽是将当前容器进⾏copy,复制出来⼀个新的容器,然后向新容器 中添加我们需要的元素,最后将原容器的引⽤指向新容器。
这样做的好处在于,我们可以在并发的场景下对容器进⾏"读操作"⽽不需要"加 锁",从⽽达到读写分离的⽬的。从JDK 1.5 开始Java并发包⾥提供了两个使⽤ CopyOnWrite机制实现的并发容器 ,分别是CopyOnWriteArrayList和 CopyOnWriteArraySet
优点: CopyOnWriteArrayList经常被⽤于“读多写少”的并发场景,是因为 CopyOnWriteArrayList⽆需任何同步措施,⼤⼤增强了读的性能。在Java中遍历线 程⾮安全的List(如:ArrayList和 LinkedList)的时候,若中途有别的线程对List容器 进⾏修改,那么会抛出ConcurrentModificationException异常。 CopyOnWriteArrayList由于其"读写分离",遍历和修改操作分别作⽤在不同的List容 器,所以在使⽤迭代器遍历的时候,则不会抛出异常。
缺点: 第⼀个缺点是CopyOnWriteArrayList每次执⾏写操作都会将原容器进⾏拷⻉ 了⼀份,数据量⼤的时候,内存会存在较⼤的压⼒,可能会引起频繁Full GC(ZGC因为没有使⽤Full GC)。⽐如这些对象占⽤的内存⽐较⼤200M左右, 那么再写⼊100M数据进去,内存就会多占⽤300M。 第⼆个缺点是CopyOnWriteArrayList由于实现的原因,写和读分别作⽤在不同新⽼ 容器上,在写操作执⾏过程中,读不会阻塞,但读取到的却是⽼容器的数据。
现在我们来看⼀下CopyOnWriteArrayList的add操作源码,它的逻辑很清晰,就是 先把原容器进⾏copy,然后在新的副本上进⾏“写操作”,最后再切换引⽤,在此过 程中是加了锁的。
public boolean add(E e) {
// ReentrantLock加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 拷⻉原容器,⻓度为原容器⻓度加⼀
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 在新副本上执⾏添加操作
newElements[len] = e;
// 将原容器引⽤指向新副本
setArray(newElements);
return true;
} finally {
// 解锁
lock.unlock();
}
}
我们再来看⼀下remove操作的源码,remove的逻辑是将要remove元素之外的其他 元素拷⻉到新的副本中,然后切换引⽤,再将原容器的引⽤指向新的副本中,因为 remove操作也是“写操作”所以也是要加锁的。
public E remove(int index) {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
// 如果要删除的是列表末端数据,拷⻉前len-1个数据到新副本上,再切换引⽤
setArray(Arrays.copyOf(elements, len - 1));
else {
// 否则,将除要删除元素之外的其他元素拷⻉到新副本中,并切换引⽤
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
// 解锁
lock.unlock();
}
}
我们再来看看CopyOnWriteArrayList效率最⾼的读操作的源码
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}
由上可⻅“读操作”是没有加锁,直接读取。
4、并发set
Set的三个子类分别是:HaseSet、TreeSet、LinkedHashSet.这三个都是线程不安全的,底层其实都是Map的,Map是key-value键值对出现的。
Set集合怎么实现线程安全?
方案一:
和list一样,使用Colletcions这个工具类syn方法类创建个线程安全的set.
Set<String> synSet = Collections.synchronizedSet(new HashSet<>());
方案二:
使用JUC包里面的CopyOnWriteArraySet
Set<String> copySet = new CopyOnWriteArraySet<>();
另外JDK提供了ConcurrentSkipListSet,是线程安全的有序的集合。底层是使⽤ ConcurrentSkipListMap实现。
1.CopyOnWriteArraySet:无序,添加时内存占有率高,弱一致性。
2.ConcurrentSkipListSet:有序,需要维护索引,弱一致性。
5、并发queue
我们假设⼀种场景,⽣产者⼀直⽣产资源,消费者⼀直消费资源,资源存储在⼀个 缓冲池中,⽣产者将⽣产的资源存进缓冲池中,消费者从缓冲池中拿到资源进⾏消 费,这就是⼤名鼎鼎的⽣产者-消费者模式。
该模式能够简化开发过程,⼀⽅⾯消除了⽣产者类与消费者类之间的代码依赖性, 另⼀⽅⾯将⽣产数据的过程与使⽤数据的过程解耦简化负载。 我们⾃⼰coding实现这个模式的时候,因为需要让多个线程操作共享变量(即资 源),所以很容易引发线程安全问题,造成重复消费和死锁,尤其是⽣产者和消费 者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒⽣产者;当 缓冲池满了,我们需要阻塞⽣产者,唤醒消费者,这些个等待-唤醒逻辑都需要⾃ ⼰实现。(这块不明⽩的同学,可以看最下⽅结语部分的链接)
这么容易出错的事情,JDK当然帮我们做啦,这就是阻塞队列(BlockingQueue), 你只管往⾥⾯存、取就⾏,⽽不⽤担⼼多线程环境下存、取共享变量的线程安全问题。
BlockingQueue是Java util.concurrent包下重要的数据结构,区别于普通的队 列,BlockingQueue提供了线程安全的队列访问⽅式,并发包下很多⾼级同 步类的实现都是基于BlockingQueue实现的。
BlockingQueue⼀般⽤于⽣产者-消费者模式,⽣产者是往队列⾥添加元素的线程, 消费者是从队列⾥拿元素的线程。BlockingQueue就是存放元素的容器。
BlockingQueue阻塞队列提供了四组不同的⽅法⽤于插⼊、移除、检查元素:
- 抛出异常:如果试图的操作⽆法⽴即执⾏,抛异常。当阻塞队列满时候,再往 队列⾥插⼊元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空 时,从队列⾥获取元素时会抛出NoSuchElementException异常 。
- 返回特殊值:如果试图的操作⽆法⽴即执⾏,返回⼀个特殊值,通常是true / false。
- ⼀直阻塞:如果试图的操作⽆法⽴即执⾏,则⼀直阻塞或者响应中断。
- 超时退出:如果试图的操作⽆法⽴即执⾏,该⽅法调⽤将会发⽣阻塞,直到能 够执⾏,但等待时间不会超过给定值。返回⼀个特定值以告知该操作是否成 功,通常是 true / false。
注意:
- 不能往阻塞队列中插⼊null,会抛出空指针异常。
- 可以访问阻塞队列中的任意元素,调⽤remove(o)可以将队列之中的特定对象 移除,但并不⾼效,尽量避免使⽤
三、Atomic
Java的java.util.concurrent
包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic
包。
我们以AtomicInteger
为例,它提供的主要操作有:
- 增加值并返回新值:
int addAndGet(int delta)
- 加1后返回新值:
int incrementAndGet()
- 获取当前值:
int get()
- 用CAS方式设置:
int compareAndSet(int expect, int update)
Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。
如果我们自己通过CAS编写incrementAndGet()
,它大概长这样:
public int incrementAndGet(AtomicInteger var) {
int prev, next;
do {
prev = var.get();
next = prev + 1;
} while ( ! var.compareAndSet(prev, next));
return next;
}
CAS是指,在这个操作中,如果AtomicInteger
的当前值是prev
,那么就更新为next
,返回true
。如果AtomicInteger
的当前值不是prev
,就什么也不干,返回false
。通过CAS操作并配合do ... while
循环,即使其他线程修改了AtomicInteger
的值,最终的结果也是正确的。
我们利用AtomicLong
可以编写一个多线程安全的全局唯一ID生成器:
class IdGenerator {
AtomicLong var = new AtomicLong(0);
public long getNextId() {
return var.incrementAndGet();
}
}
通常情况下,我们并不需要直接用do ... while
循环调用compareAndSet
实现复杂的并发操作,而是用incrementAndGet()
这样的封装好的方法,因此,使用起来非常简单。
在高度竞争的情况下,还可以使用Java 8提供的LongAdder
和LongAccumulator
。