首页 > 编程语言 >java并发 共享模型之管程 4.

java并发 共享模型之管程 4.

时间:2024-09-04 10:50:25浏览次数:6  
标签:java log 管程 Object 并发 线程 debug new public

1. wait notify

1.小故事

原理

注:虽然 blocked 和 waiting 状态的线程都在等待,但二者有区别。waiting 状态的线程通常是因为它持有了某个对象的锁,但由于某个条件不满足而被挂起。线程在 waiting 状态中会等待其他线程通过调用 notify() 或 notifyAll() 来通知它条件已经满足,从而继续执行。而 blocked 状态的线程则是因为它试图获取一个已经被其他线程持有的锁(没有条件满足的情况),因此被阻塞。blocked 状态的线程会在锁被释放后有机会被调度。值得注意的是,waiting 状态的线程在条件满足后,会从 waiting 状态转变为 blocked 状态,然后再尝试获取锁以继续执行。

2. API 介绍

obj.wait() 让进入 object 监视器的线程到 waitSet 等待
obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法

@Slf4j(topic = "c.Test18")
public class Test18 {
    static final Object lock = new Object();

    public static void main(String[] args) {
        try {
            lock.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

因为没获取此对象的锁 。如下就可以了

@Slf4j(topic = "c.TestWaitNotify")
public class TestWaitNotify {
    final static Object obj = new Object();

    public static void main(String[] args) {

        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行");
                try {
                    obj.wait(); // 让线程在obj上一直等待下去
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("qita1");
            }
        },"t1").start();

        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    obj.wait(); // 让线程在obj上一直等待下去
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("qita2");
            }
        },"t2").start();

        // 主线程两秒后执行
        sleep(0.5);
        log.debug("唤醒 obj 上其它线程");
        synchronized (obj) {
            obj.notify(); // 唤醒obj上一个线程
//            obj.notifyAll(); // 唤醒obj上所有等待线程
        }
    }
}

notify随机唤醒一个

notifyAll全唤醒,wait()可以带参数,如参数为1s则1s后无notify也会自动向下执行 ,如0.5s后有notify则直接唤醒不等1s。

2. wait notify 的正确姿势

1. sleep(long n) 和 wait(long n) 的区别

        1) sleep 是 Thread 方法,而 wait 是 Object 的方法

        2) sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用

        3) sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁

        4) 它们状态都是 TIMED_WAITING

2. 演化过程

step1

package cn.itcast.n4;

import lombok.extern.slf4j.Slf4j;

import static cn.itcast.n2.util.Sleeper.sleep;

@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep1 {
    static final Object room = new Object();
    static boolean hasCigarette = false; // 有没有烟
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("有烟没?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("没烟,先歇会!");
                    sleep(2);
                }
                log.debug("有烟没?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("可以开始干活了");
                }
            }
        }, "小南").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }

        sleep(1);
        new Thread(() -> {
            // 这里能不能加 synchronized (room)?
            //synchronized (room) {
                hasCigarette = true;
                log.debug("烟到了噢!");
            //}
        }, "送烟的").start();
    }
}

若送烟的也加 synchronized (room) {}

new Thread(() -> {
            // 这里能不能加 synchronized (room)?
            synchronized (room) {
                hasCigarette = true;
                log.debug("烟到了噢!");
            }
        }, "送烟的").start();

输出 没了小南问有烟没个小南开始干活这两句,因为小南加的是sleep,而sleep 在睡眠的同时,不会释放对象锁的,这两秒内main的送烟就没执行,两秒后小南醒来问了没烟结束线程释放锁,然后送烟的才来执行。

有这么几个问题 
        1.其它干活的线程,都要一直阻塞,效率太低
        2.小南线程必须睡足 2s 后才能醒来,就算烟提前送到,也无法立刻醒来
        3.加了 synchronized (room) 后,就好比小南在里面反锁了门睡觉,烟根本没法送进门,main 没加synchronized 就好像 main 线程是翻窗户进来的
        4.解决方法,使用 wait - notify 机制

改进step2 

package cn.itcast.testcopy;

import lombok.extern.slf4j.Slf4j;

import static cn.itcast.n2.util.Sleeper.sleep;

@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep2 {
    static final Object room = new Object();
    static boolean hasCigarette = false; // 有没有烟
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("有烟没?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("没烟,先歇会!");
                    try {
                        room.wait(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有烟没?[{}]", hasCigarette);
                if (hasCigarette){
                    log.debug("可以开始干活了");
                }
            }
        }, "小南").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }

        sleep(1);
        new Thread(() -> {
            synchronized (room) {
                hasCigarette = true;
                log.debug("烟到了噢!");
                room.notify();
            }
        }, "送烟的").start();
    }

}

输出

解决了其它干活的线程阻塞的问题
但如果有其它线程也在等待条件呢?

step3  

package cn.itcast.testcopy;

import lombok.extern.slf4j.Slf4j;

import static cn.itcast.n2.util.Sleeper.sleep;

