Task的线程 和 MailboxProcessor
的绑定
executingThread
是 Task
类(StreamTask
的父类)在构造时创建的物理线程。MailboxProcessor
是 StreamTask
用来处理异步事件和驱动其主要处理逻辑(processInput
)的核心组件。它们之间的绑定关系如下:
Task
作为Runnable
:Task
类实现了Runnable
接口,其run()
方法是executingThread
的入口点。// ... public class Task implements Runnable, TaskSlotPayload { // ... private final Thread executingThread; // 在构造函数中创建 public Task(/*...*/) { // ... this.executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); // this 指向 Task 实例 } @Override public void run() { // 这是 executingThread 执行的入口 try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { doRun(); // 调用实际的工作方法 } finally { terminationFuture.complete(executionState); } } private void doRun() { // ... // 对于 StreamTask,这里会调用到 StreamTask 的 invoke() 方法 invokable.invoke(); // invokable 就是 StreamTask 实例 // ... } // ... }
StreamTask创建了TaskMailboxImpl,传递给MailboxProcessor,因此是MailboxProcessor的执行线程。
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor actionExecutor)
throws Exception {
this(
environment,
timerService,
uncaughtExceptionHandler,
actionExecutor,
new TaskMailboxImpl(Thread.currentThread()));
}
this.mailboxProcessor =
new MailboxProcessor(
this::processInput, mailbox, actionExecutor, mailboxMetricsControl);
StreamTask.invoke()
和 MailboxProcessor
:
- 当
executingThread
启动并执行到StreamTask.invoke()
时,StreamTask
会使用其内部的MailboxProcessor
来驱动其核心事件循环。StreamTask.java
// ... @Override public final void invoke() throws Exception { // ... (初始化,如 restoreInternal()) ... // let the task do its work getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart(); runMailboxLoop(); // <--- 关键调用 // ... (清理工作,如 afterInvoke()) ... } public void runMailboxLoop() throws Exception { mailboxProcessor.runMailboxLoop(); // 将控制权交给 MailboxProcessor } // ...
mailboxProcessor.runMailboxLoop()
是一个阻塞调用(从executingThread
的视角看)。这个方法会在executingThread
上运行一个循环,不断地从邮箱 (Mailbox) 中取出邮件 (Mail) 并执行它们,或者在没有邮件时执行默认操作 (通常是StreamTask.processInput()
,用于处理输入数据和调用算子)。
Mailbox 的线程模型:
MailboxProcessor
被设计为在其“拥有者”线程(即executingThread
)上执行其核心循环和邮件处理。TaskMailbox
(被MailboxProcessor
使用) 内部有检查,确保其关键方法(如take
,put
的某些变体,以及邮件的执行)是在预期的邮箱线程(即executingThread
)上调用的。MailboxProcessor.java
public void runMailboxLoop() throws Exception { // ... final TaskMailbox localMailbox = mailbox; checkState( localMailbox.isMailboxThread(), // 确保当前线程是邮箱线程 "Method must be executed by declared mailbox thread!"); // ... while (isNextLoopPossible()) { processMail(localMailbox, false); // 处理邮件,在 executingThread 上执行 if (isNextLoopPossible()) { mailboxDefaultAction.runDefaultAction(mailboxController); // 执行默认动作,在 executingThread 上执行 } } }
MailboxDefaultAction
通常包装了StreamTask.processInput()
,所以数据处理和算子调用也是在executingThread
上发生的。- 其他线程(例如网络线程接收到数据后,或者定时器线程触发定时器)想要与
StreamTask
交互时,它们不会直接调用StreamTask
的方法,而是向其Mailbox
中放入一个“邮件”(一个Runnable
或Callable
)。MailboxProcessor
会在executingThread
上从邮箱中取出这个邮件并执行它。
总结线程与 Mailbox 的绑定:
Task
构造时创建executingThread
,并将Task
自身作为Runnable
传递给该线程。executingThread
启动后,执行Task.run()
->Task.doRun()
->StreamTask.invoke()
。- 在
StreamTask.invoke()
中,调用mailboxProcessor.runMailboxLoop()
。 mailboxProcessor.runMailboxLoop()
在executingThread
上运行,它负责从邮箱中拉取任务并执行,或者执行默认的数据处理逻辑 (processInput
)。- 所有提交到该
StreamTask
邮箱的异步操作最终都会在executingThread
上被MailboxProcessor
串行化执行。
因此,executingThread
成为了 MailboxProcessor
的“工作线程”。MailboxProcessor
确保了 StreamTask
的核心逻辑(包括状态访问、算子调用等)都在这个单一的 executingThread
上顺序执行,从而简化了并发控制。
MailboxProcessor的功能
MailboxProcessor
是 Flink 中任务(Task)执行模型的核心组件,它实现了基于邮箱(Mailbox)的单线程执行模式。其主要能力包括:
管理邮箱 (TaskMailbox
):
- 持有一个
TaskMailbox
实例,用于存储需要串行执行的各种动作(Mail
)。这些动作可以是来自外部的请求(如 Checkpoint 触发、Timer 回调)或内部控制命令。
// ... existing code ...
public class MailboxProcessor implements Closeable {
// ... existing code ...
/**
* The mailbox data-structure that manages request for special actions, like timers,
* checkpoints, ...
*/
protected final TaskMailbox mailbox;
// ... existing code ...
执行默认动作 (MailboxDefaultAction
):
- 在邮箱为空时,会循环执行一个预定义的“默认动作”。在
StreamTask
的上下文中,这个默认动作通常是处理输入数据(processInput
)。
this.mailboxProcessor =
new MailboxProcessor(
this::processInput, mailbox, actionExecutor, mailboxMetricsControl);
// ... existing code ...
/**
* Action that is repeatedly executed if no action request is in the mailbox. Typically record
* processing.
*/
protected final MailboxDefaultAction mailboxDefaultAction;
// ... existing code ...
单线程执行循环 (runMailboxLoop
):
- 核心方法
runMailboxLoop()
驱动整个执行逻辑。它会不断检查邮箱中是否有新的Mail
,如果有则执行它们;如果没有,则执行默认动作。 - 这种机制保证了默认动作(如数据处理)和邮箱中的其他动作(如 Checkpoint、Timer 事件)之间是单线程顺序执行的,避免了并发冲突。
// ... existing code ...
public void runMailboxLoop() throws Exception {
suspended = !mailboxLoopRunning;
final TaskMailbox localMailbox = mailbox;
checkState(
localMailbox.isMailboxThread(),
"Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController mailboxController = new MailboxController(this);
while (isNextLoopPossible()) {
// The blocking `processMail` call will not return until default action is available.
processMail(localMailbox, false);
if (isNextLoopPossible()) {
mailboxDefaultAction.runDefaultAction(
mailboxController); // lock is acquired inside default action as needed
}
}
}
// ... existing code ...
提供邮箱执行器 (MailboxExecutor
):
- 通过
getMainMailboxExecutor()
和getMailboxExecutor(int priority)
方法,向外部提供MailboxExecutor
。这使得其他组件(如 TimerService、CheckpointCoordinator)可以将它们的动作提交到邮箱中,由MailboxProcessor
在其单线程循环中统一调度执行。
// ... existing code ...
public MailboxExecutor getMainMailboxExecutor() {
return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);
}
/**
* Returns an executor service facade to submit actions to the mailbox.
*
* @param priority the priority of the {@link MailboxExecutor}.
*/
public MailboxExecutor getMailboxExecutor(int priority) {
return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
}
// ... existing code ...
生命周期管理:
- 实现了
Closeable
接口,并有prepareClose()
和close()
方法,对应TaskMailbox
的quiesce()
和close()
。这确保了在任务结束时,邮箱能被正确关闭,并处理(如取消)剩余的Mail
。
// ... existing code ...
/** Lifecycle method to close the mailbox for action submission. */
public void prepareClose() {
mailbox.quiesce();
}
/**
* Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all
* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the
* mailbox.
*/
@Override
public void close() {
List<Mail> droppedMails = mailbox.close();
// ... existing code ...
}
挂起与恢复:
MailboxProcessor
的执行循环可以被挂起 (suspend()
) 和恢复(通过再次调用runMailboxLoop()
或相关控制逻辑)。默认动作也可以通过MailboxController
暂时挂起。
// ... existing code ...
/** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */
public void suspend() {
sendPoisonMail(() -> suspended = true);
}
// ... existing code ...
以及 MailboxController
中的 suspendDefaultAction()
。
异常处理:
reportThrowable(Throwable throwable)
方法允许将其他线程中发生的异常报告给邮箱线程,并在邮箱线程中重新抛出,从而中断任务执行。
// ... existing code ...
public void reportThrowable(Throwable throwable) {
sendControlMail(
() -> {
if (throwable instanceof Exception) {
throw (Exception) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw WrappingRuntimeException.wrapIfNecessary(throwable);
}
},
"Report throwable %s",
throwable);
}
// ... existing code ...
度量指标控制 (MailboxMetricsController
):
- 包含一个
MailboxMetricsController
用于控制和访问邮箱相关的度量指标,如邮箱延迟、处理的邮件数量等。
// ... existing code ...
private final MailboxMetricsController mailboxMetricsControl;
// ... existing code ...
@VisibleForTesting
public MailboxMetricsController getMailboxMetricsControl() {
return this.mailboxMetricsControl;
}
// ... existing code ...
MailboxProcessor 与 StreamTask 的互动
MailboxProcessor
为 StreamTask
提供了一个强大的、基于邮箱的单线程执行引擎。StreamTask
委托 MailboxProcessor
来驱动其核心的数据处理循环,并通过 MailboxExecutor
将所有需要与任务主线程同步的异步操作(如 Timer、Checkpoint 事件)统一提交到邮箱中进行调度。这种设计确保了任务内部操作的串行化,简化了并发控制,并提高了系统的稳定性和可维护性。
StreamTask
是 Flink 流处理任务的基类,它使用 MailboxProcessor
来管理其核心执行逻辑。
创建和持有
MailboxProcessor
:StreamTask
在其构造函数中创建并持有一个MailboxProcessor
实例。MailboxDefaultAction
通常被设置为StreamTask::processInput
,这意味着当邮箱为空时,StreamTask
会执行其数据处理逻辑。StreamTaskActionExecutor
也被传递给MailboxProcessor
。
驱动执行循环:
StreamTask
的invoke()
方法是任务的执行入口。在其核心逻辑中,它会调用mailboxProcessor.runMailboxLoop()
来启动邮箱处理循环。这个循环会一直运行,直到任务完成或被取消。- 代码见
StreamTask.invoke()
:StreamTask.java
// ... existing code ... public final void invoke() throws Exception { // ... initialization ... try { // ... // Run mailbox until all gates will be recovered. mailboxProcessor.runMailboxLoop(); // 启动邮箱循环 // ... } finally { // ... cleanup ... // let mailbox execution reject all new letters from this point mailboxProcessor.prepareClose(); // ... mailboxProcessor.close(); } } // ... existing code ...
提交异步动作:
StreamTask
及其相关的组件(如TimerService
、SubtaskCheckpointCoordinator
)需要执行一些异步操作,例如触发 Timer、执行 Checkpoint、响应外部事件等。这些操作需要确保在任务的主线程中执行,以避免并发问题。StreamTask
通过从mailboxProcessor
获取的MailboxExecutor
来提交这些异步操作。这些操作会被封装成Mail
放入邮箱,由MailboxProcessor
在其循环中按顺序执行。- 例如,
ProcessingTimeService
的实现会使用MailboxExecutor
来调度 Timer 的触发:StreamOperatorFactoryUtil.java
// ... existing code ... public static <OUT, OP extends StreamOperator<OUT>> OP createOperator( // ... MailboxExecutor mailboxExecutor = // Obtained via containingTask.getMailboxExecutorFactory() containingTask .getMailboxExecutorFactory() .createExecutor(configuration.getChainIndex()); // ... final ProcessingTimeService processingTimeService; if (operatorFactory instanceof ProcessingTimeServiceAware) { processingTimeService = ((ProcessingTimeServiceAware) operatorFactory) .createProcessingTimeService(mailboxExecutor); } else { processingTimeService = processingTimeServiceFactory.get(); } // ... existing code ...
ProcessingTimeServiceImpl
内部会使用这个mailboxExecutor
来execute
或schedule
定时任务。
控制流程与状态:
StreamTask
的processInput
方法(作为MailboxDefaultAction
)可以通过MailboxDefaultAction.Controller
与MailboxProcessor
交互。例如,当输入数据处理完毕或遇到反压时,它可以调用controller.suspendDefaultAction()
来暂时挂起默认动作的执行,让MailboxProcessor
优先处理邮箱中的其他Mail
。- 代码见
StreamTask.processInput()
:// ... existing code ... protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { DataInputStatus status = inputProcessor.processInput(); switch (status) { // ... case END_OF_INPUT: // Suspend the mailbox processor, it would be resumed in afterInvoke and finished // after all records processed by the downstream tasks. We also suspend the default // actions to avoid repeat executing the empty default operation (namely process // records). controller.suspendDefaultAction(); // 通过 Controller 控制 MailboxProcessor mailboxProcessor.suspend(); return; } // ... } // ... existing code ...
生命周期同步:
StreamTask
在其生命周期的不同阶段(如cancelTask
,afterInvoke
)会调用MailboxProcessor
的相应方法(如prepareClose
,close
,allActionsCompleted
)来同步状态和清理资源。- 例如,在任务正常结束或需要最终 Checkpoint 完成后,会调用
mailboxProcessor.allActionsCompleted()
:StreamTask.java
// ... existing code ... FutureUtils.waitForAll(terminationConditions) .thenRun(mailboxProcessor::allActionsCompleted); // Resumes the mailbox processor. The mailbox processor would be completed // after all records are processed by the downstream tasks. mailboxProcessor.runMailboxLoop(); // ... existing code ...
TaskMailboxImpl
虽然这个类的核心结构是“一个锁(ReentrantLock
)加一个队列(Deque<Mail>
)”,但它的实现中包含了一些针对 Flink Task 执行模型的特定优化和设计,使其不仅仅是一个简单的线程安全队列。
@ThreadSafe
public class TaskMailboxImpl implements TaskMailbox {
/** Lock for all concurrent ops. */
private final ReentrantLock lock = new ReentrantLock();
/** Internal queue of mails. */
@GuardedBy("lock")
private final Deque<Mail> queue = new ArrayDeque<>();
/** Condition that is triggered when the mailbox is no longer empty. */
@GuardedBy("lock")
private final Condition notEmpty = lock.newCondition();
/** The state of the mailbox in the lifecycle of open, quiesced, and closed. */
@GuardedBy("lock")
private State state = OPEN;
/** Reference to the thread that executes the mailbox mails. */
@Nonnull private final Thread taskMailboxThread;
/**
* The current batch of mails. A new batch can be created with {@link #tryBuildBatch()} and
* consumed with {@link #tryTakeFromBatch()}.
*/
private final Deque<Mail> batch = new ArrayDeque<>();
/**
* Performance optimization where hasNewMail == !queue.isEmpty(). Will not reflect the state of
* {@link #batch}.
*/
private volatile boolean hasNewMail = false;
/**
* Performance optimization where there is new urgent mail. When there is no urgent mail in the
* batch, it should be checked every time mail is taken, including taking mail from batch queue.
*/
private volatile boolean hasNewUrgentMail = false;
public TaskMailboxImpl(@Nonnull final Thread taskMailboxThread) {
this.taskMailboxThread = taskMailboxThread;
}
@VisibleForTesting
public TaskMailboxImpl() {
this(Thread.currentThread());
}
核心结构:
lock: ReentrantLock
: 这是实现线程安全的核心。所有对内部队列queue
和状态state
的并发访问都由此锁保护。queue: Deque<Mail>
:- 实际存储待处理
Mail
对象的双端队列。Mail
对象封装了需要执行的动作(通常是一个Runnable
)及其优先级。 - 使用
ArrayDeque
作为底层实现。
- 实际存储待处理
notEmpty: Condition
:- 与
lock
关联的条件变量。当邮箱从空变为非空时(即有新的Mail
被放入),会通过notEmpty.signal()
或notEmpty.signalAll()
来唤醒可能正在等待获取Mail
的线程(主要是邮箱处理线程)。 - 在
take()
方法中,如果队列为空,线程会调用notEmpty.await()
等待。
- 与
state: State
(enum:OPEN
,QUIESCED
,CLOSED
):- 表示邮箱的生命周期状态:
OPEN
: 邮箱正常工作,可以接收和发送邮件。QUIESCED
: 邮箱处于静默状态,不再接受新的邮件(put
操作会失败),但仍然可以取出已有的邮件。通常在任务准备关闭时进入此状态。CLOSED
: 邮箱已关闭,不能进行任何操作。所有未处理的邮件会被清空。
- 状态转换由
quiesce()
和close()
方法控制,并且这些操作也受lock
保护。
- 表示邮箱的生命周期状态:
taskMailboxThread: Thread
:- 一个非常重要的字段,它存储了被指定为“邮箱线程”的线程引用。
- 很多操作(如
take
,tryTake
,hasMail
,createBatch
,tryTakeFromBatch
,quiesce
,close
)都强制要求调用者必须是这个taskMailboxThread
,通过checkIsMailboxThread()
进行检查。这是因为 Flink 的 Task 执行模型是单线程的,MailboxProcessor
会在其专用的线程中处理邮箱中的邮件和默认动作。
batch: Deque<Mail>
:- 这是一个性能优化的设计。
MailboxProcessor
在其主循环中,会先调用createBatch()
将主队列queue
中的所有邮件一次性转移到这个batch
队列中。然后,MailboxProcessor
会优先从batch
中通过tryTakeFromBatch()
获取邮件进行处理。 - 目的: 减少锁的竞争。
createBatch()
在持有锁的情况下将一批邮件转移出来,之后MailboxProcessor
处理batch
中的邮件时就不再需要频繁获取锁去访问主队列queue
。这对于高吞吐量的场景非常重要。 batch
的操作也仅限于taskMailboxThread
。
- 这是一个性能优化的设计。
hasNewMail: volatile boolean
:- 这是另一个性能优化。它大致反映了主队列
queue
是否为空 (!queue.isEmpty()
)。 volatile
关键字确保了不同线程对它的可见性。- 目的: 允许邮箱线程在不获取锁的情况下快速检查是否有新邮件。例如,在
hasMail()
和tryTake()
方法中,会先检查batch
,然后检查hasNewMail
,只有当hasNewMail
为true
时,才尝试获取锁并检查主队列queue
。 - 当有新邮件通过
put()
或putFirst()
(从非邮箱线程调用时)添加到queue
时,hasNewMail
会被设置为true
。当邮件从queue
中被取出或通过createBatch()
转移到batch
时,hasNewMail
会被更新。
- 这是另一个性能优化。它大致反映了主队列
特别需要注意的点:
单消费者(邮箱线程)设计:
- 尽管
put()
和putFirst()
方法允许从任何线程添加邮件(是线程安全的),但所有取邮件的操作(take
,tryTake
,createBatch
,tryTakeFromBatch
)以及生命周期管理方法(quiesce
,close
)都必须由taskMailboxThread
调用。这是 Flink Mailbox 模型的核心设计,确保了任务逻辑的单线程执行。
- 尽管
批处理优化 (
batch
队列):- 理解
batch
队列的作用对于分析性能至关重要。它不是一个独立的邮箱,而是主队列queue
的一个临时缓存,用于减少锁争用。MailboxProcessor
会周期性地将queue
中的内容“批发”到batch
中。
- 理解
hasNewMail
优化:hasNewMail
变量提供了一种轻量级的检查机制,避免了邮箱线程在主队列可能为空时仍频繁获取锁。优先级处理 (
takeOrNull
方法):takeOrNull(Deque<Mail> queue, int priority)
方法实现了从队列中根据优先级取出邮件的逻辑。它会遍历队列,找到第一个优先级大于或等于指定priority
的邮件并返回。这意味着高优先级的邮件(如控制命令、Checkpoint barrier)可以被优先处理。
putFirst()
的特殊行为:putFirst(@Nonnull Mail mail)
方法很有意思:- 如果调用者是
taskMailboxThread
,邮件会直接被添加到batch
队列的头部。这是因为邮箱线程是当前批次邮件的消费者,将邮件直接放入批处理队列的头部可以使其被更快处理,而无需等待下一轮createBatch
。 - 如果调用者不是
taskMailboxThread
,邮件会被添加到主队列queue
的头部,并通过notEmpty.signal()
唤醒邮箱线程。
- 如果调用者是
生命周期管理 (
state
,quiesce()
,close()
):- 邮箱的生命周期状态转换是严格控制的,并且与任务的生命周期紧密相关。
quiesce()
: 使邮箱不再接受新邮件,但允许处理完已有的邮件。close()
: 彻底关闭邮箱,清空所有邮件,并唤醒所有可能在等待的线程(通过notEmpty.signalAll()
),通常是为了让它们感知到关闭状态并退出。
锁的粒度和使用:
ReentrantLock
用于保护对共享数据(queue
,state
)的访问。Condition
(notEmpty
) 用于实现生产者-消费者模式中的等待和通知机制。lock.lockInterruptibly()
在take()
方法中使用,允许等待的邮箱线程响应中断。
runExclusively(Runnable runnable)
:- 提供了一种机制,允许以独占方式在邮箱的锁保护下执行一段代码。这对于需要原子地执行多个邮箱操作(例如,检查状态然后根据状态放入邮件)的场景非常有用,可以避免竞态条件。
总而言之,TaskMailboxImpl
虽然基于简单的锁和队列,但通过引入批处理、hasNewMail
标志、严格的线程模型以及精细的生命周期管理,为 Flink 的 MailboxProcessor
提供了一个高效且功能完备的邮件调度机制。这些设计都是为了在保证单线程执行模型的前提下,最大化吞吐量并减少不必要的同步开销。
Mail 类分析
Mail
类是 Apache Flink 流处理运行时任务邮箱机制中的一个核心组件。它代表一个可执行的任务单元,绑定到特定的操作符链中,可以被下游邮箱处理器选择执行。
主要属性
mailOptions
: 邮件选项,用于配置邮件的行为,如是否可延迟执行。runnable
: 要执行的操作,是一个ThrowingRunnable
类型的实例,可以抛出异常。priority
: 邮件的优先级。优先级并不直接决定执行顺序,而是用于避免上下游操作符之间的活锁或死锁问题。descriptionFormat
和descriptionArgs
: 用于调试和错误报告的邮件描述信息。actionExecutor
: 用于执行runnable
的执行器。
Mail
类提供了三个构造函数,允许灵活地创建邮件对象:
- 最简单的构造函数只需要
runnable
、priority
和描述信息。 - 可以指定
MailboxExecutor.MailOptions
来配置邮件选项。 - 可以指定
StreamTaskActionExecutor
来控制操作的执行方式。
核心方法
getMailOptions()
: 获取邮件选项。getPriority()
: 获取邮件的优先级。如果邮件是可延迟的,则返回最小优先级。tryCancel()
: 尝试取消邮件的执行。toString()
: 返回邮件的描述信息。run()
: 执行邮件中的操作。
Mail
类在 Flink 的流处理任务中扮演着重要角色。它允许将任务分解为小的、可执行的单元,并通过邮箱机制进行调度和执行。这种设计有助于提高任务的并发性和响应性,同时避免复杂的同步问题。
在实际使用中,可以通过创建 Mail
对象来封装需要执行的操作,并将其提交到邮箱中等待执行。通过设置不同的优先级和选项,可以控制操作的执行顺序和行为。
MailboxExecutorImpl
MailboxExecutorImpl
实现了 flink.api.common.operators.MailboxExecutor
接口,它充当了向 Flink Task 的邮箱(TaskMailbox
)提交执行单元(Runnable
或 Callable
)的一个入口或门面。它的核心目标是允许其他组件将代码片段(封装为 Mail
对象)放入邮箱,这些代码片段最终会由 MailboxProcessor
在其专用的单线程中执行。
核心成员变量:
mailbox: TaskMailbox
: 这是实际存储待执行邮件的邮箱实例。MailboxExecutorImpl
将通过它来提交新的邮件。MailboxExecutorImpl.java
// ... existing code ... /** The mailbox that manages the submitted runnable objects. */ @Nonnull private final TaskMailbox mailbox; // ... existing code ...
priority: int
: 与此MailboxExecutorImpl
实例关联的邮件的默认优先级。当通过这个执行器提交任务时,任务会带上这个优先级。// ... existing code ... private final int priority; // ... existing code ...
actionExecutor: StreamTaskActionExecutor
: 这是一个执行器,用于实际运行封装在Mail
对象中的命令。Mail
对象在被MailboxProcessor
取出后,其run()
方法会使用这个actionExecutor
来执行具体的逻辑。// ... existing code ... private final StreamTaskActionExecutor actionExecutor; // ... existing code ...
mailboxProcessor: MailboxProcessor
(可能为null
): 指向驱动邮箱循环的MailboxProcessor
。主要用于isIdle()
方法的判断。// ... existing code ... private final MailboxProcessor mailboxProcessor; // ... existing code ...
构造函数:
- 提供了两个构造函数,主要的区别在于是否传入
MailboxProcessor
。
它们初始化了执行器的核心组件。// ... existing code ... public MailboxExecutorImpl( @Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) { this(mailbox, priority, actionExecutor, null); } public MailboxExecutorImpl( @Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) { this.mailbox = mailbox; this.priority = priority; this.actionExecutor = Preconditions.checkNotNull(actionExecutor); this.mailboxProcessor = mailboxProcessor; } // ... existing code ...
主要方法分析:
execute
// ... existing code ...
@Override
public void execute(
MailOptions mailOptions,
final ThrowingRunnable<? extends Exception> command,
final String descriptionFormat,
final Object... descriptionArgs) {
try {
mailbox.put(
new Mail(
mailOptions,
command,
priority,
actionExecutor,
descriptionFormat,
descriptionArgs));
} catch (MailboxClosedException mbex) {
throw new RejectedExecutionException(mbex);
}
}
// ... existing code ...
- 这是向邮箱提交任务的核心方法。
- 它接收一个
ThrowingRunnable
作为要执行的命令,以及MailOptions
(用于配置邮件行为,例如是否可延迟)、描述信息等。 - 内部会创建一个新的
Mail
对象,该对象封装了传入的command
、此执行器实例的priority
、actionExecutor
以及描述信息。 - 然后调用
mailbox.put(new Mail(...))
将这个新创建的Mail
对象放入TaskMailbox
中。 - 如果邮箱已经关闭(
MailboxClosedException
),则会抛出RejectedExecutionException
,这是 Executor 服务在无法接受新任务时的标准行为。
yield
此方法设计为由邮箱线程自身调用。
- 它尝试从
mailbox
中获取一个至少具有 此执行器priority
的邮件 (mailbox.take(priority)
)。这是一个阻塞操作,如果当前没有符合条件的邮件,它会等待。 - 一旦获取到
Mail
对象,它会立即在当前线程(即邮箱线程)中执行mail.run()
。 - 目的: 允许当前正在邮箱线程中执行的某个可能耗时较长的操作(例如用户函数)主动暂停,让邮箱中其他待处理的邮件(特别是具有相同或更高优先级的邮件,如 Checkpoint Barrier)有机会执行。这是一种协作式多任务处理机制,对于保证邮箱系统的响应性至关重要。
@Override
public void yield() throws InterruptedException {
Mail mail = mailbox.take(priority);
try {
mail.run();
} catch (Exception ex) {
throw WrappingRuntimeException.wrapIfNecessary(ex);
}
}
tryYield()
:- 与
yield()
类似,但是一个非阻塞版本。 - 它调用
mailbox.tryTake(priority)
尝试获取邮件。 - 如果成功获取到邮件,则执行它并返回
true
。 - 如果没有符合条件的邮件,则立即返回
false
,不会阻塞。 - 需要注意的是,根据
MailboxExecutor
接口的约定和MailOptions.deferrable()
的设计,yield()
和tryYield()
通常不会执行被标记为 "deferrable"(可延迟)的邮件。这是为了在需要快速让出执行权(例如为了处理 Checkpoint)时,避免执行那些可以稍后处理的低优先级或非紧急任务。
- 与
shouldInterrupt()
:- 此方法用于指示当前正在邮箱线程中执行的操作是否应该被中断(例如,一个长时间运行的用户函数)。
- 目前的实现是简单地检查
mailbox.hasMail()
,即只要邮箱中还有任何待处理的邮件,就建议中断。 - 代码中的
TODO: FLINK-35051
注释表明,这是一个待优化的点。理想情况下,只有当邮箱中有时间敏感的邮件(例如与 Checkpoint 相关的邮件)时,才应该建议中断,以避免不必要的性能开销。
isIdle()
:// ... existing code ... public boolean isIdle() { return !mailboxProcessor.isDefaultActionAvailable() && !mailbox.hasMail() && mailbox.getState().isAcceptingMails(); } // ... existing code ...
- 检查关联的
MailboxProcessor
是否处于空闲状态。 - 判断条件为:
MailboxProcessor
的默认操作(通常是processInput
)当前不可用(即被挂起)。TaskMailbox
中没有待处理的邮件。TaskMailbox
的状态仍然是接受邮件的状态(即不是QUIESCED
或CLOSED
)。
- 这个方法需要
mailboxProcessor
成员不为null
。
- 检查关联的
总结与作用
MailboxExecutorImpl
为 Flink 的异步操作和事件驱动模型提供了一个关键的接口。它使得系统中的不同部分(例如 Timer Service、Checkpoint Coordinator,甚至是算子自身)能够安全地将需要在 Task 主执行线程(即邮箱线程)中执行的逻辑提交到邮箱队列。
- 封装提交逻辑: 它将创建
Mail
对象并将其放入TaskMailbox
的细节封装起来,提供了一个更简洁的Executor
风格的 API。 - 支持优先级: 允许为通过特定执行器实例提交的任务指定一个默认优先级。
- 协作式调度 (
yield
/tryYield
): 这是 Mailbox 模型单线程执行模式下实现并发感和响应性的核心机制。它允许长时间运行的任务主动让出控制权,确保高优先级任务(如系统事件)能够及时处理。 - 中断提示 (
shouldInterrupt
): 为长时间运行的用户代码提供了一个检查点,以便在需要时(例如为了执行 Checkpoint)能够优雅地中断。
通过 MailboxExecutorImpl
,Flink 能够确保所有关键的 Task 级别操作(数据处理、状态访问、Checkpoint、Timer 回调等)都在同一个线程中有序执行,从而避免了复杂的并发控制问题,简化了状态管理和一致性保证。
MailboxProcessor
细节分析
MailboxProcessor
封装了基于 Mailbox 的执行模型的完整逻辑。它的核心是一个事件循环 (runMailboxLoop
),该循环持续执行两个主要任务:
- 处理邮箱中的邮件 (Mail): 检查
TaskMailbox
中是否有待处理的邮件(例如 Checkpoint 触发、Timer 事件、用户通过MailboxExecutor
提交的自定义逻辑等),并按优先级顺序执行它们。 - 执行默认动作 (MailboxDefaultAction): 如果邮箱中没有邮件,或者邮件处理完毕后,它会执行一个“默认动作”。在
StreamTask
的上下文中,这个默认动作通常是processInput()
,即处理来自上游的数据。
这种设计确保了 Task 内部所有操作(数据处理、Checkpoint、Timer 等)的单线程执行,从而极大地简化了并发控制和状态管理。
主要结构组件:
mailbox: TaskMailbox
: 这是实际存储和管理Mail
对象的组件。MailboxProcessor
从它那里获取邮件。mailboxDefaultAction: MailboxDefaultAction
: 代表在邮箱空闲时重复执行的默认操作。它通过MailboxDefaultAction.Controller
与MailboxProcessor
交互,例如在没有输入数据时通知MailboxProcessor
暂停调用默认动作。actionExecutor: StreamTaskActionExecutor
: 用于实际执行Mail
中封装的Runnable
。Mail
对象本身不直接执行逻辑,而是委托给这个执行器。控制标志 (Control Flags) - 这些标志必须只能从邮箱线程访问,以避免竞态条件:
mailboxLoopRunning: boolean
: 控制主事件循环是否应该继续运行。当设置为false
时,循环会在当前迭代完成后终止。suspended: boolean
: 控制邮箱处理器是否被临时挂起。如果为true
,runMailboxLoop
会退出,但之后可以被重新调用以恢复。suspendedDefaultAction: DefaultActionSuspension
: 记录当前默认动作是否被挂起。如果非null
,表示默认动作已挂起,MailboxProcessor
不会调用它。
// ... existing code ... /** * Control flag to terminate the mailbox processor. Once it was terminated could not be * restarted again. Must only be accessed from mailbox thread. */ private boolean mailboxLoopRunning; /** * Control flag to temporary suspend the mailbox loop/processor. After suspending the mailbox * processor can be still later resumed. Must only be accessed from mailbox thread. */ private boolean suspended; /** * Remembers a currently active suspension of the default action. Serves as flag to indicate a * suspended default action (suspended if not-null) and to reuse the object as return value in * consecutive suspend attempts. Must only be accessed from mailbox thread. */ private DefaultActionSuspension suspendedDefaultAction; // ... existing code ...
mailboxMetricsControl: MailboxMetricsController
: 用于管理和暴露与邮箱相关的度量指标。
MailboxProcessor
提供了多个构造函数,允许不同程度的定制。核心的构造函数接收 MailboxDefaultAction
、TaskMailbox
、StreamTaskActionExecutor
和 MailboxMetricsController
。
一个常见的用法是传入一个 MailboxDefaultAction
,然后 MailboxProcessor
会使用默认的 TaskMailboxImpl
(与当前线程绑定)和 StreamTaskActionExecutor.IMMEDIATE
。
// ... existing code ...
public MailboxProcessor(
MailboxDefaultAction mailboxDefaultAction,
TaskMailbox mailbox,
StreamTaskActionExecutor actionExecutor,
MailboxMetricsController mailboxMetricsControl) {
this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction);
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
this.mailbox = Preconditions.checkNotNull(mailbox);
this.mailboxLoopRunning = true;
this.suspendedDefaultAction = null;
this.mailboxMetricsControl = mailboxMetricsControl;
}
// ... existing code ...
runMailboxLoop()
// ... existing code ...
public void runMailboxLoop() throws Exception {
suspended = !mailboxLoopRunning;
final TaskMailbox localMailbox = mailbox;
checkState(
localMailbox.isMailboxThread(),
"Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController mailboxController = new MailboxController(this);
while (isNextLoopPossible()) {
// The blocking `processMail` call will not return until default action is available.
processMail(localMailbox, false);
if (isNextLoopPossible()) {
mailboxDefaultAction.runDefaultAction(
mailboxController); // lock is acquired inside default action as needed
}
}
}
private boolean isNextLoopPossible() {
// 'Suspended' can be false only when 'mailboxLoopRunning' is true.
return !suspended;
}
// ... existing code ...
- 这是
MailboxProcessor
的心脏。它在一个while (isNextLoopPossible())
循环中运行。 - 前置检查: 确保该方法由指定的邮箱线程执行,并且邮箱处于
OPEN
状态。 - 创建
MailboxController
:MailboxController
是MailboxDefaultAction
与MailboxProcessor
交互的桥梁。 - 循环体:
processMail(localMailbox, false)
: 调用此方法处理邮箱中的邮件。这是一个关键步骤,它会尝试非阻塞地处理一批邮件。如果默认操作被挂起,它可能会阻塞地等待邮件或默认操作变为可用。false
表示不是单步执行。if (isNextLoopPossible()) { mailboxDefaultAction.runDefaultAction(mailboxController); }
: 如果循环仍然可以继续(例如,没有被挂起或关闭),并且默认动作是可用的,则执行默认动作。
- 设计理念: 注释中提到,
runMailboxLoop
的设计目标是保持热路径(默认动作,邮箱中没有邮件)尽可能快。因此,对控制标志(如mailboxLoopRunning
,suspendedDefaultAction
)的检查通常与mailbox.hasMail()
为true
相关联。这意味着,如果要在邮箱线程内部更改这些标志,必须确保邮箱中至少有一个邮件,以便更改能被及时感知。
processMail
// ... existing code ...
private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception {
// Doing this check is an optimization to only have a volatile read in the expected hot
// path, locks are only
// acquired after this point.
boolean isBatchAvailable = mailbox.createBatch();
// Take mails in a non-blockingly and execute them.
boolean processed = isBatchAvailable && processMailsNonBlocking(singleStep);
if (singleStep) {
return processed;
}
// If the default action is currently not available, we can run a blocking mailbox execution
// until the default action becomes available again.
processed |= processMailsWhenDefaultActionUnavailable();
return processed;
}
private boolean processMailsNonBlocking(boolean singleStep) throws Exception {
long processedMails = 0;
Optional<Mail> maybeMail;
while (isNextLoopPossible() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {
if (processedMails++ == 0) {
maybePauseIdleTimer();
}
runMail(maybeMail.get());
if (singleStep) {
break;
}
}
if (processedMails > 0) {
maybeRestartIdleTimer();
return true;
} else {
return false;
}
}
// ... existing code ...
其中 processMailsNonBlocking
和 processMailsWhenDefaultActionUnavailable
内部会调用 runMail(Mail mail)
来实际执行邮件:
// ... existing code ...
private void runMail(Mail mail) throws Exception {
mailboxMetricsControl.getMailCounter().inc();
mail.run();
// ... existing code ...
- 此方法负责处理邮箱中的邮件。
mailbox.createBatch()
: 首先尝试从主队列创建一批邮件到TaskMailbox
的内部批处理队列。这是一个优化,减少锁竞争。processMailsNonBlocking(singleStep)
: 非阻塞地处理批处理队列中的邮件。如果singleStep
为true
,则只处理一个邮件(用于测试或调试)。processMailsWhenDefaultActionUnavailable()
: 如果默认动作当前不可用(例如,由于反压或没有输入),此方法会尝试从邮箱中获取并处理邮件。它可能会阻塞地等待新邮件的到来,直到默认动作再次可用或循环终止。- 返回
true
如果至少处理了一封邮件。
suspend()
// ... existing code ...
/** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */
public void suspend() {
sendPoisonMail(() -> suspended = true);
}
/** Send mail in first priority for internal needs. */
private void sendPoisonMail(RunnableWithException mail) {
mailbox.runExclusively(
() -> {
// keep state check and poison mail enqueuing atomic, such that no intermediate
// #close may cause a
// MailboxStateException in #sendPriorityMail.
if (mailbox.getState() == TaskMailbox.State.OPEN) {
sendControlMail(mail, "poison mail");
}
});
public void runExclusively(Runnable runnable) {
lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}
// ... existing code ...
- 用于从外部(非邮箱线程)请求挂起邮箱循环。
- 它通过
sendPoisonMail()
向邮箱头部插入一个高优先级的“毒丸”邮件。当这个邮件被处理时,它会将suspended
标志设置为true
,从而导致runMailboxLoop
在下一次检查isNextLoopPossible()
时退出。 - Poison Mail: 是一种特殊控制邮件,用于改变
MailboxProcessor
的内部状态。
allActionsCompleted()
// ... existing code ...
/**
* This method must be called to end the stream task when all actions for the tasks have been
* performed.
*/
public void allActionsCompleted() {
sendPoisonMail(
() -> {
mailboxLoopRunning = false;
suspended = true;
});
}
// ... existing code ...
- 当 Task 的所有动作都已完成,需要终止邮箱循环时调用此方法。
- 与
suspend()
类似,它也通过sendPoisonMail()
发送一个毒丸邮件。该邮件会将mailboxLoopRunning
设置为false
并将suspended
设置为true
,从而彻底停止事件循环。
sendPoisonMail
和 sendControlMail(...)
:
// ... existing code ...
/** Send mail in first priority for internal needs. */
private void sendPoisonMail(RunnableWithException mail) {
mailbox.runExclusively(
() -> {
// keep state check and poison mail enqueuing atomic, such that no intermediate
// #close may cause a
// MailboxStateException in #sendPriorityMail.
if (mailbox.getState() == TaskMailbox.State.OPEN) {
sendControlMail(mail, "poison mail");
}
});
}
/**
* Sends the given <code>mail</code> using {@link TaskMailbox#putFirst(Mail)} . Intended use is
* to control this <code>MailboxProcessor</code>; no interaction with tasks should be performed;
*/
private void sendControlMail(
RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {
mailbox.putFirst(
new Mail(
mail,
Integer.MAX_VALUE /*not used with putFirst*/,
descriptionFormat,
descriptionArgs));
}
// ... existing code ...
sendPoisonMail
: 确保在邮箱OPEN
状态下,通过sendControlMail
发送一个控制邮件。它使用mailbox.runExclusively
来原子地检查状态和入队。sendControlMail
: 将一个具有最高优先级的Mail
对象(通过mailbox.putFirst()
)放入邮箱。这些邮件用于内部控制,如挂起、终止、报告错误等。
生命周期方法 (prepareClose()
, close()
):
// ... existing code ...
/** Lifecycle method to close the mailbox for action submission. */
public void prepareClose() {
mailbox.quiesce();
}
/**
* Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all
* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the
* mailbox.
*/
@Override
public void close() {
List<Mail> droppedMails = mailbox.close();
// ... existing code ...
prepareClose()
: 调用mailbox.quiesce()
。这会使邮箱进入静默状态,不再接受新的邮件,但允许处理已有的邮件。这是关闭过程的第一步。close()
: 调用mailbox.close()
。这会彻底关闭邮箱,清空所有未处理的邮件,并尝试取消仍在邮箱中的RunnableFuture
实例。
与 MailboxDefaultAction
的交互 (通过 MailboxController
):
// ... existing code ...
protected static final class MailboxController implements MailboxDefaultAction.Controller {
private final MailboxProcessor mailboxProcessor;
protected MailboxController(MailboxProcessor mailboxProcessor) {
this.mailboxProcessor = mailboxProcessor;
}
@Override
public void allActionsCompleted() {
mailboxProcessor.allActionsCompleted();
}
@Override
public MailboxDefaultAction.Suspension suspendDefaultAction(
PeriodTimer suspensionPeriodTimer) {
return mailboxProcessor.suspendDefaultAction(suspensionPeriodTimer);
}
// ... existing code ...
}
// ... existing code ...
private final class DefaultActionSuspension implements MailboxDefaultAction.Suspension {
@Nullable private final PeriodTimer suspensionTimer;
public DefaultActionSuspension(@Nullable PeriodTimer suspensionTimer) {
this.suspensionTimer = suspensionTimer;
}
@Override
public void resume() {
if (mailbox.isMailboxThread()) {
resumeInternal();
} else {
try {
sendControlMail(this::resumeInternal, "resume default action");
} catch (MailboxClosedException ex) {
// Ignored
}
}
}
private void resumeInternal() {
// This method must be called from the mailbox thread.
if (mailboxProcessor.suspendedDefaultAction == this) {
mailboxProcessor.suspendedDefaultAction = null;
if (suspensionTimer != null) {
suspensionTimer.markEnd();
}
}
}
}
// ... existing code ...
MailboxController
是一个内部类,实现了MailboxDefaultAction.Controller
接口。MailboxDefaultAction
通过这个Controller
来与MailboxProcessor
通信。suspendDefaultAction()
: 当默认动作(如processInput
)发现当前没有工作可做时(例如,没有输入数据或下游反压),它会调用controller.suspendDefaultAction()
。MailboxProcessor.suspendDefaultAction(@Nullable PeriodTimer suspensionTimer)
:- 此方法(只能由邮箱线程调用)将
suspendedDefaultAction
设置为一个新的DefaultActionSuspension
实例。 DefaultActionSuspension
实现了MailboxDefaultAction.Suspension
接口,其resume()
方法用于恢复默认动作的执行。resume()
可以从任何线程调用,如果不是邮箱线程,它会发送一个控制邮件来确保恢复逻辑在邮箱线程中执行。
- 此方法(只能由邮箱线程调用)将
获取 MailboxExecutor
:
// ... existing code ...
public MailboxExecutor getMainMailboxExecutor() {
return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);
}
/**
* Returns an executor service facade to submit actions to the mailbox.
*
* @param priority the priority of the {@link MailboxExecutor}.
*/
public MailboxExecutor getMailboxExecutor(int priority) {
return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
}
// ... existing code ...
getMainMailboxExecutor()
: 返回一个具有最低优先级的MailboxExecutor
。getMailboxExecutor(int priority)
: 返回一个指定优先级的MailboxExecutor
。这些MailboxExecutor
实例允许其他组件向此MailboxProcessor
的邮箱提交任务。
总结
MailboxProcessor
是 Flink Task 单线程执行模型的核心。它通过一个事件循环来协调处理高优先级的控制/事件邮件和低优先级的默认数据处理动作。这种机制确保了:
- 单线程执行: 所有关键逻辑都在同一个线程中执行,避免了复杂的并发同步。
- 响应性: 高优先级邮件(如 Checkpoint barriers)可以抢占默认动作,保证系统事件的及时处理。
- 可控性: 提供了挂起、恢复、终止事件循环的机制。
- 可扩展性: 通过
MailboxExecutor
允许外部组件向邮箱提交自定义任务。
processInput
processInput
方法是 StreamTask
执行其核心数据处理逻辑的地方。它是作为 MailboxProcessor
的默认动作 (MailboxDefaultAction) 来执行的。这意味着,当 MailboxProcessor
的邮箱中没有更高优先级的“邮件”(如 Checkpoint 触发、Timer 事件等)需要处理时,它就会循环调用这个 processInput
方法。
下面是对 processInput
方法的详细分析:
方法职责与设计理念:
处理输入事件: 其核心职责是从输入源(由
inputProcessor
代表)获取一个事件(通常是一条记录或一组记录),并将其传递给后续的算子链进行处理。非阻塞性: 注释中强调“Implementations should (in general) be non-blocking”。这是非常关键的一点。因为
MailboxProcessor
是单线程执行其邮箱中的邮件和默认动作的,如果processInput
长时间阻塞,将会导致 Checkpoint barriers、Timer 等重要事件无法及时处理,影响任务的正确性和性能。与 MailboxProcessor 协作: 通过
MailboxDefaultAction.Controller controller
参数,processInput
可以与MailboxProcessor
进行交互,例如在没有数据或遇到反压时,通知MailboxProcessor
暂停调用默认动作。
处理输入 (
inputProcessor.processInput()
):
- 方法首先调用
inputProcessor.processInput()
。InputProcessor
负责从上游读取数据、反序列化,并将数据喂给当前 Task 的第一个 Operator。 processInput()
的返回值DataInputStatus
描述了本次输入处理的结果。
根据 DataInputStatus
进行分支处理:
DataInputStatus status = inputProcessor.processInput();
switch (status) {
case MORE_AVAILABLE:
if (taskIsAvailable()) {
return;
}
break;
case NOTHING_AVAILABLE:
break;
case END_OF_RECOVERY:
throw new IllegalStateException("We should not receive this event here.");
case STOPPED:
endData(StopMode.NO_DRAIN);
return;
case END_OF_DATA:
endData(StopMode.DRAIN);
notifyEndOfData();
return;
case END_OF_INPUT:
// Suspend the mailbox processor, it would be resumed in afterInvoke and finished
// after all records processed by the downstream tasks. We also suspend the default
// actions to avoid repeat executing the empty default operation (namely process
// records).
controller.suspendDefaultAction();
mailboxProcessor.suspend();
return;
}
MORE_AVAILABLE
: 表示 inputProcessor
中还有更多数据可以立即处理。
if (taskIsAvailable()) { return; }
: 如果当前任务本身也是可用的(例如,下游没有反压),则直接返回。MailboxProcessor
会很快再次调用processInput
来处理更多数据。NOTHING_AVAILABLE
: 表示inputProcessor
当前没有可用的数据。此时,方法不会立即返回,而是会继续检查是否存在反压等情况,可能需要暂停默认动作的执行。END_OF_RECOVERY
: 这是一个不期望在此处出现的状态,表示任务恢复逻辑可能存在问题,因此抛出IllegalStateException
。STOPPED
: 表示输入流被强制停止(例如任务被取消,且不需要流干数据)。endData(StopMode.NO_DRAIN)
: 通知算子链以非排空模式结束处理。return;
: 结束当前processInput
调用。
END_OF_DATA
: 表示当前输入流的所有数据都已到达(例如,有限流Source结束)。endData(StopMode.DRAIN)
: 通知算子链以排空模式结束处理(处理完所有已缓冲的数据)。notifyEndOfData()
: 通知 TaskManager 当前任务的数据已结束。return;
: 结束当前processInput
调用。
END_OF_INPUT
: 表示该 Task 的所有输入都已经结束。这是一个更强的结束信号。controller.suspendDefaultAction()
: 通知MailboxProcessor
暂停调用processInput
。因为已经没有新的输入了,再继续调用也没有意义。mailboxProcessor.suspend()
: 暂停整个MailboxProcessor
的事件循环。任务此时会等待下游处理完所有数据,并完成最终的 Checkpoint 等操作。return;
: 结束当前processInput
调用。
处理反压和等待逻辑 (当 NOTHING_AVAILABLE
或其他需要等待的情况): 如果 inputProcessor.processInput()
返回 NOTHING_AVAILABLE
,或者虽然有数据但任务本身不可用(例如下游反压),代码会进入等待逻辑:
// 如果前面没有return
TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
PeriodTimer timer;
CompletableFuture<?> resumeFuture;
if (!recordWriter.isAvailable()) {
timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());
resumeFuture = recordWriter.getAvailableFuture();
} else if (!inputProcessor.isAvailable()) {
timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
resumeFuture = inputProcessor.getAvailableFuture();
} else if (changelogWriterAvailabilityProvider != null
&& !changelogWriterAvailabilityProvider.isAvailable()) {
// waiting for changelog availability is reported as busy
timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
} else {
// data availability has changed in the meantime; retry immediately
return;
}
assertNoException(
resumeFuture.thenRun(
new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
: 获取IO相关的度量指标组。PeriodTimer timer; CompletableFuture<?> resumeFuture;
: 声明计时器和用于恢复的 Future。检查输出是否可用 (
!recordWriter.isAvailable()
):如果
recordWriter
(负责将处理结果写到下游)不可用,说明下游存在反压。timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());
: 启动一个计时器,用于度量由于下游反压导致的等待时间。resumeFuture = recordWriter.getAvailableFuture();
: 获取一个 Future,当recordWriter
再次可用时,该 Future 会完成。
检查输入处理器是否可用 (
!inputProcessor.isAvailable()
):如果
inputProcessor
本身不可用(例如,等待网络缓冲区的到来)。timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
: 启动一个计时器,用于度量由于上游输入不可用导致的空闲时间。resumeFuture = inputProcessor.getAvailableFuture();
: 获取一个 Future,当inputProcessor
再次可用时,该 Future 会完成。
检查 Changelog Writer 是否可用:
如果使用了 Changelog State Backend,并且其
changelogWriterAvailabilityProvider
表示不可用。timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
: 启动计时器,度量等待 Changelog Writer 的繁忙时间。resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
: 获取 Future,等待其可用。
数据可用性已改变 (
else { return; }
):如果以上等待条件都不满足,说明在
inputProcessor.processInput()
调用之后,数据的可用性可能已经发生了变化(例如,新的数据刚刚到达)。此时直接return
,让MailboxProcessor
立即重试processInput
。
挂起默认动作并等待恢复:
controller.suspendDefaultAction(timer)
: 调用controller
的suspendDefaultAction
方法,并传入之前启动的timer
。这会通知MailboxProcessor
暂时停止调用processInput
。MailboxProcessor
会使用这个timer
来记录挂起的时间(用于监控和度量)。该方法返回一个MailboxDefaultAction.Suspension
对象。resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer))
: 当resumeFuture
完成时(即等待的条件解除,例如下游不再反压或上游数据到达),会执行ResumeWrapper
中的逻辑。ResumeWrapper
会调用Suspension
对象的resume()
方法,这会通知MailboxProcessor
可以重新开始调用processInput
了。同时,timer
也会被停止。assertNoException(...)
: 确保thenRun
中的操作不会抛出未捕获的异常。
Checkpoint
StreamTask
通过其内部的 MailboxProcessor
和相关的 MailboxExecutor
来发送和处理与 Checkpoint 相关的邮件(即需要在 Task 主线程中执行的 Checkpoint 操作)。
以下是 StreamTask
如何发送和处理与 Checkpoint 相关邮件的关键机制分析:
MailboxProcessor
和MailboxExecutor
:- 每个
StreamTask
都有一个MailboxProcessor
实例 (mailboxProcessor
),它负责驱动 Task 的事件循环。 StreamTask
可以通过mailboxProcessor.getMailboxExecutor(priority)
获取一个MailboxExecutor
。这个MailboxExecutor
提供了execute(...)
方法,可以将一个Runnable
(封装了 Checkpoint 相关逻辑)作为Mail
提交到邮箱中。- 这些邮件会被
MailboxProcessor
在其主循环中按优先级取出并执行。
- 每个
SubtaskCheckpointCoordinator
:StreamTask
包含一个SubtaskCheckpointCoordinator
实例 (subtaskCheckpointCoordinator
)。这个协调器负责处理 Task 级别的 Checkpoint 逻辑,例如触发操作符的快照、处理 Barrier 对齐、通知 Checkpoint 完成或中止等。- 很多 Checkpoint 相关的操作会首先由
SubtaskCheckpointCoordinator
发起或处理,然后它可能会通过StreamTask
的MailboxExecutor
将具体的执行步骤提交到邮箱。
actionExecutor
:StreamTask
还有一个StreamTaskActionExecutor
实例 (actionExecutor
)。虽然MailboxExecutor
用于将任务 放入 邮箱,但当Mail
从邮箱中被取出后,其内部的Runnable
通常会通过这个actionExecutor
来实际执行。对于 Checkpoint 相关的操作,这确保了它们在正确的 Task 主线程上下文中运行。
发送 Checkpoint 相关邮件的典型场景和方法:
触发 Checkpoint (
triggerCheckpointAsync
):- 当 JobManager 向 TaskManager 发送触发 Checkpoint 的 RPC 时,
Task
(通常是StreamTask
的父类或其本身)会调用triggerCheckpointAsync
方法。 - 这个方法会将实际的 Checkpoint 执行逻辑封装成一个
Runnable
,并通过mainMailboxExecutor
(一个具有默认优先级的MailboxExecutor
)提交到邮箱。 - 这样做是为了确保 Checkpoint 的所有阶段(例如调用操作符的
snapshotState
)都在 Task 的主线程中执行,从而避免与正常的数据处理流程发生并发冲突。
StreamTask.java
// ... existing code ... @Override public CompletableFuture<Boolean> triggerCheckpointAsync( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { checkForcedFullSnapshotSupport(checkpointOptions); CompletableFuture<Boolean> result = new CompletableFuture<>(); mainMailboxExecutor.execute( () -> { try { // Lock the mailbox to ensure that the checkpoint is not concurrent with other // actions synchronized (mailboxProcessor) { result.complete( triggerUnfinishedChannelsCheckpoint( checkpointMetaData, checkpointOptions)); } } catch (Exception ex) { // Report the failure both via the Future result but also to the mailbox result.completeExceptionally(ex); throw ex; } }, "checkpoint %s with %s", checkpointMetaData, checkpointOptions); return result; } // ... existing code ...
在上面的代码片段中,
mainMailboxExecutor.execute(...)
就是将 Checkpoint 触发逻辑(triggerUnfinishedChannelsCheckpoint
)作为邮件发送到邮箱的关键步骤。- 当 JobManager 向 TaskManager 发送触发 Checkpoint 的 RPC 时,
通知 Checkpoint 完成 (
notifyCheckpointCompleteAsync
):- 当 Task 完成一个 Checkpoint 并收到 JobManager 的确认后,会调用此方法。
- 同样,通知操作符 Checkpoint 完成的逻辑也会被封装并通过
MailboxExecutor
提交到邮箱。
StreamTask.java
// ... existing code ... @Override public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) { return notifyCheckpointOperation( () -> notifyCheckpointComplete(checkpointId), String.format("checkpoint %d completed", checkpointId)); } // ... existing code ...
而
notifyCheckpointOperation
内部会使用MailboxExecutor
:StreamTask.java
// ... existing code ... private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) { CompletableFuture<Void> result = new CompletableFuture<>(); mailboxProcessor .getMailboxExecutor(TaskMailbox.MAX_PRIORITY) .execute( () -> { try { runnable.run(); } catch (Exception ex) { result.completeExceptionally(ex); throw ex; } result.complete(null); }, description); return result; } // ... existing code ...
这里使用了
TaskMailbox.MAX_PRIORITY
,表明这是一个高优先级的操作。通知 Checkpoint 中止 (
notifyCheckpointAbortAsync
):- 当一个 Checkpoint 因为各种原因(超时、错误、被新的 Checkpoint 取代)需要中止时,会调用此方法。
- 中止逻辑,包括清理操作符可能产生的临时状态,也会通过邮件发送到邮箱执行。
StreamTask.java
// ... existing code ... @Override public Future<Void> notifyCheckpointAbortAsync( long checkpointId, long latestCompletedCheckpointId) { return notifyCheckpointOperation( () -> { if (latestCompletedCheckpointId > 0) { notifyCheckpointComplete(latestCompletedCheckpointId); } if (isCurrentSyncSavepoint(checkpointId)) { throw new FlinkRuntimeException("Stop-with-savepoint failed."); } subtaskCheckpointCoordinator.notifyCheckpointAborted( checkpointId, operatorChain, this::isRunning); }, String.format("checkpoint %d aborted", checkpointId)); } // ... existing code ...
同样,它也使用了
notifyCheckpointOperation
方法,将中止逻辑放入邮箱。处理 Barrier 对齐时的 Timer 回调:
- 当使用 Barrier 对齐策略时,如果一个 InputGate 等待某个 Channel 的 Barrier 超时,
SubtaskCheckpointCoordinator
会注册一个 Timer。当这个 Timer 触发时,其回调逻辑(例如取消 Checkpoint 或强制触发 Checkpoint)也会被封装成邮件并通过MailboxExecutor
提交到邮箱执行。 - 在
BarrierAlignmentUtil.createRegisterTimerCallback
中可以看到相关的逻辑,它会返回一个BiConsumer<Long, Long>
,这个 Consumer 内部会使用mainMailboxExecutor
来执行超时处理。
- 当使用 Barrier 对齐策略时,如果一个 InputGate 等待某个 Channel 的 Barrier 超时,
Source Task 的特定行为:
- 例如在
SourceOperatorStreamTask
中,notifyCheckpointAbortAsync
和notifyCheckpointSubsumedAsync
方法会直接使用mainMailboxExecutor
来执行清理 Checkpoint 的逻辑。
SourceOperatorStreamTask.java
// ... existing code ... @Override public Future<Void> notifyCheckpointAbortAsync( long checkpointId, long latestCompletedCheckpointId) { mainMailboxExecutor.execute( () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId); return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId); } @Override public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) { mainMailboxExecutor.execute( () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId); return super.notifyCheckpointSubsumedAsync(checkpointId); // ... existing code ...
- 例如在
总结:
StreamTask
依赖其 MailboxProcessor
和通过它获取的 MailboxExecutor
来确保所有与 Checkpoint 相关的关键操作(触发、通知完成/中止、Barrier 处理等)都在 Task 的主事件循环线程中串行执行。这避免了复杂的并发控制,保证了 Checkpoint 过程与正常数据处理流程的一致性和正确性。当需要执行一个 Checkpoint 相关操作时,通常会将其封装为一个 Runnable
,然后通过 MailboxExecutor.execute()
方法将其作为一封邮件提交到邮箱队列中,等待 MailboxProcessor
的调度执行。