Spark 运行流程核心组件(三)任务执行

发布于:2025-08-19 ⋅ 阅读:(15) ⋅ 点赞:(0)

一、启动模式

1、standalone

在这里插入图片描述

  1. 资源申请:Driver向Master申请Executor资源
  2. Executor启动:Master调度Worker启动Executor
  3. 注册通信:Executor直接向Driver注册

2、YARN

在这里插入图片描述

  1. Driver向YARN ResourceManager(RM)申请AM容器

  2. RM分配NodeManager(NM)启动AM(yarn-client 仅资源代理,不运行用户代码)

  3. AM向RM注册

  4. AM根据申请Executor容器

5.RM分配多个NM

6.每个NM启动ExecutorBackend进程

**7.**注册通信:Executor向AM内的Driver注册

二、Executor端任务执行的核心组件

  1. Driver 端组件
    • CoarseGrainedSchedulerBackend:负责与Executor通信
    • TaskSchedulerImpl:任务调度核心逻辑
    • DAGScheduler:DAG调度与Stage管理
    • BlockManagerMaster:块管理器协调器
    • MapOutputTrackerMaster:Shuffle输出跟踪器
  2. Executor 端组件
    • CoarseGrainedExecutorBackend:Executor的通信端点
    • Executor:任务执行引擎
    • TaskRunner:任务执行线程封装
    • BlockManager:本地数据块管理
    • ShuffleManager:Shuffle读写控制
    • ExecutorSource:指标监控

三、Executor 端任务执行核心流程

1、任务接收与初始化

  • CoarseGrainedExecutorBackend 接收任务
case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo(log"Got assigned task ${MDC(LogKeys.TASK_ID, taskDesc.taskId)}")
        executor.launchTask(this, taskDesc)
      }
  • Executor 任务启动
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val taskId = taskDescription.taskId
    val tr = createTaskRunner(context, taskDescription)
    runningTasks.put(taskId, tr)
    val killMark = killMarks.get(taskId)
    if (killMark != null) {
      tr.kill(killMark._1, killMark._2)
      killMarks.remove(taskId)
    }
    threadPool.execute(tr)
    if (decommissioned) {
      log.error(s"Launching a task while in decommissioned state.")
    }
  }

2、任务执行

  • TaskRunner.run
// 1. 类加载与依赖管理
updateDependencies(
          taskDescription.artifacts.files,
          taskDescription.artifacts.jars,
          taskDescription.artifacts.archives,
          isolatedSession)
// 2. 反序列化任务
task = ser.deserialize[Task[Any]](
          taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
// 3. 内存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
task.setTaskMemoryManager(taskMemoryManager)

// 4. 任务执行
val value = Utils.tryWithSafeFinally {
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem,
            cpus = taskDescription.cpus,
            resources = resources,
            plugins = plugins)
          threwException = false
          res
        } {
          // block 释放
          val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
          // memory 释放
          val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

          if (freedMemory > 0 && !threwException) {
            val errMsg = log"Managed memory leak detected; size = " +
              log"${LogMDC(NUM_BYTES, freedMemory)} bytes, ${LogMDC(TASK_NAME, taskName)}"
            if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {
              throw SparkException.internalError(errMsg.message, category = "EXECUTOR")
            } else {
              logWarning(errMsg)
            }
          }

          if (releasedLocks.nonEmpty && !threwException) {
            val errMsg =
              log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" +
                log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" +
                log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})"
            if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {
              throw SparkException.internalError(errMsg.message, category = "EXECUTOR")
            } else {
              logInfo(errMsg)
            }
          }
        }

// 5. 状态上报
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  • Task run
// hadoop.caller.context.enabled = true
// 添加 HDFS 审计日志 , 用于问题排查 。 
// e.g. 小文件剧增 定位spark 任务
new CallerContext(
      "TASK",
      SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
      appId,
      appAttemptId,
      jobId,
      Option(stageId),
      Option(stageAttemptId),
      Option(taskAttemptId),
      Option(attemptNumber)).setCurrentContext()

// 任务启动
context.runTaskWithListeners(this)

3、shuffle处理

  • ShuffleMapTask

为下游 Stage 准备 Shuffle 数据(Map 端输出),生成 MapStatus(包含数据位置和大小信息)。

val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
  threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 从广播 序列化 rdd 、 dep
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
  ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
  threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L

val rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
  partitionId
} else {
  context.taskAttemptId()
}
dep.shuffleWriterProcessor.write(
  rdd.iterator(partition, context),
  dep,
  mapId,
  partitionId,
  context)
  • ResultTask
override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val threadMXBean = ManagementFactory.getThreadMXBean
  val deserializeStartTimeNs = System.nanoTime()
  val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
  } else 0L
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
  _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L

  func(context, rdd.iterator(partition, context))
}

四、核心通信机制

消息类型 方向 内容
RegisterExecutor Executor→Driver executorId, hostPort
RegisteredExecutor Driver→Executor 注册成功确认
LaunchTask Driver→Executor 序列化的TaskDescription
StatusUpdate Executor→Driver taskId, state, result data
KillTask Driver→Executor 终止指定任务
StopExecutor Driver→Executor 关闭Executor指令
Heartbeat Executor→Driver 心跳+指标数据

五、Executor 线程模型

Executor JVM Process
├── CoarseGrainedExecutorBackend (netty)
├── ThreadPool (CacheThreadPool)
│   ├── TaskRunner 1
│   ├── TaskRunner 2
│   └── ... 
├── BlockManager
│   ├── MemoryStore (on-heap/off-heap)
│   └── DiskStore
└── ShuffleManager
    ├── SortShuffleWriter
    └── UnsafeShuffleWriter

网站公告

今日签到

点亮在社区的每一天
去签到