在并发编程中,线程间的协调是一个常见的挑战。Java 并发包 java.util.concurrent
提供了多种工具来帮助开发者解决这一问题。其中,CountDownLatch
、CyclicBarrier
和 Semaphore
是三个非常有用的同步辅助类,它们各自具有独特的功能和适用场景。本文将详细介绍这三个类的工作原理,并通过实际案例演示如何在项目中应用它们。
CountDownLatch 概述
定义与用途
CountDownLatch
是一个同步辅助类,它允许一个或多个线程等待其他线程完成一组操作。它的核心是一个计数器,当计数器的值达到零时,所有等待的线程会被释放并继续执行。典型的应用场景包括:
- 等待一组线程完成初始化工作。
- 在主线程中等待子线程完成任务后进行汇总处理。
关键方法
CountDownLatch(int count)
:构造函数,设置初始计数值。await()
:使当前线程等待直到计数器归零,或者被中断。countDown()
:递减计数器的值;如果计数器归零,则唤醒所有等待的线程。
示例代码 - 简单使用
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int numberOfThreads = 3;
CountDownLatch latch = new CountDownLatch(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
new Thread(new Worker(latch)).start();
}
// 主线程等待所有子线程完成
latch.await();
System.out.println("All threads have finished their work.");
}
static class Worker implements Runnable {
private final CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
// 模拟工作
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " has completed its task.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 工作完成后减少计数
latch.countDown();
}
}
}
}
CyclicBarrier 概述
定义与用途
CyclicBarrier
也是一个同步辅助类,但它允许多个线程相互等待,直到所有参与的线程都到达了一个屏障点(barrier)。与 CountDownLatch
不同的是,CyclicBarrier
可以重用,即一旦所有线程都到达了屏障点,计数器会自动复位,以便下一轮使用。适用于以下情况:
- 多个线程需要同时开始下一个阶段的任务。
- 需要周期性地同步多个线程的状态。
关键方法
CyclicBarrier(int parties)
:构造函数,指定参与的线程数量。await()
:使当前线程等待,直到所有线程都调用了此方法。reset()
:重置屏障,使得所有正在等待的线程抛出BrokenBarrierException
,并且可以重新开始新一轮的同步。
示例代码 - 循环使用
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int numberOfThreads = 3;
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, new Runnable() {
@Override
public void run() {
System.out.println("All threads reached the barrier, starting next phase...");
}
});
for (int i = 0; i < numberOfThreads; i++) {
new Thread(new Runner(barrier)).start();
}
}
static class Runner implements Runnable {
private final CyclicBarrier barrier;
public Runner(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
// 模拟工作
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " is ready to reach the barrier.");
// 等待其他线程
barrier.await();
// 模拟下一阶段的工作
System.out.println(Thread.currentThread().getName() + " has started the next phase.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
Semaphore 概述
定义与用途
Semaphore
是一种基于许可的同步机制,用于控制对共享资源的访问。它维护了一组许可证,每个许可证代表一次对资源的占用权。当一个线程请求访问资源时,它必须先获取一个许可证;如果许可证不足,则该线程将被阻塞,直到有可用的许可证为止。通常用于限制对有限资源(如数据库连接池)的同时访问数量。
关键方法
Semaphore(int permits)
:构造函数,指定初始的许可证数量。acquire()
:获取一个许可证,如果没有可用的许可证则阻塞当前线程。release()
:释放一个许可证,增加可用的许可证数量。tryAcquire()
:尝试获取一个许可证,但不会阻塞;如果成功返回true
,否则返回false
。
示例代码 - 数据库连接池模拟
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private static final int MAX_CONNECTIONS = 5;
private static final Semaphore semaphore = new Semaphore(MAX_CONNECTIONS);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new DatabaseConnection()).start();
}
}
static class DatabaseConnection implements Runnable {
@Override
public void run() {
try {
// 尝试获取一个数据库连接
if (semaphore.tryAcquire()) {
System.out.println(Thread.currentThread().getName() + " acquired a connection.");
// 模拟使用数据库连接
Thread.sleep((long) (Math.random() * 2000));
// 使用完毕后释放连接
semaphore.release();
System.out.println(Thread.currentThread().getName() + " released a connection.");
} else {
System.out.println(Thread.currentThread().getName() + " could not acquire a connection.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
总结与比较
特性/类名 | CountDownLatch | CyclicBarrier | Semaphore |
---|---|---|---|
是否可重用 | 否 | 是 | 是 |
主要用途 | 等待一组线程完成特定事件 | 等待一组线程到达某个同步点 | 控制对共享资源的并发访问量 |
构造参数 | 初始计数 | 参与的线程数及可选的屏障动作 | 初始许可证数量 |
关键方法 | await() , countDown() |
await() , reset() |
acquire() , release() , tryAcquire() |
适用场景 | 初始化、汇总等 | 周期性同步、分阶段执行 | 资源池管理、限流 |
实战建议
- 选择合适的工具:根据具体需求选择最适合的同步辅助类,不要混用或滥用。
- 考虑性能影响:虽然这些类提供了强大的同步功能,但也可能带来额外的开销,特别是在高并发环境下。因此,在设计时应评估其对系统性能的影响。
- 异常处理:始终确保正确处理可能出现的异常情况,如
InterruptedException
或BrokenBarrierException
,以保证程序的健壮性。 - 文档化同步逻辑:复杂的同步逻辑可能会增加代码的理解难度,因此应当详细记录各个同步点的作用及其交互方式。
结语
感谢您的阅读!如果您对 JUC 包中的其他组件或并发编程话题有任何疑问或见解,欢迎继续探讨。