1. wait notify
1.小故事
原理
注:虽然 blocked
和 waiting
状态的线程都在等待,但二者有区别。waiting
状态的线程通常是因为它持有了某个对象的锁,但由于某个条件不满足而被挂起。线程在 waiting
状态中会等待其他线程通过调用 notify()
或 notifyAll()
来通知它条件已经满足,从而继续执行。而 blocked
状态的线程则是因为它试图获取一个已经被其他线程持有的锁(没有条件满足的情况),因此被阻塞。blocked
状态的线程会在锁被释放后有机会被调度。值得注意的是,waiting
状态的线程在条件满足后,会从 waiting
状态转变为 blocked
状态,然后再尝试获取锁以继续执行。
2. API 介绍
obj.wait() 让进入 object 监视器的线程到 waitSet 等待
obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒
它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法
@Slf4j(topic = "c.Test18")
public class Test18 {
static final Object lock = new Object();
public static void main(String[] args) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
因为没获取此对象的锁 。如下就可以了
@Slf4j(topic = "c.TestWaitNotify")
public class TestWaitNotify {
final static Object obj = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (obj) {
log.debug("执行");
try {
obj.wait(); // 让线程在obj上一直等待下去
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("qita1");
}
},"t1").start();
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait(); // 让线程在obj上一直等待下去
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("qita2");
}
},"t2").start();
// 主线程两秒后执行
sleep(0.5);
log.debug("唤醒 obj 上其它线程");
synchronized (obj) {
obj.notify(); // 唤醒obj上一个线程
// obj.notifyAll(); // 唤醒obj上所有等待线程
}
}
}
notify随机唤醒一个
notifyAll全唤醒,wait()可以带参数,如参数为1s则1s后无notify也会自动向下执行 ,如0.5s后有notify则直接唤醒不等1s。
2. wait notify 的正确姿势
1. sleep(long n) 和 wait(long n) 的区别
1) sleep 是 Thread 方法,而 wait 是 Object 的方法
2) sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
3) sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
4) 它们状态都是 TIMED_WAITING
2. 演化过程
step1
package cn.itcast.n4;
import lombok.extern.slf4j.Slf4j;
import static cn.itcast.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep1 {
static final Object room = new Object();
static boolean hasCigarette = false; // 有没有烟
static boolean hasTakeout = false;
public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
sleep(2);
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小南").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
synchronized (room) {
log.debug("可以开始干活了");
}
}, "其它人").start();
}
sleep(1);
new Thread(() -> {
// 这里能不能加 synchronized (room)?
//synchronized (room) {
hasCigarette = true;
log.debug("烟到了噢!");
//}
}, "送烟的").start();
}
}
若送烟的也加 synchronized (room) {}
new Thread(() -> {
// 这里能不能加 synchronized (room)?
synchronized (room) {
hasCigarette = true;
log.debug("烟到了噢!");
}
}, "送烟的").start();
输出 没了小南问有烟没个小南开始干活这两句,因为小南加的是sleep,而sleep 在睡眠的同时,不会释放对象锁的,这两秒内main的送烟就没执行,两秒后小南醒来问了没烟结束线程释放锁,然后送烟的才来执行。
有这么几个问题
1.其它干活的线程,都要一直阻塞,效率太低
2.小南线程必须睡足 2s 后才能醒来,就算烟提前送到,也无法立刻醒来
3.加了 synchronized (room) 后,就好比小南在里面反锁了门睡觉,烟根本没法送进门,main 没加synchronized 就好像 main 线程是翻窗户进来的
4.解决方法,使用 wait - notify 机制
改进step2
package cn.itcast.testcopy;
import lombok.extern.slf4j.Slf4j;
import static cn.itcast.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep2 {
static final Object room = new Object();
static boolean hasCigarette = false; // 有没有烟
static boolean hasTakeout = false;
public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette){
log.debug("可以开始干活了");
}
}
}, "小南").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
synchronized (room) {
log.debug("可以开始干活了");
}
}, "其它人").start();
}
sleep(1);
new Thread(() -> {
synchronized (room) {
hasCigarette = true;
log.debug("烟到了噢!");
room.notify();
}
}, "送烟的").start();
}
}
输出
解决了其它干活的线程阻塞的问题
但如果有其它线程也在等待条件呢?
step3
package cn.itcast.testcopy;
import lombok.extern.slf4j.Slf4j;
import static cn.itcast.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep3 {
static final Object room = new Object();
static boolean hasCigarette = false; // 有没有烟
static boolean hasTakeout = false;
public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小南").start();
new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外卖送到没?[{}]", hasTakeout);
if (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小女").start();
sleep(1);
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notify();
}
}, "送烟的").start();
}
}
notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】
解决方法,改为 notifyAll
step4
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notifyAll();
}
}, "送烟的").start();
还是有些小问题
用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了
解决方法,用 while + wait,当条件不成立,再次 wait
step 5
总结
synchronized(lock) {
while(条件不成立) {
lock.wait();
}
// 干活
}
//另一个线程
synchronized(lock) {
lock.notifyAll();
}
3. 同步模式之保护性暂停
1. 定义
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
要点
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
JDK 中,join 的实现、Future 的实现,采用的就是此模式
因为要等待另一方的结果,因此归类到同步模式
2.实现
正常版
@Slf4j(topic = "c.TestGuardedObject")
public class TestGuardedObject {
public static void main(String[] args) {
GuardedObject guardedObject=new GuardedObject();
new Thread(() -> {
//等待结果
log.debug("等待结果");
List<String> list = (List<String>) guardedObject.get();
log.debug("结果大小:{}",list.size());
},"t1").start();
new Thread(()->{
log.debug("执行下载");
try {
List<String> list = Downloader.download();
guardedObject.complete(list);
} catch (IOException e) {
e.printStackTrace();
}
},"t2").start();
}
}
class GuardedObject {
private Object response;
private final Object lock = new Object();
public Object get() {
synchronized (lock) {
// 条件不满足则等待
while (response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足则通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
与join相比这么实现的好处有两个
1. join只能等待线程运行结束,而本例中保护性暂停执行完可以继续干别的事
2.用join等待结果的变量只能设计成全局的,这里可以设计为局部变量。
带超时版
@Slf4j(topic = "c.Test20gaijin")
//增加超时效果
class GuardedObjectgai {
private Object response;
private final Object lock = new Object();
//获取结果
//timeout表示最多等多久
public Object get(long timeout) {
synchronized (lock) {
//开始时间
long begin=System.currentTimeMillis();
//经历的时间
long passedTime=0;
// 条件不满足则等待
while (response == null) {
//经历的时间大于最大等待时间,退出循环
if (passedTime>=timeout){
break;
}
try {
lock.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
//求经历的时间
passedTime=System.currentTimeMillis()-begin;
}
return response;
}
}
//产生结果
public void complete(Object response) {
synchronized (lock) {
// 条件满足则通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
这么设计乍一看没啥问题,但细看其实有问题,如虚假唤醒,如本来要等2s,1s后来了个唤醒,此时应该在等1s,但再进循环timeout还是2s,所以应该改进
@Slf4j(topic = "c.Test20gaijin")
//增加超时效果
class GuardedObjectgai {
private Object response;
private final Object lock = new Object();
//获取结果
//timeout表示最多等多久
public Object get(long timeout) {
synchronized (lock) {
//开始时间
long begin=System.currentTimeMillis();
//经历的时间
long passedTime=0;
// 条件不满足则等待
while (response == null) {
//这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
//经历的时间大于最大等待时间,退出循环
if (waitTime<=0){
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//求经历的时间
passedTime=System.currentTimeMillis()-begin;
}
return response;
}
}
//产生结果
public void complete(Object response) {
synchronized (lock) {
// 条件满足则通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
下面对这段代码进行测试,这次t2就不真正下载了,用sleep模拟
case1 等待两秒睡眠1s后就进行唤醒,理论上唤醒后就结束
@Slf4j(topic = "c.Test20gaijin")
public class Test20gaijin {
public static void main(String[] args) {
GuardedObjectgai guardedObject=new GuardedObjectgai();
new Thread(() -> {
log.debug("begin");
Object response = guardedObject.get(2000);
log.debug("结果是:{}",response);
},"t1").start();
//这次不真下载了,模拟一下
new Thread(()->{
log.debug("begin");
Sleeper.sleep(1);
guardedObject.complete(new Object());
},"t2").start();
}
}
运行结果 可见就是等了1s
case2 等待两秒 睡眠2s后再进行唤醒,理论上2s后就等待结束 只改Sleeper.sleep(3);这一句就行
可见确实是2s。
case3 测试虚假唤醒 只需该这两句
Sleeper.sleep(1);
guardedObject.complete(null);
case4 改动 GuardedObjectgai,wait里不用waittime而是timeout此时应该发生虚假唤醒
package cn.itcast.testcopy;
import cn.itcast.n2copy.util.Sleeper;
import cn.itcast.pattern.Downloader;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
/**
* ClassName: Test20gaijin
* Package: cn.itcast.testcopy
* Description:
*
* @Author: 1043
* @Create: 2024/9/4 - 9:50
* @Version: v1.0
*/
@Slf4j(topic = "c.Test20gaijin")
public class Test20gaijin {
public static void main(String[] args) {
GuardedObjectgai guardedObject=new GuardedObjectgai();
new Thread(() -> {
log.debug("begin");
Object response = guardedObject.get(2000);
log.debug("结果是:{}",response);
},"t1").start();
//这次不真下载了,模拟一下
new Thread(()->{
log.debug("begin");
Sleeper.sleep(1);
guardedObject.complete(null);
},"t2").start();
}
}
@Slf4j(topic = "c.Test20gaijin")
//增加超时效果
class GuardedObjectgai {
private Object response;
private final Object lock = new Object();
//获取结果
//timeout表示最多等多久
public Object get(long timeout) {
synchronized (lock) {
//开始时间
long begin=System.currentTimeMillis();
//经历的时间
long passedTime=0;
// 条件不满足则等待
while (response == null) {
//这一轮循环应该等待的时间
//long waitTime = timeout - passedTime;
//经历的时间大于最大等待时间,退出循环
if (timeout<=passedTime){
break;
}
try {
lock.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
//求经历的时间
passedTime=System.currentTimeMillis()-begin;
}
return response;
}
}
//产生结果
public void complete(Object response) {
synchronized (lock) {
// 条件满足则通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
等了3s可见确实发生了虚假唤醒的问题。
join原理
源码里的实现跟上面的带超时版基本思路一样,就是变量名不太一样。
解耦
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理.
这种解耦送信的和居民还存在一一对应的关系,一个线程对应一个消费线程,若想送信与居民不存在这种一一对应应该使用生产者消费者模式。
package cn.itcast.testcopy;
import cn.itcast.n2copy.util.Sleeper;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
/**
* ClassName: Test20jieou
* Package: cn.itcast.testcopy
* Description:
*
* @Author: 1043
* @Create: 2024/9/4 - 10:29
* @Version: v1.0
*/
public class Test20jieou {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new People().start();
}
Sleeper.sleep(1);
for (Integer id : Mailboxes.getIds()) {
new Postman(id,"内容"+id).start();
}
}
}
@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run() {
//收信
GuardedObjectv3 guardedObjectv3=Mailboxes.createGuardedObjectv3();
log.debug("开始收信 id:{}", guardedObjectv3.getId());
Object mail = guardedObjectv3.get(5000);
log.debug("收到信 id:{}, 内容:{}", guardedObjectv3.getId(), mail);
}
}
@Slf4j(topic = "c.Postman")
class Postman extends Thread{
private int id;
private String mail;
public Postman(int id,String mail){
this.id=id;
this.mail=mail;
}
@Override
public void run() {
//收信
GuardedObjectv3 guardedObjectv3=Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObjectv3.complete(mail);
}
}
// 解耦类
/**
* createGuardedObjectv3和getIds因为是用Hashtable操作本身就是线程安全所以不加锁
*/
class Mailboxes {
private static Map<Integer, GuardedObjectv3> boxes = new Hashtable<>();// Hashtable线程安全
private static int id = 1;
public static GuardedObjectv3 getGuardedObject(int id) {
return boxes.remove(id);
}
// 产生唯一id
private static synchronized int generateId() {
return id++; //++不是原子性操作,所以方法加锁
}
public static GuardedObjectv3 createGuardedObjectv3() {
GuardedObjectv3 gobject = new GuardedObjectv3(generateId());
boxes.put(gobject.getId(), gobject);
return gobject;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
@Slf4j(topic = "c.Test20jieou")
// 增加超时效果
class GuardedObjectv3 {
// 标识 Guarded Object
private int id;
public int getId() {
return id;
}
public GuardedObjectv3(int id) {
this.id = id;
}
private Object response;
private final Object lock = new Object();
// 获取结果
// timeout表示最多等多久
public Object get(long timeout) {
synchronized (lock) {
// 开始时间
long begin = System.currentTimeMillis();
// 经历的时间
long passedTime = 0;
// 条件不满足则等待
while (response == null) {
// 这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
// 经历的时间大于最大等待时间,退出循环
if (waitTime <= 0) {
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 求经历的时间
passedTime = System.currentTimeMillis() - begin;
}
return response;
}
}
// 产生结果
public void complete(Object response) {
synchronized (lock) {
// 条件满足则通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
标签:java,log,管程,Object,并发,线程,debug,new,public
From: https://blog.csdn.net/m0_56369671/article/details/141830244