当前位置: 首页 > news >正文

Flink KeyedProcessFunction为什么能为每个key定义State和Timer?

问题描述

一个常见的开窗逻辑(12H 或者 500条):

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import java.time.Duration;public class UIDWindowWithProcessFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 假设输入数据流包含uid字段和其他数据DataStream<Event> inputStream = env.addSource(...); inputStream.keyBy(event -> event.uid)  // 按UID分组.process(new CustomProcessFunction()).print();env.execute("UID-based Window Processing");}public static class CustomProcessFunction extends KeyedProcessFunction<String, Event, OutputEvent> {// 状态用于计数private transient ValueState<Integer> countState;// 状态用于记录最后更新时间private transient ValueState<Long> lastTimerState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>("count", Types.INT);countState = getRuntimeContext().getState(countDescriptor);ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);lastTimerState = getRuntimeContext().getState(timerDescriptor);}@Overridepublic void processElement(Event event, Context ctx, Collector<OutputEvent> out) throws Exception {// 获取当前计数Integer currentCount = countState.value();if (currentCount == null) {currentCount = 0;}// 更新计数currentCount += 1;countState.update(currentCount);// 获取当前定时器时间戳Long currentTimer = lastTimerState.value();// 如果是第一条记录,注册12小时后的定时器if (currentCount == 1) {long timerTime = ctx.timestamp() + Duration.ofHours(12).toMillis();ctx.timerService().registerProcessingTimeTimer(timerTime);lastTimerState.update(timerTime);}// 如果达到500条,立即触发并重置if (currentCount >= 500) {// 触发处理OutputEvent output = new OutputEvent(ctx.getCurrentKey(), currentCount,System.currentTimeMillis());out.collect(output);// 清除状态countState.clear();// 取消之前的定时器if (currentTimer != null) {ctx.timerService().deleteProcessingTimeTimer(currentTimer);}lastTimerState.clear();}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {// 定时器触发时处理Integer currentCount = countState.value();if (currentCount != null && currentCount > 0) {OutputEvent output = new OutputEvent(ctx.getCurrentKey(),currentCount,timestamp);out.collect(output);// 清除状态countState.clear();lastTimerState.clear();}}}// 定义输入输出事件类public static class Event {public String uid;// 其他字段...}public static class OutputEvent {public String uid;public int count;public long timestamp;public OutputEvent(String uid, int count, long timestamp) {this.uid = uid;this.count = count;this.timestamp = timestamp;}}
}

虽然 通过uid进行shuffle,即 keyBy(event -> event.uid)。

但因为Flink的并行度,也就是subtask数量 远少于 uid数量,导致每个subtask会处理多个用户的数据。而实际上每个subtask只有一个 CustomProcessFunction。那状态计数是否会冲突?

// 获取当前计数 
Integer currentCount = countState.value();

触发的Timer又是否是只属于一个用户?

@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {// 定时器触发时处理Integer currentCount = countState.value();if (currentCount != null && currentCount > 0) {

实际上这两个问题的答案都是肯定的,实现机制在于:

  • getRuntimeContext().getState()怎么实现对于key绑定状态
  • Timer怎么绑定key?

为什么getRuntimeContext().getState()能够获得和key绑定的state?

Subtask会根据是不是keyedProcessFunction 在处理每条数据时,设置currentKey

OneInputStreamTask 通过 StreamTaskNetworkOutput 处理每一条输入数据。StreamTaskNetworkOutput则创建了recordProcessor 。

private StreamTaskNetworkOutput(Input<IN> operator, WatermarkGauge watermarkGauge, Counter numRecordsIn) {this.operator = checkNotNull(operator);this.watermarkGauge = checkNotNull(watermarkGauge);this.numRecordsIn = checkNotNull(numRecordsIn);this.recordProcessor = RecordProcessorUtils.getRecordProcessor(operator);}

RecordProcessorUtils.getRecordProcessor 根据是不是KeyStream会增加setKeyContextElement操作,这个process会设置Key再调用OP的 processElement。

    public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(Input<T> input) {boolean canOmitSetKeyContext;if (input instanceof AbstractStreamOperator) {canOmitSetKeyContext = canOmitSetKeyContext((AbstractStreamOperator<?>) input, 0);} else {canOmitSetKeyContext =input instanceof KeyContextHandler&& !((KeyContextHandler) input).hasKeyContext();}if (canOmitSetKeyContext) {return input::processElement;} else if (input instanceof AsyncKeyOrderedProcessing&& ((AsyncKeyOrderedProcessing) input).isAsyncKeyOrderedProcessingEnabled()) {return ((AsyncKeyOrderedProcessing) input).getRecordProcessor(1);} else {return record -> {input.setKeyContextElement(record);input.processElement(record);};}}

AbstractStreamOperator setKey的实现

    @Override@SuppressWarnings({"unchecked", "rawtypes"})public void setKeyContextElement1(StreamRecord record) throws Exception {setKeyContextElement(record, stateKeySelector1);}private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)throws Exception {if (selector != null) {Object key = selector.getKey(record.getValue());setCurrentKey(key);}}

