首页 > 其他分享 >多线程系列(十五) -常用并发工具类详解

多线程系列(十五) -常用并发工具类详解

时间:2024-03-07 13:55:22浏览次数:29  
标签:执行 name Thread 并发 thread 详解 线程 多线程 public

一、摘要

在前几篇文章中,我们讲到了线程、线程池、BlockingQueue 等核心组件,其实 JDK 给开发者还提供了比synchronized更加高级的线程同步组件,比如 CountDownLatch、CyclicBarrier、Semaphore、Exchanger 等并发工具类。

下面我们一起来了解一下这些常用的并发工具类!

二、常用并发工具类

2.1、CountDownLatch

CountDownLatch是 JDK5 之后加入的一种并发流程控制工具类,它允许一个或多个线程一直等待,直到其他线程运行完成后再执行。

它的工作原理主要是通过一个计数器来实现,初始化的时候需要指定线程的数量;每当一个线程完成了自己的任务,计数器的值就相应得减 1;当计数器到达 0 时,表示所有的线程都已经执行完毕,处于等待的线程就可以恢复继续执行任务。

根据CountDownLatch的工作原理,它的应用场景一般可以划分为两种:

  • 场景一:某个线程需要在其他 n 个线程执行完毕后,再继续执行
  • 场景二:多个工作线程等待某个线程的命令,同时执行同一个任务

下面我们先来看下两个简单的示例。

示例1:某个线程等待 n 个工作线程

比如某项任务,先采用多线程去执行,最后需要在主线程中进行汇总处理,这个时候CountDownLatch就可以发挥作用了,具体应用如下!

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        // 采用 10 个工作线程去执行任务
        final int threadCount = 10;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 执行具体任务
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",执行完毕!");
                    // 计数器减 1
                    countDownLatch.countDown();
                }
            }).start();
        }

        // 阻塞等待 10 个工作线程执行完毕
        countDownLatch.await();
        System.out.println("所有任务线程已执行完毕,准备进行结果汇总");
    }
}

运行结果如下:

thread name:Thread-0,执行完毕!
thread name:Thread-2,执行完毕!
thread name:Thread-1,执行完毕!
thread name:Thread-3,执行完毕!
thread name:Thread-4,执行完毕!
thread name:Thread-5,执行完毕!
thread name:Thread-6,执行完毕!
thread name:Thread-7,执行完毕!
thread name:Thread-8,执行完毕!
thread name:Thread-9,执行完毕!
所有任务线程执行完毕,准备进行结果汇总
示例2:n 个工作线程等待某个线程

比如田径赛跑,10 个同学准备开跑,但是需要等工作人员发出枪声才允许开跑,使用CountDownLatch可以实现这一功能,具体应用如下!

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        // 使用一个计数器
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final int threadCount = 10;
        // 采用 10 个工作线程去执行任务
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 阻塞等待计数器为 0
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 发起某个服务请求,省略
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",开始执行!");

                }
            }).start();
        }

        Thread.sleep(1000);
        System.out.println("thread name:" +  Thread.currentThread().getName() + " 准备开始!");
        // 将计数器减 1,运行完成后为 0
        countDownLatch.countDown();
    }
}

运行结果如下:

thread name:main 准备开始!
thread name:Thread-0,开始执行!
thread name:Thread-1,开始执行!
thread name:Thread-2,开始执行!
thread name:Thread-3,开始执行!
thread name:Thread-5,开始执行!
thread name:Thread-6,开始执行!
thread name:Thread-8,开始执行!
thread name:Thread-7,开始执行!
thread name:Thread-4,开始执行!
thread name:Thread-9,开始执行!

从上面的示例可以很清晰的看到,CountDownLatch类似于一个倒计数器,当计数器为 0 的时候,调用await()方法的线程会被解除等待状态,然后继续执行。

CountDownLatch类的主要方法,有以下几个:

  • public CountDownLatch(int count):核心构造方法,初始化的时候需要指定线程数
  • countDown():每调用一次,计数器值 -1,直到 count 被减为 0,表示所有线程全部执行完毕
  • await():等待计数器变为 0,即等待所有异步线程执行完毕,否则一直阻塞
  • await(long timeout, TimeUnit unit):支持指定时间内的等待,避免永久阻塞,await()的一个重载方法

