背景
Spark Executor decommission 最初是 AWS 为了 Spot 节点和 Spot Block 节点配置的, Spot 节点仅有正常节点几分之一的价格,但是可能在任何时间收回,收回的时候仅给 120 秒的缓冲期。
Spot Block 比正常价格便宜 30%~45%,但是稳定性比 Spot 节点高,回收时缓冲期为最高可以达6小时).
参考文档:
Using AWS Spot Instances and Spot Blocks in QDS Clusters
New – EC2 Spot Blocks for Defined-Duration Workloads
其他场景没有 Spot 和 Spot Block 节点,Spark 和 cluster manager(YARN 和 Mesos) 结合。如 YARN 框架 Graceful Decommission of YARN Nodes 说明配置了 node 的优雅退出过程。包含以下3步:
- 先添加要 decommission 的 node 到 /etc/hadoop/conf/yarn.exclude
- 然后执行
yarn rmadmin -refreshNodes -g 100 --client
,-g 是 gracefull,100 是等待时间100秒,NodeManager 节点开始 decommission 过程。 - Application Master 收到 decommission 的 executors,给 executor 发送 Decommission 的请求。Executor 进行 Decommission 操作。
Application Master 的日志:
24/03/05 16:55:55 INFO YarnClusterSchedulerBackend: Decommission executors: 2
24/03/05 16:55:55 INFO YarnClusterSchedulerBackend: Notify executor 2 to decommission.
24/03/05 16:55:55 INFO BlockManagerMasterEndpoint: Mark BlockManagers (BlockManagerId(2, core-8ce527b-1, 24639, None)) as being decommissioning.
24/03/05 16:55:55 INFO ShuffleStatus: Updating map output for 1843 to BlockManagerId(3, core-8ce527b-2, 21569, None)
24/03/05 16:55:55 INFO ShuffleStatus: Updating map output for 1825 to BlockManagerId(1, core-8ce527b-2, 29453, None)
另一个触发 Decommission 的流程开启 Dynamic Allocation 后,Shuffle Tracking 查找没有任务的 executors,开始触发 decommission 的过程。
Spark Decommission 的功能
- Executor 停止运行新的 Task
- 把 cached RDD blocks 和 shuffle blocks 传输到其他的 Executor 和 Remote Storage。更新 driver 端的 shuffle status。
- Reducer 获取新的 shuffle status,读取 shuffle 数据。
增加的参数
--conf spark.storage.decommission.enabled=true \
--conf spark.storage.decommission.shuffleBlocks.enabled=true \
--conf spark.storage.decommission.rddBlocks.enabled=true \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10s \
--conf spark.storage.decommission.fallbackStorage.path=bos://bmr-rd-wh/houzhizhen/decommission-fallbackStorage-path/ \
执行流程
1. SparkContext 中,向 block manager masger 注册 fallback 的地址。
如果设置了 spark.storage.decommission.fallbackStorage.path
, 则注册以下地址。
val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
代表 block 可以上传到指定的地址。
2. 当 Executor 接收到 Decommission 请求时,向 Driver 调用 getPeers 获取 Decommission 的目标地址列表
getPeers 返回所有的执行器列表,加上 fallback 地址,去掉正在 decommission 的 Executors。
3. Executor 把 shuffle block 上传到 getPeers 返回的地址
- 3.1 先拿到 block manager 所有的 block,放到 shufflesToMigrate 队列。
- 3.2 采用多线程的架构,每个 peer 一个线程,每个线程做以下工作
1): 从 shufflesToMigrate 取一个 block
2): 上传到 peer,如果 peer 是 fallback,调用 upload,如果 peer 是 executor, 则调用 uploadBlockSync。
3): 向 driver updateBlockInfo, 如果 peer 是 fallback 则在 upload 里,上传完就调用 updateBlockInfo。如果 peer 是 executor, 则由 executor 成功接收到 block 后调用。
4. reducer 从 driver 获取最新的地址
如果正常的 executor 地址,则从 executor 读取,如果是 fall back 地址,则调用 FallBackStorage.read 读取数据块。
代码分析
1. Driver 注册 fall back 的地址
SparkContext 中,向 block manager masger 注册 fallback 的地址。
val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
2. Driver 处理 decommission executors
在 decommissionExecutors 里,执行以下步骤:
- 通知 scheduler 这些 executor 正在 decommission, 当调度器收到 fetch failed 时做一些特除处理。
- decommissioningBlockManagerSet 添加这些 executor 的 id. 其他 Executor 再 decommission 时,getPeers 时去掉正在 decommission 的 Executor。
- 给每个 executor 发送 DecommissionExecutor 事件。
- 如果设置了 spark.executor.decommission.forceKillTimeout, 则启动一个定时任务, 如果 executor 还处于 pendingDecommission, 则发送 kill 事件。
3. Executor 收到 DecommissionExecutor 的处理
- 判断 migrationEnabled, 判断逻辑如下:
val migrationEnabled = env.conf.get(STORAGE_DECOMMISSION_ENABLED) &&
(env.conf.get(STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) ||
env.conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
env.blockManager.decommissionBlockManager()
executor.decommission()
开启一个线程,不断检查 decomission 状态,如果 decommission 成功,则
exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
退出 executor。
env.blockManager.decommissionBlockManager() 的处理
decommissioner = Some(new BlockManagerDecommissioner(conf, this))
decommissioner.foreach(_.start())
BlockManagerDecommissioner#start
start 启动两个线程,一个为了 rdd block, 另一个为了 shuffle block.
def start(): Unit = {
logInfo("Starting block migration")
rddBlockMigrationExecutor.foreach(_.submit(rddBlockMigrationRunnable))
shuffleBlockMigrationRefreshExecutor.foreach(_.submit(shuffleBlockMigrationRefreshRunnable))
}
rddBlockMigrationRunnable
bm.getPeers 如果没有活着的 executor,则停止。
val replicateBlocksInfo = bm.getMigratableRDDBlocks()
对于每个 block,调用
migrateBlock(blockToReplicate: ReplicateBlock): Boolean
BlockManagerDecommissioner#refreshMigratableShuffleBlocks
- 调用 getStoredShuffles,放到 shufflesToMigrate 队列
override def getStoredShuffles(): Seq[ShuffleBlockInfo] = {
val allBlocks = blockManager.diskBlockManager.getAllBlocks()
allBlocks.flatMap {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
Some(ShuffleBlockInfo(shuffleId, mapId))
case _ =>
None
}
}
- 对于每个 peer,启动一个线程 ShuffleMigrationRunnable, 执行以下操作
1): 从 shufflesToMigrate 获取一个 shuffleBlockInfo
2): 如果定义fallbackStorage 并且当前peer 是 FALLBACK_BLOCK_MANAGER_ID, 执行 _.copy(shuffleBlockInfo, bm); 否则执行步骤 3
3): 执行 bm.blockTransferService.uploadBlockSync 把 block 上传到其他 Executor.
问题
- 数据循环拷贝。如有10个执行器1–10。现在只有 executor 10 有任务运行。刚开始 decommission executor 1,会把 shuffle 数据 decommission 到 executor 2 ~ executor 10 和 fallback. 然后 executor 2 decommission, 从 executor 1 拷贝到 executor 2的数据再拷贝到其他节点。
- executor 1 在 decommission 过程中, executor-2 执行 decommission,executor-1 还是会发送数据到 executor-2。
如果直接发往 remote reliable storage, 则问题1和问题2 不存在。https://github.com/apache/spark/pull/45228/files