首页 > 编程语言 >【并发编程】线程间的通信

【并发编程】线程间的通信

时间:2023-01-19 11:00:19浏览次数:42  
标签:Thread 编程 System 并发 线程 new public out


文章目录

  • ​​1.wait、notify、notifyAll​​
  • ​​2.生产者消费者模型​​
  • ​​3.管道流进行线程间的通信​​
  • ​​4.Thread.join()方法​​
  • ​​5.Condition详解​​

1.wait、notify、notifyAll

  • 在多线程环境下,有时候一个线程的执行,依赖于另一个线程的某种状态的改变,这时就可以使用wait和notify或者notifyAll。
  • wait和sleep的区别:wait会释放持有的锁,但是sleep不会,sleep知识让线程在指定的时间内,不去抢占cpu的资源。
  • wait notify在使用的时候必须放在同步代码块里,必须拥有当前对象的锁,不能获取A对象的锁,去唤醒B对象。
  • notify随机唤醒一个等待的线程,notifyAll唤醒所有在该对象上等待的线程。
public class WaitDemo {

private static boolean flag = false;

private static Object object = new Object();

public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (object) {
if (!flag) {
try {
System.out.println("flag is false");
System.out.println(object+"进入等待状态");
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("flag is true");
}).start();

Thread.sleep(2000L);
new Thread(() -> {
synchronized (object) {
flag = true;
object.notify();
System.out.println(object+"被唤醒");
}
}).start();
}

}

【并发编程】线程间的通信_数据

2.生产者消费者模型

(1)生产者消费者模型图

【并发编程】线程间的通信_数据_02

(2)编码实战

  • 中间商Broker代码
