这是一个非常典型的 分片线程池(Striped Thread Pool) 实现,名为 IgniteStripedThreadPoolExecutor
,是 Apache Ignite 自定义的并发执行框架组件。
🧱 一、核心思想:什么是“Striped”线程池?
💡 关键特性:同一个“索引”(index)的任务,永远由同一个线程执行。
这解决了两个问题:
- 性能:避免锁竞争(多个任务操作同一数据时,串行化处理)
- 顺序性:保证特定数据的操作顺序(如 key=A 的消息不乱序)
类比理解:
想象一个快递分拣中心,有 N
个工人(线程),包裹按目的地编号 % N
分配给某个固定工人处理。
- 所有发往“杭州”的包裹 → 都由 3 号工人处理
- 所有发往“北京”的包裹 → 都由 1 号工人处理
这样既并行(多个城市同时处理),又保证了单个城市的顺序。
📦 二、字段解析
private final ExecutorService[] execs;
- 这是一个 线程池数组,每个元素是一个独立的
ExecutorService
- 数组长度 =
concurrentLvl
(并发级别),也就是“条带数” - 每个子线程池大小为 1(后面会看到)
✅ 相当于:创建了
N
个单线程池,组成一个“线程池组”
🔧 三、构造函数详解
public IgniteStripedThreadPoolExecutor(
int concurrentLvl,
String igniteInstanceName,
String threadNamePrefix,
UncaughtExceptionHandler eHnd,
boolean allowCoreThreadTimeOut,
long keepAliveTime)
参数说明:
参数 | 含义 |
---|---|
concurrentLvl |
并发等级 → 决定有多少个“条带”(即多少个子线程池) |
igniteInstanceName |
节点名,用于线程命名 |
threadNamePrefix |
线程名前缀,如 "callback" |
eHnd |
异常处理器,捕获未处理异常 |
allowCoreThreadTimeOut |
是否允许核心线程超时销毁 |
keepAliveTime |
空闲线程等待新任务的最长时间 |
构造逻辑:
execs = new ExecutorService[concurrentLvl];
ThreadFactory factory = new IgniteThreadFactory(...);
for (int i = 0; i < concurrentLvl; i++) {
IgniteThreadPoolExecutor executor = new IgniteThreadPoolExecutor(
1, // corePoolSize
1, // maximumPoolSize
keepAliveTime,
new LinkedBlockingQueue<>(),
factory
);
executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
execs[i] = executor;
}
✅ 每个子线程池都是 单线程执行器(Single-threaded)
🔍 为什么每个条带是单线程?
- 保证 同一个 idx 的任务串行执行
- 避免并发修改共享状态(如缓存、状态机)
- 性能上接近无锁设计(只要哈希分布均匀)
🚀 四、核心方法:execute(Runnable task, int idx)
public void execute(Runnable task, int idx) {
execs[threadId(idx)].execute(task);
}
这是唯一可用的提交任务的方法。
工作流程:
- 根据
idx
计算应该由哪个线程处理 - 提交到对应的子线程池
➤ threadId(idx)
方法:
public int threadId(int idx) {
return idx < execs.length ? idx : idx % execs.length;
}
- 如果
idx
小于条带数 → 直接使用idx
- 否则取模 → 均匀分布到各个线程
✅ 这是一个 哈希映射策略,将任意整数
idx
映射到[0, N)
范围内
⚠️ 五、禁用的方法(重要!)
这个类 故意禁用了标准 ExecutorService
的所有通用提交方法:
@Override public void execute(Runnable cmd) {
throw new UnsupportedOperationException();
}
@Override public <T> Future<T> submit(Callable<T> task) {
throw new UnsupportedOperationException();
}
// ... 其他 submit/invoke 方法也都抛异常
❓ 为什么?
因为:
- 没有
idx
→ 无法决定哪个线程执行 - 必须显式指定
idx
才能路由任务 - 强制用户遵守“按索引分片”的编程模型
✅ 这是一种 设计约束:你必须知道你的任务属于哪个“条带”
🔄 六、生命周期管理方法
这些方法对所有子线程池进行统一操作:
方法 | 行为 |
---|---|
shutdown() |
所有子线程池调用 shutdown() |
shutdownNow() |
所有子线程池尝试中断,并收集未执行任务 |
isShutdown() |
所有都 shutdown 才返回 true |
isTerminated() |
所有都终止才返回 true |
awaitTermination() |
等待所有子线程池结束 |
✅ 符合
ExecutorService
接口规范,整体作为一个单元关闭
🎯 七、典型使用场景(在 Ignite 中)
这类线程池主要用于:
1. 异步回调执行(callbackExecSvc
)
callbackExecSvc = new IgniteStripedThreadPoolExecutor(
cfg.getAsyncCallbackPoolSize(),
"callback",
oomeHnd,
false,
0
);
- 每个缓存键(key)的监听器回调 → 按 key.hashCode() % N 分配线程
- 保证同一个 key 的事件不乱序
2. 数据流处理(DataStreamer)
- 数据按 key 分片写入,每个分片由固定线程处理
3. 消息处理管道
- 消息带有一个“会话ID”或“分区ID”,相同 ID 的消息必须顺序处理
📊 八、优缺点总结
优点 | 缺点 |
---|---|
✅ 高并发 + 保序 | ❌ 必须提前知道“分片键”(idx) |
✅ 减少锁竞争(每个线程只处理自己的任务) | ❌ 线程间负载可能不均(热 key 问题) |
✅ 简单高效,接近无锁设计 | ❌ 不支持 submit() 返回 Future (无法获取结果) |
✅ 易于调试(知道哪个线程在处理哪类任务) | ❌ 不能用于通用任务调度 |
🧩 九、和 JDK 原生类的对比
类型 | 特点 |
---|---|
Executors.newFixedThreadPool(N) |
所有任务随机分配给 N 个线程,无顺序保证 |
Executors.newSingleThreadExecutor() |
所有任务串行执行,性能低 |
ForkJoinPool.commonPool() |
工作窃取,适合分治任务 |
IgniteStripedThreadPoolExecutor |
分片并行 + 局部串行,兼顾吞吐与顺序 |
🔁 它填补了“完全并行”和“完全串行”之间的空白
✅ 十、一句话总结
IgniteStripedThreadPoolExecutor
是一种 基于索引分片的任务调度器,它通过将任务绑定到固定的线程上来实现 局部串行 + 全局并行,特别适用于需要 顺序处理但又追求高吞吐 的场景(如事件回调、消息队列、数据流等)。
💡 使用建议
// 示例:按缓存 key 分发回调
int idx = key.hashCode();
stripedExecutor.execute(() -> {
// 处理某个 key 的事件
}, idx);
- 选择合适的
concurrentLvl
(通常为 CPU 核心数或稍大) - 确保
idx
分布均匀,避免“热点线程” - 不要用它执行长时间阻塞任务(会影响该条带的所有任务)
如果你想实现类似功能,也可以基于此模式封装自己的 StripedExecutor
。