相信大家在实际工作中,都或多或少了解过生产者消费者模型,在一些基于内存进行设计的消息队列模型中,当有新消息写入的时候,消息会被投递到一条内存队列中,然后消费者会自动收到通知进行消费。
通常我们称投递消息的一方为生产者,取出消息的一方为消费者。如果要用伪代码去表示这个流程的话,大概如下所示:
//生产者调用该函数 投递消息
void pushMsg(Msg msg){
msgQueue.push(msg);
}
//消费者调用该函数 取出消息
Msg takeMsg(){
return msgQueue.take();
}
如果队列中没有消息,消费者就会处于等待状态,当生产者将消息投递到队列之后,则会自动返回数据给到消费者。在这种场景下,如果我们细心思考就会发现一个问题:如何让生产者投递完消息之后就会主动通知到消费者呢?
其实这个问题的本质就和我们今天要讲解的线程间通信的案例有关。为什么这么说呢?我们可以将生产者和消费者看作是两个线程的角色,一个负责往内存队列中投递消息,一个负责从内存队列中取出消息,当生产速度等于消费速度的时候,两者的协调关系就如同下图所示:
在 JDK 中,我将常见的负责线程间通信的手段做了些归类,大致如下:
-
wait
-
notify
-
notifyAll
-
condition
这四种方式都有一个共同的特点,它们都必须要在加锁之后才能使用。wait,notify,notifyAll 是配合着 synchronized 关键字去使用的,condition 则是配合着 Lock 去使用的。
在线程间做通信的时候,双方都需要处于一个稳定的状态,类似于一问一答的模式,如果不是这种模式就可能会出现:A 线程在发送给 B 线程某种信号之后,B 线程却在执行其他任务,从而“忽略”了这个信号,所以当两个线程之间进行通信的时候,一定是需要一方处于等待状态,另一方去发送信号。
如果要将两个线程之间的通信模式进行抽象的话,我们可以用下边这张图来描述:
一个线程去通知协调者,然后让协调者去将信息传达给到另一个线程。
下边我们来通过实战案例,更加深入地理解线程间通信的机制。
wait 、 notify 、 notifyAll
下边是一个实战案例,利用多线程的思路去实现交替打印 ABC 三个字符的效果。其实这个案例也有点类似于生产者消费者模型,只不过它所涉及到的角色不止两个,因此实现的思路会比生产者消费者模型要复杂一些。下边我们来看如何通过 wait + notifyAll 的方式去实现,大概的程序代码如下:
package 并发编程05.交替打印ABC;
/**
* 多线程间的通信
*
* @Author idea
* @Date created in 11:20 上午 2022/6/5
*/
public class PrintAbcDemo_1 {
private int signal = 0;
public synchronized void printA() {
while (signal != 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("a");
signal = 1;
notifyAll();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void printB() {
while (signal != 1) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("b");
signal = 2;
notifyAll();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void printC() {
while (signal != 2) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("c");
signal = 0;
notifyAll();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
PrintAbcDemo_1 printAbcDemo_1 = new PrintAbcDemo_1();
Thread printAThread = new Thread(new Runnable() {
@Override
public void run() {
while (true){
printAbcDemo_1.printA();
}
}
});
Thread printBThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
printAbcDemo_1.printB();
}
}
});
Thread printCThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
printAbcDemo_1.printC();
}
}
});
printAThread.start();
printBThread.start();
printCThread.start();
Thread.yield();
}
}
在这段代码中,我们分别构建了三个线程负责打印任务,A 线程,B 线程,C 线程各自负责打印 ABC 字符,每个线程一开始都是处于等待状态,需要当 signal 信号达到自己满足的条件之后,才会完成打印工作,否则就会一直处于等待状态。当打印完毕之后,线程自己就会修改 signal 数值,并且调用 notifyAll 方法去通知其他处于等待状态的线程。
这里有个点要注意下,当线程调用了 wait 方法之后,synchronized 锁会直接晋升到重量级锁的级别,这一点是和其他锁不太相同的点。
这样一段代码虽然能够实现我们想要的功能,但是在性能方面还是存在着一些瑕疵,需要完善,由于notifyAll的底层是会将所有处于等待状态的且属于同一个monitor监管的线程统统都唤醒,所以被唤醒的线程们后续又要参与一次条件竞争,但是实际上我们只需要唤醒一个线程就足够了,因此我们可以通过Condition来优化这个效果。采用了 Condition 之后,具体的代码案例如下所示:
package 并发编程05.交替打印ABC;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Author idea
* @Date created in 11:02 下午 2022/6/6
*/
public class PrintAbcDemo_2 {
private int signal = 0;
Lock lock = new ReentrantLock();
Condition a = lock.newCondition();
Condition b = lock.newCondition();
Condition c = lock.newCondition();
public void printA() {
lock.lock();
while (signal != 0) {
try {
a.await();
} catch (Exception e) {
e.printStackTrace();
}
}
signal++;
System.out.println("a");
b.signal();
lock.unlock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void printB() {
lock.lock();
while (signal != 1) {
try {
b.await();
} catch (Exception e) {
e.printStackTrace();
}
}
signal++;
System.out.println("b");
c.signal();
lock.unlock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void printC() {
lock.lock();
while (signal != 2) {
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
}
signal = 0;
System.out.println("c");
a.signal();
lock.unlock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
PrintAbcDemo_2 printAbcDemo_2 = new PrintAbcDemo_2();
Thread printAThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
printAbcDemo_2.printA();
}
}
});
printAThread.start();
Thread printBThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
printAbcDemo_2.printB();
}
}
});
printBThread.start();
Thread printCThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
printAbcDemo_2.printC();
}
}
});
printCThread.start();
}
}
可以看到,这组代码案例中我使用了 signal 和 signalAll 函数,这两个函数的效果其实有些类似于 notify 和 notifyAll,但是底层的实现原理还是有些出入,下边我们来深入了解下上边我们所说的这些个函数的底层原理。
标签:Java,Thread,signal,通信,线程,void,new,public From: https://www.cnblogs.com/fxh0707/p/17237812.html