这段代码是 Apache Ignite 中的 PoolProcessor
类的 构造函数 和 start()
方法,它的核心职责是:
💡 初始化 Ignite 节点中所有关键的线程池(Thread Pools),为不同类型的任务分配独立的执行资源。
🧱 一、类定位:PoolProcessor
- 作用:Ignite 内核(
GridKernalContext
)的一部分 - 职责:统一管理节点内所有线程池的创建与生命周期
- 重要性:相当于整个 Ignite 节点的“线程调度中枢”,不同任务走不同线程池,避免相互阻塞
🏗️ 二、构造函数分析:PoolProcessor(GridKernalContext ctx)
public PoolProcessor(GridKernalContext ctx) {
super(ctx);
IgnitePluginProcessor plugins = ctx.plugins();
if (plugins != null) {
final IoPool[] executorExtensions = ctx.plugins().extensions(IoPool.class);
if (executorExtensions != null) {
for (IoPool ex : executorExtensions) {
final byte id = ex.id();
// 1. ID 必须 >= 0
if (id < 0) throw new IgniteException("ID is negative");
// 2. ID 不能是保留给系统使用的(如 PUBLIC, SYSTEM 等)
if (GridIoPolicy.isReservedGridIoPolicy(id))
throw new IgniteException("ID in reserved range");
// 3. 不能重复注册同一个 ID
if (extPools[id] != null)
throw new IgniteException("ID already used");
// 安全包装(如果启用了安全模块)
extPools[id] = ctx.security().enabled() ?
new SecurityAwareIoPool(ctx.security(), ex) : ex;
}
}
}
}
🔍 做了什么?
加载插件扩展的 IO 线程池:
- 允许第三方插件通过
IoPool
接口注册自定义线程池 - 每个池有一个唯一的
byte id
作为标识
- 允许第三方插件通过
严格校验
id
合法性:- 非负数
- 不在系统保留范围(如
0~15
已被GridIoPolicy
占用) - 不能重复
存储到
extPools
数组中备用
✅ 这是 Ignite 扩展机制的一部分 —— 插件可以注入自己的执行线程池。
⚙️ 三、start()
方法详解
这是真正 创建所有线程池 的地方。
🌐 总体结构
@Override public void start() throws IgniteCheckedException {
super.start();
// 1. 获取配置
IgniteConfiguration cfg = ctx.config();
// 2. 设置未捕获异常处理器
UncaughtExceptionHandler excHnd = new UncaughtExceptionHandler() { ... };
// 3. 逐个创建各种功能专用线程池
execSvc = createExecutorService(...); // public pool
svcExecSvc = createExecutorService(...); // service pool
sysExecSvc = createExecutorService(...); // system pool
...
}
🧩 四、Ignite 的线程池模型(关键设计思想)
Ignite 采用 “多线程池隔离” 架构,不同任务使用不同线程池,防止:
- 高负载任务阻塞关键系统操作
- 查询任务拖慢故障检测
- P2P 类加载影响数据流
线程池名称 | 配置项 | 用途 | 特点 |
---|---|---|---|
publicThreadPool |
getPublicThreadPoolSize() |
用户缓存操作(put/get) | 默认 4 * CPU |
serviceThreadPool |
getServiceThreadPoolSize() |
用户部署的服务(Service Grid) | 默认 4 * CPU |
systemThreadPool |
getSystemThreadPoolSize() |
核心系统任务: - 故障检测 - 拓扑变更 - 心跳通信 |
默认 4 * CPU |
stripedPool |
getStripedPoolSize() |
高并发异步任务(如消息处理) | 使用 StripedExecutor (基于哈希的任务分片) |
managementThreadPool |
getManagementThreadPoolSize() |
JMX 管理操作 | 默认 4 |
peerClassLoadingThreadPool |
getPeerClassLoadingThreadPoolSize() |
节点间类加载(P2P) | 默认 2 |
dataStreamerThreadPool |
getDataStreamerThreadPoolSize() |
DataStreamer 数据灌入 |
使用 StripedExecutor |
asyncCallbackPool |
getAsyncCallbackPoolSize() |
异步回调执行 | IgniteStripedThreadPoolExecutor |
connectorThreadPool |
getConnectorConfiguration().getThreadPoolSize() |
REST/TCP 客户端连接处理 | 可选 |
utilityCacheThreadPool |
getUtilityCacheThreadPoolSize() |
内部缓存(如元数据) | 默认 4 |
affinityPool |
固定大小 1 | 分区映射(Affinity)计算 | 单线程,避免并发冲突 |
indexingPool (idxExecSvc ) |
CPU 相关 | SQL 索引操作(若开启 INDEXING) | 核心数 ~ 2倍核心数 |
buildIndexThreadPool |
getBuildIndexThreadPoolSize() |
构建索引(后台任务) | 默认 2 |
queryPool |
getQueryThreadPoolSize() |
SQL 查询执行 | 默认 4 * CPU |
schemaPool |
固定大小 2 | SQL 模式变更(CREATE/DROP TABLE) | 防止并发 DDL 冲突 |
rebalanceThreadPool |
getRebalanceThreadPoolSize() |
数据再平衡 | 后面还有创建 |
🔧 五、线程池创建方式说明
1. 普通线程池:createExecutorService(...)
createExecutorService(
"pub",
name,
coreSize, max, keepAlive,
new LinkedBlockingQueue<>(),
GridIoPolicy.PUBLIC_POOL,
oomeHnd
);
- 使用
LinkedBlockingQueue
(无界队列) corePoolSize == maxPoolSize
→ 固定大小线程池allowCoreThreadTimeOut(true)
→ 空闲时核心线程也可销毁- 绑定
GridIoPolicy
(用于网络通信优先级调度)
2. 分片线程池:createStripedExecutor(...)
stripedExecSvc = createStripedExecutor(...);
- 使用
StripedExecutor
:将任务按 key 哈希到固定数量的工作线程 - 保证同一 key 的任务串行执行,提高缓存局部性
- 常用于高并发消息处理、数据流等场景
3. 特殊线程池:IgniteStripedThreadPoolExecutor
callbackExecSvc = new IgniteStripedThreadPoolExecutor(...);
- 更轻量的 striped 实现,用于异步回调
🛡️ 六、异常处理机制
所有线程池的线程都设置了 UncaughtExceptionHandler
:
new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
}
⚠️ 任何线程抛出未捕获异常 → 触发 FailureProcessor → 可能导致节点停止或 JVM 终止
这是 Ignite 故障检测的重要一环。
📏 七、线程池大小验证:validateThreadPoolSize(...)
private void validateThreadPoolSize(int size, String name) {
if (size <= 0)
throw new IgniteException("Illegal " + name + " thread pool size: " + size);
}
- 防止配置错误导致无法启动
- 所有线程求数量都必须 > 0
🧩 八、GridIoPolicy
的作用(关键!)
每个线程池绑定一个 GridIoPolicy
(如 PUBLIC_POOL=1
, SYSTEM_POOL=3
),它用于:
- 网络通信层区分消息类型
- 不同类型的消息由对应的线程池处理
- 实现 消息级别的 QoS(服务质量)控制
例如:
SYSTEM_POOL
消息优先处理(心跳、故障探测)PUBLIC_POOL
处理用户请求- 避免普通请求堆积影响集群稳定性
✅ 九、总结:PoolProcessor
的核心价值
维度 | 说明 |
---|---|
🔐 稳定性 | 关键系统任务独立线程池,不受用户负载影响 |
⚡ 性能 | 多线程并行 + 分片执行(Striped),最大化吞吐 |
🧩 可扩展性 | 支持插件自定义线程池(IoPool ) |
📊 可观测性 | 各类任务资源隔离,便于监控和调优 |
🚨 容错性 | 线程异常 → 触发全局故障处理机制 |
📌 十、生产建议
根据负载调整线程池大小
- 高并发读写:增大
publicThreadPoolSize
- 大量 SQL 查询:增大
queryThreadPoolSize
- 开启持久化时:注意
stripedPool
和systemPool
的压力
- 高并发读写:增大
避免所有线程池共用一个队列
- Ignite 已做到隔离,不要破坏这种设计
监控线程池队列积压
- 使用 JMX 查看各线程池的
queue.size
- 使用 JMX 查看各线程池的
不要随意关闭
allowCoreThreadTimeOut
- 在低负载时节省资源
🎯 最后一句话总结
PoolProcessor
是 Ignite 的“线程资源调度中心”,它通过 精细化的线程池隔离 + 插件扩展能力 + 全局异常捕获,确保了分布式系统在高并发、多任务场景下的 稳定性、性能和可维护性。