Spark Executor decommission 原理分析

发布于:2024-03-10 ⋅ 阅读:(58) ⋅ 点赞:(0)

背景

Spark Executor decommission 最初是 AWS 为了 Spot 节点和 Spot Block 节点配置的, Spot 节点仅有正常节点几分之一的价格,但是可能在任何时间收回,收回的时候仅给 120 秒的缓冲期。
Spot Block 比正常价格便宜 30%~45%,但是稳定性比 Spot 节点高,回收时缓冲期为最高可以达6小时).

参考文档:
Using AWS Spot Instances and Spot Blocks in QDS Clusters
New – EC2 Spot Blocks for Defined-Duration Workloads

其他场景没有 Spot 和 Spot Block 节点,Spark 和 cluster manager(YARN 和 Mesos) 结合。如 YARN 框架 Graceful Decommission of YARN Nodes 说明配置了 node 的优雅退出过程。包含以下3步:

  1. 先添加要 decommission 的 node 到 /etc/hadoop/conf/yarn.exclude
  2. 然后执行 yarn rmadmin -refreshNodes -g 100 --client,-g 是 gracefull,100 是等待时间100秒,NodeManager 节点开始 decommission 过程。
  3. Application Master 收到 decommission 的 executors,给 executor 发送 Decommission 的请求。Executor 进行 Decommission 操作。
    Application Master 的日志:
24/03/05 16:55:55 INFO YarnClusterSchedulerBackend: Decommission executors: 2
24/03/05 16:55:55 INFO YarnClusterSchedulerBackend: Notify executor 2 to decommission.
24/03/05 16:55:55 INFO BlockManagerMasterEndpoint: Mark BlockManagers (BlockManagerId(2, core-8ce527b-1, 24639, None)) as being decommissioning.
24/03/05 16:55:55 INFO ShuffleStatus: Updating map output for 1843 to BlockManagerId(3, core-8ce527b-2, 21569, None)
24/03/05 16:55:55 INFO ShuffleStatus: Updating map output for 1825 to BlockManagerId(1, core-8ce527b-2, 29453, None)

另一个触发 Decommission 的流程开启 Dynamic Allocation 后,Shuffle Tracking 查找没有任务的 executors,开始触发 decommission 的过程。

Spark Decommission 的功能

  1. Executor 停止运行新的 Task
  2. 把 cached RDD blocks 和 shuffle blocks 传输到其他的 Executor 和 Remote Storage。更新 driver 端的 shuffle status。
  3. Reducer 获取新的 shuffle status,读取 shuffle 数据。

增加的参数

 --conf spark.storage.decommission.enabled=true \
 --conf spark.storage.decommission.shuffleBlocks.enabled=true \
 --conf spark.storage.decommission.rddBlocks.enabled=true \
 --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10s \
 --conf spark.storage.decommission.fallbackStorage.path=bos://bmr-rd-wh/houzhizhen/decommission-fallbackStorage-path/ \

执行流程

1. SparkContext 中,向 block manager masger 注册 fallback 的地址。

如果设置了 spark.storage.decommission.fallbackStorage.path, 则注册以下地址。

val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)

代表 block 可以上传到指定的地址。

2. 当 Executor 接收到 Decommission 请求时,向 Driver 调用 getPeers 获取 Decommission 的目标地址列表

getPeers 返回所有的执行器列表,加上 fallback 地址,去掉正在 decommission 的 Executors。

3. Executor 把 shuffle block 上传到 getPeers 返回的地址

  • 3.1 先拿到 block manager 所有的 block,放到 shufflesToMigrate 队列。
  • 3.2 采用多线程的架构,每个 peer 一个线程,每个线程做以下工作
    1): 从 shufflesToMigrate 取一个 block
    2): 上传到 peer,如果 peer 是 fallback,调用 upload,如果 peer 是 executor, 则调用 uploadBlockSync。
    3): 向 driver updateBlockInfo, 如果 peer 是 fallback 则在 upload 里,上传完就调用 updateBlockInfo。如果 peer 是 executor, 则由 executor 成功接收到 block 后调用。

