Java并发基础
并发:多个任务在同一时间段内交替执行
并行:多个任务在同一时刻同时执行
Java线程
创建方式
继承Thread和实现接口
继承Thread类并重写run
,之后调用start
方法启动线程,注意:调用run只是普通的方法调用,不会新开线程。例如如下匿名类继承Thread。
Thread thread = new Thread() {
@Override
public void run() {
// 线程执行的逻辑
System.out.println("Thread is running");
}
};
thread.start(); // 启动线程
这种方法局限在于不能再次调用start
再次执行该方法。 // TODO why,start的原理?
实现Runnable或者实现Callable接口
两者区别:前者没有返回值,
后者可以使用FutureTask<>
包裹Callable
,然后传递给Thread
执行或者Executor框架中的submit()方法提交Callable任务,并返回一个Future对象,通过该对象获得返回值。
实现Runnable例子,注意和上面的区别,是把匿名实现类当参数传给Thread。如果想多次执行该方法,再创建一个Thread传进去,调用start即可。 // TODO,一个线程能不能多次执行我给的方法?
Thread thread = new Thread(() -> {
Thread.sleep(10);
IntStream.range(0, 10).forEach(System.out::println);
});
thread.start();
原理:Thread的start
方法会调用run
,内部如下,target
是Runnable
类型的
@Override
public void run() {
if (target != null) {
target.run();
}
}
实现Callable
- 使用
FutureTask
和Thread
FutureTask<String> task = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "HEllo";
}
});
new Thread(task).start();
System.out.println(task.get());
- 使用
Future
和Executor
线程池。把Callable
对象提交给线程池后,会返回一个Future
,调用get
时如果没有执行完,当前线程会阻塞,和FutureTask
一样。如果执行时间短,最好用while(!task.isDone())
来忙等待,这样可以避免发生阻塞,省去多余的线程上下文切换时间。
// SingleThreadExecutor仅有的一个线程来执行任务
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> task = executorService.submit(new Callable<String>() {
@Override
public String call() throws InterruptedException {
Thread.sleep(500);
return "HEllo";
}
});
System.out.println(task.get());
executorService.shutdown();//关闭线程池
用线程池创建
创建例子上面给出过了,实现接口加Future
接收结果(如果有)。有5钟常见线程池,后面讲。
如果任务抛出过异常,Future.get()
时候也会抛出异常。
注:submit()
方法用于提交带返回值的任务,通过Future
对象可以获取任务的执行结果;execute()
方法用于提交不带返回值的任务,有异常也只能在任务里面做好捕获处理。
线程池中调用这两个方法之后,内部会将传入的Runnable
(我们的任务),封装成一个Worker
对象放入Hashset.
详细见[后文](# 复用线程的原理)
LifeCycle和基本方法
这两个放一起是因为生命周期有很多地方的转换是因为调用了某些线程方法。
注:图中几点说明
- Waiting和TimedWaiting区别:后者有最长时间,时间到线程会被唤醒,前者不会
- 进入Blocked的机会一般只有等待锁的时候
- 两个wait状态调用
notify
之后有两个箭头,一个时runnable
,一个blocked
,大概是因为被唤醒后又进入了等锁状态。这两条线似乎是多余的。
常用方法
Object的wait notify
调用该对象的wait
释放该对象的锁,notify
和notifyAll
唤醒所有等待该锁的线程
注:获取锁使用sychronized
Thread的interrupt stop
interrupt
: 其他线程调用该线程的此方法,中断标志位置为true
,若该线程处于阻塞或者等待状态,会抛出InterruptedException
,可以自行捕获异常处理,若处于正常运行状态,不会抛出异常。可见,此方法不会直接导致线程退出,接收到信号之后是否退出取决于线程中的逻辑,也可以不退出继续执行。线程可以使用Thread.interruptd
重置标记位为false
.
注意:有的阻塞状态不会响应中断,比如synchronized
stop
:强制中断,不建议使用。因为可能当前线程有重要任务或者资源在使用。
其他
sleep
setDaemon
join
线程池
概念 ref
- 核心线程数(Core Pool Size):表示核心线程的数量。所谓核心线程,是在"大多数情况下(未开启allowCoreThreadTimeOut参数)"都不会被销毁的线程(即使它们一直处于空闲状态);
- 最大线程数(Max Pool Size):当任务堆积,且池中没有空闲的核心线程来处理任务时,线程池会创建一些临时的线程来处理堆积的任务。这些临时的线程会在空闲一定时间后被销毁掉, 最大的临时线程数量为MaxPoolSize - CorePoolSize;
- 存活时间(Keep Alive Time):分为具体的数值和单位,如60s。表示临时线程空闲存活时间(在开启allowCoreThreadTimeOut参数后也表示核心线程的空闲存活时间)。 当临时线程空闲时间超过该值之后,就会被销毁掉;
- 工作队列(Work Queue):存放挤压的任务;
- 拒绝策略(Reject Handler):当线程池的线程数量已经达到上限(Max Pool Size),全部都处于非空闲的状态,且工作队列已满无法再堆积任务时, 会按照预先设定的方式拒绝新的任务
注:核心线程和非核心线程没有本质区别,主要是后者一个空闲一定时间后被销毁。
注注:线程池allowCoreThreadTimeOut(boolean)
如果设置true,核心线程空闲一定时也会被销毁。
加入新任务的流程
此流程对应线程池中execute()
- 先判断核心线程是否已满,否,直接执行
- 是,看工作队列是否已满,否,加入队列等待
- 是,查看能否创建临时线程,否,拒绝策略
- 是,创建临时线程并执行
- 是,查看能否创建临时线程,否,拒绝策略
- 是,看工作队列是否已满,否,加入队列等待
总结:核心线程->工作队列->临时线程
以上概念有两个需要仔细理解:工作队列,拒绝策略。
工作队列
在java队列的文章中具体学习
拒绝策略
工作的线程达到上限,并且队列也满,会对新来的执行拒绝策略。
默认提供四种。当然,也可以自己实现RejectedExecutionHandler
接口自定义策略。
-
AbortPolicy:默认的拒绝策略。会抛出RejectedExecutionException异常。
-
DiscardPolicy:静默拒绝策略。同AbortPolicy类似,但不会抛出异常。
-
CallerRunsPolicy:交由调用者所在线程来执行任务。源码
// 判断线程池是否关闭 if (!e.isShutdown()) { // 在线程池没有关闭的情况下,直接执行任务,注意:当前执行的线程是调用线程 r.run(); }
-
DiscardOldestPolicy:丢弃掉工作队列中等待时间最长的任务,也就是当前队列中的第一个任务,并将新的任务添加到队列尾部。
if (!e.isShutdown()) { e.getQueue().poll(); // 取出对头头部的数据 e.execute(r); // 提交新任务,注意,新任务会自然放入到队列尾部 }
测试,自己创建(后面讲如何创建)一个最大线程数为1,队列容量为1的线程池,然后提交3个任务。分别指定不同的策略看看执行结果。
// 交给调用者线程执行
RejectedExecutionHandler policy = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
policy);
Runnable runnable = () -> {
try {
System.out.println("任务由" + Thread.currentThread().getName() + "执行 then sleep");
Thread.sleep(1000 * 5); // 模拟任务一直占用线程
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
for (int i = 0; i < 3; ++i) {
executor.submit(runnable);
}
这个例子的输出就是
任务由main执行 then sleep
任务由pool-1-thread-1执行 then sleep
任务由pool-1-thread-1执行 then sleep
创建线程池
构造函数如下,对应上面5个概念,存活时间为值和单位。
另外注意ThreadFactory
是一个接口,只有一个Thread newThread(Runnable r);
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
也可以使用Executors
类预制的几个线程池,通过他们创建方法可以大概看出他们的特性。
但他们中有的存在一定问题,阿里《Java开发手册》中提到要手动用
ThreadPoolExecutor
创建比较好
FixedThreadPool
无界队列(LinkedBlockingQueue)。没有指定拒绝策略,则使用默认的AbortPolicy策略。
因为队列无限制,所以不会抛出RejectedExecutionException
,但是会出现OutOfMemoryError
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
// 调用
Executors.newFixedThreadPool();
CachedThreadPool
同步队列(SynchronousQueue),本质上无存储空间,存和取都会阻塞操作者,底层实现是先自旋,超过一定次数再加锁。
总线程数为INTMAX,抛出的东西同上
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
SingleThreadExecutor
只有一个线程,无界队列
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
ScheduledThreadPoolExecutor
继承ThreadExecutor
可执行定时任务的线程池。其中ScheduledThreadPoolExecutor.DelayedWorkQueue()
是私有内部类。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 10L, TimeUnit.MILLISECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}
该线程池两个核心方法
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
两者都是隔一段时间执行一次,区别:
- 前者的间隔时间是从每一次执行开始计算,如果时间到了,但是上一次还没执行完,则等待结束后再执行下一次。
- 后者间隔时间从一次执行结束后开始计算。
**WorkStealingPool **
是ForkJoin
框架的一部分。
特点是每个线程都有自己的任务队列,当某个线程完成了自己队列中的任务后,它可以从其他线程的队列中偷取任务来执行。
选用线程池
// TODO 根据不同场景选用不同的线程池
关闭线程池
线程池的所有状态
shutdown()会等待线程池和队列任务都完成,shutdownNow()会抛弃队列任务,尝试终止正在执行的任务,具体是调用线程的interrupt()
但具体是否终止,取决于线程本身。
复用线程的原理
线程池最大优点就是复用线程。原理是将Thread
和Runnable
封装进一个Worker
对象,这个对象也是实现了Runnable
接口,他的run
方法内部是一个死循环,该worker
中的线程负责执行这个run方法,并且不断从队列中读取任务并执行,达到线程复用的目的。
简单来说就是让一个线程死循环,在循环中读取队列任务并执行。
// 省略无关代码
public void execute(Runnable command) {
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 添加新的worker
return;
}
if (isRunning(c) && workQueue.offer(command)) {
}
}
解释 : 一开始会尝试addWorker
新建线程,如果失败就放入队列。内部如下,创建后会直接开始新建的线程
private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
// ...
if (workerAdded) {
t.start(); // 开启线程,这个线程相当于 new Thread(worker).start(); 只不过是Thread工厂创建的
}
}
}
Worker的run
实际调final void runWorker(Worker w)
,在该循环中会不断用getTask()
从队列获取任务然后执行
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
while (task != null || (task = getTask()) != null) {
// ....
task.run();
}
}
Java锁
锁的底层到底是什么
Java中每个对象都隐含关联一个监视器
ObjectMonitor
,监视器通过cpp
实现内置在JVM中,监视器地址记录在对象的MarkWord
上,synchronized
通过ObjectMonitor
实现对象的锁操作。
涉及知识:java对象的内存模型
MarkWord是什么
JVM在内存中将对象划为三部分:对象头、实例数据和填充数据。对象头分为MarkWord
和类型指针两部分。MarkWord
存储对象自身的运行数据,如哈希值、GC分代年龄等。对象在不同状态下,MarkWord中存储的信息不同,例如下图,最左边是状态,右边是该状态对应的存储信息。注意重量级锁定的状态中,有指向重量级锁的指针,即ObjectMonitor
对象的地址
ObjectMonitor是什么
内部有如下记录
ObjectMonitor() {
_header = NULL;
_count = 0; //记录个数
_waiters = 0,
_recursions = 0;
_object = NULL;
_owner = NULL; //持有Monitor的线程
_WaitSet = NULL; //处于wait状态的线程
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ; //处于等待锁block状态的线程
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
}
注意,以上都是cpp在JVM中的实现
一个通俗易懂的图
每个等待锁的线程都会被封装成ObjectWaiter对象,ObjectWaiter首先会进入 Entry Set等着,当线程获取到对象的
monitor
后把monitor
中的owner
变量设置为当前线程,同时monitor
中的计数器count
加1,若线程调用wait()
方法,将释放当前持有的monitor
,owner
变量恢复为null
,count
自减1,同时该线程进入 WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor
并复位变量的值,以便其他线程进入获取对象的monitor
。
到这里,大致理解了锁底层就是一个ObjectMonitor
对象,需要访问对象的线程排成队列,这个队列协调不同的线程按一定顺序规则访问对象,而协调的操作由JVM来做。
多线程共享数据时必然会出现竞争问题,于是就要使用锁来控制数据访问。
注:后续讲到的各种锁并不是非此即彼的关系,只是站在不同角度分类有不同的名字。
锁优化策略
有什么点可优化
互斥同步涉及的挂起/唤醒线程都涉及内核态转换,消耗大。jvm会根据同步块代码和锁的访问情况对锁进行不同程度的优化或者升级(轻到重),尽量避免系统的同步或者切换线程的操作。
-
锁消除和锁粗化都是在运行时的一些优化方案,比如我们某段代码虽然加了锁,但是在运行时根本不可能出现各个线程之间资源争夺的情况,这种情况下,完全不需要任何加锁机制,所以锁会被消除。锁粗化则是我们代码中频繁地出现互斥同步操作,比如在一个循环内部加锁,这样明显是非常消耗性能的,所以JVM一旦检测到这种操作,会将整个同步范围进行扩展。
-
偏向锁:目的是消除数据在无竞争情况下的同步原语。线程获取偏向锁后,持有锁的线程以后每次进入相应同步块时,都不需要再进行任何同步操作。偏向锁不会主动释放,只有当其他线程尝试获取锁时,才会检查持有线程是否可以释放锁。如可以释放则替换为新线程ID,不可释放则升级为轻量级锁。
-
轻量级锁:进入同步代码块前,JVM首先查看同步对象的
MarkWord
看看有没有被占用,如果没有,在当前线程栈帧当中建立一个锁记录Lock Record
,用于存储同步对象的MarkWord
的拷贝(why? 用于后面比较);然后虚拟机使用CAS
操作将对象的LockRecord
指针指向当前线程的栈帧(也就是尝试获取锁)。如果更新失败,则检查对象当前的MarkWord
和之前拷贝是否一致,如果不一致,说明别的线程抢先一步,就进入自旋并在自旋达到一定次数(自适应自旋锁)后升级为重量级锁。自旋的同时如果有第三个线程尝试获取锁,也会直接升级到重量级锁。为什么要用轻量级锁而不是重量级锁?
因为重量级锁请求失败会将线程挂起,而线程上下文切换涉及OS内核态转换,有一定耗时。所以如果线程很少,每个线程占用锁时间不长,就直接自旋一直占着CPU,等锁释放。
-
重量级锁:如果加锁作用于代码块,会调用
monitorenter
和monitorexit
这种耗时高的操作(一般比之前的自旋和CAS高)来进行同步。作用于方法则会给方法加上ACC_SYNCHRONIZED
标记,原理类似。
锁API
锁API和上面提到的锁不同,这些锁是Java API层面实现的锁机制。
Lock && Condition
Lock有如下方法,不同于synchronized
,Lock更像是一把真的锁,可以lock和unlock
public interface Lock {
//获取锁,拿不到锁会阻塞,等待其他线程释放锁,获取到锁后返回
void lock();
//同上,但是等待过程中会响应中断
void lockInterruptibly() throws InterruptedException;
//尝试获取锁,但是不会阻塞,如果能获取到会返回true,不能返回false
boolean tryLock();
//尝试获取锁,但是可以限定超时时间,如果超出时间还没拿到锁返回false,否则返回true,可以响应中断
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//暂时可以理解为替代传统的Object的wait()、notify()等操作的工具
Condition newCondition();
}
Condition:
public interface Condition {
//与调用锁对象的wait方法一样,会进入到等待状态,但是这里需要调用Condition的signal或signalAll方法进行唤醒(感觉就是和普通对象的wait和notify是对应的)同时,等待状态下是可以响应中断的
void await() throws InterruptedException;
//同上,但不响应中断(看名字都能猜到)
void awaitUninterruptibly();
//等待指定时间,如果在指定时间(纳秒)内被唤醒,会返回剩余时间,如果超时,会返回0或负数,可以响应中断
long awaitNanos(long nanosTimeout) throws InterruptedException;
//等待指定时间(可以指定时间单位),如果等待时间内被唤醒,返回true,否则返回false,可以响应中断
boolean await(long time, TimeUnit unit) throws InterruptedException;
//可以指定一个明确的时间点,如果在时间点之前被唤醒,返回true,否则返回false,可以响应中断
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个处于等待状态的线程,注意还得获得锁才能接着运行
void signal();
//同上,但是是唤醒所有等待线程
void signalAll();
}
注意signal
和Object的notify
一样,要有持有锁才能调用,而且需要是同一个Condition
对象,每次用Lock new的是不一样的。
Lock的实现类
ReentrantLock
可重入锁。同一个线程可以多次加锁和释放锁。完全释放后其他线程才能加锁。初始化的时候可以指定其是否为公平锁。即线程按照请求顺序获取锁。非公平锁则是每到一个新线程申请锁,则直接尝试,失败后再放入等待队列。
用例
Lock testLock = new ReentrantLock();
Condition condition = testLock.newCondition();
new Thread(() -> {
testLock.lock(); //和synchronized一样,必须持有锁的情况下才能使用await
try {
condition.await(); //进入等待状态
} catch (InterruptedException e) {
e.printStackTrace();
}
testLock.unlock();
}).start();
Thread.sleep(100); //防止线程2先跑
new Thread(() -> {
testLock.lock();
condition.signal(); //唤醒线程1,但是此时线程1还必须要拿到锁才能继续运行
testLock.unlock(); //这里释放锁之后,线程1就可以拿到锁继续运行了
}).start();
**ReentrantReadWriteLock **
读写锁。实现ReadWriteLock
没实现Lock
public interface ReadWriteLock {
//获取读锁
Lock readLock();
//获取写锁
Lock writeLock();
}
规则:如果只加了读锁,别的线程还可以继续加读锁,不可加写锁,如果有一个加了写锁,其他线程什么锁都不能加,当然,他自己依然可以加读锁,并且可以加完之后把写锁释放,这叫锁降级。
Lock底层原理:AQS队列同步器
GPT :
Java中的同步器主要有两种实现:基于内置的synchronized关键字的隐式监视器锁(monitor lock)和基于显示的同步器(如AbstractQueuedSynchronizer,简称AQS)。
AQS内部不使用底层提供的monitor
,而是自身维护队列,当线程调用lock()
方法来申请锁,便会触发AQS的一系列判断,如果不符合条件,比如已经有线程申请了。那就将该线程阻塞(这个是要用底层API的),放入等待队列。另外,线程之间竞争锁的时候也要用底层CAS操作。
我们可以通过AQS实现自己的锁,只需要实现不多的几个方法即可。
其他基于AQS实现的"锁"
CyclicBarrier && CountDownLatch
简单说前者用于同步(多个线程),后者用于等待(一个线程等其他线程)
前者可以在多个线程中调用其await
方法,用于等待其他线程也都调用这个函数。
后者可以在多个线程中调用其countDown
方法,然后在等待线程中调用await
方法,当计数器为0,该线程继续执行。
Samphore
使用aquire
和release
方法,原理类似信号量机制。
Fork/Join并发框架
方便大任务分成小任务并发执行。主要由以下几个类。
ForkJoinPool
:实现ExecutorService
是线程池。不会为每个任务分配一个线程,会分开放到队列中让不同线程执行,创建线程池可以指定LIFO还是FIFO。ForkJoinTask
:实现Future
,核心方法:fork
和join
,子类有RecursiveAction
(有返回值)和RecusiveTask
(无返回值)ForkJoinWorkerThread
:继承Thread,是线程池中的工作线程,拥有自己的工作队列WorkQueue
:保存当前线程的工作。当前线程完成工作会窃取其他线程的队列中的工作来执行
用例:
- 和普通线程池一样,这里也要提交任务,只不过任务是继承了他规定的
ForkJoinTask
的类 - 在任务内部可以调用
fork
来递归产生子任务,join
等待子任务返回 - 注意,接收
pool
submit返回值用的是ForkJoinTask
,对应Future,而任务内部接收的join
返回值就是目标类型long
。 - 窃取其他线程工作的时候是从队尾窃取,减少和原线程的冲突。
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> rootTask = forkJoinPool.submit(new SumForkJoinTask(1L, 10_0000_0000L));
System.out.println("计算结果:" + rootTask.get());
}
}
class SumForkJoinTask extends RecursiveTask<Long> {
private final Long min;
private final Long max;
private Long threshold = 1000L;
public SumForkJoinTask(Long min, Long max) {
this.min = min;
this.max = max;
}
// 继承对应的task,重写compute方法
@Override
protected Long compute() {
// 小于阈值时直接计算
if ((max - min) <= threshold) {
long sum = 0;
for (long i = min; i < max; i++) {
sum = sum + i;
}
return sum;
}
// 拆分成小任务
long middle = (max + min) >>> 1;
SumForkJoinTask leftTask = new SumForkJoinTask(min, middle);
leftTask.fork();
SumForkJoinTask rightTask = new SumForkJoinTask(middle, max);
rightTask.fork();
// 汇总结果
return leftTask.join() + rightTask.join();
}
}
流
简化代码,并行流还能提高计算效率。支持如下操作
- 中间操作
filter
map
distinct
skip
sort
sorted(cmp)
....
- 终端操作
count
findFirst
forEach forEachOrdered
(按顺序)reduce(根据指定的计算模型计算Stream中的每一个值,参数1初始值,参数2计算模型)
max min
allMatch
.....
中间操作用于将一个流转成另一个流,执行终端操作时才会真正计算。
分为并行和串行流。
创建流的几种方式
Arrays.asList(1,2,3).stream(); //集合或者数组
Stream.of(1, 2, 3); // of方法
Stream.iterate(0, n->n + 2); //iterate方法
Files.lines(Paths.get("D://a.txt"));//用文件创建
串行
Arrays.asList(1, 2, 3).stream().filter(n -> n > 1).count();
并行
Arrays.asList(1, 2, 3).stream().parallel().filter(n -> n > 1).count();
并行流适合CPU密集型计算,太简单的计算,和cpu上下文切换耗时比起来比较亏。而且并行流用的是全局线程池,可能会影响到别的线程执行。
可以用所谓的线程池隔离技术,其实就是另外用ForkJoinPool
来提交任务并执行。例如
int sum = forkJoinPool.submit(()->Arrays.asList(1, 3, 4).parallelStream().reduce(0, Integer::sum)).get();
标签:Java,Thread,队列,并发,任务,线程,new,执行
From: https://www.cnblogs.com/BayMax0-0/p/17780964.html