@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep3 {
    static final Object room = new Object();
    static boolean hasCigarette = false; // 有没有烟
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("有烟没?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("没烟,先歇会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有烟没?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("可以开始干活了");
                }
            }
        }, "小南").start();

        new Thread(() -> {
            synchronized (room) {
                Thread thread = Thread.currentThread();
                log.debug("外卖送到没?[{}]", hasTakeout);
                if (!hasTakeout) {
                    log.debug("没外卖,先歇会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("外卖送到没?[{}]", hasTakeout);
                if (hasTakeout) {
                    log.debug("可以开始干活了");
                } else {
                    log.debug("没干成活...");
                }
            }
        }, "小女").start();
        sleep(1);
        new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("外卖到了噢!");
                room.notify();
            }
        }, "送烟的").start();
    }

}

        notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】
        解决方法,改为 notifyAll 

step4 

new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("外卖到了噢!");
                room.notifyAll();
            }
        }, "送烟的").start();

还是有些小问题 
        用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了
        解决方法,用 while + wait,当条件不成立,再次 wait

step 5

 

 总结

synchronized(lock) {
    while(条件不成立) {
        lock.wait();
    }
    // 干活
}
//另一个线程
synchronized(lock) {
    lock.notifyAll();
}

3. 同步模式之保护性暂停

1. 定义

        即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
要点
        有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
        如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
        JDK 中,join 的实现、Future 的实现,采用的就是此模式
        因为要等待另一方的结果,因此归类到同步模式

 2.实现

正常版

@Slf4j(topic = "c.TestGuardedObject")
public class TestGuardedObject {
    public static void main(String[] args) {
        GuardedObject guardedObject=new GuardedObject();
        new Thread(() -> {
            //等待结果
            log.debug("等待结果");
            List<String> list = (List<String>) guardedObject.get();
            log.debug("结果大小:{}",list.size());
        },"t1").start();

        new Thread(()->{
            log.debug("执行下载");
            try {
                List<String> list = Downloader.download();
                guardedObject.complete(list);
            } catch (IOException e) {
                e.printStackTrace();
            }
        },"t2").start();
    }
}

class GuardedObject {
    private Object response;
    private final Object lock = new Object();