RuntimeContext创建

AbstractStreamOperator 会创建 runtime

        this.runtimeContext =new StreamingRuntimeContext(environment,environment.getAccumulatorRegistry().getUserMap(),getMetricGroup(),getOperatorID(),getProcessingTimeService(),null,environment.getExternalResourceInfoProvider());

 AbstractUdfStreamOperator 会向udf注入runtime

      
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT>{@Overrideprotected void setup(StreamTask<?, ?> containingTask,StreamConfig config,Output<StreamRecord<OUT>> output) {super.setup(containingTask, config, output);FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());}//FunctionUtils
public static void setFunctionRuntimeContext(Function function, RuntimeContext context) {if (function instanceof RichFunction) {RichFunction richFunction = (RichFunction) function;richFunction.setRuntimeContext(context);}}

StreamingRuntimeContext 获取状态,这就是getRuntimeContext().getState()调用的。

    @Overridepublic <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);stateProperties.initializeSerializerUnlessSet(this::createSerializer);return keyedStateStore.getState(stateProperties);}// 返回成员对象private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {checkNotNull(stateDescriptor, "The state properties must not be null");checkNotNull(keyedStateStore,String.format("Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.",stateDescriptor.getName(), stateDescriptor.getType()));return keyedStateStore;}

注意这个 keyedStateStore 在StreamingRuntimeContext 刚new出来时 是null,在AbstractStreamOperator 的以下函数进行初始化

    @Overridepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager)throws Exception {final TypeSerializer<?> keySerializer =config.getStateKeySerializer(getUserCodeClassloader());final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());final CloseableRegistry streamTaskCloseableRegistry =Preconditions.checkNotNull(containingTask.getCancelables());final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getJobConfiguration(),runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState(),isAsyncKeyOrderedProcessingEnabled());stateHandler =new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);timeServiceManager =isAsyncKeyOrderedProcessingEnabled()? context.asyncInternalTimerServiceManager(): context.internalTimerServiceManager();beforeInitializeStateHandler();stateHandler.initializeOperatorState(this);runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));}

StreamOperatorStateHandler 会根据有没有keyedStateBackend 来判断是不是要产生DefaultKeyedStateStore。

    public StreamOperatorStateHandler(StreamOperatorStateContext context,ExecutionConfig executionConfig,CloseableRegistry closeableRegistry) {this.context = context;this.keySerializer = context.keySerializer();this.operatorStateBackend = context.operatorStateBackend();this.keyedStateBackend = context.keyedStateBackend();this.asyncKeyedStateBackend = context.asyncKeyedStateBackend();this.closeableRegistry = closeableRegistry;if (keyedStateBackend != null || asyncKeyedStateBackend != null) {keyedStateStore =new DefaultKeyedStateStore(keyedStateBackend,asyncKeyedStateBackend,new SerializerFactory() {@Overridepublic <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {return typeInformation.createSerializer(executionConfig.getSerializerConfig());}});} else {keyedStateStore = null;}}

