在Java的并发编程中,同步器(Synchronizer)是一个非常重要的概念,它用于管理多个线程之间的协作,以确保线程间的正确交互和数据的一致性。Java并发包java.util.concurrent
中提供了多种同步器,这些同步器主要用于实现锁(Locks)和其他并发原语(Concurrency Primitives)。
主要的同步器包括:
Semaphore(信号量):
Semaphore(信号量)是Java中一种非常有用的同步工具,是一种计数器,它用于控制同时访问某个特定资源的线程数量。它可以被用来实现某种形式的资源池,或者限制并发访问某个特定资源的线程数量。Semaphore可以管理一组许可(permits),每个许可代表了一个对共享资源的访问权。
Semaphore的基本工作原理:
初始化:Semaphore在创建时可以指定一个许可的数量,这个数量代表了在任意时刻能够同时访问资源的线程数。
获取许可(acquire):当一个线程想要访问受保护资源时,它必须先获取Semaphore的一个许可。如果Semaphore中还有可用的许可,那么线程就可以继续执行;如果没有可用的许可,线程将会被阻塞,直到有许可被释放为止。
释放许可(release):当线程完成对共享资源的访问后,它应该释放Semaphore中的一个许可,以便其他等待的线程可以获得许可并访问资源。
Semaphore的使用场景非常广泛,比如控制数据库连接的池大小、限制并发访问某个API的线程数量等。
在Java中,Semaphore类位于
java.util.concurrent
包下。示例:
import java.util.concurrent.Semaphore; public class SemaphoreExample { // 创建一个Semaphore,初始许可数量为5 private static final Semaphore semaphore = new Semaphore(5); public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(() -> { try { // 请求许可 semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " 获取了许可"); // 模拟资源访问,这里只是简单地休眠一段时间 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 释放了许可"); // 释放许可 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } }
在这个示例中,我们创建了一个初始许可数量为5的Semaphore。然后,我们启动了10个线程,每个线程都尝试获取许可来访问资源。由于初始许可数量只有5个,因此前5个线程能够立即获取许可并继续执行,而后5个线程将会被阻塞,直到有许可被释放。每个线程在访问完资源后都会释放许可,这样被阻塞的线程就可以获取许可并继续执行了。
CountDownLatch(倒计数锁存器):
CountDownLatch
是Java中一个非常有用的同步器,它属于java.util.concurrent
包。CountDownLatch
是一个同步辅助类,它允许一个或多个线程等待一组事件在其他线程中的发生。在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
CountDownLatch
的工作原理很简单:它初始化时设置一个给定的计数值(count),表示需要等待的事件数量。调用await()
方法的线程会阻塞,直到其他线程调用countDown()
方法使得当前计数值达到零。当计数值达到零时,所有因调用await()
方法而阻塞的线程都将被释放,继续执行。这里有一个简单的使用场景:假设您有一个程序,它需要等待多个背景任务完成后才能继续执行。您可以为每个背景任务创建一个线程,并在
CountDownLatch
上调用countDown()
方法来表示任务完成。主线程则调用await()
方法等待所有任务完成。示例代码:
import java.util.concurrent.CountDownLatch; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException { // 初始化CountDownLatch,计数值为3,表示需要等待3个事件 CountDownLatch latch = new CountDownLatch(3); // 假设我们有三个任务需要并行执行 for (int i = 0; i < 3; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 开始执行"); // 模拟任务执行时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 执行完毕"); // 通知CountDownLatch,一个事件已经完成 latch.countDown(); }).start(); } // 主线程等待所有任务完成 System.out.println("主线程等待所有任务完成..."); latch.await(); // 阻塞当前线程,直到所有任务完成 System.out.println("所有任务已经完成,主线程继续执行"); } }
在这个例子中,主线程会等待三个并行执行的任务全部完成后才继续执行。每个任务在执行完毕后都会调用
countDown()
方法来减少CountDownLatch
的计数值,当计数值减到零时,await()
方法会返回,主线程得以继续执行。
CyclicBarrier(循环屏障):
CyclicBarrier
是Java中java.util.concurrent
包下的一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(common barrier point)。在屏障点之前,所有线程都会被阻塞,直到最后一个线程到达屏障点。然后,屏障被打破,所有线程都被释放,并继续执行。
CyclicBarrier
的一个关键特性是它是可重用的。也就是说,当所有线程都通过了屏障点之后,屏障可以被重置,以便下一轮使用。这与CountDownLatch
不同,后者是一次性的,一旦计数值达到零,就不能再被重用了。
CyclicBarrier
通常用于以下场景:
并行计算:当需要将一个大任务分解成多个小任务并行执行,并在所有小任务都完成后才能继续执行后续步骤时。
多线程协调:当需要多个线程在继续执行之前达到某个同步点时。
CyclicBarrier
的构造函数通常接受两个参数:
- parties:必须到达屏障点的线程数量。
- barrierAction:一个可选的
Runnable
命令,当最后一个线程到达屏障点时(即屏障被打破时),将执行该命令。使用示例:
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { int parties = 3; // 需要到达屏障点的线程数量 CyclicBarrier barrier = new CyclicBarrier(parties, () -> { System.out.println("所有线程都到达了屏障点,继续执行后续任务..."); }); for (int i = 0; i < parties; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 准备到达屏障点..."); Thread.sleep(1000); // 模拟任务执行时间 barrier.await(); // 等待其他线程到达屏障点 System.out.println(Thread.currentThread().getName() + " 已经通过屏障点,继续执行..."); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
在这个示例中,我们创建了一个
CyclicBarrier
,并指定了三个线程需要到达屏障点。每个线程在到达屏障点之前都会等待一段时间(模拟任务执行时间),然后调用await()
方法等待其他线程。当所有线程都到达屏障点时,会执行barrierAction
指定的Runnable命令,然后所有线程都被释放,继续执行后续任务。
Exchanger(交换器):
Exchanger
是Java中java.util.concurrent
包下的一个同步器,它用于在两个线程之间进行数据交换。当两个线程都到达某个交换点时,它们会交换各自持有的数据,然后继续执行。Exchanger
可以视为一个同步的交换点,它允许线程在交换数据后继续它们的执行,而无需使用其他同步机制(如锁或信号量)来协调数据交换。
Exchanger
的一个典型应用场景是遗传算法中的两个种群在每次迭代结束时需要交换它们的最佳个体。在这种情况下,两个线程(或更准确地说,两个处理种群进化的任务)可以在达到迭代结束时使用Exchanger
来交换它们各自的最佳个体。使用
Exchanger
时,线程通过调用exchange(V x)
方法参与数据交换,其中V
是交换的数据类型。每个线程都会传递一个数据项到exchange
方法,并阻塞在那里,直到另一个线程也到达交换点并调用exchange
方法。然后,这两个线程会交换它们的数据项,并继续执行。请注意,
Exchanger
要求两个线程都到达交换点才能继续执行,因此它适用于那些需要严格同步数据交换的场景。如果某个线程因某种原因未能到达交换点,那么另一个线程将会无限期地等待下去,除非线程被中断或调用了exchange
方法的超时版本。使用示例:
import java.util.concurrent.Exchanger; public class ExchangerExample { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); Thread thread1 = new Thread(() -> { String data1 = "数据来自线程1"; try { // 线程1等待与线程2交换数据 String data2 = exchanger.exchange(data1); System.out.println("线程1从线程2收到数据:" + data2); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread thread2 = new Thread(() -> { String data2 = "数据来自线程2"; try { // 线程2等待与线程1交换数据 String data1 = exchanger.exchange(data2); System.out.println("线程2从线程1收到数据:" + data1); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); thread2.start(); } }
在这个示例中,我们创建了两个线程
thread1
和thread2
,它们都试图使用Exchanger
来交换数据。每个线程都调用exchange
方法并传递一个字符串数据项,然后等待与另一个线程交换数据。一旦两个线程都到达交换点,它们就会交换数据项并继续执行。
Locks(锁):
在Java的并发编程中,
Locks
(锁)确实是一种重要的同步机制,但它们通常不被直接归类为与CountDownLatch
、CyclicBarrier
或Exchanger
等相同的“同步器”类别,尽管它们在功能上都是为了实现线程间的同步与协调。
Locks
提供了一种比传统的synchronized
方法和语句更灵活的锁定机制。Java中的java.util.concurrent.locks
包提供了几种锁的实现,包括ReentrantLock
、ReadWriteLock
等。这些锁支持更复杂的同步操作,比如尝试非阻塞地获取锁、尝试可中断地获取锁以及尝试带超时的获取锁等。
Locks
的主要优点:
- 灵活性:提供了比
synchronized
方法和语句更广泛的锁定操作。- 可中断的锁获取:允许在等待锁的时候可以被中断。
- 尝试非阻塞地获取锁:如果锁不可用,线程可以立即返回,而不是等待。
- 锁和条件变量的分离:
Lock
接口提供了newCondition()
方法,用于获取与锁相关联的Condition
对象,以支持更灵活的线程间通信。然而,在提及“同步器”时,我们通常会想到那些具有特定同步模式或行为的类,如
CountDownLatch
、CyclicBarrier
和Exchanger
,它们各自解决了不同的同步问题,并提供了相应的功能来简化并发编程。因此,虽然
Locks
在Java并发编程中扮演着至关重要的角色,但它们并不直接归类为与CountDownLatch
、CyclicBarrier
和Exchanger
等相同的“同步器”类别。
同步器的设计原则:
- 封装性:同步器通常封装了底层的同步控制逻辑,对外提供简单的API供用户调用。
- 可扩展性:通过组合不同的同步器,可以实现复杂的并发控制逻辑。
- 灵活性:同步器通常支持多种使用场景,可以根据具体需求选择适合的同步器。
关键的同步器设计原则:
-
最小锁定原则:
- 尽可能减少锁的持有时间和范围,只在必要的代码块上使用锁,以避免不必要的线程阻塞和死锁风险。
- 优先考虑使用细粒度的锁,即只在需要同步的数据或操作上应用锁,而不是对整个对象或方法加锁。
-
避免锁竞争:
- 设计时应考虑减少锁的争用,通过合理的线程分配和任务划分,避免多个线程同时竞争同一个锁。
- 使用读写锁(如
ReentrantReadWriteLock
)等更高级的锁机制,允许多个读线程同时访问数据,而写线程则独占访问。
-
公平性考虑:
- 在需要公平访问资源的场景下,应选择支持公平性的锁实现(如
ReentrantLock
的公平锁模式)。 - 确保线程按照请求锁的顺序获得锁,避免饥饿现象的发生。
- 在需要公平访问资源的场景下,应选择支持公平性的锁实现(如
-
条件变量的正确使用:
- 当使用
Lock
接口时,应合理利用与之关联的Condition
对象进行线程间的通信和协调。 - 通过
Condition
的await()
、signal()
和signalAll()
方法,实现线程间的精确控制和唤醒。
- 当使用
-
锁的释放与获取的一致性:
- 确保在锁的保护区域内发生异常时,能够正确地释放锁,避免死锁的发生。
- 使用
try-finally
语句块来确保锁在finally
块中被释放。
-
避免死锁:
- 在设计并发程序时,应仔细分析锁的使用情况,避免循环等待条件的发生。
- 遵循锁的顺序一致性原则,即所有线程以相同的顺序获取锁。
-
锁的复用与扩展性:
- 在可能的情况下,考虑使用可重用的锁(如
CyclicBarrier
、Semaphore
等),以减少锁对象的创建和销毁开销。 - 设计具有良好扩展性的同步机制,以便在并发级别增加时能够高效地处理更多的线程。
- 在可能的情况下,考虑使用可重用的锁(如
-
性能优化:
- 在满足正确性要求的前提下,尽可能优化同步机制的性能。
- 通过减少锁的粒度、使用无锁或低锁技术(如CAS操作)等方式来提高并发程序的性能。