从以上的分析可以得出,当计数器为 1 的时候,即由一个线程来通知其他线程,效果等同于对象的wait()notifyAll();当计时器大于 1 的时候,可以实现多个工作线程完成任务后通知一个或者多个等待线程继续工作,CountDownLatch可以看成是一种进阶版的等待/通知机制,在实际中应用比较多见。

2.2、CyclicBarrier

CyclicBarrier从字面上很容易理解,表示可循环使用的屏障,它真正的作用是让一组线程到达一个屏障时被阻塞,直到满足要求的线程数都到达屏障时,屏障才会解除,此时所有被屏障阻塞的线程就可以继续执行。

下面我们还是先看一个简单的示例,以便于更好的理解这个工具类。

public class CyclicBarrierTest {

    public static void main(String[] args) {
        // 设定参与线程的个数为 5
        int threadCount = 5;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有的线程都已经准备就绪...");
            }
        });
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",已达到屏障!");
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",阻塞解除,继续执行!");
                }
            }).start();
        }
    }
}

输出结果:

thread name:Thread-0,已达到屏障!
thread name:Thread-1,已达到屏障!
thread name:Thread-2,已达到屏障!
thread name:Thread-3,已达到屏障!
thread name:Thread-4,已达到屏障!
所有的线程都已经准备就绪...
thread name:Thread-4,阻塞解除,继续执行!
thread name:Thread-0,阻塞解除,继续执行!
thread name:Thread-3,阻塞解除,继续执行!
thread name:Thread-1,阻塞解除,继续执行!
thread name:Thread-2,阻塞解除,继续执行!

从上面的示例可以很清晰的看到,CyclicBarrier中设定的线程数相当于一个屏障,当所有的线程数达到时,此时屏障就会解除,线程继续执行剩下的逻辑。

CyclicBarrier类的主要方法,有以下几个:

  • public CyclicBarrier(int parties):构造方法,parties参数表示参与线程的个数
  • public CyclicBarrier(int parties, Runnable barrierAction):核心构造方法,barrierAction参数表示线程到达屏障时的回调方法
  • public void await():核心方法,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞,直到屏障解除,继续执行剩下的逻辑

从以上的示例中,可以看到CyclicBarrierCountDownLatch有很多的相似之处,都能够实现线程之间的等待,但是它们的侧重点不同:

  • CountDownLatch一般用于一个或多个线程,等待其他的线程执行完任务后再执行
  • CyclicBarrier一般用于一组线程等待至某个状态,当状态解除之后,这一组线程再继续执行
  • CyclicBarrier中的计数器可以反复使用,而CountDownLatch用完之后只能重新初始化

2.3、Semaphore

Semaphore通常我们把它称之为信号计数器,它可以保证同一时刻最多有 N 个线程能访问某个资源,比如同一时刻最多允许 10 个用户访问某个服务,同一时刻最多创建 100 个数据库连接等等。

Semaphore可以用于控制并发的线程数,实际应用场景非常的广,比如流量控制、服务限流等等。

下面我们看一个简单的示例。

public class SemaphoreTest {

    public static void main(String[] args) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        // 同一时刻仅允许最多3个线程获取许可
        final Semaphore semaphore = new Semaphore(3);
        // 初始化 5 个线程生成
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 如果超过了许可数量,其他线程将在此等待
                        semaphore.acquire();
                        System.out.println(format.format(new Date()) +  " thread name:" +  Thread.currentThread().getName() + " 获取许可,开始执行任务");
                        // 假设执行某项任务的耗时
                        Thread.sleep(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        // 使用完后释放许可
                        semaphore.release();
                    }
                }
            }).start();
        }
    }
}

输出结果:

2023-11-22 17:32:01 thread name:Thread-0 获取许可,开始执行任务
2023-11-22 17:32:01 thread name:Thread-1 获取许可,开始执行任务
2023-11-22 17:32:01 thread name:Thread-2 获取许可,开始执行任务
2023-11-22 17:32:03 thread name:Thread-4 获取许可,开始执行任务
2023-11-22 17:32:03 thread name:Thread-3 获取许可,开始执行任务