getState方法如下,最终调用 keyedStateBackend相关方法。

    @Overridepublic <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {requireNonNull(stateProperties, "The state properties must not be null");try {stateProperties.initializeSerializerUnlessSet(serializerFactory);return getPartitionedState(stateProperties);} catch (Exception e) {throw new RuntimeException("Error while getting state", e);}}protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor)throws Exception {checkState(keyedStateBackend != null&& supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1,"Current operator does not integrate the async processing logic, "+ "thus only supports state v1 APIs. Please use StateDescriptor under "+ "'org.apache.flink.runtime.state'.");return keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);}

那context中的keyedStateBackend是怎么注入的?AbstractStreamOperator初始化产生了StreamOperatorStateContext。

StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getJobConfiguration(),runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState(),isAsyncKeyOrderedProcessingEnabled());

这里创建StreamOperatorStateContext实际使用 StreamTaskStateInitializerImpl ,该对象包含了操作符执行所需的各种状态后端和时间服务管理器。

主要初始化内容

1. 状态后端初始化

  • Keyed State Backend(键控状态后端)

    • 根据 keySerializer 是否存在决定是否创建键控状态后端
    • 支持同步和异步两种键控状态后端
    • 通过 StateBackend.createKeyedStateBackend() 或 StateBackend.createAsyncKeyedStateBackend() 创建
  • Operator State Backend(操作符状态后端)

    • 创建 DefaultOperatorStateBackend 来管理操作符状态
    • 处理操作符级别的状态恢复

2. 原始状态输入流初始化

  • Raw Keyed State Inputs(原始键控状态输入)

    • 为自定义键控状态提供输入流
    • 处理从检查点或保存点恢复的原始键控状态数据
  • Raw Operator State Inputs(原始操作符状态输入)

    • 为自定义操作符状态提供输入流
    • 处理从检查点或保存点恢复的原始操作符状态数据

3. 时间服务管理器初始化

  • Internal Timer Service Manager(内部定时器服务管理器)
    • 创建和管理内部定时器服务
    • 支持同步和异步状态后端的定时器管理
    • 当 keyedStatedBackend != null 创建 timeServiceManager

初始化依据

1. 任务环境信息

  • 通过 Environment 获取任务的基本信息,包括:
    • 任务信息(TaskInfo)
    • 任务状态管理器(TaskStateManager
    • 作业ID和任务索引等

2. 操作符标识

  • 根据 OperatorID 从 TaskStateManager 中获取特定操作符的优先级状态信息(PrioritizedOperatorSubtaskState)
  • 这包含了从检查点或保存点恢复的状态数据

3. 状态恢复信息

  • 从 PrioritizedOperatorSubtaskState 获取各种状态:
    • 管理的键控状态(getPrioritizedManagedKeyedState())
    • 管理的操作符状态(getPrioritizedManagedOperatorState())
    • 原始键控状态(getPrioritizedRawKeyedState())
    • 原始操作符状态(getPrioritizedRawOperatorState())

4. 配置参数

  • managedMemoryFraction:管理内存的分配比例
  • isUsingCustomRawKeyedState:是否使用自定义原始键控状态
  • isAsyncState:是否使用异步状态后端

Timer怎么和key绑定?

Timer 详细分析见:

揭秘Fliuk Timer机制:是否多线程触发?

调用链:

  • 用户在 KeyedProcessFunction 中调用 ctx.timerService().registerProcessingTimeTimer(...)

  • KeyedProcessOperator 将 context 注入 KeyedProcessFunctionKeyedProcessFunction 调用 ctx.timerService()实际转发 KeyedProcessOperator 注入的 SimpleTimerService

  • SimpleTimerService 将调用转发给 internalTimerService.registerProcessingTimeTimer(...)

  • InternalTimerService (内部使用一个支持删除的索引堆,懒判断到期后)StreamTaskProcessingTimeService 注册一个回调。

KeyedProcessOperator 的 open方法 创建时间服务和Context。

    public void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);InternalTimerService<VoidNamespace> internalTimerService =getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);TimerService timerService = new SimpleTimerService(internalTimerService);context = new ContextImpl(userFunction, timerService);onTimerContext = new OnTimerContextImpl(userFunction, timerService);}