public class Broker {

//当前库存数
private static int num;

//规定最大库存数量
private static final int TOTAL = 20;

/**
* 生产者生产产品存入库存
*/
public synchronized void put(){
//先判断库存有没有满
if(num < TOTAL){
//库存没有满时,生产者生产
System.out.println("---库存新增一个,当前库存为:"+ ++num);
//唤醒消费者消费
notifyAll();
}else{
try {
//库存满时,生产这进入等待状态
System.out.println("***库存已满,生产者等待生产");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 消费者消费库存
*/
public synchronized void take(){
//先判断是否有库存
if(num>0){
System.out.println("---库存减少1个,当前库存为:"+ --num);
//唤醒生产者
notifyAll();
}else{
try {
System.out.println("***暂无库存,消费者等待消费");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
  • 生产者Producer代码
public class Producer implements Runnable {

private Broker broker;

public Producer(Broker broker) {
this.broker = broker;
}

@Override
public void run() {
while (true) {
System.out.println("###生产者生产一件商品");
broker.put();
}
}
}
  • 消费者Consumer代码
public class Consumer implements Runnable {

private Broker broker;

public Consumer(Broker broker) {
this.broker = broker;
}

@Override
public void run() {
while (true) {
System.out.println("###消费者消费一件商品");
broker.take();
}
}
}
  • 测试代码
public static void main(String[] args) {

//创建中间商
Broker broker = new Broker();

//生产者线程
for (int i = 0; i < 5; i++) {
new Thread(new Producer(broker)).start();
}

//消费者线程
for (int i = 0; i < 5; i++) {
new Thread(new Consumer(broker)).start();
}
}

【并发编程】线程间的通信_java_03

3.管道流进行线程间的通信

  • 管道流进行通信其实就是以内存为媒介,一个线程去往里面存数据,一个线程去里面取数据,用于线程间的通信。
  • 主要有两类
  • 面向字节:【PipedOutputStream、PipedInputStream】
  • 面向字符:【PipedReader、PipedWriter】

(1)字节管道流

  • 编写线程ByteStreamReader类
public class ByteStreamReader implements Runnable {

private PipedInputStream pipedInputStream;

public ByteStreamReader(PipedInputStream pipedInputStream) {
this.pipedInputStream = pipedInputStream;
}

@Override
public void run() {
try {
if(pipedInputStream != null){
//读取内存中中的数据
String str = new BufferedReader(new InputStreamReader(pipedInputStream)).lines().collect(Collectors.joining("\n"));
System.out.println("当前线程:"+Thread.currentThread().getName()+"读取内存中的数据:"+str);
}
pipedInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
  • 测试代码
public static void main(String[] args) throws IOException {
//创建管道输入流
PipedInputStream pipedInputStream = new PipedInputStream();
//创建管道输出流
PipedOutputStream pipedOutputStream = new PipedOutputStream();
//输入流与输出流建立连接
pipedOutputStream.connect(pipedInputStream);
//启动线程,将输入流作为参数传输进去
new Thread(new ByteStreamReader(pipedInputStream)).start();
//创建字符输入流
BufferedReader bufferedReader = null;

System.out.print("当前线程:"+Thread.currentThread().getName()+"向内存中写入数据:");
//将控制台输入的内容转化成流
bufferedReader = new BufferedReader(new InputStreamReader(System.in));
//写入内存
pipedOutputStream.write(bufferedReader.readLine().getBytes());

pipedOutputStream.close();

if(bufferedReader != null){
bufferedReader.close();
}
}

【并发编程】线程间的通信_加锁_04

  • 注意:不要在同一个线程中使用PipInputStream和PipOutputStream,会造成死锁。

4.Thread.join()方法

(1)join()方法简介

  • join()方法一共三个方法重载
public final void join() throws InterruptedException;

public final synchronized void join(long millis) throws InterruptedException;

public final synchronized void join(long millis, int nanos) throws InterruptedException;
  • 三个重载最终都掉用一个参数的版本。
  • join()和join(0)是等价的,表示会一直等下去,join(非0)表示等待一段时间。
  • 使用场景:线程A执行到一半,需要一个数据,这个数据需要线程B去执行修改,只有B修改完成之后,A才能继续操作。

(2)join的使用

public class JoinDemo {

public static int num = 0;

public void add() {
num++;
}

public static void main(String[] args) {
JoinDemo joinDemo = new JoinDemo();

Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + ":开始执行");
System.out.println(Thread.currentThread().getName() + ":执行num+1");
joinDemo.add();
System.out.println(Thread.currentThread().getName() + ":结束执行");
}, "线程1");

new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + ":开始执行");
thread.start();
/**join方法控制让线程2中的线程1先执行完成以后在执行线程2后面的操作*/
thread.join();
if (num == 1) {
System.out.println(Thread.currentThread().getName() + ":拿到的num为:" + num);
}
System.out.println(Thread.currentThread().getName() + ":结束执行");
} catch (InterruptedException e) {
e.printStackTrace();
}

}, "线程2").start();
}
}
  • 没有加上join()方法的运行结果

【并发编程】线程间的通信_加锁_05

  • 加上join()方法的运行结果

【并发编程】线程间的通信_后端_06

5.Condition详解

(1)Condition简介

  • 在线程Thread类中线程之间通信是通过object类的wait()和notify()方式实现的。而ReentrantLock也有类似于wait()和notify()功能。前者是java底层级别后者是语言级别的具有更高的可控制性和扩展性。
  • 二者的区别:
  • Condition能够支持不响应式中断,而通过使用Object方式不支持。
  • Condition能偶支持多个等待队列(new多个Condition对像),而Object方式只能支持一个。
  • Condition能够支持超时时间的设置,而Object不支持。

(2)案例实战

  • 简单案例
public class ConditionDemo implements Runnable{
private static Lock lock = new ReentrantLock();

private static Condition condition = lock.newCondition();

@Override
public void run() {
try{
lock.lock();
condition.await();
System.out.println("Thread is going on");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new ConditionDemo());
//启动线程
thread.start();
//睡眠2s
Thread.sleep(2000);
//加锁,因为condition在调用await()方法时,会释放锁资源,所以要重新加锁
lock.lock();
//唤醒
condition.signal();
//解锁
lock.unlock();
}
}

【并发编程】线程间的通信_加锁_07

新建的线程thread调用start()方法后执行run()方法,此时掉用lock.lock()方法进行加锁,此时线程获得锁,继续执行condition.await()方法,这个时候线程会释放刚才获得的锁资源,将线程加入到condition维护的等待队列中,等调用condition.signal()方法后,会唤醒condition等待对类中的一个线程加入到AQS对列中去,直至唤醒的线程重新获取所资源后才能继续向下执行。

  • 生产者消费者模型
public class ConditionDemo {

private int queueSize=10;

//定义优先队列,大小初始化为10
private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);

//定义ReentrantLock,Condition要配合锁使用
private Lock lock = new ReentrantLock();

//定义生产者的Condition对象
private Condition producer = lock.newCondition();

//定义消费者的Condition对象
private Condition consumer = lock.newCondition();

class Consumer extends Thread{

volatile boolean flag = true;

private void consume(){
//循环调用
while(flag){
//加锁
lock.lock();
try{
/**
* 如果队列是空就让消费者停止消费,进入等待状态,循环等待,
* 保证不会在有消费者线程去执行await()方法
*/
while(queue.isEmpty()){
try{
System.out.println("队列空,等待数据");
consumer.await();
} catch (InterruptedException e) {
//发生异常结束方法执行
flag=false;
}
}
//队列弹出一个元素
queue.poll();
//唤醒生产者
producer.signal();
System.out.println("从队列中取走一个元素,队列剩余"+queue.size()+"个元素");
}finally {
//最后一定要进行解锁操作
lock.unlock();
}
}
}

@Override
public void run() {
consume();
}
}

class Producer extends Thread{

volatile boolean flag = true;

private void produce(){
//循环调用
while(flag){
//加锁
lock.lock();
try{
/**
* 判断队列是否已满,如果队列的大小等于规定好的队列长度
* 就让生产者进行等待
*/
while(queue.size() == queueSize){
try {
System.out.println("队列满,等待有空余空间");
producer.await();
}catch (InterruptedException e){
//发生异常结束方法执行
flag=false;
}
}
//生产一个元素
queue.offer(1); //每次插入一个元素
//唤醒消费者
consumer.signal();
System.out.println("向队列中插入一个元素,队列剩余"+queue.size()+"个元素");
}finally {
lock.unlock();
}
}
}

@Override
public void run() {
produce();
}
}

public static void main(String[] args) {
ConditionDemo conditionDemo = new ConditionDemo();
Producer producer = conditionDemo.new Producer();
Consumer consumer = conditionDemo.new Consumer();
producer.start();
consumer.start();
producer.interrupt();
consumer.interrupt();
}
}

【并发编程】线程间的通信_加锁_08


标签:Thread,编程,System,并发,线程,new,public,out
From: https://blog.51cto.com/u_15646271/6019971

相关文章

  • Spring-webflux 响应式编程
    热爱可抵漫长岁月文章目录​​1.前言​​​​2.Spring-webflux简介​​​​3.什么是“响应式”​​​​4.Spring-webflux的响应式API​​​​5.SpringMVC还是WebFlu......
  • 读编程与类型系统笔记11_高级类型及其他
    1. 范畴论1.1. 范畴论是数学的一个分支,研究的是由对象及这些对象之间的箭头组成的结构1.2. 函子和单子的概念来自范畴论1.3. Haskell是一种编程语言,从范畴论中汲取......
  • 线程池处理爬虫电影票房排行榜
    需求:爬取1996-2023年电影票房排行榜首先,我们先爬取一年的数据,然后通过循环,逐一爬取每一年的数据。通过测试,话费时间32秒,代码如下:importrequestsfromlxmlimportetr......
  • 17种编程语言实现排序算法-冒泡排序
    开源地址https://gitee.com/lblbc/simple-works/tree/master/sort/bubbleSort1.安卓Java版privatevoidsort(int[]array){for(inti=0;i<array.length......
  • 高职学生如何成为编程高手
    高职学生如何成为编程高手不知不觉在高职教学7年了,这7年的教学经验使我感受颇深!高职高专学习软件的学生,很多在入学的时候都有比较大理想:我一定要成为一名编程高手......
  • 提高C#编程水平的50个要点(二)
    26.对需要排序的对象实现IComparable和IComparer接口27.避免使用 ICloneable接口28.避免使用类型转换操作符29.只有当基类加入了与派生类中现有的函数名称相同的函数时,才需......
  • 提高C#编程水平的50个要点(一)
    1.总是用属性 (Property) 来代替可访问的数据成员2.在  readonly 和 const 之间,优先使用 readonly3.在 as 和 强制类型转换之间,优先使用 as 操作符4.使用条件......
  • 【并发编程】线程安全性问题
    文章目录1.什么是线程安全性2.原子性操作3.深入理解synchronized3.4.volatile关键字3.5.happens-before规则3.6.如何避免线程安全性问题1.什么是线程安全性当......
  • 【并发编程】线程的基础知识篇
    文章目录1.进程与线程的区别2.线程的状态相互转换3.创建线程的方式4.线程的挂起和恢复5.线程的中断操作6.线程的优先级7.守护线程1.进程与线程的区别(1)什么是......
  • 场景编程集锦 - 考试打假
    场景描述考试历来都是人才选拔高效而重要的手段,也是彰显社会公平的重要方面。无论是中国古代的科举考试,还是当今的全国高考,或者是出国留学的雅思托福考试,古今中外概莫能外。......