4. reducer 从 driver 获取最新的地址

如果正常的 executor 地址,则从 executor 读取,如果是 fall back 地址,则调用 FallBackStorage.read 读取数据块。

代码分析

1. Driver 注册 fall back 的地址

SparkContext 中,向 block manager masger 注册 fallback 的地址。

val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)

2. Driver 处理 decommission executors

在 decommissionExecutors 里,执行以下步骤:

  1. 通知 scheduler 这些 executor 正在 decommission, 当调度器收到 fetch failed 时做一些特除处理。
  2. decommissioningBlockManagerSet 添加这些 executor 的 id. 其他 Executor 再 decommission 时,getPeers 时去掉正在 decommission 的 Executor。
  3. 给每个 executor 发送 DecommissionExecutor 事件。
  4. 如果设置了 spark.executor.decommission.forceKillTimeout, 则启动一个定时任务, 如果 executor 还处于 pendingDecommission, 则发送 kill 事件。

3. Executor 收到 DecommissionExecutor 的处理

  1. 判断 migrationEnabled, 判断逻辑如下:
val migrationEnabled = env.conf.get(STORAGE_DECOMMISSION_ENABLED) &&
        (env.conf.get(STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) ||
          env.conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
  1. env.blockManager.decommissionBlockManager()

  2. executor.decommission()

  3. 开启一个线程,不断检查 decomission 状态,如果 decommission 成功,则 exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)退出 executor。

env.blockManager.decommissionBlockManager() 的处理
 decommissioner = Some(new BlockManagerDecommissioner(conf, this))
 decommissioner.foreach(_.start())
BlockManagerDecommissioner#start

start 启动两个线程,一个为了 rdd block, 另一个为了 shuffle block.

def start(): Unit = {
    logInfo("Starting block migration")
    rddBlockMigrationExecutor.foreach(_.submit(rddBlockMigrationRunnable))
    shuffleBlockMigrationRefreshExecutor.foreach(_.submit(shuffleBlockMigrationRefreshRunnable))
  }
rddBlockMigrationRunnable
  1. bm.getPeers 如果没有活着的 executor,则停止。

  2. val replicateBlocksInfo = bm.getMigratableRDDBlocks()

  3. 对于每个 block,调用 migrateBlock(blockToReplicate: ReplicateBlock): Boolean

BlockManagerDecommissioner#refreshMigratableShuffleBlocks
  1. 调用 getStoredShuffles,放到 shufflesToMigrate 队列
 override def getStoredShuffles(): Seq[ShuffleBlockInfo] = {
    val allBlocks = blockManager.diskBlockManager.getAllBlocks()
    allBlocks.flatMap {
      case ShuffleIndexBlockId(shuffleId, mapId, _) =>
        Some(ShuffleBlockInfo(shuffleId, mapId))
      case _ =>
        None
    }
  }
  1. 对于每个 peer,启动一个线程 ShuffleMigrationRunnable, 执行以下操作
    1): 从 shufflesToMigrate 获取一个 shuffleBlockInfo
    2): 如果定义fallbackStorage 并且当前peer 是 FALLBACK_BLOCK_MANAGER_ID, 执行 _.copy(shuffleBlockInfo, bm); 否则执行步骤 3
    3): 执行 bm.blockTransferService.uploadBlockSync 把 block 上传到其他 Executor.

问题

  1. 数据循环拷贝。如有10个执行器1–10。现在只有 executor 10 有任务运行。刚开始 decommission executor 1,会把 shuffle 数据 decommission 到 executor 2 ~ executor 10 和 fallback. 然后 executor 2 decommission, 从 executor 1 拷贝到 executor 2的数据再拷贝到其他节点。
  2. executor 1 在 decommission 过程中, executor-2 执行 decommission,executor-1 还是会发送数据到 executor-2。

如果直接发往 remote reliable storage, 则问题1和问题2 不存在。https://github.com/apache/spark/pull/45228/files


网站公告

今日签到

点亮在社区的每一天
去签到