问题描述
一个常见的开窗逻辑(12H 或者 500条):
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import java.time.Duration;
public class UIDWindowWithProcessFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 假设输入数据流包含uid字段和其他数据
DataStream<Event> inputStream = env.addSource(...);
inputStream
.keyBy(event -> event.uid) // 按UID分组
.process(new CustomProcessFunction())
.print();
env.execute("UID-based Window Processing");
}
public static class CustomProcessFunction extends KeyedProcessFunction<String, Event, OutputEvent> {
// 状态用于计数
private transient ValueState<Integer> countState;
// 状态用于记录最后更新时间
private transient ValueState<Long> lastTimerState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> countDescriptor =
new ValueStateDescriptor<>("count", Types.INT);
countState = getRuntimeContext().getState(countDescriptor);
ValueStateDescriptor<Long> timerDescriptor =
new ValueStateDescriptor<>("timerState", Types.LONG);
lastTimerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<OutputEvent> out) throws Exception {
// 获取当前计数
Integer currentCount = countState.value();
if (currentCount == null) {
currentCount = 0;
}
// 更新计数
currentCount += 1;
countState.update(currentCount);
// 获取当前定时器时间戳
Long currentTimer = lastTimerState.value();
// 如果是第一条记录,注册12小时后的定时器
if (currentCount == 1) {
long timerTime = ctx.timestamp() + Duration.ofHours(12).toMillis();
ctx.timerService().registerProcessingTimeTimer(timerTime);
lastTimerState.update(timerTime);
}
// 如果达到500条,立即触发并重置
if (currentCount >= 500) {
// 触发处理
OutputEvent output = new OutputEvent(
ctx.getCurrentKey(),
currentCount,
System.currentTimeMillis()
);
out.collect(output);
// 清除状态
countState.clear();
// 取消之前的定时器
if (currentTimer != null) {
ctx.timerService().deleteProcessingTimeTimer(currentTimer);
}
lastTimerState.clear();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {
// 定时器触发时处理
Integer currentCount = countState.value();
if (currentCount != null && currentCount > 0) {
OutputEvent output = new OutputEvent(
ctx.getCurrentKey(),
currentCount,
timestamp
);
out.collect(output);
// 清除状态
countState.clear();
lastTimerState.clear();
}
}
}
// 定义输入输出事件类
public static class Event {
public String uid;
// 其他字段...
}
public static class OutputEvent {
public String uid;
public int count;
public long timestamp;
public OutputEvent(String uid, int count, long timestamp) {
this.uid = uid;
this.count = count;
this.timestamp = timestamp;
}
}
}
虽然 通过uid进行shuffle,即 keyBy(event -> event.uid)。
但因为Flink的并行度,也就是subtask数量 远少于 uid数量,导致每个subtask会处理多个用户的数据。而实际上每个subtask只有一个 CustomProcessFunction。那状态计数是否会冲突?
// 获取当前计数
Integer currentCount = countState.value();
触发的Timer又是否是只属于一个用户?
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {
// 定时器触发时处理
Integer currentCount = countState.value();
if (currentCount != null && currentCount > 0) {
实际上这两个问题的答案都是肯定的,实现机制在于:
getRuntimeContext().getState()怎么实现对于key绑定状态
Timer怎么绑定key?
为什么getRuntimeContext().getState()能够获得和key绑定的state?
Subtask会根据是不是keyedProcessFunction 在处理每条数据时,设置currentKey
OneInputStreamTask 通过 StreamTaskNetworkOutput 处理每一条输入数据。StreamTaskNetworkOutput则创建了recordProcessor 。
private StreamTaskNetworkOutput(
Input<IN> operator, WatermarkGauge watermarkGauge, Counter numRecordsIn) {
this.operator = checkNotNull(operator);
this.watermarkGauge = checkNotNull(watermarkGauge);
this.numRecordsIn = checkNotNull(numRecordsIn);
this.recordProcessor = RecordProcessorUtils.getRecordProcessor(operator);
}
RecordProcessorUtils.getRecordProcessor 根据是不是KeyStream会增加setKeyContextElement操作,这个process会设置Key再调用OP的 processElement。
public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(
Input<T> input) {
boolean canOmitSetKeyContext;
if (input instanceof AbstractStreamOperator) {
canOmitSetKeyContext = canOmitSetKeyContext((AbstractStreamOperator<?>) input, 0);
} else {
canOmitSetKeyContext =
input instanceof KeyContextHandler
&& !((KeyContextHandler) input).hasKeyContext();
}
if (canOmitSetKeyContext) {
return input::processElement;
} else if (input instanceof AsyncKeyOrderedProcessing
&& ((AsyncKeyOrderedProcessing) input).isAsyncKeyOrderedProcessingEnabled()) {
return ((AsyncKeyOrderedProcessing) input).getRecordProcessor(1);
} else {
return record -> {
input.setKeyContextElement(record);
input.processElement(record);
};
}
}
AbstractStreamOperator setKey的实现
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void setKeyContextElement1(StreamRecord record) throws Exception {
setKeyContextElement(record, stateKeySelector1);
}
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
throws Exception {
if (selector != null) {
Object key = selector.getKey(record.getValue());
setCurrentKey(key);
}
}
RuntimeContext创建
AbstractStreamOperator 会创建 runtime
this.runtimeContext =
new StreamingRuntimeContext(
environment,
environment.getAccumulatorRegistry().getUserMap(),
getMetricGroup(),
getOperatorID(),
getProcessingTimeService(),
null,
environment.getExternalResourceInfoProvider());
AbstractUdfStreamOperator 会向udf注入runtime
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>{
@Override
protected void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
}
//FunctionUtils
public static void setFunctionRuntimeContext(Function function, RuntimeContext context) {
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.setRuntimeContext(context);
}
}
StreamingRuntimeContext 获取状态,这就是getRuntimeContext().getState()调用的。
@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
stateProperties.initializeSerializerUnlessSet(this::createSerializer);
return keyedStateStore.getState(stateProperties);
}
// 返回成员对象
private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(
StateDescriptor<?, ?> stateDescriptor) {
checkNotNull(stateDescriptor, "The state properties must not be null");
checkNotNull(
keyedStateStore,
String.format(
"Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.",
stateDescriptor.getName(), stateDescriptor.getType()));
return keyedStateStore;
}
注意这个 keyedStateStore 在StreamingRuntimeContext 刚new出来时 是null,在AbstractStreamOperator 的以下函数进行初始化
@Override
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());
final CloseableRegistry streamTaskCloseableRegistry =
Preconditions.checkNotNull(containingTask.getCancelables());
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
getProcessingTimeService(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics,
config.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
runtimeContext.getJobConfiguration(),
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState(),
isAsyncKeyOrderedProcessingEnabled());
stateHandler =
new StreamOperatorStateHandler(
context, getExecutionConfig(), streamTaskCloseableRegistry);
timeServiceManager =
isAsyncKeyOrderedProcessingEnabled()
? context.asyncInternalTimerServiceManager()
: context.internalTimerServiceManager();
beforeInitializeStateHandler();
stateHandler.initializeOperatorState(this);
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
StreamOperatorStateHandler 会根据有没有keyedStateBackend 来判断是不是要产生DefaultKeyedStateStore。
public StreamOperatorStateHandler(
StreamOperatorStateContext context,
ExecutionConfig executionConfig,
CloseableRegistry closeableRegistry) {
this.context = context;
this.keySerializer = context.keySerializer();
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
this.asyncKeyedStateBackend = context.asyncKeyedStateBackend();
this.closeableRegistry = closeableRegistry;
if (keyedStateBackend != null || asyncKeyedStateBackend != null) {
keyedStateStore =
new DefaultKeyedStateStore(
keyedStateBackend,
asyncKeyedStateBackend,
new SerializerFactory() {
@Override
public <T> TypeSerializer<T> createSerializer(
TypeInformation<T> typeInformation) {
return typeInformation.createSerializer(
executionConfig.getSerializerConfig());
}
});
} else {
keyedStateStore = null;
}
}
getState方法如下,最终调用 keyedStateBackend相关方法。
@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
requireNonNull(stateProperties, "The state properties must not be null");
try {
stateProperties.initializeSerializerUnlessSet(serializerFactory);
return getPartitionedState(stateProperties);
} catch (Exception e) {
throw new RuntimeException("Error while getting state", e);
}
}
protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor)
throws Exception {
checkState(
keyedStateBackend != null
&& supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1,
"Current operator does not integrate the async processing logic, "
+ "thus only supports state v1 APIs. Please use StateDescriptor under "
+ "'org.apache.flink.runtime.state'.");
return keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
}
那context中的keyedStateBackend是怎么注入的?AbstractStreamOperator初始化产生了StreamOperatorStateContext。
StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
getProcessingTimeService(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics,
config.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
runtimeContext.getJobConfiguration(),
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState(),
isAsyncKeyOrderedProcessingEnabled());
这里创建StreamOperatorStateContext实际使用 StreamTaskStateInitializerImpl ,该对象包含了操作符执行所需的各种状态后端和时间服务管理器。
主要初始化内容
1. 状态后端初始化
Keyed State Backend(键控状态后端):
- 根据 keySerializer 是否存在决定是否创建键控状态后端
- 支持同步和异步两种键控状态后端
- 通过
StateBackend.createKeyedStateBackend()
或StateBackend.createAsyncKeyedStateBackend()
创建
Operator State Backend(操作符状态后端):
- 创建
DefaultOperatorStateBackend
来管理操作符状态 - 处理操作符级别的状态恢复
- 创建
2. 原始状态输入流初始化
Raw Keyed State Inputs(原始键控状态输入):
- 为自定义键控状态提供输入流
- 处理从检查点或保存点恢复的原始键控状态数据
Raw Operator State Inputs(原始操作符状态输入):
- 为自定义操作符状态提供输入流
- 处理从检查点或保存点恢复的原始操作符状态数据
3. 时间服务管理器初始化
- Internal Timer Service Manager(内部定时器服务管理器):
- 创建和管理内部定时器服务
- 支持同步和异步状态后端的定时器管理
当 keyedStatedBackend != null 创建 timeServiceManager
初始化依据
1. 任务环境信息
- 通过 Environment 获取任务的基本信息,包括:
- 任务信息(TaskInfo)
- 任务状态管理器(
TaskStateManager
) - 作业ID和任务索引等
2. 操作符标识
- 根据 OperatorID 从
TaskStateManager
中获取特定操作符的优先级状态信息(PrioritizedOperatorSubtaskState) - 这包含了从检查点或保存点恢复的状态数据
3. 状态恢复信息
- 从 PrioritizedOperatorSubtaskState 获取各种状态:
- 管理的键控状态(getPrioritizedManagedKeyedState())
- 管理的操作符状态(getPrioritizedManagedOperatorState())
- 原始键控状态(getPrioritizedRawKeyedState())
- 原始操作符状态(getPrioritizedRawOperatorState())
4. 配置参数
- managedMemoryFraction:管理内存的分配比例
- isUsingCustomRawKeyedState:是否使用自定义原始键控状态
- isAsyncState:是否使用异步状态后端
Timer怎么和key绑定?
Timer 详细分析见:
调用链:
用户在
KeyedProcessFunction
中调用ctx.timerService().registerProcessingTimeTimer(...)
。KeyedProcessOperator 将 context 注入
KeyedProcessFunction
,KeyedProcessFunction
调用ctx.timerService()
实际转发 KeyedProcessOperator 注入的 SimpleTimerServiceSimpleTimerService
将调用转发给internalTimerService.registerProcessingTimeTimer(...)
。InternalTimerService (内部使用一个支持删除的索引堆,懒判断到期后)
向StreamTask
的ProcessingTimeService
注册一个回调。
KeyedProcessOperator 的 open方法 创建时间服务和Context。
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
context = new ContextImpl(userFunction, timerService);
onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}
调用了 AbstractStreamOperator的方法 获取时间服务
public <K, N> InternalTimerService<N> getInternalTimerService(
String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
if (timeServiceManager == null) {
throw new RuntimeException("The timer service has not been initialized.");
}
@SuppressWarnings("unchecked")
InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
return keyedTimeServiceHandler.getInternalTimerService(
name, keySerializer, namespaceSerializer, triggerable);
}
Triggerable 接口有两个方法:onEventTime(InternalTimer<K, N> timer) 和 onProcessingTime(InternalTimer<K, N> timer)。当 InternalTimerService 检测到有定时器到期时,就会调用实现了这个接口的对象的相应方法。
这个方法根据 InternalTimeServiceManagerImpl 获取 TimerService
public <N> InternalTimerService<N> getInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable) {
checkNotNull(keySerializer, "Timers can only be used on keyed operators.");
// the following casting is to overcome type restrictions.
TimerSerializer<K, N> timerSerializer =
new TimerSerializer<>(keySerializer, namespaceSerializer);
InternalTimerServiceImpl<K, N> timerService =
registerOrGetTimerService(name, timerSerializer);
timerService.startTimerService(
timerSerializer.getKeySerializer(),
timerSerializer.getNamespaceSerializer(),
triggerable);
return timerService;
}
register中保证每个名字只有一个 TimerService
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(
String name, TimerSerializer<K, N> timerSerializer) {
InternalTimerServiceImpl<K, N> timerService =
(InternalTimerServiceImpl<K, N>) timerServices.get(name);
if (timerService == null) {
if (priorityQueueSetFactory instanceof AsyncKeyedStateBackend) {
timerService =
new InternalTimerServiceAsyncImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(
EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext);
} else {
timerService =
new InternalTimerServiceImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(
EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext);
}
timerServices.put(name, timerService);
}
return timerService;
}
startTimerService
方法是 InternalTimerServiceImpl
的初始化入口。它负责设置必要的序列化器、触发目标(通常是算子自身),并且在从故障恢复时处理已保存的定时器。
与处理时间定时器的联系点:
// ... existing code ...
this.triggerTarget = Preconditions.checkNotNull(triggerTarget);
// re-register the restored timers (if any)
// 关键点:检查处理时间定时器队列 (processingTimeTimersQueue) 的头部是否有定时器
final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();
if (headTimer != null) {
// 如果存在(通常意味着是从快照恢复的),
// 则调用 processingTimeService.registerTimer 来重新注册这个最早到期的处理时间定时器。
// this::onProcessingTime 是回调方法,当定时器触发时,会调用 InternalTimerServiceImpl 的 onProcessingTime 方法。
nextTimer =
processingTimeService.registerTimer(
headTimer.getTimestamp(), this::onProcessingTime);
}
this.isInitialized = true;
} else {
// ... existing code ...
- 恢复处理时间定时器:
- 在
if (restoredTimersSnapshot != null)
的逻辑块之后(或者如果restoredTimersSnapshot
为null
),代码会检查processingTimeTimersQueue
。这个队列存储了当前算子实例负责的所有处理时间定时器。 - 如果
processingTimeTimersQueue.peek()
返回一个非null
的headTimer
,这通常意味着在任务启动时,状态后端已经恢复了之前保存的定时器到这个队列中。 - 此时,
InternalTimerServiceImpl
需要告诉底层的ProcessingTimeService
(由StreamTask
提供,通常是基于 JVM 的ScheduledExecutorService
):“嘿,我这里最早有一个处理时间定时器需要在headTimer.getTimestamp()
这个时间点触发,到时请调用我的onProcessingTime
方法。” processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime)
就是在执行这个注册操作。this::onProcessingTime
是一个方法引用,指向InternalTimerServiceImpl
自己的onProcessingTime
方法。当ProcessingTimeService
确定时间到达后,会通过 Mailbox 机制回调这个方法。nextTimer
字段保存了ProcessingTimeService
返回的ScheduledFuture<?>
,允许后续取消或管理这个已注册的系统级定时器。
- 在
所以,startTimerService
在初始化阶段确保了从状态恢复的处理时间定时器能够被正确地重新调度。
registerProcessingTimeTimer
方法是用户(通过 KeyedProcessFunction
-> SimpleTimerService
)实际注册一个新的处理时间定时器时调用的核心逻辑。
注意这里向Timer队列添加的时候,Timer 包含 keyContext.getCurrentKey()
// ... existing code ...
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
// 获取当前处理时间定时器队列中最早的定时器 (如果存在)
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
// 将新的定时器添加到处理时间定时器队列中
// TimerHeapInternalTimer 包含了时间戳、key 和 namespace
// keyContext.getCurrentKey() 获取当前正在处理的元素的 key
if (processingTimeTimersQueue.add(
new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
// 如果添加成功 (通常意味着队列状态改变了,比如新定时器成了新的头部,或者队列之前是空的)
// 获取之前队列头部的触发时间,如果队列之前为空,则认为是 Long.MAX_VALUE
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// 检查新注册的定时器是否比当前已调度的系统级定时器更早
if (time < nextTriggerTime) {
// 如果新定时器更早,说明需要重新调度
if (nextTimer != null) {
// 取消之前已注册的系统级定时器 (nextTimer)
// false 表示不中断正在执行的任务 (如果回调已经在执行中)
nextTimer.cancel(false);
}
// 使用 processingTimeService 注册新的、更早的定时器
// 当这个新的时间点到达时,会回调 this::onProcessingTime
nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
}
}
}
// ... existing code ...
- 添加定时器到内部队列:
- 首先,
new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)
创建了一个新的处理时间定时器对象。 processingTimeTimersQueue.add(...)
将这个新定时器添加到内部的优先队列中。这个队列会根据时间戳对定时器进行排序。
- 首先,
- 与
ProcessingTimeService
交互以优化调度:InternalTimerServiceImpl
只会向底层的ProcessingTimeService
注册一个系统级的定时器,即其内部队列中最早到期的那个处理时间定时器。这样做是为了避免向系统注册过多的定时器回调,提高效率。InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
获取在添加新定时器之前队列中最早的定时器。long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
获取之前需要触发的时间。if (time < nextTriggerTime)
: 这个判断至关重要。它检查新注册的定时器time
是否比当前已在ProcessingTimeService
中注册的下一个触发时间nextTriggerTime
更早。- 如果新定时器确实更早,那么之前向
ProcessingTimeService
注册的那个nextTimer
就作废了(因为它不再是最早的了)。 nextTimer.cancel(false);
取消旧的系统级定时器。nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
然后向ProcessingTimeService
注册这个新的、更早的定时器。
- 如果新定时器确实更早,那么之前向
- 如果新注册的定时器并不比当前已调度的
nextTimer
更早,那么就不需要做任何操作,因为当前的nextTimer
仍然是有效的,它会在其预定时间触发,届时onProcessingTime
方法会处理所有到期的定时器(包括这个新加入但不是最早的定时器)。
Timer触发的时候怎么绑定key
KeyedProcessOperator 的 onProcessingTime 函数 调用触发 udf 的 onTimer
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.eraseTimestamp();
invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
}
private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, VoidNamespace> timer)
throws Exception {
onTimerContext.timeDomain = timeDomain;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
而这个函数通过 InternalTimerServiceImpl 调用,这里通过timer.getKey()设置了key。
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {
void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null
&& timer.getTimestamp() <= time
&& !cancellationContext.isCancelled()) {
keyContext.setCurrentKey(timer.getKey());
processingTimeTimersQueue.poll();
triggerTarget.onProcessingTime(timer);
taskIOMetricGroup.getNumFiredTimers().inc();
}
if (timer != null && nextTimer == null) {
nextTimer =
processingTimeService.registerTimer(
timer.getTimestamp(), this::onProcessingTime);
}
}