一、引言
1.1 什么是Semaphore信号量
Semaphore(信号量)是Java并发包java.util.concurrent
中的一个重要同步工具类,它用于控制同时访问某个资源的线程数量。Semaphore维护了一组许可证(permits),线程在访问资源前需要获取许可证,访问完成后释放许可证。
核心概念:
- 许可证(Permits):代表可以同时访问资源的线程数量
- 获取(Acquire):线程请求获得一个或多个许可证
- 释放(Release):线程释放持有的许可证,供其他线程使用
图解说明:上图展示了Semaphore的基本工作原理。许可证池中有固定数量的许可证,线程获取许可证后才能访问资源,当许可证用完时,新来的线程需要等待。
1.2 Semaphore的核心概念:许可证机制
许可证机制是Semaphore的核心设计思想,它类似于现实生活中的停车场管理:
时序图说明:
- Semaphore初始化时设定许可证数量(本例为2)
- 线程1和线程2成功获取许可证并访问资源
- 线程3因为无可用许可证而等待
- 当线程1释放许可证后,线程3立即获得许可证
- 所有线程使用完资源后都会释放许可证
1.3 为什么需要Semaphore
在实际开发中,我们经常遇到需要限制并发访问数量的场景:
典型应用场景:
- 数据库连接池:限制同时连接数据库的连接数
- HTTP连接池:控制同时发起的HTTP请求数量
- 线程池控制:限制同时执行的任务数量
- 限流保护:保护系统不被过多请求压垮
- 资源池管理:管理有限的系统资源(如文件句柄、内存等)
不使用Semaphore的问题:
// 问题示例:无控制的并发访问
public class UncontrolledAccess {
private final ExpensiveResource resource = new ExpensiveResource();
public void accessResource() {
// 所有线程都可以同时访问,可能导致:
// 1. 系统资源耗尽
// 2. 性能急剧下降
// 3. 系统崩溃
resource.doWork();
}
}
使用Semaphore的解决方案:
// 解决方案:使用Semaphore控制并发
public class ControlledAccess {
private final Semaphore semaphore = new Semaphore(5); // 最多5个线程同时访问
private final ExpensiveResource resource = new ExpensiveResource();
public void accessResource() throws InterruptedException {
semaphore.acquire(); // 获取许可证
try {
resource.doWork(); // 安全访问资源
} finally {
semaphore.release(); // 确保释放许可证
}
}
}
二、Semaphore核心原理
2.1 信号量的理论基础
信号量(Semaphore)概念最初由计算机科学家Edsger Dijkstra在1965年提出,是解决并发编程中资源竞争问题的重要工具。
信号量的数学模型:
P操作(获取):
if (S > 0) {
S = S - 1; // 获取成功,减少可用资源
} else {
block(); // 获取失败,线程阻塞等待
}
V操作(释放):
S = S + 1; // 增加可用资源
wakeup(); // 唤醒等待的线程
Java中的对应关系:
- P操作 ↔
acquire()
方法 - V操作 ↔
release()
方法 - S值 ↔ 可用许可证数量
2.2 基于AQS的底层实现机制
Semaphore基于AbstractQueuedSynchronizer(AQS)框架实现,AQS提供了同步状态管理和线程阻塞/唤醒的基础设施。
类图说明:
- Semaphore:对外提供的API接口
- Sync:内部抽象同步器,继承自AQS
- FairSync/NonfairSync:公平和非公平模式的具体实现
- AQS:提供同步状态管理和线程队列管理
AQS状态表示:
// AQS中的state字段表示可用许可证数量
// state > 0 : 有可用许可证
// state = 0 : 无可用许可证,新线程需要等待
// state < 0 : 理论上不会出现(Semaphore确保state >= 0)
protected int getState() { return state; }
protected boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2.3 公平与非公平模式原理
Semaphore支持两种获取许可证的策略:公平模式和非公平模式。
公平模式(FairSync):
protected int tryAcquireShared(int acquires) {
for (;;) {
// 关键:检查是否有前驱节点在等待
if (hasQueuedPredecessors())
return -1; // 有线程在排队,当前线程不能插队
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
非公平模式(NonfairSync):
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
// 注意:没有检查hasQueuedPredecessors(),允许插队
}
}
两种模式的对比:
性能特性分析:
- 公平模式:保证先到先得,但性能较低(需要检查队列状态)
- 非公平模式:性能更高,但可能导致线程饥饿
2.4 许可证的获取与释放流程
获取许可证的完整流程:
释放许可证的完整流程:
2.5 源码关键片段深度解析
核心获取逻辑:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS中的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 非公平模式的tryAcquireShared实现
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
源码解析要点:
1. 中断检查机制:
if (Thread.interrupted())
throw new InterruptedException();
在获取许可证前首先检查线程是否被中断,体现了可中断设计原则。
2. 快速路径优化:
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
首先尝试快速获取,只有在失败时才进入复杂的队列等待逻辑,这是一种重要的性能优化策略。
3. CAS无锁更新:
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
这里的逻辑很巧妙:
- 如果
remaining < 0
,说明许可证不足,直接返回负数表示失败 - 如果许可证足够,尝试CAS更新;如果CAS失败,会在下一次循环中重试
- 这种设计在高并发下既保证了正确性,又提供了较好的性能
4. 释放逻辑的实现:
public void release() {
sync.releaseShared(1);
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
释放逻辑的关键点:
- 溢出检查:
if (next < current)
检查整数溢出,防止许可证数量超过最大值 - 无条件释放:release操作不检查当前线程是否持有许可证,这是Semaphore的设计特点
- CAS重试:使用无锁的CAS操作更新状态,失败时自动重试
5. 等待队列的管理:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这段代码展示了AQS队列的精妙设计:
- 共享模式节点:使用
Node.SHARED
标记,与独占模式不同 - 传播机制:
setHeadAndPropagate
确保在有多个许可证时能唤醒多个等待线程 - 中断响应:
parkAndCheckInterrupt()
检查中断状态,实现可中断等待 - 异常安全:finally块确保在异常情况下正确清理节点
三、Semaphore核心API详解
3.1 构造方法详解
Semaphore提供了两个构造方法,允许配置许可证数量和公平性策略:
// 构造方法1:指定许可证数量,默认非公平模式
public Semaphore(int permits)
// 构造方法2:指定许可证数量和公平性策略
public Semaphore(int permits, boolean fair)
参数说明:
- permits:初始许可证数量,必须 >= 0
- fair:公平性策略,true为公平模式,false为非公平模式
使用示例:
// 创建一个有5个许可证的非公平信号量
Semaphore semaphore1 = new Semaphore(5);
// 创建一个有3个许可证的公平信号量
Semaphore semaphore2 = new Semaphore(3, true);
// 创建一个许可证数量为0的信号量(常用于一次性事件)
Semaphore semaphore3 = new Semaphore(0);
构造方法源码解析:
public Semaphore(int permits) {
sync = new NonfairSync(permits); // 默认使用非公平模式
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// Sync构造方法
Sync(int permits) {
setState(permits); // 设置AQS的state为许可证数量
}
构造方法设计要点:
- 默认选择非公平模式是因为性能更好,大多数场景下也是合适的
- 许可证数量直接设置为AQS的state值,利用AQS的原子操作保证线程安全
- 通过不同的Sync实现类来支持公平和非公平两种策略
3.2 acquire()系列方法深度剖析
acquire()系列方法是获取许可证的核心API,提供了多种获取策略:
// 基本获取方法
public void acquire() throws InterruptedException
public void acquire(int permits) throws InterruptedException
// 不可中断获取方法
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)
3.2.1 基本acquire()方法:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
方法特性分析:
- 可中断:线程在等待过程中可以被中断
- 阻塞等待:如果许可证不足,线程会阻塞直到有可用许可证
- 异常安全:被中断时抛出InterruptedException
使用示例:
public class ResourceManager {
private final Semaphore semaphore = new Semaphore(3);
public void accessResource() throws InterruptedException {
// 获取一个许可证
semaphore.acquire();
try {
System.out.println(Thread.currentThread().getName() + " 正在访问资源");
Thread.sleep(2000); // 模拟资源使用
} finally {
semaphore.release(); // 确保释放许可证
}
}
public void batchAccess(int count) throws InterruptedException {
// 批量获取多个许可证
semaphore.acquire(count);
try {
System.out.println("批量获取了 " + count + " 个许可证");
// 执行需要多个许可证的操作
} finally {
semaphore.release(count); // 批量释放
}
}
}
3.2.2 不可中断acquire方法:
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
不可中断方法的特点:
- 线程在等待过程中不响应中断
- 适用于必须获得资源才能继续的场景
- 即使线程被中断,也会继续等待直到获得许可证
使用场景对比:
// 场景1:可中断获取 - 适用于可以取消的任务
public void cancellableTask() {
try {
semaphore.acquire(); // 可以被中断取消
doWork();
} catch (InterruptedException e) {
System.out.println("任务被取消");
Thread.currentThread().interrupt(); // 恢复中断状态
} finally {
semaphore.release();
}
}
// 场景2:不可中断获取 - 适用于关键资源访问
public void criticalTask() {
semaphore.acquireUninterruptibly(); // 不可被中断
try {
doCriticalWork(); // 关键任务必须完成
} finally {
semaphore.release();
}
}
3.3 release()系列方法详解
release()方法用于释放许可证,增加可用许可证的数量:
// 释放一个许可证
public void release()
// 释放多个许可证
public void release(int permits)
3.3.1 基本release()方法源码:
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
// AQS中的releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 唤醒等待的线程
return true;
}
return false;
}
release方法的重要特性:
1. 无所有权检查:
// Semaphore不检查释放许可证的线程是否曾经获取过许可证
public class PermitDemo {
private static final Semaphore semaphore = new Semaphore(1);
public static void main(String[] args) throws InterruptedException {
// 线程A获取许可证
Thread threadA = new Thread(() -> {
try {
semaphore.acquire();
System.out.println("Thread A 获取许可证");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 注意:Thread A 不释放许可证
});
// 线程B释放许可证(即使它没有获取过)
Thread threadB = new Thread(() -> {
try {
Thread.sleep(1000);
semaphore.release(); // 这是允许的!
System.out.println("Thread B 释放许可证");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadA.start();
threadB.start();
threadA.join();
threadB.join();
}
}
这种设计的优缺点:
- 优点:灵活性高,可以实现复杂的许可证管理策略
- 缺点:容易出现许可证泄漏或错误释放的问题
2. 许可证数量可以超过初始值:
public class PermitOverflow {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2); // 初始2个许可证
System.out.println("初始许可证: " + semaphore.availablePermits()); // 输出: 2
semaphore.release(3); // 释放3个许可证
System.out.println("释放后许可证: " + semaphore.availablePermits()); // 输出: 5
// 现在有5个许可证可用,超过了初始值2
}
}
3. 批量释放的原子性:
// 批量释放许可证的示例
public class BatchRelease {
private final Semaphore semaphore = new Semaphore(10);
public void batchOperation() throws InterruptedException {
int permits = 5;
semaphore.acquire(permits); // 批量获取5个许可证
try {
// 执行需要5个许可证的操作
processWithMultiplePermits();
} finally {
semaphore.release(permits); // 原子性地释放5个许可证
}
}
private void processWithMultiplePermits() {
// 模拟需要多个许可证的操作
System.out.println("使用5个许可证执行批量操作");
}
}
3.4 tryAcquire()尝试获取许可证
tryAcquire()系列方法提供非阻塞的许可证获取方式:
// 立即尝试获取,不等待
public boolean tryAcquire()
public boolean tryAcquire(int permits)
// 带超时的尝试获取
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
3.4.1 立即尝试获取:
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
重要特性:
- 非阻塞:立即返回,不会等待
- 非公平:即使是公平模式的Semaphore,tryAcquire也是非公平的
- 返回值:true表示获取成功,false表示获取失败
使用示例:
public class NonBlockingAccess {
private final Semaphore semaphore = new Semaphore(3);
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicInteger failureCount = new AtomicInteger(0);
public void attemptAccess() {
if (semaphore.tryAcquire()) {
successCount.incrementAndGet();
try {
System.out.println(Thread.currentThread().getName() + " 获取许可证成功");
Thread.sleep(1000); // 模拟工作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
} else {
failureCount.incrementAndGet();
System.out.println(Thread.currentThread().getName() + " 获取许可证失败,执行降级逻辑");
fallbackLogic(); // 执行备用逻辑
}
}
private void fallbackLogic() {
// 获取许可证失败时的备用处理逻辑
System.out.println("执行备用处理方案");
}
public void printStatistics() {
System.out.println("成功获取: " + successCount.get());
System.out.println("获取失败: " + failureCount.get());
}
}
3.4.2 带超时的尝试获取:
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
超时获取的特性:
- 有限等待:在指定时间内等待,超时则返回false
- 可中断:等待过程中可以被中断
- 精确控制:可以精确控制等待时间
超时获取的使用场景:
public class TimeoutAccess {
private final Semaphore semaphore = new Semaphore(2);
// 场景1:用户请求处理,不能无限等待
public boolean processUserRequest() {
try {
// 最多等待5秒
if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
try {
return doProcessing();
} finally {
semaphore.release();
}
} else {
System.out.println("系统繁忙,请稍后重试");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
// 场景2:批处理任务,设置合理超时
public boolean processBatch(List<Task> tasks) {
int requiredPermits = Math.min(tasks.size(), semaphore.availablePermits());
try {
// 根据任务数量动态设置超时时间
long timeout = tasks.size() * 100; // 每个任务预期100ms
if (semaphore.tryAcquire(requiredPermits, timeout, TimeUnit.MILLISECONDS)) {
try {
return processTasks(tasks.subList(0, requiredPermits));
} finally {
semaphore.release(requiredPermits);
}
} else {
// 部分处理策略
return processPartially(tasks);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
private boolean doProcessing() {
// 模拟处理逻辑
return true;
}
private boolean processTasks(List<Task> tasks) {
// 批量处理任务
return true;
}
private boolean processPartially(List<Task> tasks) {
// 部分处理逻辑
return true;
}
static class Task {
// 任务定义
}
}
3.5 其他重要方法介绍
除了核心的获取和释放方法,Semaphore还提供了一些实用的辅助方法:
3.5.1 状态查询方法:
// 获取当前可用许可证数量
public int availablePermits()
// 获取正在等待许可证的线程数量(估算值)
public final int getQueueLength()
// 检查是否有线程正在等待许可证
public final boolean hasQueuedThreads()
// 返回等待线程的集合(用于调试)
protected Collection<Thread> getQueuedThreads()
// 检查是否使用公平策略
public boolean isFair()
状态查询方法的使用示例:
public class SemaphoreMonitor {
private final Semaphore semaphore;
private final int totalPermits;
public SemaphoreMonitor(int permits, boolean fair) {
this.semaphore = new Semaphore(permits, fair);
this.totalPermits = permits;
}
public void printStatus() {
System.out.println("=== Semaphore 状态监控 ===");
System.out.println("总许可证数量: " + totalPermits);
System.out.println("可用许可证: " + semaphore.availablePermits());
System.out.println("使用中许可证: " + (totalPermits - semaphore.availablePermits()));
System.out.println("等待队列长度: " + semaphore.getQueueLength());
System.out.println("是否有等待线程: " + semaphore.hasQueuedThreads());
System.out.println("公平模式: " + semaphore.isFair());
System.out.println("使用率: " + String.format("%.1f%%",
(totalPermits - semaphore.availablePermits()) * 100.0 / totalPermits));
}
// 监控线程,定期输出状态
public void startMonitoring() {
Thread monitor = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
printStatus();
Thread.sleep(5000); // 每5秒监控一次
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
monitor.setDaemon(true);
monitor.start();
}
}
3.5.2 批量操作方法:
// 排空所有可用许可证
public int drainPermits()
// 减少可用许可证数量
protected void reducePermits(int reduction)
批量操作的源码实现:
public int drainPermits() {
return sync.drainPermits();
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
drainPermits的使用场景:
public class SystemMaintenance {
private final Semaphore semaphore = new Semaphore(10);
// 系统维护时,停止新的请求处理
public void startMaintenance() {
System.out.println("开始系统维护...");
// 排空所有许可证,阻止新请求
int drained = semaphore.drainPermits();
System.out.println("已排空 " + drained + " 个许可证");
// 等待当前正在处理的请求完成
waitForActiveRequestsComplete();
// 执行维护操作
performMaintenance();
}
public void endMaintenance() {
System.out.println("维护完成,恢复服务...");
// 恢复许可证,重新允许请求处理
semaphore.release(10);
}
private void waitForActiveRequestsComplete() {
// 等待活跃请求完成的逻辑
}
private void performMaintenance() {
// 维护操作逻辑
}
}
方法使用注意事项:
- availablePermits():返回的是瞬时值,在高并发环境下可能立即过期
- getQueueLength():返回的是估算值,主要用于监控和调试
- drainPermits():会影响所有等待的线程,使用时需要谨慎
- reducePermits():受保护的方法,主要用于子类扩展
四、Semaphore典型使用场景
4.1 连接池资源管理
连接池是Semaphore最经典的应用场景之一。在数据库连接池、HTTP连接池等场景中,需要限制同时建立的连接数量以避免资源耗尽。
连接池的基本实现框架:
public class DatabaseConnectionPool {
private final Semaphore connectionSemaphore;
private final Queue<Connection> availableConnections;
private final Set<Connection> allConnections;
private final int maxConnections;
public DatabaseConnectionPool(int maxConnections) {
this.maxConnections = maxConnections;
this.connectionSemaphore = new Semaphore(maxConnections);
this.availableConnections = new ConcurrentLinkedQueue<>();
this.allConnections = ConcurrentHashMap.newKeySet();
// 初始化连接池
initializeConnections();
}
public Connection getConnection() throws InterruptedException {
// 获取许可证
connectionSemaphore.acquire();
try {
Connection connection = availableConnections.poll();
if (connection == null || !isValidConnection(connection)) {
connection = createNewConnection();
}
return connection;
} catch (Exception e) {
// 如果获取连接失败,释放许可证
connectionSemaphore.release();
throw e;
}
}
public void returnConnection(Connection connection) {
if (connection != null && allConnections.contains(connection)) {
if (isValidConnection(connection)) {
availableConnections.offer(connection);
} else {
// 连接已失效,创建新连接补充
try {
Connection newConnection = createNewConnection();
availableConnections.offer(newConnection);
} catch (Exception e) {
// 创建失败时记录日志,但仍要释放许可证
System.err.println("Failed to create replacement connection: " + e.getMessage());
}
}
// 释放许可证,允许其他线程获取连接
connectionSemaphore.release();
}
}
private void initializeConnections() {
for (int i = 0; i < maxConnections; i++) {
try {
Connection connection = createNewConnection();
availableConnections.offer(connection);
} catch (Exception e) {
System.err.println("Failed to initialize connection: " + e.getMessage());
}
}
}
private Connection createNewConnection() {
// 模拟创建数据库连接
Connection connection = new MockConnection();
allConnections.add(connection);
return connection;
}
private boolean isValidConnection(Connection connection) {
// 检查连接是否有效
try {
return connection != null && !connection.isClosed();
} catch (Exception e) {
return false;
}
}
// 模拟Connection类
private static class MockConnection implements Connection {
private boolean closed = false;
@Override
public boolean isClosed() { return closed; }
@Override
public void close() { closed = true; }
// 其他Connection方法的模拟实现...
}
}
连接池使用示例:
public class DatabaseService {
private final DatabaseConnectionPool connectionPool;
public DatabaseService() {
this.connectionPool = new DatabaseConnectionPool(10); // 最大10个连接
}
public void executeQuery(String sql) {
Connection connection = null;
try {
// 获取连接(可能需要等待)
connection = connectionPool.getConnection();
// 执行数据库操作
executeSQL(connection, sql);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("获取数据库连接被中断");
} catch (Exception e) {
System.err.println("数据库操作失败: " + e.getMessage());
} finally {
// 确保连接被归还
if (connection != null) {
connectionPool.returnConnection(connection);
}
}
}
private void executeSQL(Connection connection, String sql) {
// 模拟SQL执行
System.out.println("执行SQL: " + sql);
try {
Thread.sleep(1000); // 模拟数据库操作耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
4.2 限流与流量控制
在微服务架构中,限流是保护系统稳定性的重要手段。Semaphore可以有效控制接口的并发访问量。
接口限流器的实现:
public class RateLimiter {
private final Semaphore semaphore;
private final int maxConcurrency;
private final long timeoutMs;
// 监控统计
private final AtomicLong totalRequests = new AtomicLong(0);
private final AtomicLong rejectedRequests = new AtomicLong(0);
private final AtomicLong timeoutRequests = new AtomicLong(0);
public RateLimiter(int maxConcurrency, long timeoutMs) {
this.maxConcurrency = maxConcurrency;
this.timeoutMs = timeoutMs;
this.semaphore = new Semaphore(maxConcurrency);
}
/**
* 执行限流保护的操作
*/
public <T> T execute(Supplier<T> operation) throws RateLimitException {
totalRequests.incrementAndGet();
try {
// 尝试在指定时间内获取许可证
if (semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS)) {
try {
return operation.get();
} finally {
semaphore.release();
}
} else {
// 获取许可证超时
timeoutRequests.incrementAndGet();
throw new RateLimitException("Request timeout: too many concurrent requests");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
rejectedRequests.incrementAndGet();
throw new RateLimitException("Request interrupted", e);
}
}
/**
* 立即执行,不等待
*/
public <T> Optional<T> tryExecute(Supplier<T> operation) {
totalRequests.incrementAndGet();
if (semaphore.tryAcquire()) {
try {
return Optional.of(operation.get());
} finally {
semaphore.release();
}
} else {
rejectedRequests.incrementAndGet();
return Optional.empty();
}
}
/**
* 获取限流统计信息
*/
public RateLimitStats getStats() {
return new RateLimitStats(
maxConcurrency,
semaphore.availablePermits(),
totalRequests.get(),
rejectedRequests.get(),
timeoutRequests.get(),
semaphore.getQueueLength()
);
}
/**
* 动态调整并发限制
*/
public void adjustLimit(int newLimit) {
int currentLimit = maxConcurrency;
int difference = newLimit - currentLimit;
if (difference > 0) {
// 增加许可证
semaphore.release(difference);
} else if (difference < 0) {
// 减少许可证
try {
semaphore.acquire(-difference);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 自定义异常
public static class RateLimitException extends Exception {
public RateLimitException(String message) {
super(message);
}
public RateLimitException(String message, Throwable cause) {
super(message, cause);
}
}
// 统计信息类
public static class RateLimitStats {
private final int maxConcurrency;
private final int availablePermits;
private final long totalRequests;
private final long rejectedRequests;
private final long timeoutRequests;
private final int waitingThreads;
public RateLimitStats(int maxConcurrency, int availablePermits,
long totalRequests, long rejectedRequests,
long timeoutRequests, int waitingThreads) {
this.maxConcurrency = maxConcurrency;
this.availablePermits = availablePermits;
this.totalRequests = totalRequests;
this.rejectedRequests = rejectedRequests;
this.timeoutRequests = timeoutRequests;
this.waitingThreads = waitingThreads;
}
public double getSuccessRate() {
return totalRequests == 0 ? 1.0 :
(totalRequests - rejectedRequests - timeoutRequests) * 1.0 / totalRequests;
}
public double getCurrentUtilization() {
return (maxConcurrency - availablePermits) * 1.0 / maxConcurrency;
}
@Override
public String toString() {
return String.format(
"RateLimitStats{maxConcurrency=%d, available=%d, " +
"total=%d, rejected=%d, timeout=%d, waiting=%d, " +
"successRate=%.2f%%, utilization=%.2f%%}",
maxConcurrency, availablePermits, totalRequests,
rejectedRequests, timeoutRequests, waitingThreads,
getSuccessRate() * 100, getCurrentUtilization() * 100
);
}
}
}
限流器的使用示例:
public class APIController {
private final RateLimiter rateLimiter = new RateLimiter(10, 5000); // 最大10并发,超时5秒
// 使用限流保护的API
public ResponseEntity<String> protectedAPI() {
try {
String result = rateLimiter.execute(() -> {
// 执行实际的业务逻辑
return performBusinessLogic();
});
return ResponseEntity.ok(result);
} catch (RateLimiter.RateLimitException e) {
return ResponseEntity.status(429) // Too Many Requests
.body("Service is busy, please try again later");
}
}
// 立即响应的API(不等待)
public ResponseEntity<String> immediateAPI() {
Optional<String> result = rateLimiter.tryExecute(() -> {
return performBusinessLogic();
});
if (result.isPresent()) {
return ResponseEntity.ok(result.get());
} else {
return ResponseEntity.status(503) // Service Unavailable
.body("Service is temporarily unavailable");
}
}
// 监控接口
public ResponseEntity<RateLimiter.RateLimitStats> getStats() {
return ResponseEntity.ok(rateLimiter.getStats());
}
private String performBusinessLogic() {
// 模拟业务处理
try {
Thread.sleep(1000); // 模拟1秒处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Business logic executed successfully";
}
}
4.3 生产者-消费者模式
在生产者-消费者模式中,Semaphore可以用来控制缓冲区的容量,确保生产者不会生产过多数据,消费者也不会消费空缓冲区。
基于Semaphore的生产者-消费者实现:
public class ProducerConsumerBuffer<T> {
private final Queue<T> buffer;
private final Semaphore emptySlots; // 空槽位信号量
private final Semaphore fullSlots; // 满槽位信号量
private final Semaphore mutex; // 互斥访问信号量
private final int capacity;
// 统计信息
private final AtomicLong producedCount = new AtomicLong(0);
private final AtomicLong consumedCount = new AtomicLong(0);
public ProducerConsumerBuffer(int capacity) {
this.capacity = capacity;
this.buffer = new LinkedList<>();
this.emptySlots = new Semaphore(capacity); // 初始时所有槽位都为空
this.fullSlots = new Semaphore(0); // 初始时没有满槽位
this.mutex = new Semaphore(1); // 互斥访问
}
/**
* 生产者放入数据
*/
public void put(T item) throws InterruptedException {
emptySlots.acquire(); // 等待空槽位
mutex.acquire(); // 获取互斥锁
try {
buffer.offer(item);
producedCount.incrementAndGet();
System.out.println("Produced: " + item + ", Buffer size: " + buffer.size());
} finally {
mutex.release(); // 释放互斥锁
fullSlots.release(); // 增加满槽位计数
}
}
/**
* 生产者尝试放入数据(非阻塞)
*/
public boolean tryPut(T item, long timeout, TimeUnit unit) throws InterruptedException {
if (emptySlots.tryAcquire(timeout, unit)) {
if (mutex.tryAcquire(timeout, unit)) {
try {
buffer.offer(item);
producedCount.incrementAndGet();
System.out.println("Produced (timeout): " + item + ", Buffer size: " + buffer.size());
return true;
} finally {
mutex.release();
fullSlots.release();
}
} else {
emptySlots.release(); // 获取mutex失败,释放emptySlots
return false;
}
}
return false;
}
/**
* 消费者取出数据
*/
public T take() throws InterruptedException {
fullSlots.acquire(); // 等待满槽位
mutex.acquire(); // 获取互斥锁
try {
T item = buffer.poll();
consumedCount.incrementAndGet();
System.out.println("Consumed: " + item + ", Buffer size: " + buffer.size());
return item;
} finally {
mutex.release(); // 释放互斥锁
emptySlots.release(); // 增加空槽位计数
}
}
/**
* 消费者尝试取出数据(非阻塞)
*/
public T tryTake(long timeout, TimeUnit unit) throws InterruptedException {
if (fullSlots.tryAcquire(timeout, unit)) {
if (mutex.tryAcquire(timeout, unit)) {
try {
T item = buffer.poll();
consumedCount.incrementAndGet();
System.out.println("Consumed (timeout): " + item + ", Buffer size: " + buffer.size());
return item;
} finally {
mutex.release();
emptySlots.release();
}
} else {
fullSlots.release(); // 获取mutex失败,释放fullSlots
return null;
}
}
return null;
}
/**
* 获取缓冲区状态
*/
public BufferStats getStats() {
return new BufferStats(
capacity,
buffer.size(),
emptySlots.availablePermits(),
fullSlots.availablePermits(),
producedCount.get(),
consumedCount.get()
);
}
/**
* 清空缓冲区
*/
public void clear() throws InterruptedException {
mutex.acquire();
try {
int size = buffer.size();
buffer.clear();
// 重置信号量状态
fullSlots.drainPermits();
emptySlots.drainPermits();
emptySlots.release(capacity);
System.out.println("Buffer cleared, removed " + size + " items");
} finally {
mutex.release();
}
}
// 缓冲区统计信息
public static class BufferStats {
private final int capacity;
private final int currentSize;
private final int emptySlots;
private final int fullSlots;
private final long producedCount;
private final long consumedCount;
public BufferStats(int capacity, int currentSize, int emptySlots,
int fullSlots, long producedCount, long consumedCount) {
this.capacity = capacity;
this.currentSize = currentSize;
this.emptySlots = emptySlots;
this.fullSlots = fullSlots;
this.producedCount = producedCount;
this.consumedCount = consumedCount;
}
public double getUtilization() {
return currentSize * 1.0 / capacity;
}
@Override
public String toString() {
return String.format(
"BufferStats{capacity=%d, size=%d, empty=%d, full=%d, " +
"produced=%d, consumed=%d, utilization=%.1f%%}",
capacity, currentSize, emptySlots, fullSlots,
producedCount, consumedCount, getUtilization() * 100
);
}
}
}
生产者-消费者的使用示例:
public class ProducerConsumerDemo {
private static final ProducerConsumerBuffer<String> buffer =
new ProducerConsumerBuffer<>(5);
public static void main(String[] args) throws InterruptedException {
// 启动多个生产者
for (int i = 0; i < 3; i++) {
final int producerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 10; j++) {
String item = "Producer-" + producerId + "-Item-" + j;
buffer.put(item);
Thread.sleep(1000); // 生产间隔
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer-" + i).start();
}
// 启动多个消费者
for (int i = 0; i < 2; i++) {
final int consumerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 15; j++) {
String item = buffer.take();
// 模拟消费处理时间
Thread.sleep(1500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer-" + i).start();
}
// 监控线程
new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(3000);
System.out.println("=== " + buffer.getStats() + " ===");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Monitor").start();
// 主线程等待
Thread.sleep(30000);
}
}