调用了 AbstractStreamOperator的方法 获取时间服务

    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {if (timeServiceManager == null) {throw new RuntimeException("The timer service has not been initialized.");}@SuppressWarnings("unchecked")InternalTimeServiceManager<K> keyedTimeServiceHandler =(InternalTimeServiceManager<K>) timeServiceManager;TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();checkState(keySerializer != null, "Timers can only be used on keyed operators.");return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable);}

Triggerable 接口有两个方法:onEventTime(InternalTimer<K, N> timer) 和 onProcessingTime(InternalTimer<K, N> timer)。当 InternalTimerService 检测到有定时器到期时,就会调用实现了这个接口的对象的相应方法。

这个方法根据  InternalTimeServiceManagerImpl 获取 TimerService

    public <N> InternalTimerService<N> getInternalTimerService(String name,TypeSerializer<K> keySerializer,TypeSerializer<N> namespaceSerializer,Triggerable<K, N> triggerable) {checkNotNull(keySerializer, "Timers can only be used on keyed operators.");// the following casting is to overcome type restrictions.TimerSerializer<K, N> timerSerializer =new TimerSerializer<>(keySerializer, namespaceSerializer);InternalTimerServiceImpl<K, N> timerService =registerOrGetTimerService(name, timerSerializer);timerService.startTimerService(timerSerializer.getKeySerializer(),timerSerializer.getNamespaceSerializer(),triggerable);return timerService;}

register中保证每个名字只有一个 TimerService

<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {InternalTimerServiceImpl<K, N> timerService =(InternalTimerServiceImpl<K, N>) timerServices.get(name);if (timerService == null) {if (priorityQueueSetFactory instanceof AsyncKeyedStateBackend) {timerService =new InternalTimerServiceAsyncImpl<>(taskIOMetricGroup,localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),cancellationContext);} else {timerService =new InternalTimerServiceImpl<>(taskIOMetricGroup,localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),cancellationContext);}timerServices.put(name, timerService);}return timerService;}

startTimerService方法是 InternalTimerServiceImpl 的初始化入口。它负责设置必要的序列化器、触发目标(通常是算子自身),并且在从故障恢复时处理已保存的定时器。

与处理时间定时器的联系点:

// ... existing code ...this.triggerTarget = Preconditions.checkNotNull(triggerTarget);// re-register the restored timers (if any)// 关键点:检查处理时间定时器队列 (processingTimeTimersQueue) 的头部是否有定时器final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();if (headTimer != null) {// 如果存在(通常意味着是从快照恢复的),// 则调用 processingTimeService.registerTimer 来重新注册这个最早到期的处理时间定时器。// this::onProcessingTime 是回调方法,当定时器触发时,会调用 InternalTimerServiceImpl 的 onProcessingTime 方法。nextTimer =processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime);}this.isInitialized = true;} else {
// ... existing code ...
  • 恢复处理时间定时器:
    • 在 if (restoredTimersSnapshot != null) 的逻辑块之后(或者如果 restoredTimersSnapshot 为 null),代码会检查 processingTimeTimersQueue。这个队列存储了当前算子实例负责的所有处理时间定时器。
    • 如果 processingTimeTimersQueue.peek() 返回一个非 null 的 headTimer,这通常意味着在任务启动时,状态后端已经恢复了之前保存的定时器到这个队列中。
    • 此时,InternalTimerServiceImpl 需要告诉底层的 ProcessingTimeService (由 StreamTask 提供,通常是基于 JVM 的 ScheduledExecutorService):“嘿,我这里最早有一个处理时间定时器需要在 headTimer.getTimestamp() 这个时间点触发,到时请调用我的 onProcessingTime 方法。”
    • processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime) 就是在执行这个注册操作。this::onProcessingTime 是一个方法引用,指向 InternalTimerServiceImpl 自己的 onProcessingTime 方法。当 ProcessingTimeService 确定时间到达后,会通过 Mailbox 机制回调这个方法。
    • nextTimer 字段保存了 ProcessingTimeService 返回的 ScheduledFuture<?>,允许后续取消或管理这个已注册的系统级定时器。

