Spark 运行流程核心组件(一)作业提交

发布于:2025-08-13 ⋅ 阅读:(17) ⋅ 点赞:(0)

1、Job启动流程

在这里插入图片描述

1、Client触发 SparkContext 初始化

2、SparkContextMaster 注册应用

3、Master 调度 Worker 启动 Executor

4、Worker 进程启动 Executor

5、DAGScheduler 将作业分解为 Stage

6、TaskScheduler 分配 TaskExecutor

2、核心组件

组件 职责
SparkContext 应用入口,协调各组件,管理应用生命周期。
DAGScheduler 将 Job 拆分为 Stage,构建 DAG,提交 TaskSet 给 TaskScheduler。
TaskScheduler 调度 Task 到 Executor,处理故障重试。
CoarseGrainedSchedulerBackend 与集群管理器交互,申请资源,管理 Executor。
ExternalClusterManager 抽象层,适配不同集群(Standalone/YARN/Mesos)。
Master & Worker Standalone 模式下管理集群资源(Master 分配资源,Worker 启动 Executor)。
Executor 在 Worker 上运行,执行 Task,管理内存/磁盘。
CoarseGrainedExecutorBackend Executor 的通信代理,接收 Task,返回状态/结果。
Task 计算单元(ShuffleMapTask / ResultTask)。
ShuffleManager 管理 Shuffle 数据读写(如 SortShuffleManager)。

3、工作流程

1、SparkContext

负责资源申请、任务提交、与集群管理器通信。

调用runJob方法,将RDD操作传递给DAGScheduler

2、DAGScheduler

将Job拆分为Stage(DAG),处理Shuffle依赖,提交TaskSet给TaskScheduler。

1、DAGSchedulerEvent

/* 作业生命周期事件 */
JobSubmitted //新作业提交时触发
JobCancelled //单个作业被取消
JobGroupCancelled //作业组整体取消
JobTagCancelled //按标签批量取消作业
AllJobsCancelled //取消所有运行中的作业

/* 阶段执行事件 */
MapStageSubmitted //Shuffle Map阶段提交
StageCancelled //单个阶段取消
StageFailed //阶段执行失败
ResubmitFailedStages //自动重试失败阶段 ,默认4次

/* 任务调度事件 */
TaskSetFailed //整个任务集失败,默认4次
SpeculativeTaskSubmitted //启动推测执行任务
UnschedulableTaskSetAdded //任务集进入待调度队列
UnschedulableTaskSetRemoved //任务集离开待调度队列

/* Shuffle 优化事件 */
RegisterMergeStatuses //注册Shuffle合并状态
ShuffleMergeFinalized //Shuffle合并完成
ShufflePushCompleted //Shuffle数据推送完成

/* 资源管理事件 */
ExecutorAdded //新Executor注册成功
ExecutorLost //Executor异常丢失
WorkerRemoved //工作节点移除

/* 执行过程事件 */
BeginEvent //任务集开始执行 
GettingResultEvent //驱动程序主动获取任务结果
CompletionEvent //作业/阶段完成

2、stage拆分流程

*ResultStage (执行作的最后一个阶段)、ShuffleMapStage (shuffle映射输出文件)*

  1. 用户行动操作触发submitJob,发送JobSubmitted事件。
  2. handleJobSubmitted处理事件,调用createResultStage创建ResultStage。
  3. createResultStage调用getOrCreateParentStages获取父Stage,父Stage的创建会递归进行。
  4. 在创建父Stage的过程中,遇到宽依赖则创建ShuffleMapStage,并递归创建其父Stage。
  5. 当所有父Stage都创建完成后,回到handleJobSubmitted,调用submitStage提交ResultStage。
  6. submitStage检查父Stage是否完成,如果有未完成的父Stage,则递归提交父Stage;否则,提交当前Stage(调用submitMissingTasks)。
  7. submitMissingTasks为Stage创建任务(ShuffleMapTask或ResultTask),并提交给TaskScheduler执行。

3、宽窄依赖切分

private def stageDependsOn(stage: Stage, target: Stage): Boolean = {
    if (stage == target) {
      return true
    }
    // DFS遍历RDD依赖树
    val visitedRdds = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new ListBuffer[RDD[_]]
    waitingForVisit += stage.rdd
    def visit(rdd: RDD[_]): Unit = {
      if (!visitedRdds(rdd)) {
        visitedRdds += rdd
        for (dep <- rdd.dependencies) {
          dep match {
            // 宽依赖:创建新的ShuffleMapStage
            case shufDep: ShuffleDependency[_, _, _] =>
              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
              if (!mapStage.isAvailable) {
                waitingForVisit.prepend(mapStage.rdd)
              }  // Otherwise there's no need to follow the dependency back
            // 窄依赖:继续回溯
            case narrowDep: NarrowDependency[_] =>
              waitingForVisit.prepend(narrowDep.rdd)
          }
        }
      }
    }
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.remove(0))
    }
    visitedRdds.contains(target.rdd)
  }

3、TaskScheduler

接收TaskSet,按调度策略(FIFO/FAIR)将Task分配给Executor。

1、执行流程

1、DAGScheduler 调用 taskScheduler.submitTasks() 后,任务进入 TaskScheduler 调度阶段

2、任务提交submitTasks

// TaskSetManager管理任务集
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 添加到调度池
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 触发资源分配
backend.reviveOffers()

3、资源分配 (Driver)

// CoarseGrainedSchedulerBackend.scala
override def reviveOffers(): Unit = {
  driverEndpoint.send(ReviveOffers)  // 向DriverEndpoint发送消息
}

// DriverEndpoint处理
case ReviveOffers =>
  makeOffers()  // 触发资源分配

4、资源分配核心

private def makeOffers(): Unit = {
  // Make sure no executor is killed while some task is launching on it
  val taskDescs = withLock {
    // 1. 获取所有可用Executor资源
    val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) }
    val workOffers = activeExecutors.map {
      case (id, executorData) => buildWorkerOffer(id, executorData)
    }.toIndexedSeq
    // 2. 调用任务调度器分配任务
    scheduler.resourceOffers(workOffers, true)
  }
  // 3. 启动分配的任务
  if (taskDescs.nonEmpty) {
    launchTasks(taskDescs)
  }
}

5、任务启动

// CoarseGrainedSchedulerBackend
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
  for (task <- tasks.flatten) {
    // 1. 序列化任务
    val serializedTask = TaskDescription.encode(task)
    // 2. 检查任务大小
    if (serializedTask.limit() >= maxRpcMessageSize) {
      Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
            s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
          taskSetMgr.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      val executorData = executorDataMap(task.executorId)
      // Do resources allocation here. The allocated resources will get released after the task
      // finishes.
      executorData.freeCores -= task.cpus
      task.resources.foreach { case (rName, addressAmounts) =>
        executorData.resourcesInfo(rName).acquire(addressAmounts)
      }
      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
        s"${executorData.executorHost}.")
      // 发送任务到Executor
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

网站公告

今日签到

点亮在社区的每一天
去签到