    public Object get() {
        synchronized (lock) {
            // 条件不满足则等待
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足则通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}

 与join相比这么实现的好处有两个
1. join只能等待线程运行结束,而本例中保护性暂停执行完可以继续干别的事
2.用join等待结果的变量只能设计成全局的,这里可以设计为局部变量。

带超时版

@Slf4j(topic = "c.Test20gaijin")
//增加超时效果
class GuardedObjectgai {
    private Object response;
    private final Object lock = new Object();

    //获取结果
    //timeout表示最多等多久
    public Object get(long timeout) {
        synchronized (lock) {
            //开始时间
            long begin=System.currentTimeMillis();
            //经历的时间
            long passedTime=0;
            // 条件不满足则等待
            while (response == null) {
                //经历的时间大于最大等待时间,退出循环
                if (passedTime>=timeout){
                    break;
                }
                try {
                    lock.wait(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //求经历的时间
                passedTime=System.currentTimeMillis()-begin;
            }
            return response;
        }
    }

    //产生结果
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足则通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}

这么设计乍一看没啥问题,但细看其实有问题,如虚假唤醒,如本来要等2s,1s后来了个唤醒,此时应该在等1s,但再进循环timeout还是2s,所以应该改进


@Slf4j(topic = "c.Test20gaijin")
//增加超时效果
class GuardedObjectgai {
    private Object response;
    private final Object lock = new Object();

    //获取结果
    //timeout表示最多等多久
    public Object get(long timeout) {
        synchronized (lock) {
            //开始时间
            long begin=System.currentTimeMillis();
            //经历的时间
            long passedTime=0;
            // 条件不满足则等待
            while (response == null) {
                //这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                //经历的时间大于最大等待时间,退出循环
                if (waitTime<=0){
                    break;
                }
                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //求经历的时间
                passedTime=System.currentTimeMillis()-begin;
            }
            return response;
        }
    }

    //产生结果
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足则通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}

下面对这段代码进行测试,这次t2就不真正下载了,用sleep模拟
case1  等待两秒睡眠1s后就进行唤醒,理论上唤醒后就结束

@Slf4j(topic = "c.Test20gaijin")
public class Test20gaijin {
    public static void main(String[] args) {
        GuardedObjectgai guardedObject=new GuardedObjectgai();
        new Thread(() -> {
            log.debug("begin");
            Object response = guardedObject.get(2000);
            log.debug("结果是:{}",response);
        },"t1").start();

        //这次不真下载了,模拟一下
        new Thread(()->{
            log.debug("begin");
            Sleeper.sleep(1);
            guardedObject.complete(new Object());
        },"t2").start();
    }
}

运行结果 可见就是等了1s

 case2  等待两秒 睡眠2s后再进行唤醒,理论上2s后就等待结束 只改Sleeper.sleep(3);这一句就行

可见确实是2s。

case3 测试虚假唤醒 只需该这两句

Sleeper.sleep(1);
guardedObject.complete(null);

case4 改动 GuardedObjectgai,wait里不用waittime而是timeout此时应该发生虚假唤醒

package cn.itcast.testcopy;

import cn.itcast.n2copy.util.Sleeper;
import cn.itcast.pattern.Downloader;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;

/**
 * ClassName: Test20gaijin
 * Package: cn.itcast.testcopy
 * Description:
 *
 * @Author: 1043
 * @Create: 2024/9/4 - 9:50
 * @Version: v1.0
 */
@Slf4j(topic = "c.Test20gaijin")
public class Test20gaijin {
    public static void main(String[] args) {
        GuardedObjectgai guardedObject=new GuardedObjectgai();
        new Thread(() -> {
            log.debug("begin");
            Object response = guardedObject.get(2000);
            log.debug("结果是:{}",response);
        },"t1").start();

        //这次不真下载了,模拟一下
        new Thread(()->{
            log.debug("begin");
            Sleeper.sleep(1);
            guardedObject.complete(null);
        },"t2").start();
    }
}

@Slf4j(topic = "c.Test20gaijin")
//增加超时效果
class GuardedObjectgai {
    private Object response;
    private final Object lock = new Object();

    //获取结果
    //timeout表示最多等多久
    public Object get(long timeout) {
        synchronized (lock) {
            //开始时间
            long begin=System.currentTimeMillis();
            //经历的时间
            long passedTime=0;
            // 条件不满足则等待
            while (response == null) {
                //这一轮循环应该等待的时间
                //long waitTime = timeout - passedTime;
                //经历的时间大于最大等待时间,退出循环
                if (timeout<=passedTime){
                    break;
                }
                try {
                    lock.wait(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //求经历的时间
                passedTime=System.currentTimeMillis()-begin;
            }
            return response;
        }
    }

    //产生结果
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足则通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}

等了3s可见确实发生了虚假唤醒的问题。 

join原理

源码里的实现跟上面的带超时版基本思路一样,就是变量名不太一样。

 解耦

         图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理.

        这种解耦送信的和居民还存在一一对应的关系,一个线程对应一个消费线程,若想送信与居民不存在这种一一对应应该使用生产者消费者模式。

package cn.itcast.testcopy;

import cn.itcast.n2copy.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;

/**
 * ClassName: Test20jieou
 * Package: cn.itcast.testcopy
 * Description:
 *
 * @Author: 1043
 * @Create: 2024/9/4 - 10:29
 * @Version: v1.0
 */
public class Test20jieou {
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
        Sleeper.sleep(1);
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id,"内容"+id).start();
        }
    }
}
@Slf4j(topic = "c.People")
class People extends Thread{
    @Override
    public void run() {
        //收信
        GuardedObjectv3 guardedObjectv3=Mailboxes.createGuardedObjectv3();
        log.debug("开始收信 id:{}", guardedObjectv3.getId());
        Object mail = guardedObjectv3.get(5000);
        log.debug("收到信 id:{}, 内容:{}", guardedObjectv3.getId(), mail);
    }
}

@Slf4j(topic = "c.Postman")
class Postman extends Thread{
    private int id;
    private String mail;
    public Postman(int id,String mail){
        this.id=id;
        this.mail=mail;
    }
    @Override
    public void run() {
        //收信
        GuardedObjectv3 guardedObjectv3=Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}", id, mail);
        guardedObjectv3.complete(mail);
    }
}

// 解耦类

/**
 * createGuardedObjectv3和getIds因为是用Hashtable操作本身就是线程安全所以不加锁
 */
class Mailboxes {
    private static Map<Integer, GuardedObjectv3> boxes = new Hashtable<>();// Hashtable线程安全
    private static int id = 1;
    public static GuardedObjectv3 getGuardedObject(int id) {
        return boxes.remove(id);
    }

    // 产生唯一id
    private static synchronized int generateId() {
        return id++; //++不是原子性操作,所以方法加锁
    }

    public static GuardedObjectv3 createGuardedObjectv3() {
        GuardedObjectv3 gobject = new GuardedObjectv3(generateId());
        boxes.put(gobject.getId(), gobject);
        return gobject;
    }

    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}

@Slf4j(topic = "c.Test20jieou")
// 增加超时效果
class GuardedObjectv3 {
    // 标识 Guarded Object
    private int id;

    public int getId() {
        return id;
    }

    public GuardedObjectv3(int id) {
        this.id = id;
    }

    private Object response;
    private final Object lock = new Object();

    // 获取结果
    // timeout表示最多等多久
    public Object get(long timeout) {
        synchronized (lock) {
            // 开始时间
            long begin = System.currentTimeMillis();
            // 经历的时间
            long passedTime = 0;
            // 条件不满足则等待
            while (response == null) {
                // 这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                // 经历的时间大于最大等待时间,退出循环
                if (waitTime <= 0) {
                    break;
                }
                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 求经历的时间
                passedTime = System.currentTimeMillis() - begin;
            }
            return response;
        }
    }

    // 产生结果
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足则通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}

标签:java,log,管程,Object,并发,线程,debug,new,public
From: https://blog.csdn.net/m0_56369671/article/details/141830244

相关文章