从上面的示例可以很清晰的看到,同一时刻前 3 个线程获得了许可优先执行, 2 秒过后许可被释放,剩下的 2 个线程获取释放的许可继续执行。

Semaphore类的主要方法,有以下几个:

  • public Semaphore(int permits):构造方法,permits参数表示同一时间能访问某个资源的线程数量
  • acquire():获取一个许可,在获取到许可之前或者被其他线程调用中断之前,线程将一直处于阻塞状态
  • tryAcquire(long timeout, TimeUnit unit):表示在指定时间内尝试获取一个许可,如果获取成功,返回true;反之false
  • release():释放一个许可,同时唤醒一个获取许可不成功的阻塞线程。

通过permits参数的设定,可以实现限制多个线程同时访问服务的效果,当permits参数为 1 的时候,表示同一时刻只有一个线程能访问服务,相当于一个互斥锁,效果等同于synchronized

使用Semaphore的时候,通常需要先调用acquire()或者tryAcquire()获取许可,然后通过try ... finally模块在finally中释放许可。

例如如下方式,尝试在 3 秒内获取许可,如果没有获取就退出,防止程序一直阻塞。

// 尝试 3 秒内获取许可
if(semaphore.tryAcquire(3, TimeUnit.SECONDS)){
    try {
       // ...业务逻辑
    }  finally {
        // 释放许可
        semaphore.release();
    }
}

2.4、Exchanger

Exchanger从字面上很容易理解表示交换,它主要用途在两个线程之间进行数据交换,注意也只能在两个线程之间进行数据交换。

Exchanger提供了一个exchange()同步交换方法,当两个线程调用exchange()方法时,无论调用时间先后,会互相等待线程到达exchange()方法同步点,此时两个线程进行交换数据,将本线程产出数据传递给对方。

简单的示例如下。

public class ExchangerTest {

