JUC并发编程
1. 常见的加锁方式
1.1 synchronized关键字
要求:多个线程并行执行,依次实现对数字的+1、-1操作。即一次+1、一次-1,依次执行。
Share类
class Share {
private int number = 0;
public synchronized void incr() throws InterruptedException { // +1
while (number != 0) { //用wait不能用if,避免虚假唤醒
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + " : " + number);
this.notifyAll();
}
public synchronized void decr() throws InterruptedException { // -1
while (number != 1) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + " : " + number);
this.notifyAll();
}
}
创建主类
public class Demo04 {
public static void main(String[] args) {
Share share = new Share();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
share.incr();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "cao1").start();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
share.decr();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "chen1").start();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
share.incr();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "cao2").start();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
share.decr();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "chen2").start();
}
}
运行结果
cao1 : 1
chen2 : 0
cao2 : 1
chen1 : 0
cao2 : 1
chen1 : 0
cao2 : 1
chen1 : 0
...
1.2 读写锁
实现卖票
LTicket 类
class LTicket {
private int number = 30;
//默认创建的是非公平锁,new ReentrantLock(true)创建的是公平锁
/**
* 非公平锁:会造成线程饿死,效率高
* 公平锁:阳光普照,效率相对低
*/
private final Lock lock = new ReentrantLock();
public void sale() {
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出:" + number--);
}
} finally {
lock.unlock();
}
}
}
主类
public class Demo03 {
public static void main(String[] args) {
LTicket lTicket = new LTicket();
Thread cao = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
lTicket.sale();
}
}
}, "cao");
cao.start();
Thread chen = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
lTicket.sale();
}
}
}, "chen");
chen.start();
System.out.println("main");
}
}
运行结果
main
cao卖出:30
cao卖出:29
cao卖出:28
cao卖出:27
cao卖出:26
cao卖出:25
cao卖出:24
...
多个线程实现对number的+1、-1操作,要求+1、-1操作轮流执行。
LShare类
class LShare{
private int number = 0;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
// +1
public void incr() throws InterruptedException {
lock.lock();
try {
while (number != 0){
condition.await();
}
number ++ ;
System.out.println(Thread.currentThread().getName() + " : " + number);
condition.signalAll();
}finally {
lock.unlock();
}
}
// -1
public void decr() throws InterruptedException {
lock.lock();
try {
while (number != 1){
condition.await();
}
number -- ;
System.out.println(Thread.currentThread().getName() + " : " + number);
condition.signalAll();
}finally {
lock.unlock();
}
}
}
主类
public class Demo05 {
public static void main(String[] args) {
LShare lShare = new LShare();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
lShare.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "cao1").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
lShare.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "chen1").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
lShare.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "cao2").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
lShare.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "chen2").start();
}
}
运行结果
cao1 : 1
chen1 : 0
cao2 : 1
chen2 : 0
cao1 : 1
chen1 : 0
cao2 : 1
chen2 : 0
cao1 : 1
chen1 : 0
cao2 : 1
chen2 : 0
...
多个线程并行,依次打印一次AA,两次BB、三次CC
ShareResource类
class ShareResource{
/**
* 1 -> A
* 2 -> B
* 3 -> C
*/
private int flag = 1; // 初始时,为1,唤醒打印AA线程
/**
* 创建锁
*/
private Lock lock = new ReentrantLock();
/**
* 创建condition
*/
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
/**
* 打印
*/
public void print1(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 1){
c1.await();
}
for (int i = 0; i < 1; i++) {
System.out.println(Thread.currentThread().getName() + " : " + i + " 轮数: " + loop);
}
flag = 2;
c2.signal();
}finally {
lock.unlock();
}
}
public void print2(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 2){
c2.await();
}
for (int i = 0; i < 2; i++) {
System.out.println(Thread.currentThread().getName() + " : " + i + " 轮数: " + loop);
}
flag = 3;
c3.signal();
}finally {
lock.unlock();
}
}
public void print3(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 3){
c3.await();
}
for (int i = 0; i < 3; i++) {
System.out.println(Thread.currentThread().getName() + " : " + i + " 轮数: " + loop);
}
flag = 1;
c1.signal();
}finally {
lock.unlock();
}
}
}
主类
public class Demo06 {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
shareResource.print1(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}, "AA").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
shareResource.print2(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}, "BB").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
shareResource.print3(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}, "CC").start();
}
}
运行结果
AA : 0 轮数: 0
BB : 0 轮数: 0
BB : 1 轮数: 0
CC : 0 轮数: 0
CC : 1 轮数: 0
CC : 2 轮数: 0
AA : 0 轮数: 1
BB : 0 轮数: 1
BB : 1 轮数: 1
CC : 0 轮数: 1
CC : 1 轮数: 1
CC : 2 轮数: 1
...
注意: synchronized 锁的范围,不同的对象,类;可以将类理解为特殊的对象
对于普通同步方法,锁是当前实例的对象
对于静态同步方法,锁是当前类的Class对象
对于同步方法块,锁是synchronized括号里配置的对象
2. 线程安全的数据结构
// 写时复用技术,List
CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
// Set
CopyOnWriteArraySet<String> strings = new CopyOnWriteArraySet<>();
// Map
ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
3. 可重入锁
synchronized(隐式)和Lock(显式)都是可重入锁(递归锁)
可重入锁(Reentrant Lock),也称为递归锁,是一种支持同一个线程多次获取同一个锁的锁机制。
在并发编程中,当一个线程获得了锁之后,如果再次尝试获取同一个锁时,可重入锁会允许该线程继续获取锁而不会被阻塞。这种机制允许线程在执行过程中多次获取同一个锁,并且在释放锁之前需要相同次数的解锁操作。
可重入锁的主要目的是解决在递归调用或嵌套代码中的锁定问题。当一个线程已经获得了锁,但在持有锁的代码块中又调用了另一个需要同样锁的方法时,如果使用非可重入锁,线程会因为无法再次获得同一个锁而陷入死锁状态。而可重入锁允许线程多次获得同一个锁,避免了死锁问题。
可重入锁的实现通常会记录持有锁的线程和持有锁的次数。当线程再次请求获取锁时,会检查当前线程是否已经持有锁,如果是,则增加持有锁的计数器;否则,线程会被阻塞直到获取到锁。
在Java中,ReentrantLock类是可重入锁的一种实现方式。它提供了与synchronized相似的功能,但具备更高的灵活性和扩展性。通过调用lock()方法获取锁,再调用unlock()方法释放锁,可以实现多次获取和释放同一个锁的操作。
可重入锁的使用可以简化代码逻辑,提高灵活性,并且能够避免死锁问题。然而,需要注意避免过度的锁嵌套,以免引起性能问题或潜在的死锁风险。
Lock lock = new ReentrantLock();
new Thread(() -> {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " -> 外层");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " -> 中层");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " -> 内层");
}finally {
lock.unlock();
}
}finally {
lock.unlock();
}
}finally {
lock.unlock();
}
}, "t2").start();
执行结果
t2 -> 外层
t2 -> 中层
t2 -> 内层
Object object = new Object();
new Thread(() -> {
synchronized (object){
System.out.println(Thread.currentThread().getName() + " -> 外层");
synchronized (object){
System.out.println(Thread.currentThread().getName() + " -> 中层");
synchronized (object){
System.out.println(Thread.currentThread().getName() + " -> 内层");
}
}
}
}, "t1").start();
执行结果
t1 -> 外层
t1 -> 中层
t1 -> 内层
4. 线程死锁
死锁:两个或者两个以上的进程因为互相争夺资源而造成一种互相等待的现象,如果没有外力干涉,将无法继续执行下去。
原因:
系统资源不足
进程运行推进顺序不合适
资源分配不当
命令行查看程序是否是死锁的方法:
jps -l 查看当前阻塞程序的id
jstack id 查看是否为死锁的详细信息
主类
public class Demo09 {
static final Object object1 = new Object();
static final Object object2 = new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (object1){
System.out.println(Thread.currentThread().getName() + "拥有object1,试图获取object2");
try {
Thread.sleep(100); // 该线程休眠一段时间,确保两个线程能同时运行
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (object2){
System.out.println(Thread.currentThread().getName() + "成功获取object2");
}
}
}, "t1").start();
new Thread(() -> {
synchronized (object2){
System.out.println(Thread.currentThread().getName() + "拥有object2,试图获取object1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (object1){
System.out.println(Thread.currentThread().getName() + "成功获取object1");
}
}
}, "t2").start();
}
}
执行结果
t1拥有object1,试图获取object2
t2拥有object2,试图获取object1
此时程序处于死锁状态,并未执行结束
PS D:\Project\java\syncDemo> jps -l
14480 org.jetbrains.jps.cmdline.Launcher
3092 org.jetbrains.idea.maven.server.RemoteMavenServer36
15032 jdk.jcmd/sun.tools.jps.Jps
4280
14700 sync.Demo09
PS D:\Project\java\syncDemo> jstack 14700
2024-05-25 16:21:01
Full thread dump Java HotSpot(TM) 64-Bit Server VM (17.0.10+11-LTS-240 mixed mode, sharing):
Threads class SMR info:
_java_thread_list=0x00000252a08ecc30, length=15, elements={
0x00000252a06a21b0, 0x00000252a06a3030, 0x00000252a06bf1f0, 0x00000252a06c11c0,
...
Found 1 deadlock.
5. 创建线程的方式
- 1.继承Thread类、实现Runnable接口
- 2.通过Callable接口(可以有线程的返回值)
- 3.线程池方式
Runnable和Callable接口的区别
- 1.是否有返回值
- 2.是否抛出异常
- 3.实现方法的名称不同,一个是run,一个是call
找到一个类,即和Runnable有关系,又和Callable有关系
Runnable接口有实现类FutureTask
FutureTask构造可以传递Callable
代码
public class Demo10 {
public static void main(String[] args) throws Exception {
/* FutureTask<Object> futureTask = new FutureTask<>(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println(Thread.currentThread().getName() + " come in callable");
return "hello";
}
});*/
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + " come in callable");
// if (true) {
// throw new Exception("callable 异常");
// }
return "hello";
});
new Thread(futureTask, "cao").start();
// 线程未执行完毕(包括正常结束和异常结束),futureTask.isDone()返回当前任务是否已完成
while (!futureTask.isDone()) {
System.out.println("运行中...");
}
// 第一次调用get获取返回值时,等待线程执行完毕后返回结果
// 第二次调用,直接返回结果
// 如果 futureTask.cancel() 导致线程退出,futureTask.get()报错
System.out.println("运行结果1为:" + futureTask.get());
System.out.println("运行结果2为:" + futureTask.get());
System.out.println("---------------");
}
}
运行结果
运行中...
运行中...
cao come in callable
运行中...
运行中...
运行中...
运行中...
运行中...
运行结果1为:hello
运行结果2为:hello
---------------
6. JUC强大的辅助类
6.1 减少计数CountDownLatch
模拟班级里所有学生走完之后,班长才可以锁门
public class Demo11 {
public static void main(String[] args) throws InterruptedException {
// 创建CountDownLatch对象,设置初始值
CountDownLatch countDownLatch = new CountDownLatch(6);
// 6个同学陆续离开教室之后,班长才可以锁门
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " -> 离开");
// 计数器 -1
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "班长锁门");
}
}
运行结果
5 -> 离开
3 -> 离开
0 -> 离开
4 -> 离开
1 -> 离开
2 -> 离开
main班长锁门
6.2 循环栅栏CyclicBarrier
模拟集齐七颗龙珠可以召唤神龙
public class Demo12 {
private static final int NUMBER = 7;
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
// 方式1
// 创建CyclicBarrier对象
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, new Runnable() {
@Override
public void run() {
System.out.println("集齐7颗龙珠可以召唤神龙");
}
});
// 集齐7颗龙珠的过程
for (int i = 0; i < NUMBER; i++) {
new Thread(() -> {
//等待
try {
System.out.println(Thread.currentThread().getName() + " 集齐一颗龙珠");
int await = cyclicBarrier.await(); // 等待集齐所有的龙珠
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}, String.valueOf(i)).start();
}
// 方式2
// 创建CountDownLatch对象,设置初始值
/*
CountDownLatch countDownLatch = new CountDownLatch(7);
// 6个同学陆续离开教室之后,班长才可以锁门
for (int i = 0; i < 7; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "龙珠被获取");
// 计数器 -1
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println("已集齐七颗龙珠,可以召唤神龙");
*/
}
}
执行结果
6 集齐一颗龙珠
0 集齐一颗龙珠
1 集齐一颗龙珠
2 集齐一颗龙珠
3 集齐一颗龙珠
4 集齐一颗龙珠
5 集齐一颗龙珠
集齐7颗龙珠可以召唤神龙
6.3 信号灯Semaphore
模拟6辆车,停在3个停车位
public class Demo13 {
public static void main(String[] args) {
// 创建Semaphore,设置许可数量
Semaphore semaphore = new Semaphore(3);
// 模拟6辆车
for (int i = 0; i < 6; i++) {
new Thread(() -> {
// 抢占车位
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "获取到了车位");
// 设置随机停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName() + "离开了车位");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
执行结果
0获取到了车位
2获取到了车位
1获取到了车位
2离开了车位
5获取到了车位
5离开了车位
4获取到了车位
1离开了车位
3获取到了车位
4离开了车位
0离开了车位
3离开了车位
7. 读写锁
悲观锁和乐观锁:
乐观锁和悲观锁是两种思想,用于解决并发场景下的数据竞争问题。
乐观锁:乐观锁在操作数据时非常乐观,认为别人不会同时修改数据。因此乐观锁不会上锁,只是在执行更新的时候判断一下在此期间别人是否修改了数据:如果别人修改了数据则放弃操作,否则执行操作。
悲观锁:悲观锁在操作数据时比较悲观,认为别人会同时修改数据。因此操作数据时直接把数据锁住,直到操作完成后才会释放锁;上锁期间其他人不能修改数据。
行锁和表锁:
行锁和表锁都是数据库中常见的锁机制,用于控制并发访问和修改数据库中的数据。
行锁是指在事务执行期间,仅对数据表中的某一行(或某几行)加锁,其他行不受影响,这样可以保证同时访问表中不同行数据的并发事务不会互相干扰。行锁适用于需要对表中部分数据进行修改或查询的情况。
表锁是指在事务执行期间,对整张数据表加锁,其他事务无法对该表中的任何行进行修改或查询,这种锁机制可以保证同时访问表中任意数据的并发事务之间不会互相干扰,但会导致并发性降低。表锁适用于需要对表中所有数据进行修改或查询的情况。
读锁和写锁:
读锁(Shared Lock):
- 读锁是一种共享锁,允许多个事务同时获取并保持对同一数据的读取权限。
- 多个事务可以同时获取读锁,彼此之间不会产生冲突。
- 读锁不会阻塞其他事务获取读锁,因为多个事务可以同时持有读锁。
- 读锁与写锁之间会发生冲突,当一个事务持有读锁时,其他事务无法获取写锁。
写锁(Exclusive Lock):
写锁是一种排他锁,只允许一个事务获取对数据的写权限,其他事务无法同时读取或写入该数据。
写锁会阻塞其他事务获取读锁或写锁,因为写锁与任何其他锁都发生冲突。
写锁与写锁之间、写锁与读锁之间都会发生冲突。
写锁的持有者可以读取和修改数据,其他事务无法读取或修改该数据,直到写锁被释放。
读写锁:
- 一个资源可以被多个读线程访问,或者可以被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读共享
synchronized和ReentrantLock都是独占的,每次只能进行一个操作。
ReentrantReadWriteLock中,读读操作可以共享,写操作独占,效率相对较高
缺点:
- 会造成锁饥饿,一直读,没有写操作;
- 读的时候不能进行写,只有读完才可以写,写操作可以进行读(锁的降级,在适当的情况下将锁从写模式降级为读模式)
- 当一个线程获取了写锁,并且又获取了读锁(获取写锁的线程可以获取读锁),那么当该线程释放了写锁时,该线程拥有的锁就会进行降级,变为了读锁,读锁和写锁的获取都是分开的,所以写锁的释放不会影响到读锁的持有
写锁降级为读锁的流程(jdk8):
获取写锁 --> 获取读锁 --> 释放写锁 --> 释放读锁
只有写锁降级为读锁,不存在读锁升级为写锁
只有锁降级,没有锁升级
MyCache类
class MyCache {
// 当一个线程修改了一个volatile变量,新值会立即被写入主内存(RAM),
// 同时其他线程读取该变量时会从主内存中读取最新值,而不是使用它们各自工作内存(CPU缓存)中的副本。
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 写入数据
public void put(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在写数据");
TimeUnit.MICROSECONDS.sleep(300);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写数据完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
//读取数据
public Object get(String key) {
rwLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName() + "正在读取数据");
TimeUnit.MICROSECONDS.sleep(300);
result = map.get(key);
System.out.println(Thread.currentThread().getName() + "成功读取数据");
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
return result;
}
}
主类
public class Demo14 {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//创建线程写数据
for (int i = 0; i < 5; i++) {
int num = i;
new Thread(() -> {
myCache.put(String.valueOf(num), String.valueOf(num));
}, String.valueOf(i)).start();
}
//创建线程读数据
for (int i = 0; i < 5; i++) {
int num = i;
new Thread(() -> {
Object object = myCache.get(String.valueOf(num));
System.out.println(Thread.currentThread().getName() + " get value is " + object);
}, String.valueOf(i)).start();
}
}
}
运行结果
0正在写数据
0写数据完成
4正在写数据
4写数据完成
1正在写数据
1写数据完成
2正在写数据
2写数据完成
3正在写数据
3写数据完成
0正在读取数据
1正在读取数据
2正在读取数据
3正在读取数据
4正在读取数据
3成功读取数据
4成功读取数据
2成功读取数据
1成功读取数据
0成功读取数据
0 get value is 0
2 get value is 2
3 get value is 3
1 get value is 1
4 get value is 4
演示读写锁的降级
public class Demo15 {
public static void main(String[] args) {
ReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
// 锁降级
// 1.获取写锁
writeLock.lock();
System.out.println("write");
// 2.获取读锁
readLock.lock();
System.out.println("read");
// 3.释放写锁
writeLock.unlock();
// 4.释放读锁
readLock.unlock();
}
}
运行结果
write
read
8. BlockingQueue阻塞队列
队列为空时,出队操作阻塞
队列满时,入队操作阻塞
public class Demo16 {
public static void main(String[] args) throws InterruptedException {
// 创建阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(2);
// 第一组,抛出异常 (add, remove, element)
boolean cao = blockingQueue.add("cao");
boolean chen = blockingQueue.add("chen");
System.out.println(blockingQueue.element());
//boolean aLong = blockingQueue.add("long");
/** 超出容量,报错
* Exception in thread "main" java.lang.IllegalStateException: Queue full
* at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
* at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:329)
* at sync.Demo16.main(Demo16.java:23)
*/
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
/** 元素为空
* Exception in thread "main" java.util.NoSuchElementException
* at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
* at sync.Demo16.main(Demo16.java:33)
*/
// 第二组,特殊值(offer, poll, peek),将立即返回一个值
// 入队,出队,查看队头
// 第三组,阻塞等待(put, take, 不可用)
// 第四组,超时等待(offer, poll, 不可用)
// 如果队列满了,阻塞2秒,再结束
blockingQueue.offer("cao", 2, TimeUnit.SECONDS);
// 阻塞3秒
blockingQueue.poll(3, TimeUnit.SECONDS);
}
}
运行结果
cao
cao
chen
9. ThreadPool线程池
9.1 常见线程池
三种常见分类:
// 一池多线程(不可扩容) ExecutorService threadPool = Executors.newFixedThreadPool(5); // 一池一线程 ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 一吃多线程(可扩容) ExecutorService threadPool = Executors.newCachedThreadPool();
public class Demo17 {
public static void main(String[] args) {
// 一池多线程(不可扩容)
// ExecutorService threadPool = Executors.newFixedThreadPool(5);
/**
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-3 -> 正在处理请求.
* pool-1-thread-3 -> 正在处理请求.
* pool-1-thread-3 -> 正在处理请求.
* pool-1-thread-3 -> 正在处理请求.
* pool-1-thread-3 -> 正在处理请求.
* pool-1-thread-2 -> 正在处理请求.
* pool-1-thread-5 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-4 -> 正在处理请求.
*/
// 一池一线程
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
/**
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
*/
// 一池多线程(可扩容)
ExecutorService threadPool = Executors.newCachedThreadPool();
/**
* pool-1-thread-9 -> 正在处理请求.
* pool-1-thread-4 -> 正在处理请求.
* pool-1-thread-3 -> 正在处理请求.
* pool-1-thread-8 -> 正在处理请求.
* pool-1-thread-5 -> 正在处理请求.
* pool-1-thread-1 -> 正在处理请求.
* pool-1-thread-6 -> 正在处理请求.
* pool-1-thread-2 -> 正在处理请求.
* pool-1-thread-7 -> 正在处理请求.
* pool-1-thread-10 -> 正在处理请求.
*/
// 处理请求
try {
for (int i = 0; i < 10; i++) {
// 可以使用lambda表达式
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " -> 正在处理请求.");
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
9.2 自定义线程池
自定义线程池中7个参数的含义:
int corePoolSize, // 常驻线程数量(核心) int maximumPoolSize, // 最大的线程数量 long keepAliveTime, // 线程存活时间,一段时间不用之后线程数量降低到常驻线程数量 TimeUnit unit, // 时间单位 BlockingQueue<Runnable> workQueue, // 阻塞队列,无空闲线程时,将任务暂存此处 ThreadFactory threadFactory, // 用于创建线程 RejectedExecutionHandler handler // 拒绝策略,理解为当前线程池的任务太多了,拒绝服务
线程池中任务的执行流程:先放入常驻线程中,常驻线程满了之后,再放入阻塞队列,阻塞队列也满了,开启新的线程来执行新来的任务,上限为maximumPoolSize。
public class Demo18 {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 处理请求中");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
运行结果
java.util.concurrent.RejectedExecutionException: Task sync.Demo18$$Lambda$16/0x000001803f001a00@2d98a335 rejected from java.util.concurrent.ThreadPoolExecutor@16b98e56[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
at sync.Demo18.main(Demo18.java:21)
pool-1-thread-5 处理请求中
pool-1-thread-1 处理请求中
pool-1-thread-4 处理请求中
pool-1-thread-3 处理请求中
pool-1-thread-2 处理请求中
pool-1-thread-4 处理请求中
pool-1-thread-1 处理请求中
pool-1-thread-5 处理请求中
10. Fork/Join分支合并框架
分组计算 1+2+3+…+100
MyTask类
class MyTask extends RecursiveTask<Integer> {
private static final int VALUE = 10;
private int left;
private int right;
private int result;
public MyTask(int left, int right) {
this.left = left;
this.right = right;
}
@Override
protected Integer compute() {
if (right - left + 1 <= VALUE) {
//相加
for (int i = left; i <= right; i++) {
result += i;
}
} else {
//拆分
int mid = left + right >> 1;
MyTask task1 = new MyTask(left, mid);
MyTask task2 = new MyTask(mid + 1, right);
//分支
task1.fork();
task2.fork();
//合并
result += task1.join() + task2.join();
}
return result;
}
}
主类
public class Demo19 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask task = new MyTask(1, 100);
//创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
try {
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(task);
//获取最终合并之后的结果
Integer result = forkJoinTask.get();
System.out.println("计算结果为:" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭池对象
forkJoinPool.shutdown();
}
// Integer compute = task.compute();
// System.out.println("计算结果为:" + compute);
}
}
运行结果
计算结果为:5050
11. CompletableFuture异步回调
public class Demo20 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//异步调用,无返回值
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " --> CompletableFuture<Void>");
});
completableFuture1.get();
//异步调用,有返回值
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " --> CompletableFuture<String>");
// int c = 1 / 0;
return "hello cao";
});
String result = completableFuture2.whenComplete((t, u) -> {
// t是返回值,u是异常值
System.out.println("t = " + t + "\nu = " + u);
}).get();
System.out.println("异步回调返回值为:" + result);
}
}
运行结果
ForkJoinPool.commonPool-worker-1 --> CompletableFuture<Void>
ForkJoinPool.commonPool-worker-1 --> CompletableFuture<String>
t = hello cao
u = null
异步回调返回值为:hello cao
标签:JUC,Thread,编程,System,并发,线程,println,new,out
From: https://blog.csdn.net/qq_53982633/article/details/143312291