所以,startTimerService 在初始化阶段确保了从状态恢复的处理时间定时器能够被正确地重新调度。

registerProcessingTimeTimer 方法是用户(通过 KeyedProcessFunction -> SimpleTimerService)实际注册一个新的处理时间定时器时调用的核心逻辑。

注意这里向Timer队列添加的时候,Timer 包含 keyContext.getCurrentKey()

// ... existing code ...@Overridepublic void registerProcessingTimeTimer(N namespace, long time) {// 获取当前处理时间定时器队列中最早的定时器 (如果存在)InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();// 将新的定时器添加到处理时间定时器队列中// TimerHeapInternalTimer 包含了时间戳、key 和 namespace// keyContext.getCurrentKey() 获取当前正在处理的元素的 keyif (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {// 如果添加成功 (通常意味着队列状态改变了,比如新定时器成了新的头部,或者队列之前是空的)// 获取之前队列头部的触发时间,如果队列之前为空,则认为是 Long.MAX_VALUElong nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;// 检查新注册的定时器是否比当前已调度的系统级定时器更早if (time < nextTriggerTime) {// 如果新定时器更早,说明需要重新调度if (nextTimer != null) {// 取消之前已注册的系统级定时器 (nextTimer)// false 表示不中断正在执行的任务 (如果回调已经在执行中)nextTimer.cancel(false);}// 使用 processingTimeService 注册新的、更早的定时器// 当这个新的时间点到达时,会回调 this::onProcessingTimenextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);}}}
// ... existing code ...
  • 添加定时器到内部队列:
    • 首先,new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace) 创建了一个新的处理时间定时器对象。
    • processingTimeTimersQueue.add(...) 将这个新定时器添加到内部的优先队列中。这个队列会根据时间戳对定时器进行排序。
  • 与 ProcessingTimeService 交互以优化调度:
    • InternalTimerServiceImpl 只会向底层的 ProcessingTimeService 注册一个系统级的定时器,即其内部队列中最早到期的那个处理时间定时器。这样做是为了避免向系统注册过多的定时器回调,提高效率。
    • InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); 获取在添加新定时器之前队列中最早的定时器。
    • long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; 获取之前需要触发的时间。
    • if (time < nextTriggerTime): 这个判断至关重要。它检查新注册的定时器 time 是否比当前已在 ProcessingTimeService 中注册的下一个触发时间 nextTriggerTime 更早。
      • 如果新定时器确实更早,那么之前向 ProcessingTimeService 注册的那个 nextTimer 就作废了(因为它不再是最早的了)。
      • nextTimer.cancel(false); 取消旧的系统级定时器。
      • nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime); 然后向 ProcessingTimeService 注册这个新的、更早的定时器。
    • 如果新注册的定时器并不比当前已调度的 nextTimer 更早,那么就不需要做任何操作,因为当前的 nextTimer 仍然是有效的,它会在其预定时间触发,届时 onProcessingTime 方法会处理所有到期的定时器(包括这个新加入但不是最早的定时器)。

Timer触发的时候怎么绑定key

KeyedProcessOperator 的 onProcessingTime 函数 调用触发 udf 的 onTimer

       @Overridepublic void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {collector.eraseTimestamp();invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);}private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, VoidNamespace> timer)throws Exception {onTimerContext.timeDomain = timeDomain;onTimerContext.timer = timer;userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);onTimerContext.timeDomain = null;onTimerContext.timer = null;}

而这个函数通过 InternalTimerServiceImpl 调用,这里通过timer.getKey()设置了key