    public static void main(String[] args) {
        // 交换同步器
        Exchanger<String> exchanger = new Exchanger<>();

        // 线程1
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String value = "A";
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 原数据:" + value);
                    String newValue = exchanger.exchange(value);
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 交换后的数据:" + newValue);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 线程2
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String value = "B";
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 原数据:" + value);
                    String newValue = exchanger.exchange(value);
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 交换后的数据:" + newValue);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

输出结果:

thread name:Thread-0 原数据:A
thread name:Thread-1 原数据:B
thread name:Thread-0 交换后的数据:B
thread name:Thread-1 交换后的数据:A

从上面的示例可以很清晰的看到,当线程Thread-0Thread-1都到达了exchange()方法的同步点时,进行了数据交换。

Exchanger类的主要方法,有以下几个:

  • exchange(V x):等待另一个线程到达此交换点,然后将给定的对象传送给该线程,并接收该线程的对象,除非当前线程被中断,否则一直阻塞等待
  • exchange(V x, long timeout, TimeUnit unit):表示在指定的时间内等待另一个线程到达此交换点,如果超时会自动退出并抛超时异常

如果多个线程调用exchange()方法,数据交换可能会出现混乱,因此实际上Exchanger应用并不多见。

三、小结

本文主要围绕 Java 多线程中常见的并发工具类进行了简单的用例介绍,这些工具类都可以实现线程同步的效果,底层原理实现主要是基于 AQS 队列式同步器来实现,关于 AQS 我们会在后期的文章中再次介绍。

本文篇幅稍有所长,内容难免有所遗漏,欢迎大家留言指出!

四、参考

1.https://www.cnblogs.com/xrq730/p/4869671.html

2.https://zhuanlan.zhihu.com/p/97055716

标签:执行,name,Thread,并发,thread,详解,线程,多线程,public
From: https://www.cnblogs.com/dxflqm/p/18058751

相关文章

  • 详解Python魔法函数,__init__,__str__,__del__
    1、简介Python作为一门灵活而强大的编程语言,提供了许多特殊的方法,被称为魔法函数(Magicmethods)。这些魔法函数以双下划线开头和结尾,能够让我们自定义类的行为,使得Python更加灵活和易用。本文将详细介绍Python中的魔法函数,帮助读者理解其作用和用法。1.1、什么是魔法函数?魔法函......
  • CMD命令大全详解
    1、gpedit.msc-----组策略。2.、sndrec32-------录音机。3、Nslookup-------IP地址侦测器,是一个监测网络中DNS服务器是否能正确实现域名解析的命令行工具。它在WindowsNT/2000/XP中均可使用,但在Windows98中却没有集成这一个工具。【cmd命令大全】一、CMD命令4、......
  • nginx rewrite参数详解
    Nginx的rewrite指令用于重写URL,它有几个参数,这些参数定义了如何匹配和重写请求的URL。以下是rewrite指令的常见参数及其说明:Regex:这是一个正则表达式,用于匹配请求的URI。Nginx将使用这个正则表达式来查找与请求URI相匹配的模式。Replacement:这是重写后的URI,可以包含正则表达式......
  • Unity3D 渲染队列 ZTest与ZWrite详解
    在Unity3D中,渲染队列(RenderingQueue)是一个非常重要的概念,它决定了游戏中各个物体的渲染顺序和优先级。而在渲染队列中,ZTest和ZWrite又是两个关键的参数,它们决定了物体在渲染的过程中如何处理深度测试和深度写入。本文将详细介绍Unity3D中的渲染队列、ZTest和ZWrite的概念,并给出相......
  • Unity3D 立方体纹理与自制天空盒详解
    在Unity3D中,立方体纹理和自制天空盒是常见的技术,它们可以帮助开发者创建出更加真实和引人入胜的游戏场景。本文将详细介绍Unity3D中立方体纹理和自制天空盒的实现方法,希望能帮助读者更好地理解和运用这些技术。对啦!这里有个游戏开发交流小组里面聚集了一帮热爱学习游戏的零基础小......
  • Unity3D 多人战场Animation优化详解
    在多人战场游戏中,动画的优化是非常重要的,因为动画是游戏中的核心元素之一,直接影响玩家的游戏体验。对啦!这里有个游戏开发交流小组里面聚集了一帮热爱学习游戏的零基础小白,也有一些正在从事游戏开发的技术大佬,欢迎你来交流学习。在本文中,我们将详细介绍如何在Unity3D中优化多人战......
  • 面试高频题:Java并发包有些什么?
    面试过Java研发岗位的同学,相信很多都碰到过面试官问这么一道题:Java并发包有些什么?Java并发包,就是java.util.concurrent包下的类和子包。大体分为三类:一:线程池工具 核心类为ThreadPoolExecutor,通过这个类,可以构建出各种线程池。二:原子工具如AtomicInteger、AtomicLong等,通......
  • C++ Qt开发:运用QThread多线程组件
    Qt是一个跨平台C++图形界面开发库,利用Qt可以快速开发跨平台窗体应用程序,在Qt中我们可以通过拖拽的方式将不同组件放到指定的位置,实现图形化开发极大的方便了开发效率,本章将重点介绍如何运用QThread组件实现多线程功能。多线程技术在程序开发中尤为常用,Qt框架中提供了QThread库来......
  • 多线程系列(十四) -一文带你搞懂线程池技术
    一、前言虽然Java对线程的创建、中断、等待、通知、销毁、同步等功能提供了很多的支持,但是从操作系统角度来说,频繁的创建线程和销毁线程,其实是需要大量的时间和资源的。例如,当有多个任务同时需要处理的时候,一个任务对应一个线程来执行,以此来提升任务的执行效率,模型图如下:如......
  • Unity3D 常用得内置函数(Cg与GLSL)详解
    Cg和GLSL是Unity3D中常用的着色器语言,通过使用这两种语言,开发者可以实现各种精美的视觉效果。本文将详细介绍Unity3D中常用的一些内置函数,并给出相应的技术详解和代码实现。对啦!这里有个游戏开发交流小组里面聚集了一帮热爱学习游戏的零基础小白,也有一些正在从事游戏开发的技术大......