一、CountDownLatch
1.1 概述
倒计时门闩, 当执行任务的线程数量到达为0的时候,触发.
- 可以理解为: 一个教室有多个人,直到所有人走光之后, 班长锁门, 这里需要等待所有人都走完.
一种同步辅助,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
CountDownLatch使用给定的计数进行初始化。 由于调用了countDown方法, await方法会阻塞直到当前计数达到零,然后释放所有等待线程,并且任何后续的await调用都会立即返回。 这是一种一次性现象——计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。
CountDownLatch是一种通用的同步工具,可用于多种目的。 计数为 1 的CountDownLatch用作简单的开/关锁存器或门:调用await所有线程在门处等待,直到它被调用countDown的线程countDown 。 初始化为N的CountDownLatch可用于使一个线程等待,直到N 个线程完成某个操作,或者某个操作已完成 N 次.
1.2 示例代码
package cn.tcmeta.thread;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* CountDownLatchDemo
*/
public class CountDownLatchDemo {
public static void main(String[] args) {
// 等待的线程数量为5
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new Thread(() ->{
try {
System.out.println(Thread.currentThread().getName() + " --> \t" + " 正在执行!");
TimeUnit.MILLISECONDS.sleep(5000);
System.out.println(Thread.currentThread().getName() + " --> \t" + "执行成功!" );
// 当任务执行完成之后,要进行减1操作
countDownLatch.countDown();
}catch (Exception e){
e.printStackTrace();
}
}, "线程: " + i).start();
}
try {
// 当计数为0时,停止阻塞.相当于打开门闩.执行后续的逻辑
countDownLatch.await();
}catch (Exception e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --> \t" + " 所有任务执行完成!");
}
}
程序执行结果:
1.3 使用场景
1.3.1 并行任务启动后等待所有任务完成
场景:主线程需要启动多个子线程并行处理任务(如批量处理数据),并在所有子线程完成后进行结果汇总
CountDownLatch doneLatch = new CountDownLatch(TASK_COUNT);
for (int i = 0; i < TASK_COUNT; i++) {
new Thread(() -> {
try {
// 执行子任务(如调用外部API、计算等)
} finally {
doneLatch.countDown(); // 任务完成时计数器减1
}
}).start();
}
doneLatch.await(); // 主线程阻塞,直到所有子线程完成
System.out.println("所有任务处理完毕,开始汇总结果");
1.3.2 服务启动时依赖资源初始化
场景:微服务启动时需要加载多个独立资源(如数据库连接、缓存预热、配置文件加载),主线程需等待所有资源初始化完成后才启动服务。
CountDownLatch initLatch = new CountDownLatch(3); // 3个依赖资源
// 线程1:初始化数据库
new Thread(() -> { initDatabase(); initLatch.countDown(); }).start();
// 线程2:预热缓存
new Thread(() -> { warmUpCache(); initLatch.countDown(); }).start();
// 线程3:加载配置文件
new Thread(() -> { loadConfig(); initLatch.countDown(); }).start();
initLatch.await(); // 等待所有资源初始化
startServer(); // 启动服务
1.3.3 并发性能测试
模拟高并发请求,需要所有请求线程在同一时刻发起操作(如压力测试接口)。
CountDownLatch startLatch = new CountDownLatch(1); // 发令枪
CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT); // 结束标识
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
try {
startLatch.await(); // 所有线程在此等待
callTestAPI(); // 同时发起请求
} finally {
endLatch.countDown();
}
}).start();
}
Thread.sleep(500); // 确保所有线程就绪
startLatch.countDown(); // 统一放行
endLatch.await(); // 等待所有请求完成
generateReport(); // 生成测试报告
1.3.4 多阶段任务协作
游戏服务器中,玩家需全部准备就绪后游戏才开始,且需等待所有玩家完成第一关卡后再进入下一关。
// 等待所有玩家准备
CountDownLatch readyLatch = new CountDownLatch(PLAYER_COUNT);
players.forEach(player ->
player.prepare(() -> readyLatch.countDown())
);
readyLatch.await();
startGame();
// 等待所有玩家完成第一关
CountDownLatch level1Latch = new CountDownLatch(PLAYER_COUNT);
players.forEach(player ->
player.finishLevel1(() -> level1Latch.countDown())
);
level1Latch.await();
startLevel2();
1.3.5 分布式任务分片同步
[!tip]
分布式计算中,主节点将任务分发给多个工作节点,需等待所有节点返回结果后再合并。
List<WorkerNode> nodes = getWorkerNodes();
CountDownLatch resultLatch = new CountDownLatch(nodes.size());
for (WorkerNode node : nodes) {
node.executeTask(dataShard, () -> {
// 返回分片处理结果
resultLatch.countDown();
});
}
resultLatch.await();
mergeResults(); // 合并所有分片结果
1.4 关键注意事项
- 计数器不可重置:CountDownLatch的计数器归零后无法重置,如需重复使用,改用CyclicBarrier。
- 异常处理:确保countDown()在finally块中调用,避免线程异常导致主线程永久阻塞。
- 超时控制:使用await(long timeout, TimeUnit unit)防止死锁。
- 资源释放:等待线程被中断时,需妥善处理资源(如关闭连接)。
二、CyclicBarrier
2.1 概述
一种同步辅助工具,「它允许一组线程全部等待彼此到达公共屏障点」。 「CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程必须偶尔相互等待。 屏障被称为循环的,因为它可以在等待线程被释放后重新使用.」
「CyclicBarrier支持可选的Runnable命令,该命令在每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程被释放之前。 此屏障操作对于在任何一方继续之前更新共享状态很有用。」
与CountDownLatch类似,但是它不是计数相减, 而是计数相加操作;功能类似;
- 执行一个线程, 加一个数,当执行的线程数达到指定的线程数的时候,触发一个操作;
- 集齐七个龙珠,可以召唤神龙, 或者理解成: 同学都到了, 咱们再上课
2.2 基本示例代码
package cn.tcmeta.thread;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
/**
* @author laoren
* CyclicBarrier类似于CountDownLatch, 不同的是这个是做加法,相当于一个屏障.
* 可以理解为: 集齐七颗龙珠,就可以召唤神龙了. 它的计数是相加,达到某个值之后, 触发后续操作;
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 构造方法说明:
* 参数说明:
* parities: 在屏障到达之前必须调用 await 的线程数
* barrierAction: 到达屏障点触发的动作.
* public CyclicBarrier(int parties, Runnable barrierAction) {
* if (parties <= 0) throw new IllegalArgumentException();
* this.parties = parties;
* this.count = parties;
* this.barrierCommand = barrierAction;
* }
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("已经集齐了七颗龙珠,神龙现世!"));
for (int i = 0; i < 7; i++) {
int count = i;
new Thread(() ->{
System.out.println(Thread.currentThread().getName() + " --> \t" + " 已经集齐了 " + count + " 龙珠了!");
try {
TimeUnit.MILLISECONDS.sleep(2000);
// 未到达屏障点
cyclicBarrier.await();
}catch (Exception e){
e.printStackTrace();
}
}, "线程 - " + i).start();
}
}
}
2.3 使用场景
2.3.1 并行计算分阶段处理
分布式计算中,每个线程处理数据分片,需等待所有线程完成当前阶段才能进入下一阶段(如机器学习模型的迭代训练)
class DataProcessor {
final int THREAD_COUNT = 4;
final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () ->
System.out.println("当前阶段完成,开始下一阶段")
);
void process() {
ExecutorService exec = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
exec.submit(() -> {
for (int phase = 1; phase <= 3; phase++) { // 3个处理阶段
processPhase(phase); // 当前阶段计算
barrier.await(); // 等待其他线程
}
});
}
exec.shutdown();
}
void processPhase(int phase) {
// 模拟分片计算
System.out.println(Thread.currentThread().getName()
+ " 完成阶段" + phase);
}
}
2.3.2 多玩家游戏同步
游戏关卡中,所有玩家必须完成当前关卡才能同时进入下一关
package cn.tcmeta.thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class GameServer {
final int PLAYERS = 3;
final CyclicBarrier levelBarrier = new CyclicBarrier(PLAYERS, () ->
System.out.println("所有玩家通关!开始新关卡")
);
void startGame() {
for (int i = 1; i <= PLAYERS; i++) {
new Thread(() -> {
while (true) {
// completeLevel(); // 玩家完成当前关卡
try {
levelBarrier.await(); // 等待其他玩家
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}, "玩家"+i).start();
}
}
}
2.3.3 性能测试压力生成
需要精确控制所有压测线程在同一毫秒发起请求
package cn.tcmeta.thread;
import java.util.concurrent.CyclicBarrier;
class StressTester {
final int THREADS = 100;
final CyclicBarrier startBarrier = new CyclicBarrier(THREADS);
void test() throws Exception {
for (int i = 0; i < THREADS; i++) {
new Thread(() -> {
awaitBarrier(); // 等待发令枪
sendRequest(); // 发送API请求
}).start();
}
}
private void sendRequest() {
System.out.println("Sending request...");
}
void awaitBarrier() {
try {
startBarrier.await(); // 所有线程在此同步
} catch (Exception e) { /*...*/ }
}
public static void main(String[] args) throws Exception {
new StressTester().test();
}
}
2.3.4 遗传算法迭代
并行遗传算法中,每代种群需完成评估后才能进行交叉变异
class GeneticAlgorithm {
final int POPULATION_SIZE = 50;
final CyclicBarrier generationBarrier = new CyclicBarrier(POPULATION_SIZE);
void evolve() {
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < POPULATION_SIZE; i++) {
pool.execute(() -> {
for (int gen = 0; gen < 1000; gen++) {
evaluateFitness(); // 评估个体适应度
generationBarrier.await();
crossover(); // 等待后执行交叉操作
}
});
}
pool.shutdown();
}
}
2.3.5 金融交易对账系统
银行每日需等待所有分行上传数据后执行全局对账
class ReconciliationSystem {
final int BRANCHES = 20;
final CyclicBarrier dailyBarrier = new CyclicBarrier(BRANCHES, this::runReconciliation);
void startDailyJob() {
for (String branch : getBranches()) {
new Thread(() -> {
uploadData(branch); // 分行上传数据
dailyBarrier.await(); // 等待其他分行
}).start();
}
}
void runReconciliation() {
System.out.println("开始全局对账...");
}
}
2.4 CyclicBarrier核心特性
2.5 最佳实践
最佳实践:
- 「在循环体内使用 CyclicBarrier 处理分阶段任务」
- 「栅栏动作中避免长时间阻塞(会延迟线程释放)」
- 「配合 ExecutorService 管理线程生命周期」
- 「使用 isBroken() 检测屏障状态并处理异常」
三、Semaphore
3.1 概述
3.1.1 核心概念
信号量, 信号灯; 计数信号量。 从概念上讲,信号量维护一组许可; 可以做限流使用;
- 可以控制同时运行的线程数, 「线程可以有好多个线程,但是它可以控制同时执行的线程个数」
- 类似于抢车位的概念或者是厕所抢坑的例子; - 当前有7辆车, 「七个操作线程」,车位有3个「信号量是3」
3.1.2 构造方法
// 可用的初始许可证数量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 数量, 是否为公平锁
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
3.1.3 重点方法
// 1. acquire(), 获取操作. 当一个线程调用`acquire`操作时, 它要么通过成功获取信号量「信号量加1操作」, 要么一直等待下去;下到有线程释放信息号或者超时.
// 2. release(), 释放操作,实际上会将信号量的值加1操作,然后唤醒等待的线程;
3.2 示例代码
3.2.1 共享资源互斥
package cn.tcmeta.thread;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @author laoren
*/
public class SemaphoreDemo {
public static void main(String[] args) {
// 信号量,初始化为1, 表示只有一个线程可以操作.
Semaphore semaphore = new Semaphore(1);
new Thread(() ->{
// 获取锁操作
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " --> \t" + " 拿到了锁哦...");
// 模拟一下业务操作
try {
TimeUnit.MILLISECONDS.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --> \t" + " 完成了业务哦..");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 释放锁
semaphore.release();
}
}, "王麻子").start();
new Thread(() ->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " --> \t" + " 拿到了锁哦...");
// 模拟一下业务操作
try {
TimeUnit.MILLISECONDS.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --> \t" + " 完成了业务哦..");
}catch(Exception e){
e.printStackTrace();
}finally {
semaphore.release();
}
}, "肥七").start();
}
}
3.2.2 并发线程数控制
场景:
- 目前有三个坑位「信号量数量为3」
- 有10个人抢这三个坑住「线程数量」
package cn.tcmeta.thread;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo2 {
public static final int PERMITS = 3;
public static final int PERSON_COUNT = 10;
public static void main(String[] args) {
// 定义信号量
Semaphore semaphore = new Semaphore(PERMITS);
for (int i = 0; i < PERSON_COUNT; i++) {
new Thread(() ->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " --> \t" + " 抢到了坑位了!");
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 1);
System.out.println(Thread.currentThread().getName() + " --> \t" + " 啊啊啊,,舒服...释放坑位 ~~~");
}catch (Exception e){
e.printStackTrace();
}
}catch(Exception e){
e.printStackTrace();
}finally {
semaphore.release();
}
}, "线程: " + i).start();
}
}
}
3.3 使用场景
Semaphore(信号量)是Java中用于控制并发线程访问资源数量的同步工具。
它维护了一组许可证(permits),线程在访问资源前需要获取许可证,访问后释放许可证.
3.3.1 数据库连接池管理
限制同时使用的数据库连接数量,防止资源耗尽
public class ConnectionPool {
private final Semaphore semaphore;
private final List<Connection> connections = new ArrayList<>();
public ConnectionPool(int poolSize) {
semaphore = new Semaphore(poolSize, true); // 公平模式
for (int i = 0; i < poolSize; i++) {
connections.add(createConnection());
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // 获取许可证
return getAvailableConnection();
}
public void releaseConnection(Connection conn) {
returnConnection(conn);
semaphore.release(); // 释放许可证
}
private synchronized Connection getAvailableConnection() {
return connections.remove(0);
}
private synchronized void returnConnection(Connection conn) {
connections.add(conn);
}
private Connection createConnection() {
// 创建数据库连接
return mock(Connection.class);
}
}
3.3.2 限流器
控制API接口每秒最大请求数量
public class RateLimiter {
private final Semaphore semaphore;
private final int maxPermits;
private final ScheduledExecutorService scheduler;
public RateLimiter(int permitsPerSecond) {
this.maxPermits = permitsPerSecond;
semaphore = new Semaphore(permitsPerSecond);
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
int available = semaphore.availablePermits();
if (available < maxPermits) {
semaphore.release(maxPermits - available);
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
}
3.3.3 停车场管理系统
模拟停车场车位管理
public class CarPark {
private final Semaphore parkingSlots;
private final int totalSlots;
private final Set<String> parkedCars = new HashSet<>();
public CarPark(int totalSlots) {
this.totalSlots = totalSlots;
this.parkingSlots = new Semaphore(totalSlots, true); // 公平模式
}
public boolean parkCar(String carId) {
if (parkingSlots.tryAcquire()) {
synchronized (this) {
parkedCars.add(carId);
}
System.out.println(carId + " parked. Available slots: " + parkingSlots.availablePermits());
return true;
}
System.out.println(carId + " failed to park. No available slots.");
return false;
}
public void leaveCar(String carId) {
synchronized (this) {
if (parkedCars.remove(carId)) {
parkingSlots.release();
System.out.println(carId + " left. Available slots: " + parkingSlots.availablePermits());
}
}
}
}
3.3.4 打印机池管理
多线程共享有限打印机资源
public class PrinterPool {
private final Semaphore semaphore;
private final List<Printer> printers = new ArrayList<>();
public PrinterPool(int printerCount) {
semaphore = new Semaphore(printerCount);
for (int i = 0; i < printerCount; i++) {
printers.add(new Printer("Printer-" + (i+1)));
}
}
public void printDocument(String document) throws InterruptedException {
semaphore.acquire();
Printer printer = null;
try {
printer = getAvailablePrinter();
printer.print(document);
} finally {
if (printer != null) {
returnPrinter(printer);
}
semaphore.release();
}
}
private synchronized Printer getAvailablePrinter() {
return printers.remove(0);
}
private synchronized void returnPrinter(Printer printer) {
printers.add(printer);
}
}
3.3.5服务调用限流
限制对第三方服务的并发调用数量
public class ExternalServiceInvoker {
private final Semaphore semaphore;
private final int maxConcurrentCalls;
public ExternalServiceInvoker(int maxConcurrentCalls) {
this.maxConcurrentCalls = maxConcurrentCalls;
this.semaphore = new Semaphore(maxConcurrentCalls);
}
public String invokeService(String request) {
if (!semaphore.tryAcquire()) {
throw new ServiceOverloadException("Too many concurrent requests");
}
try {
return callExternalService(request);
} finally {
semaphore.release();
}
}
private String callExternalService(String request) {
// 调用外部服务
return "Response for: " + request;
}
}
3.3.6 生产者-消费者模型(有界缓冲区)
使用Semaphore实现生产者-消费者模式
public class BoundedBuffer<E> {
private final Semaphore availableItems;
private final Semaphore availableSpaces;
private final Queue<E> buffer = new LinkedList<>();
private final int capacity;
public BoundedBuffer(int capacity) {
this.capacity = capacity;
this.availableItems = new Semaphore(0);
this.availableSpaces = new Semaphore(capacity);
}
public void put(E item) throws InterruptedException {
availableSpaces.acquire(); // 等待空槽
synchronized (this) {
buffer.add(item);
}
availableItems.release(); // 增加可用项目
}
public E take() throws InterruptedException {
availableItems.acquire(); // 等待可用项目
E item;
synchronized (this) {
item = buffer.poll();
}
availableSpaces.release(); // 增加空槽
return item;
}
}
3.4 Semaphore关键特性总结
3.5 使用建议
- 资源保护:当需要保护有限资源时使用Semaphore
- 公平性考虑:在资源竞争激烈时使用公平模式防止线程饥饿
- 异常处理:确保在finally块中释放许可证,避免许可证泄漏
- 许可证数量:合理设置许可证数量,过多失去限制意义,过少影响性能
- 替代方案:对于简单计数场景,考虑使用CountDownLatch或CyclicBarrier
四、LockSupport
用于创建锁和其他同步类的基本线程阻塞基元。
4.1 重要方法
4.1.1 使用线程进入阻塞状态
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
4.1.2 唤醒某个线程
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
4.2 示例代码
需求:
- 开启一条线程, 循环输出10个数;
- 当打印到第五个数时,线程阻塞住.
- 10s之后再继续打印其值;
package cn.tcmeta.thread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* @author laoren
*/
public class LockSupportDemo {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread().getName() + " --> \t" + " 当前值是: " + i);
if (i == 5) {
System.out.println(Thread.currentThread().getName() + " --> \t" + " 被阻塞了..等一会继续干活....");
LockSupport.park(); // 阻塞当前线程
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "print-info-thread: ");
t1.start(); // 启动打印的线程
try {
// 10秒钟睡眼
TimeUnit.SECONDS.sleep(10);
LockSupport.unpark(t1); // 唤醒t1线程,继续干活
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.3 使用场景
LockSupport 是 Java 并发包中一个强大的线程阻塞工具,提供了比传统 wait/notify 更灵活、更底层的线程控制能力。
4.3.1 构建高级同步工具(如 AQS)
在实现自定义锁或同步器时,使用 LockSupport 作为底层阻塞机制
public class SimpleReentrantLock {
private volatile Thread owner = null;
private int lockCount = 0;
private final ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();
public void lock() {
Thread current = Thread.currentThread();
if (owner == current) {
lockCount++;
return;
}
waiters.add(current);
while (owner != null || !waiters.peek().equals(current) ||
!compareAndSetOwner(null, current)) {
LockSupport.park(this); // 阻塞当前线程
}
waiters.remove();
lockCount = 1;
}
public void unlock() {
if (owner != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
if (--lockCount == 0) {
owner = null;
// 唤醒队首等待线程
Thread next = waiters.peek();
if (next != null) {
LockSupport.unpark(next);
}
}
}
private boolean compareAndSetOwner(Thread expect, Thread update) {
// 原子更新owner
return UNSAFE.compareAndSwapObject(this, ownerOffset, expect, update);
}
// Unsafe 相关初始化代码省略...
}
4.3.2 中断敏感的阻塞操作
实现可响应中断的等待机制,避免 Thread.sleep() 的局限性
public class InterruptibleTask {
private volatile boolean completed = false;
public void doTask() {
Thread worker = new Thread(() -> {
// 模拟长时间任务
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
completed = true;
LockSupport.unpark(Thread.currentThread()); // 通知完成
});
worker.start();
// 等待任务完成,可响应中断
while (!completed) {
LockSupport.park(this);
if (Thread.interrupted()) {
System.out.println("Task waiting interrupted!");
worker.interrupt();
break;
}
}
}
}
4.3.3 定时阻塞与精确唤醒
需要精确控制线程阻塞时间的场景(如定时任务调度)
public class PrecisionScheduler {
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
public void scheduleAt(Runnable task, long deadlineNanos) {
Thread worker = new Thread(() -> {
long now;
while ((now = System.nanoTime()) < deadlineNanos) {
long nanosToWait = deadlineNanos - now;
if (nanosToWait > 0) {
LockSupport.parkNanos(nanosToWait);
}
}
task.run();
});
worker.start();
}
// 取消任务(提前唤醒)
public void cancel(Thread worker) {
LockSupport.unpark(worker);
}
}
4.3.4 线程协作与屏障
实现自定义的线程协作机制(替代 CountDownLatch 或 CyclicBarrier)
public class CustomBarrier {
private final int parties;
private final AtomicInteger count = new AtomicInteger();
private final Thread[] waiters;
public CustomBarrier(int parties) {
this.parties = parties;
this.waiters = new Thread[parties];
}
public void await() {
int index = count.getAndIncrement();
if (index < parties - 1) {
waiters[index] = Thread.currentThread();
LockSupport.park(this); // 阻塞直到所有线程到达
} else {
// 最后一个线程唤醒所有等待线程
for (int i = 0; i < parties - 1; i++) {
LockSupport.unpark(waiters[i]);
}
count.set(0); // 重置屏障
}
}
}
4.3.5 高性能消息队列
实现无锁或低争用的生产者-消费者模型
public class LockSupportQueue<T> {
private static class Node<T> {
T item;
volatile Node<T> next;
}
private volatile Node<T> head;
private volatile Node<T> tail;
private volatile Thread consumer;
public LockSupportQueue() {
Node<T> dummy = new Node<>();
head = dummy;
tail = dummy;
}
public void put(T item) {
Node<T> node = new Node<>();
node.item = item;
Node<T> t = tail;
tail = node;
t.next = node;
// 唤醒等待的消费者
if (consumer != null) {
LockSupport.unpark(consumer);
consumer = null;
}
}
public T take() throws InterruptedException {
Node<T> h = head;
Node<T> first = h.next;
if (first != null) {
head = first;
return first.item;
}
// 队列为空,注册自己为消费者并阻塞
consumer = Thread.currentThread();
while (first == null) {
LockSupport.park();
if (Thread.interrupted()) {
throw new InterruptedException();
}
h = head;
first = h.next;
}
head = first;
return first.item;
}
}
4.3.6 死锁检测与恢复
实现带超时的资源获取,避免死锁
public class DeadlockAvoider {
private final Lock resourceLock = new ReentrantLock();
public boolean tryLockWithTimeout(long timeout, TimeUnit unit) {
Thread current = Thread.currentThread();
final long deadline = System.nanoTime() + unit.toNanos(timeout);
// 尝试获取锁
if (resourceLock.tryLock()) {
return true;
}
// 设置超时线程
Thread timeoutThread = new Thread(() -> {
try {
Thread.sleep(unit.toMillis(timeout));
LockSupport.unpark(current); // 超时唤醒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
timeoutThread.start();
// 阻塞直到获取锁或超时
while (System.nanoTime() < deadline) {
if (resourceLock.tryLock()) {
timeoutThread.interrupt();
return true;
}
LockSupport.parkUntil(deadline);
}
return false;
}
}
4.5 LockSupport核心优势
4.6 使用注意事项
- 许可不可累加:多次调用 unpark 不会累积许可(最多只有一个有效许可)
- 中断处理:调用 park 后线程被中断会立即返回,但不会清除中断状态
- 虚假唤醒:和 Object.wait() 类似,park 也可能出现虚假唤醒
- 同步配合:通常需要与其他同步机制(如 volatile 变量)配合使用
- 避免滥用:在普通业务代码中优先使用高级并发工具类
4.7 LockSupport vs Object.wait/notify
LockSupport 是构建 Java 并发框架的基石(如 AQS),在开发高性能并发工具时尤其有用。对于日常业务开发,优先考虑使用基于 LockSupport 构建的高级并发工具(如 ReentrantLock, CountDownLatch 等)。
五、Exchanger
5.1 概述
线程可以配对和交换元素对的同步点。 每个线程在进入exchange方法时呈现一些对象,与伙伴线程匹配,并在返回时接收其伙伴的对象。 Exchanger 可以被视为SynchronousQueue的双向形式。 交换器可能在遗传算法和管道设计等应用中很有用。
5.2 基本示例
package cn.tcmeta.thread;
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
// 初始化Exchanger对象
public static final Exchanger<String> EXCHANGER = new Exchanger<>();
public static void main(String[] args) {
new Thread(() ->{
String s1 = "它们都老了吗?它们在哪里呀,幸运的是我,曾陪他们开放............";
System.out.println(Thread.currentThread().getName() + " --> \t" + "交换之前: " + s1);
try {
s1 = EXCHANGER.exchange(s1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " --> \t" + " 交换之后: " + s1);
}, "那些花: ").start();
new Thread(() ->{
String s1 = "明天你是否会想起,你昨天写的日记............";
System.out.println(Thread.currentThread().getName() + " --> \t" + "交换之前: " + s1);
try {
s1 = EXCHANGER.exchange(s1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " --> \t" + " 交换之后: " + s1);
}, "同桌的你: ").start();
}
}
5.3 使用场景
Exchanger 是 Java 并发包中一个独特的同步工具,用于两个线程之间交换数据。它提供了一个同步点,在这个点上两个线程可以交换彼此的对象。
5.3.1 生产者-消费者缓冲区交换
双缓冲技术中,生产者和消费者交换缓冲区引用,避免数据复制开销
public class DoubleBufferProcessor {
private final Exchanger<Buffer> exchanger = new Exchanger<>();
private final ExecutorService executor = Executors.newFixedThreadPool(2);
static class Buffer {
byte[] data = new byte[1024];
int size;
void fill(InputStream in) throws IOException {
size = in.read(data);
}
void process() {
// 处理缓冲区数据
System.out.println("Processing " + size + " bytes");
}
}
public void start(InputStream input) {
executor.submit(() -> producerTask(input));
executor.submit(() -> consumerTask());
}
private void producerTask(InputStream input) {
Buffer current = new Buffer();
try {
while (true) {
current.fill(input); // 填充缓冲区
current = exchanger.exchange(current); // 交换缓冲区
}
} catch (IOException | InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void consumerTask() {
Buffer current = new Buffer();
try {
while (true) {
current = exchanger.exchange(current); // 交换缓冲区
current.process(); // 处理数据
current.size = 0; // 清空缓冲区
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
5.3.2 遗传算法交叉操作
在并行遗传算法中,两个线程交换染色体进行交叉操作
public class GeneticAlgorithm {
private final Exchanger<Chromosome> exchanger = new Exchanger<>();
private final int populationSize;
public GeneticAlgorithm(int populationSize) {
this.populationSize = populationSize;
}
public void evolve() {
ExecutorService pool = Executors.newFixedThreadPool(populationSize);
for (int i = 0; i < populationSize; i += 2) {
pool.submit(new Individual(i));
pool.submit(new Individual(i + 1));
}
}
class Individual implements Runnable {
private Chromosome chromosome;
Individual(int id) {
this.chromosome = new Chromosome(id);
}
@Override
public void run() {
try {
chromosome.evaluateFitness();
// 与配对的个体交换染色体
Chromosome partner = exchanger.exchange(chromosome);
// 执行交叉操作
chromosome = chromosome.crossover(partner);
chromosome.mutate();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Chromosome {
// 染色体实现
Chromosome crossover(Chromosome other) { return this; }
void evaluateFitness() {}
void mutate() {}
}
}
5.3.3 网络协议握手交换
实现自定义协议中双方交换密钥或初始化向量
public class SecureChannel {
private final Exchanger<byte[]> keyExchanger = new Exchanger<>();
public void establishChannel(Socket socket1, Socket socket2) {
new Thread(() -> partyTask(socket1)).start();
new Thread(() -> partyTask(socket2)).start();
}
private void partyTask(Socket socket) {
try {
// 生成随机密钥
byte[] myKey = generateSessionKey();
// 发送公钥给对方
OutputStream out = socket.getOutputStream();
out.write(myKey);
out.flush();
// 接收对方的公钥
InputStream in = socket.getInputStream();
byte[] theirKey = new byte[myKey.length];
in.read(theirKey);
// 交换密钥并计算共享密钥
byte[] sharedKey = keyExchanger.exchange(combineKeys(myKey, theirKey));
// 使用共享密钥开始通信
startEncryptedCommunication(socket, sharedKey);
} catch (IOException | InterruptedException e) {
handleError(e);
}
}
}
5.3.4 游戏玩家物品交易
在线游戏中两个玩家安全交换物品
public class PlayerTradingSystem {
private final Exchanger<TradeOffer> exchanger = new Exchanger<>();
public void trade(Player player1, Player player2) {
new Thread(() -> playerTradeTask(player1, player2)).start();
new Thread(() -> playerTradeTask(player2, player1)).start();
}
private void playerTradeTask(Player player, Player partner) {
try {
// 玩家选择要交易的物品
TradeOffer myOffer = player.createTradeOffer();
// 显示交易界面
player.showTradeInterface(partner);
// 等待双方确认
if (player.confirmTrade()) {
// 交换交易内容
TradeOffer theirOffer = exchanger.exchange(myOffer);
// 执行交易
player.completeTrade(theirOffer);
} else {
player.cancelTrade();
}
} catch (InterruptedException e) {
player.cancelTrade();
Thread.currentThread().interrupt();
}
}
static class TradeOffer {
List<Item> items;
int gold;
}
}
5.3.5 测试工具中的请求-响应交换
在性能测试工具中,一个线程发送请求,另一个线程验证响应
public class RequestResponseValidator {
private final Exchanger<HttpResponse> exchanger = new Exchanger<>();
public void testEndpoint(String url) {
new Thread(() -> requesterTask(url)).start();
new Thread(() -> validatorTask()).start();
}
private void requesterTask(String url) {
try {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
// 交换响应给验证器
exchanger.exchange(response);
} catch (IOException | InterruptedException | URISyntaxException e) {
handleError(e);
}
}
private void validatorTask() {
try {
// 获取响应并验证
HttpResponse<?> response = exchanger.exchange(null);
validateResponse(response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
5.3.6 数据流水线阶段交换
在数据处理流水线中,两个相邻阶段交换处理单元
public class DataPipeline {
private final Exchanger<DataBatch> exchanger = new Exchanger<>();
public void processData(DataSource source) {
new Thread(() -> stage1(source)).start();
new Thread(() -> stage2()).start();
}
private void stage1(DataSource source) {
DataBatch batch = new DataBatch();
try {
while (source.hasMore()) {
// 提取数据
batch.extract(source);
// 预处理
batch.preprocess();
// 交换给下一阶段
batch = exchanger.exchange(batch);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void stage2() {
DataBatch batch = new DataBatch();
try {
while (true) {
// 获取处理批次
batch = exchanger.exchange(batch);
// 分析数据
batch.analyze();
// 存储结果
batch.storeResults();
// 清空批次
batch.clear();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
5.4 Exchanger核心特性总结
5.5 使用注意事项
- 仅限两个线程:Exchanger 严格用于两个线程间的交换,多线程行为未定义
- 死锁风险:如果一个线程未调用 exchange,另一个线程会永久阻塞
- 对象复用:交换后对象会被对方线程修改,需要适当同步或使用不可变对象
- 性能考虑:适合中低频交换场景,高频交换可能成为瓶颈
- 超时设置:生产环境中应始终使用带超时的 exchange 方法
- 资源清理:确保线程中断时正确处理资源释放
5.6 典型应用场景对比
Exchanger 是 Java 并发工具包中的一颗"隐藏宝石",特别适合需要精确控制两个线程间数据交换的场景。在正确的场景下使用 Exchanger 可以简化设计并提高性能,但需要注意其严格的线程配对要求。
六、没有了
学习快乐!!!