public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {void onProcessingTime(long time) throws Exception {// null out the timer in case the Triggerable calls registerProcessingTimeTimer()// inside the callback.nextTimer = null;InternalTimer<K, N> timer;while ((timer = processingTimeTimersQueue.peek()) != null&& timer.getTimestamp() <= time&& !cancellationContext.isCancelled()) {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();triggerTarget.onProcessingTime(timer);taskIOMetricGroup.getNumFiredTimers().inc();}if (timer != null && nextTimer == null) {nextTimer =processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);}}


文章转载自:

http://drUB5ZDe.pLqkz.cn
http://ecWic6ZH.pLqkz.cn
http://p2D5G0KQ.pLqkz.cn
http://WxfiLbhg.pLqkz.cn
http://wIEXUn9o.pLqkz.cn
http://cjrrHa2N.pLqkz.cn
http://KNenpqSt.pLqkz.cn
http://1H6nRRsV.pLqkz.cn
http://CeHqnsYw.pLqkz.cn
http://5eCfyZu3.pLqkz.cn
http://0uCZH6fJ.pLqkz.cn
http://g61eDfsn.pLqkz.cn
http://OLZZu388.pLqkz.cn
http://76SsoPoV.pLqkz.cn
http://CmFNCwrn.pLqkz.cn
http://tIhXRZG8.pLqkz.cn
http://DnlaVRAK.pLqkz.cn
http://zVJmcc2n.pLqkz.cn
http://pfGEcK7c.pLqkz.cn
http://bZg25lgl.pLqkz.cn
http://P1FPXifo.pLqkz.cn
http://3pg90g7D.pLqkz.cn
http://lQWxP7o3.pLqkz.cn
http://AtV4gvBt.pLqkz.cn
http://yYdC3K55.pLqkz.cn
http://NxdgX8jz.pLqkz.cn
http://bzas7Pmz.pLqkz.cn
http://Ftobi3R0.pLqkz.cn
http://8PhMSOj8.pLqkz.cn
http://5rH6GMTi.pLqkz.cn
http://www.dtcms.com/a/373795.html

相关文章:

  • 【ARDUINO】通过ESP8266连接WIFI,启动TCP,接受TCP客户端指令【待测试】
  • Azure Data Factory (ADF) vs Azure Logic Apps: 对比分析
  • 软考-系统架构设计师 企业资源规划(ERP)详细讲解
  • 农产品运输与调度服务平台的设计与实现
  • Dart → `.exe`:Flutter 桌面与纯命令行双轨编译完全指南
  • 栈专题之每日温度
  • 远场学习_FDTD_dipole(1)
  • 编译缓存工具 sccache 效果对比
  • 【MFC典型类和函数:CString的字符串魔法与Afx全局函数的便利店】
  • 【MFC】对话框属性:字体 (Font Name) 和 大小 (Font Size)
  • 搜索框设计实用指南:规范、模板与工具全解析
  • Python调用MCP:无需重构,快速为现有应用注入AI与外部服务能力!
  • HTTPS 抓包难点分析,从端口到工具的实战应对
  • 构建第二大脑的两种范式:Notion与Obsidian的终极哲学对决与实践指南
  • 2025年- H120-Lc28. 找出字符串中第一个匹配项的下标(数组)--Java版
  • 网络编程;TCP/IP协议,和 网络编程相关概念;字节序转换;0908
  • 深度剖析Windows PE程序安全:IAT HOOK与DLL劫持的攻防之道
  • ollama笔记
  • C++语言编程规范-函数
  • 如何在 FastAPI 中优雅地模拟多模块集成测试?
  • 阿德莱德大学Nat. Commun.:盐模板策略实现废弃塑料到单原子催化剂的高值转化,推动环境与能源催化应用
  • 新型APT组织“嘈杂熊“针对哈萨克斯坦能源部门发起网络间谍活动
  • Windows 11 安装 Maven、配置国内镜像
  • 软件测试|STATIC 代码静态验证工具 C/C++ 工具链设置指南
  • JavaScript 行为型设计模式详解
  • 强化学习:从 Q-Learning 到 Deep Q-Network
  • 摄像头模块在运动相机中的特殊应用
  • 雷卯针对米尔MYC-YG2UL开发板防雷防静电方案
  • 专为石油和天然气检测而开发的基于无人机的OGI相机
  • pytest(2):测试用例查找原理详解(从默认规则到高级钩子定制)