一、启动模式
1、standalone
- 资源申请:Driver向Master申请Executor资源
- Executor启动:Master调度Worker启动Executor
- 注册通信:Executor直接向Driver注册
2、YARN
Driver向YARN ResourceManager(RM)申请AM容器
RM分配NodeManager(NM)启动AM(yarn-client 仅资源代理,不运行用户代码)
AM向RM注册
AM根据申请Executor容器
5.RM分配多个NM
6.每个NM启动ExecutorBackend进程
**7.**注册通信:Executor向AM内的Driver注册
二、Executor端任务执行的核心组件
- Driver 端组件
- CoarseGrainedSchedulerBackend:负责与Executor通信
- TaskSchedulerImpl:任务调度核心逻辑
- DAGScheduler:DAG调度与Stage管理
- BlockManagerMaster:块管理器协调器
- MapOutputTrackerMaster:Shuffle输出跟踪器
- Executor 端组件
- CoarseGrainedExecutorBackend:Executor的通信端点
- Executor:任务执行引擎
- TaskRunner:任务执行线程封装
- BlockManager:本地数据块管理
- ShuffleManager:Shuffle读写控制
- ExecutorSource:指标监控
三、Executor 端任务执行核心流程
1、任务接收与初始化
- CoarseGrainedExecutorBackend 接收任务
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo(log"Got assigned task ${MDC(LogKeys.TASK_ID, taskDesc.taskId)}")
executor.launchTask(this, taskDesc)
}
Executor
任务启动
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val taskId = taskDescription.taskId
val tr = createTaskRunner(context, taskDescription)
runningTasks.put(taskId, tr)
val killMark = killMarks.get(taskId)
if (killMark != null) {
tr.kill(killMark._1, killMark._2)
killMarks.remove(taskId)
}
threadPool.execute(tr)
if (decommissioned) {
log.error(s"Launching a task while in decommissioned state.")
}
}
2、任务执行
TaskRunner.run
// 1. 类加载与依赖管理
updateDependencies(
taskDescription.artifacts.files,
taskDescription.artifacts.jars,
taskDescription.artifacts.archives,
isolatedSession)
// 2. 反序列化任务
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
// 3. 内存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
task.setTaskMemoryManager(taskMemoryManager)
// 4. 任务执行
val value = Utils.tryWithSafeFinally {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem,
cpus = taskDescription.cpus,
resources = resources,
plugins = plugins)
threwException = false
res
} {
// block 释放
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
// memory 释放
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0 && !threwException) {
val errMsg = log"Managed memory leak detected; size = " +
log"${LogMDC(NUM_BYTES, freedMemory)} bytes, ${LogMDC(TASK_NAME, taskName)}"
if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {
throw SparkException.internalError(errMsg.message, category = "EXECUTOR")
} else {
logWarning(errMsg)
}
}
if (releasedLocks.nonEmpty && !threwException) {
val errMsg =
log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" +
log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" +
log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})"
if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {
throw SparkException.internalError(errMsg.message, category = "EXECUTOR")
} else {
logInfo(errMsg)
}
}
}
// 5. 状态上报
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
- Task run
// hadoop.caller.context.enabled = true
// 添加 HDFS 审计日志 , 用于问题排查 。
// e.g. 小文件剧增 定位spark 任务
new CallerContext(
"TASK",
SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
appId,
appAttemptId,
jobId,
Option(stageId),
Option(stageAttemptId),
Option(taskAttemptId),
Option(attemptNumber)).setCurrentContext()
// 任务启动
context.runTaskWithListeners(this)
3、shuffle处理
ShuffleMapTask
为下游 Stage 准备 Shuffle 数据(Map 端输出),生成 MapStatus
(包含数据位置和大小信息)。
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 从广播 序列化 rdd 、 dep
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
val rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
partitionId
} else {
context.taskAttemptId()
}
dep.shuffleWriterProcessor.write(
rdd.iterator(partition, context),
dep,
mapId,
partitionId,
context)
ResultTask
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
四、核心通信机制
消息类型 | 方向 | 内容 |
---|---|---|
RegisterExecutor | Executor→Driver | executorId, hostPort |
RegisteredExecutor | Driver→Executor | 注册成功确认 |
LaunchTask | Driver→Executor | 序列化的TaskDescription |
StatusUpdate | Executor→Driver | taskId, state, result data |
KillTask | Driver→Executor | 终止指定任务 |
StopExecutor | Driver→Executor | 关闭Executor指令 |
Heartbeat | Executor→Driver | 心跳+指标数据 |
五、Executor 线程模型
Executor JVM Process
├── CoarseGrainedExecutorBackend (netty)
├── ThreadPool (CacheThreadPool)
│ ├── TaskRunner 1
│ ├── TaskRunner 2
│ └── ...
├── BlockManager
│ ├── MemoryStore (on-heap/off-heap)
│ └── DiskStore
└── ShuffleManager
├── SortShuffleWriter
└── UnsafeShuffleWriter