多线程进阶 JUC
1. 什么是JUC
三个包:
- java.util.concurrent
- java.util.concurrent.atomic
- java.util.concurrent.locks
另外加上java.util.function
2.线程和进程
Java默认2个线程:main+GC
Java可以开启线程吗?-不行 -> 调用 native start0()
利用c++调用底层资源
并发、并行
并发:多个对象操作同一个资源
并行:多个对象同时执行
/**
* 获取cpu核数
* 密集型/IO密集型
*/
Runtime.getRuntime().availableProcessors();
并发的本质:充分利用cpu的资源
线程有几个状态
enum State
=>
- NEW 新生
- RUNNABLE 运行
- BLOCKED 阻塞
- WAITING 等待
- TIMED_WAITING 超时等待
- TERMINATED 终止
wait和sleep区别
-
来自不同的类
wait : Object
sleep : Thread
-
锁是否释放
wait释放锁 但是sleep不会释放
-
使用的范围不同
wait必须在同步代码块中执行
sleep可以在任何地方调用
-
是否需要捕获异常
wait不需要捕获
sleep需要捕获-超时
3.Lock锁 ⭐
线程是一个单独的资源类 没有附属操作
同步操作:
-
方法上加
synchronized
-
Lock
加锁 实现类有- ReentrantLock
- ReentrantReadWriteLock.ReadLock
- ReentrantReadWriteLock.WriteLock
公平锁与非公平锁:默认非公平锁(执行顺序不遵循FIFO)
使用:
class ObjectDemo{ private Lock lock = new ReentrantLock(); public void operate() { lock.lock(); try { doSomething(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } ... }
-
二者的区别
- synchronized是内置的关键字 Lock是一个类
- synchronized无法判断锁的状态 Lock可以判断
- Synchronized会自动释放锁 Lock必须手动释放
- Synchronized占用锁后其他线程一直等待、而Lock中可以尝试获取不一定要持续等待
- Synchronized可重入锁、非公平、不可中断,Lock可以定制(可重入:线程释放锁之前再去竞争锁不需要等待只需要记录重入次数)
- Synchronized适合少量同步问题 Lock适合大量的同步代码
-
生产者和消费者问题
实现方式:1.Synchronized 2.Lock
-
Synchronized方式 两个以上的线程并发时出现问题
wait()
时可能发生虚假唤醒 等待应该出现在while循环中- if判断改为while判断
-
JUC方式
class Data{ private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment(){ lock.lock(); try{ while(number != 0){ condition.await(); } number++; condition.signalAll(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } public void decrement(){ lock.lock(); try{ while(number == 0){ condition.await(); } number--; condition.signalAll(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } }
-
-
Condition的优势
定向唤醒 精准通知
public class C { public static void main(String[] args) { Data3 data = new Data3(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printC(); } }, "C").start(); } } class Data3 {// 资源类 Lock private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private int number = 1; public void printA() { lock.lock(); try { //业务,判断->执行->通知 while (number != 1) { //等待 condition1.await(); } System.out.println(Thread.currentThread().getName() + "--->A"); //唤醒指定的人:B number = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { //业务,判断->执行->通知 while (number != 2) { //等待 condition2.await(); } System.out.println(Thread.currentThread().getName() + "--->B"); //唤醒指定的人:C number = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { //业务,判断->执行->通知 while (number!=3){ //等待 condition3.await(); } System.out.println(Thread.currentThread().getName()+"--->C"); number=1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
4.深入理解锁
- 实例方法上声明Synchronized时占用的锁时当前的对象锁
- 静态方法上声明Synchronized占用的锁是class对象类锁
5.不安全集合类
并发修改问题
List
不安全问题
-
new Vector()
-
工具类转换
Collections.synchronizedList(list_obj)
-
写入时复制(COW 读写分离)
new CopyOnWriteArrayList()
-
实现:->
private transient volatile Object[] array;
-
相比
Vector
的优势:-
Vector 的曾删改查方法都加上了synchronized锁,保证同步的情况下,因为每个方法都要去获得锁,所以性能就会大大下降。
-
CopyOnWriteArrayList 方法只是在增删改方法上增加了ReentrantLock锁,但是他的读方法不加锁,所以在读的方面就要比Vector性能要好,CopyOnWriteArrayList适合读多写少的并发情况,读写分离,在写的时候复制出一个新的数组,完成插入、修改、删除操作,在完成操作后,将这个新的数组赋值给一个array。
-
-
Set
不安全问题
- HashSet底层是HashMap
- add方法实现就是map.put(e, PRESENT);
- 工具类转换
Collections.synchronizedSet(set_obj)
new CopyOnWriteArraySet()
Map
不安全问题、
- 默认初始加载因子
0.75f
,初始容量1 << 4
- 工具类转换
Collections.synchronizedMap(map_obj)
new ConcurrentHashMap()
- HashTable是一个线程安全的类 使用synchronized锁住整张表 效率低下
- ConcurrentHashMap做到读取不加锁(volatile) 写操作时锁的粒度尽可能小-锁分段技术,jdk1.7使用内部段(Segment)实现,每个段其实就是一个小的HashTable;jdk1.8摒弃分段锁,降低了锁的粒度,采用CAS操作结合synchronized块。
6.Callable
- 类似
Runnable
,用于创建新线程 - 有返回值和异常抛出
- 使用 利用适配器
FutureTask
class CallableTest {
public static void main(String[] args) {
FutureTask task = new FutureTask(new MyThread());
new Thread(task, "thread name").start();
String res = (String)task.get(); // 等待结果返回可能产生阻塞
}
}
class MyThread implements Callable<String> {
@Override
public String call() {
return "test for callable.";
}
}
- 结果有缓存
7.常用辅助类
7.1 CountDownLatch
- 减法计数器
- 初始构造传入计数总数num 调用
countDown()
减一 await()
等待计数器归零然后向下执行
7.2 CyclicBarrier
- 循环栅栏 实现一组线程相互等待 所有线程都到达某个“屏障点”时再执行后续操作
- 构造传入parties计数和到达屏障点时执行的操作
await()
等待到达屏障点
7.3 Semaphore
- 信号量(许可证)- 用于限流 | 共享资源互斥使用
- 构造传入线程数量permits
- 占用
acquire()
,如果没有则会等待 - 释放
release()
8.读写锁
ReadWriteLock
实现类 ReentrantReadWriteLock
,只能同时读,不能同时写
readLock().lock()
独占锁 一次只能由一个线程占有writeLock().lock()
共享锁 一次可以由多个线程占用
实现原理:
ReadLock
和WriteLock
表面上是两把锁,实际上是同一把锁的两个视图- state同步状态高16位表示读、低16位表示写(CAS无法同时操作两个变量)
- 即高16位记录读锁占用数量、低16位记录写锁的占用情况
- 当然这里只有16位所以存在上限2^16 如果超过它那么就抛出异常
9.阻塞队列
接口
BlockingQueue<E>
BlockingDeque<E>
实现类
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
-
使用情境:多线程并发处理、线程池
-
apis
add()
队列满了再添加抛出异常IllegalStateException
remove()
队列空了再移除抛出异常NoSuchElementException
offer()
不抛出异常添加 可传入超时时间poll()
不抛出异常去除 可传入超时时间element()
检测队首元素 可能抛出异常peek()
检测队首元素 不抛出异常put()
阻塞添加 等待可添加的时机take()
阻塞取出 等待可取的时机
同步队列
SynchronousQueue
没有容量 只能存放一个元素
take()
put()
10.线程池⭐
池化技术:事先创建好资源,即用即取
- 默认大小:2
- 最大:diy
- 优点:线程复用、并发控制、管理线程
- 降低资源消耗
- 提高响应速度
- 方便管理
线程池三大方法
- 阿里巴巴开发手册:不允许使用Executors创建线程池 通过ThreadPoolExecutor创建 避免资源耗尽
- Executor允许的队列长度为Integer.MAX_VALUE 可能导致OOM
Executors.newSingleThreadExecutor()
单个线程Executors.newFixedThreadPool(n)
创建n个线程的池Executors.newCachedThreadPool()
创建可伸缩的线程池
7大参数
- 上述创建方法其实还是调用的还是
ThreadPoolExecutor
构造的线程池 这个构造需要7个参数
int corePoolSize
核心线程池大小int maximumPoolSize
最大核心线程池大小long keepAliveTime
超时时间(超时释放)TimeUnit unit
超时单位BlockingQueue<Runnable> workQueue
阻塞队列ThreadFactory threadFactory
线程工厂RejectedExecutionHandler handler
拒绝策略
手动创建线程池
ThreadPoolExecutor threadService = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() // 阻塞队列满了 但仍然有需求 - 不处理 抛异常
);
四种拒绝策略
ThreadPoolExecutor.AbortPolicy
满了拒绝 同时抛出异常RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy
返回提交处来执行任务ThreadPoolExecutor.DiscardPolicy
队列满了丢弃 且不抛出异常ThreadPoolExecutor.DiscardOldestPolicy
如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务(最早加入的)
线程池大小应该如何定义
- 操作类型
- cpu密集型:获取cpu核数量
Runtime.getRuntime().availableProcessors()
- io密集型:大于io密集线程的任务数量
- cpu密集型:获取cpu核数量
11.四大函数式接口⭐
现代编程:lambda表达式、链式编程、函数式接口、Stream流式计算
- default关键字 接口中默认实现的方法
函数式接口:只有一个方法的接口、简化编程模型,函数式接口可以用lambda简化
- 函数型
Function<T, R>
R apply(T t); - 断定型
Predicate<T>
boolean test(T t); - 消费型
Consumer<T>
void accept(T t); - 供给型
Supplier<T>
T get();
12.流式计算
大数据的计算应该交给流来操作 具体使用查看springcloud笔记第九节
13.ForkJoin分支合并
概念:大数据量中并行执行任务,提高效率,工作窃取
-
利用
ForkJoinPool
执行RecursiveAction
事件(无返回值)|RecursiveTask
任务(有返回值) -
使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入指定的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
-
ForkJoinPool
能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行 -
使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务
工作窃取算法:假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
使用示例:
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
14.异步回调
CompletableFuture.runAsync(()->{})
CompletableFuture.supplyAsync(()->{})
| 获取返回值futureObj.get()
- 异常处理:
futureObj.whenComplete((t,u)->{}).exceptional(e->{}).get()
15.JMM
谈谈对volatile的理解
- 虚拟机提供的轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
什么是JMM:Java内存模型 只是一个抽象的概念
约定:
- 线程解锁前,必须把共享变量立刻刷回主存
- 线程加锁前,必须读取主存中的最新值到工作内存中
- 加锁和解锁是同一把锁
八种指令:
- lock 线程独占某资源
- unlock 线程释放某资源
- read 将变量从主内存传输到工作内存
- load 讲read的变量放入工作内存
- use 将变量传输给执行引擎
- assign 讲执行引擎放入工作内存
- store 将工作内存中的变量传送到主内存
- write 写入主内存
16.volatile
- 保证可见性:多线程之间对共享变量的修改能及时通知,保证数据一致性
- 不保证原子性:对变量的同时操作不是线程安全的
==> 使用原子类解决原子性的问题 AtomicInteger
,高效安全!(再底层涉及unsafe
~)
- 禁止指令重排
- 概念:编译器优化重排代码
- 处理器进行指令重排的时候,考虑的是数据之间的依赖性
- volatile可以避免指令重排-内存屏障(保证特定的执行顺序、保证变量的内存可见性)
17.单例模式
懒汉式在多线程模式下的问题
双重监测锁模式:DCL懒汉式
...
private static volatile DemoClass singleton; // 防止指令重排
public static DemoClass getSingleton() {
if (singleton == null) {
synchronized(DemoClass.class) {
if (singleton == null) {
singleton = new DemoClass(); // 此操作非原子操作
}
}
}
return singleton;
}
...
防止反射破坏单例模式
使用枚举:反射newInstance()
源码中有一条-禁止使用反射创建枚举对象
public enum EnumSingleDemo {
INSTANCE;
public EnumSingleDemo getInstance() {
return INSTANCE;
}
}
18.深入理解CAS
概念:CompareAndSwap ,比较工作内存中的值和主内存中的值,如果值符合期望,就赋值。否则循环(自旋锁)。它是一条CPU的并发原语。
源码关键字:自旋、unsafe类
缺点:循环耗时(如果线程冲突严重会带来极大的cpu开销)、一次只能保证一个共享变量的原子性、ABA问题
ABA问题:
比如有两个线程A、B:
- 一开始都从主内存中拷贝了原值为3;
- A线程执行到
var5=this.getIntVolatile
,即var5=3。此时A线程挂起;- B修改原值为4,B线程执行完毕;
- 然后B觉得修改错了,然后再重新把值修改为3;
- A线程被唤醒,执行
this.compareAndSwapInt()
方法,发现这个时候主内存的值等于快照值3,(但是却不知道B曾经修改过),修改成功。尽管线程A CAS操作成功,但不代表就没有问题。有的需求,比如CAS,只注重头和尾,只要首尾一致就接受。但是有的需求,还看重过程,中间不能发生任何修改。这就引出了
AtomicReference
原子引用。
19.原子引用
AtomicReference
| AtomicStampedReference
AtomicStampedReference
解决ABA问题:这个类维护了一个“版本号”Stamp
,其实有点类似乐观锁的意思。
在进行CAS操作的时候,不仅要比较当前值,还要比较版本号。只有两者都相等,才执行更新操作。
20.可重入锁
某个线程已经获得某个锁,可以再次获取锁而不会出现死锁,AQS会判断是否为同一个线程
21.自旋锁
自旋锁的提出背景
由于在多处理器环境中某些资源的有限性,有时需要互斥访问(mutual exclusion),这时候就需要引入锁的概念,只有获取了锁的线程才能够对资源进行访问,由于多线程的核心是CPU的时间分片,所以同一时刻只能有一个线程获取到锁。那么就面临一个问题,那么没有获取到锁的线程应该怎么办?
通常有两种处理方式:一种是没有获取到锁的线程就一直循环等待判断该资源是否已经释放锁,这种锁叫做自旋锁,它不用将线程阻塞起来(NON-BLOCKING);还有一种处理方式就是把自己阻塞起来,等待重新调度请求,这种叫做互斥锁
。
什么是自旋锁
自旋锁的定义:当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被别人获取(占用),那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。这种采用循环加锁 -> 等待的机制被称为自旋锁(spinlock)
。
原理
自旋锁的原理比较简单,如果持有锁的线程能在短时间内释放锁资源,那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞状态,它们只需要等一等(自旋),等到持有锁的线程释放锁之后即可获取,这样就避免了用户进程和内核切换的消耗。
22.死锁排查
- 使用
jps -l
定位进程号 - 使用
jstack 进程号
查看堆栈信息(可以检查是否存在死锁)