🌟 大家好,我是摘星! 🌟
今天为大家带来的是并发设计模式实战系列,第十七章信号量(Semaphore),废话不多说直接开始~
目录
1. AQS(AbstractQueuedSynchronizer)的协作
一、核心原理深度拆解
1. 信号量本质模型
┌───────────────┐ ┌───────────────┐
│ Resource │───┬──>│ Semaphore │
│ Pool │ │ │ (计数器+队列) │
└───────────────┘ │ └───────────────┘
│
┌───────────────┐ │
│ Thread │<──┘
│ Request │
└───────────────┘
- 许可证机制:内部维护一个虚拟的许可计数器(permits)
- 双原子操作:
-
acquire()
:许可-1(当>0时立即返回,=0时线程阻塞)release()
:许可+1(唤醒等待队列中的线程)
- 公平性选择:支持FIFO队列或非公平竞争
2. 并发控制三要素
- 资源总量:
new Semaphore(N)
初始化许可数 - 占用规则:
tryAcquire(timeout)
防止死锁 - 释放保证:必须放在finally块中执行
二、生活化类比:停车场管理系统
系统组件 |
现实类比 |
核心行为 |
Semaphore |
剩余车位显示屏 |
显示可用车位数量 |
acquire() |
车辆进入抬杆 |
占用车位(数量-1) |
release() |
车辆离开 |
释放车位(数量+1) |
等待队列 |
入口排队车辆 |
按到达顺序或抢车位 |
- 突发流量:当100辆车同时到达50个车位的停车场时:
-
- 前50辆立即进入
- 后50辆需等待前车离开
三、Java代码实现(生产级Demo)
1. 完整可运行代码
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class SemaphoreDemo {
// 数据库连接池实现
static class ConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<Connection> pool;
public ConnectionPool(int poolSize) {
this.semaphore = new Semaphore(poolSize, true); // 公平模式
this.pool = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
pool.add(new Connection("Conn-" + i));
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // 如果没有许可则阻塞
return pool.take();
}
public void releaseConnection(Connection conn) {
pool.offer(conn);
semaphore.release(); // 释放许可
}
}
static class Connection {
private String name;
public Connection(String name) { this.name = name; }
@Override
public String toString() { return name; }
}
// 模拟业务操作
public static void main(String[] args) {
final ConnectionPool pool = new ConnectionPool(3);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Connection conn = pool.getConnection();
System.out.println(Thread.currentThread().getName()
+ " 获取连接: " + conn);
// 模拟业务操作
Thread.sleep(1000);
pool.releaseConnection(conn);
System.out.println(Thread.currentThread().getName()
+ " 释放连接: " + conn);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
2. 关键配置说明
// 创建信号量(公平模式 vs 非公平模式)
new Semaphore(permits, true);
// 超时获取许可(避免死锁)
semaphore.tryAcquire(2, TimeUnit.SECONDS);
// 一次性获取多个许可
semaphore.acquire(3); // 需要3个许可才能继续
四、横向对比表格
1. 并发控制工具对比
工具 |
特点 |
适用场景 |
Semaphore |
控制资源访问数量 |
连接池、限流 |
CountDownLatch |
一次性栅栏 |
多线程任务汇总 |
CyclicBarrier |
可重复使用的栅栏 |
多阶段并行计算 |
ReentrantLock |
独占锁 |
临界区精细控制 |
2. 信号量使用策略对比
策略 |
优点 |
缺点 |
公平模式 |
避免线程饥饿 |
吞吐量较低 |
非公平模式 |
吞吐量高 |
可能造成线程饥饿 |
多许可申请 |
支持复杂资源分配 |
容易导致死锁 |
可中断获取 |
响应线程中断 |
需要处理中断异常 |
五、高级应用技巧
1. 动态调整许可数
// 动态扩容(JDK没有直接方法,需通过包装实现)
class ResizableSemaphore {
private final ReentrantLock lock = new ReentrantLock();
private Semaphore semaphore;
public ResizableSemaphore(int permits) {
this.semaphore = new Semaphore(permits);
}
public void setPermits(int newPermits) {
lock.lock();
try {
int delta = newPermits - semaphore.availablePermits();
if (delta > 0) {
semaphore.release(delta); // 增加许可
} else {
semaphore.reducePermits(-delta); // 减少许可
}
} finally {
lock.unlock();
}
}
}
2. 信号量监控
// 监控关键指标
int availablePermits = semaphore.availablePermits();
int queueLength = semaphore.getQueueLength(); // 等待线程数
3. 与线程池结合
// 使用信号量限制任务提交速率
ExecutorService executor = Executors.newCachedThreadPool();
Semaphore rateLimiter = new Semaphore(10); // 最大10并发
executor.execute(() -> {
rateLimiter.acquire();
try {
// 执行任务...
} finally {
rateLimiter.release();
}
});
好的!我将延续原有结构,从 第六部分 开始扩展信号量(Semaphore)的高级特性和工程实践细节。
六、信号量的底层实现原理
1. AQS(AbstractQueuedSynchronizer)的协作
┌───────────────────┐
│ Semaphore │
│ (Sync继承AQS) │
│ - state=permits │
│ - 共享模式 │
└─────────┬─────────┘
│
┌─────────▼─────────┐
│ NonFairSync │ 或 │ FairSync │
│ - 直接竞争许可 │ │ - FIFO队列 │
└───────────────────┘
- state字段:存储当前可用许可数(volatile修饰)
- 共享模式:与ReentrantLock(独占模式)的核心区别
- 非公平实现:
NonFairSync.tryAcquireShared()
允许插队 - 公平实现:
FairSync.tryAcquireShared()
检查是否有等待队列
2. 关键方法源码片段
// JDK 17中的非公平获取逻辑
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining; // 负数表示获取失败
}
}
}
七、生产环境中的陷阱与解决方案
1. 典型问题清单
问题类型 |
现象 |
解决方案 |
许可泄漏 |
可用许可逐渐减少 |
必须用try-finally块保证释放 |
线程饥饿 |
低优先级线程长期未执行 |
使用公平模式 |
死锁 |
多许可申请顺序不当 |
统一申请/释放顺序 |
响应中断 |
阻塞线程无法响应中断 |
使用acquireInterruptibly() |
2. 调试技巧
// 1. 打印信号量状态
System.out.println("可用许可: " + semaphore.availablePermits());
System.out.println("等待线程: " + semaphore.getQueueLength());
// 2. 使用JMX监控
ManagementFactory.getPlatformMBeanServer()
.registerMBean(semaphore, new ObjectName("java.util.concurrent:type=Semaphore"));
八、与其他模式的组合应用
1. 信号量+线程池(流量整形)
ExecutorService executor = Executors.newCachedThreadPool();
Semaphore limiter = new Semaphore(20); // 最大20并发
void submitTask(Runnable task) {
limiter.acquire();
executor.execute(() -> {
try {
task.run();
} finally {
limiter.release();
}
});
}
2. 信号量+CountDownLatch(阶段控制)
Semaphore semaphore = new Semaphore(5);
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
semaphore.acquire();
try {
// 阶段1:受限资源操作
doPhase1Work();
latch.countDown();
// 阶段2:等待其他线程完成
latch.await();
doPhase2Work();
} finally {
semaphore.release();
}
}).start();
}
九、性能优化指南
1. 许可数计算公式
最大许可数 = (目标TPS × 平均耗时(秒)) / (1 - 冗余系数)
示例:
- 目标TPS=1000,平均耗时=0.1s,冗余系数=0.3
- 许可数 = (1000×0.1)/(1-0.3) ≈ 143
2. 不同场景下的参数建议
场景 |
许可数设置建议 |
公平性选择 |
数据库连接池 |
物理连接数的1.2倍 |
非公平 |
API限流 |
根据SLAB配额设置 |
公平 |
文件IO控制 |
CPU核心数×2 |
非公平 |
十、扩展变体实现
1. 可动态调整的信号量
class DynamicSemaphore {
private final ReentrantLock lock = new ReentrantLock();
private Semaphore semaphore;
public DynamicSemaphore(int permits) {
this.semaphore = new Semaphore(permits);
}
public void addPermits(int delta) {
lock.lock();
try {
if (delta > 0) {
semaphore.release(delta);
} else {
int reduction = -delta;
semaphore.acquire(reduction); // 减少可用许可
}
} finally {
lock.unlock();
}
}
}
2. 超时自动释放信号量
class AutoReleaseSemaphore {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
public AutoReleaseSemaphore(int permits) {
this.semaphore = new Semaphore(permits);
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}
public void acquireWithTimeout(long timeout, TimeUnit unit)
throws InterruptedException {
semaphore.acquire();
scheduler.schedule(() -> {
semaphore.release();
System.out.println("自动释放许可");
}, timeout, unit);
}
}
十一、行业应用案例
1. Kafka的吞吐控制
Kafka Producer使用Semaphore实现:
- 未确认请求数限制(max.in.flight.requests.per.connection)
- 内存缓冲区阻塞控制(buffer.memory)
2. Tomcat连接器配置
<!-- 在server.xml中配置信号量式连接限制 -->
<Connector
executor="threadPool"
maxConnections="10000" <!-- 信号量控制 -->
acceptCount="100" <!-- 等待队列 -->
/>