Temporal Join,一探究竟
flink sql的join分为Regular Joins、Interval Joins、Temporal Joins和Lookup Join。
尽管官方文档中对上述几种join类型的概念和原理有很好的解释,但针对Temporal Joins仍有以下一些疑惑。
- 1.官方文档中给出的tempoal join中语法如下,但紧随着分别说明只有基于事件时间方式使用可以
FOR SYSTEM_TIME AS OF leftTable.rowtime
语法,而基于处理时间方式却只能使用temporal table function join
语法?
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
- 2.基于事件时间和处理时间方式join的实现细节是什么?如何理解不同时间语义下的“时态”?
- 3.join结果是何时、如何下发的?
- 4.表中的记录在状态中如何存储?状态如何清理?
- 5.如何判断不需要(过期)记录的版本?
- 6.基于处理时间的temporal join和lookup join的区别是什么?
带着上述问题,开始尝试从源码中(1.16)获取答案。
temporal join两种时间属性的Operator实现类在StreamExecTemporalJoin#createJoinOperator
方法中初始化。
ps: 一件事找到正确入口便已成功了30%,那么这个入口是如何找到的?在源码中通过关键字硬硬硬搜出来的……
private TwoInputStreamOperator<RowData, RowData, RowData> createJoinOperator(...) {// ...// 根据table.exec.state.ttl配置项计算最小、大留存时间,temporal join中将会根据此来计算状态的留存时间long minRetentionTime = config.getStateRetentionTime();long maxRetentionTime = TableConfigUtils.getMaxIdleStateRetentionTime(config);if (rightTimeAttributeIndex >= 0) {// 基于事件时间的temporal joinreturn new TemporalRowTimeJoinOperator(...);} else {// 基于处理时间的temporal joinif (isTemporalFunctionJoin) {// 使用temporal table function join时,初始化TemporalProcessTimeJoinOperatorreturn new TemporalProcessTimeJoinOperator(...);} else {// 否则抛出异常,官方原因如下// The exsiting TemporalProcessTimeJoinOperator has already supported temporal table join. // However, the semantic of this implementation is problematic, because the join processing for left stream doesn't wait for the complete snapshot of temporal table,// this may mislead users in production environment. See FLINK-19830 for more details.throw new TableException("Processing-time temporal join is not supported yet.");}}
}
因此Temporal Join基于事件时间和处理时间的operator实现类分别为TemporalRowTimeJoinOperator
和TemporalProcessTimeJoinOperator
,二者继承共同父类BaseTwoInputStreamOperatorWithStateRetention
。
基于处理时间的temporal join只有使用temporal table function join方式时才会初始化TemporalProcessTimeJoinOperator
,否则抛出异常。FLINK-19830中说明了这样做的原因:当前TemporalProcessTimeJoinOperator
实现中,由于左表记录并不会等待右表记录的完整快照,存在可能会误导用户的语义问题。之所以支持temporal table function join方式,是由于这种方式已经存在很长时间的历史原因,出于兼容性烤考虑,所以继续支持它。
1. BaseTwoInputStreamOperatorWithStateRetention
BaseTwoInputStreamOperatorWithStateRetention
,一个允许子类基于TTL清理它们状态的抽象TwoInputStreamOperator
。
该抽象类的主要作用是根据table.exec.state.ttl,默认值0
配置项,决定是否开启处理时间timer(table.exec.state.ttl>1
时开启)。开启后,对表中每条记录的key最多维护一个处理时间timer。子类通过实现其唯一的抽象方法cleanupState(long time)
来决定timer触发时的具体动作。
- 关键成员属性
// 取值table.exec.state.ttl
private final long minRetentionTime;
// 取值table.exec.state.ttl * 3 / 2
private final long maxRetentionTime;
// 保存已注册timer的触发时间
private transient ValueState<Long> latestRegisteredCleanupTimer;
ValueState<Long> latestRegisteredCleanupTimer
状态中保存的是已注册的处理时间timer的触发时间,通过该状态值判断处理时间timer是否注册、更新等操作。除此之外minRetentionTime
和maxRetentionTime
表示状态最小、最大留存时间,用于计算处理时间timer的时间。
- 关键方法
// 注册、更新处理时间timer,由子类主动调用
protected void registerProcessingCleanupTimer()
// timer触发时自动执行
public final void onProcessingTime
// 唯一的抽象方法,子类实现具体的timer触发时的执行逻辑
public abstract void cleanupState(long time)
registerProcessingCleanupTimer()
方法负责新增、更新处理时间timer,由子类主动调用,核心逻辑如下
if(table.exec.state.ttl>1){if(timer未注册){注册处理时间timer,timer时间=curProcessingTime+maxRetentionTime;}else if(curProcessingTime+minRetentionTime > 已注册timer的时间){删除旧的timer;更新处理时间timer,timer时间=curProcessingTime+maxRetentionTime;}
}
onProcessingTime()
方法在timer触发时自动执行,主要逻辑为执行cleanupState(long time)抽象方法,核心逻辑如下
public final void onProcessingTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {if(table.exec.state.ttl>1){if(timer已注册 && 保存在状态中的time==timer){执行cleanupState(long time),该方法由子类具体实现;清空latestRegisteredCleanupTimer状态;}}
}
画个图来理解下处理时间timer
假设table.exec.state.ttl配置为2h,则minRetentionTime=2h,maxRetentionTime=3h
1.在processTime=10:00时刻首次调用registerProcessingCleanupTimer()
方法注册timer,该timer将会在curProcesstime+maxRetentionTime(13:00) 时触发执行。
2.在(10:00,11:00)
的时间范围内,继续调用registerProcessingCleanupTimer()
方法,将不会导致timer发生变更。
3.在[11:00,-)
之后的时间内,继续调用registerProcessingCleanupTimer()
方法,由于processTime+minRetentionTime > 13:00,将导致timer不断被重置。
如果之后始终不断调用registerProcessingCleanupTimer()
方法,则timer将一直不会触发,只有在timer触发之前未调用registerProcessingCleanupTimer()
方法时,timer才会被触发执行。
2. 基于事件时间的 Temporal join Operator
TemporalRowTimeJoinOperator
的processElement1()
和processElement2()
方法分别负责处理左右表中的每条记录。两个方法的处理逻辑类似。主要为以下步骤
- 1.获取(左/右表)记录中事件时间
- 2.将记录存储到对应状态中
- 3.使用最小事件时间注册事件时间timer
- 4.调用父类中的
registerProcessingCleanupTimer()
方法,注册处理时间timer
有以下几个关键点
- 当左/右表中记录到达后并没有执行join操作并输出结果,仅仅是将数据放入到各自的状态中,并使用最小事件时间注册事件时间timer
- 为什么是使用最小事件事件注册事件时间timer?
用于存储左右表记录状态的成员属性定义如下
private transient MapState<Long, RowData> leftState;
private transient MapState<Long, RowData> rightState;
二者的区别主要体现在key实际存储的内容上,leftState
中key保存的内容是一个从1开始的自增数字,当左表中第一条数据到达后其状态中key=1,第二条数据其状态中key=2,依次递增。而rightState
中key保存的是右表中每条记录的事件时间,这样设计的原因将从emitResultAndCleanUpState(long currentWatermark)
方法中得到解释。
如何理解使用最小事件时间注册事件时间timer?
基于事件时间的temporal join的工作原理是先保存左右表中记录到状态中并注册timer,当watermark超过timer时间,在timer触发执行时来处理状态中的数据并输出结果。
如果为每个key的全部数据都注册事件时间timer,就会导致注册的timer数量巨大。例如左表记录中key=A的事件时间在流中的体现形式为{9,8,5,2,1},如果为这5个事件时间都注册timer,当收到watermark(10)时,这5个timer都将会触发,造成冗余。为了避免这种情况,对同一个数据记录key仅使用最小事件时间(事件时间1)注册的事件时间timer。这样不仅大大减少了timer数量,当收到watermark(10)时,只有一个timer被触发执行。
最小事件时间timer将会保存在ValueState<Long> registeredTimer
,由左右表中的记录共同更新,当左/右表的数据记录到达processElement1()
/processElement2()
方法后,根据当前记录的事件时间和registeredTimer
状态中已注册timer时间进行比较,使用最小事件时间重新注册或更新timer(更新操作为先删除旧timer,再注册新timer)。
上述过程在registerSmallestTimer(long timestamp)
方法中完成,核心逻辑如下
// 入参timestamp为左右表中当前记录的事件时间
private void registerSmallestTimer(long timestamp) throws IOException {Long currentRegisteredTimer = registeredTimer.value();if(currentRegisteredTimer == null){registeredTimer.update(timestamp);注册timer;}else if(currentRegisteredTimer > timestamp){删除旧timer;registeredTimer.update(timestamp);注册新timer;}
}
processElement1()
方法相比processElement2()
稍有不同的地方在于,processElement1
()方法中需要为左表中每条记录计算存储在leftState
中的自增序号。使用ValueState<Long> nextLeftIndex
状态保存自增序号。计算当前记录自增序号也很简单:状态值+1。
已注册的事件时间timer触发时onEventTime()
方法将会执行,在该方法进行状态清理和结果输出。核心逻辑如下
- 1.registeredTimer.clear();
- 2.调用
emitResultAndCleanUpState(long currentWatermark)
方法来清理状态和下发结果,该方法会返回leftState
中未处理数据中最小的事件时间(lastUnprocessedTime); - 3.lastUnprocessedTime有效的情况下,使用lastUnprocessedTime重新注册事件时间timer;
- 4.调用父类方法,注册或清理处理时间timer,该步骤具体逻辑如下
if (stateCleaningEnabled) {if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) {registerProcessingCleanupTimer();} else {cleanupLastTimer();// 清空保存左表记录自增序号的状态nextLeftIndex.clear();}
}
最最最核心的join逻辑将在emitResultAndCleanUpState(long currentWatermark)
方法中揭晓。该方法主要处理逻辑如下
1.从右表记录状态(rightState
)中获取全部数据到本地list中(rightRowsSorted),该list已按照事件时间升序排序;
2.对左表记录状态(leftState
)中全部数据,按照<=currentWatermark
条件分成两部分,将满足条件的数据从状态中取出到本地map(orderedLeftRecords),并从状态中删除,该map使用TreeMap结构默认根据key(自增序号)升序排序,对不满足条件的数据部分计算其最小事件时间(lastUnprocessedTime);
因此之所以使用自增序号作为leftState
状态中key的目是为了在处理左表记录时与记录到达顺序保持一致。
3.挨个处理orderedLeftRecords中数据,对每一条记录根据其事件时间使用二分查找从rightRowsSorted中获取右表中相同事件时间的的记录,如果右表中无相同事件时间的记录,则返回右表中<左表记录事件时间且最新事件时间
的右表记录(rightRow);
4.根据第3步rightRow结果、join条件、是否是left join输出不同结果,该步骤具体逻辑如下
if (rightRow存在 && rightRow in(INSERT,UPDATE_AFTER)) {if (join条件成立) {输出左表 join 右表结果} else {if (isLeftOuterJoin) {输出左表 JOIN null结果}}
} else {if (isLeftOuterJoin) {输出左表 JOIN null结果}
}
5.orderedLeftRecords.clear();
6.删除右表状态(rightState
)中记录事件时间<=currentWatermark
的全部记录,但是在<=currentWatermark
的记录中会保留中下事件时间最大的那条记录,这意味着如果右表记录已经全部过期,rightState
中也会保留一条最新事件时间的记录。
通过记录事件时间<=currentWatermark
的条件来判断右表中记录是否已失效。
7.返回lastUnprocessedTime时间。
最后父类cleanupState(long time)
的方法实现如下。负责清空全部状态数据。
@Override
public void cleanupState(long time) {leftState.clear();rightState.clear();nextLeftIndex.clear();registeredTimer.clear();
}
以上即从TemporalRowTimeJoinOperator
的关键逻辑。可以得到该join仅支持inner join或left join,join过程是以左表数据记录为主的,右表数据记录作为维表被动等待左表记录进行匹配。这也解释了官方文档中描述中“基于事件时间的temporal join允许对版本表进行join”,为什么仅将右表称为版本表的原因。
3. 基于处理时间的 Temporal join Operator
相比于TemporalRowTimeJoinOperator
实现类,TemporalProcessTimeJoinOperator
实现类要简单很多。仅仅定义了一个ValueState<RowData> rightState
来保存右表记录中相同key的一条数据。左表记录不会通过状态进行存储,这也意味这在左表记录到达时将会立即输出join结果。
processElement1()
方法中处理左表记录的流程如下
- 1.从右表状态(
rightState
)中获取右表记录(rightSideRow); - 2.根据右表记录、join条件、是否left join来处理输出结果,该过程具体逻辑如下
if (rightSideRow == null) {// 右表记录为空if (left join) {输出左表 join null结果} else {return;}
} else {if (join条件成立) {输出左表 join 右表结果} else {if (left join) {输出左表 join null结果}}// 调用父类方法注册处理时间timer,进行状态清理registerProcessingCleanupTimer();
}
processElement2()
方法中对右表记录的处理仅存储到状态或从状态中删除,核心逻辑如下
if (记录类型 in(INSERT,UPDATE_AFTER)) {//将记录保存到状态中,调用父类方法注册处理时间timer,进行状态清理rightState.update(element.getValue());registerProcessingCleanupTimer();
} else {// 清空右表记录状态,然后删除父类中的处理时间timerrightState.clear();cleanupLastTimer();
}
最后父类最后父类cleanupState(long time)
的方法实现如下,清空保存右表记录的状态.
@Override
public void cleanupState(long time) {rightState.clear();
}
在基于处理时间的temporal join同样仅支持inner join或left join。以左表记录为主,右表记录被动等待左表记录进行匹配。
同基于事件时间的temporal join实现的最大区别在于左表记录并不会等待右表记录。当左表中key=A的记录到达后,尽管右表中同样存在key=A的数据,但是在左表key=A的记录到达时右表记录还未保存到状态中,左表记录仍然是无法正确关联到右表数据的。这就是上文提到的可能会引发用户误解的语义问题。
单纯从TemporalProcessTimeJoinOperator
的实现来看,无论是(LATERAL TemporalTableFunction(o.proctime)
)形式的 temporal table function join还是 (FOR SYSTEM_TIME AS OF leftTable.processTime
)形式的 temporal table join都是支持的。实际之所以必须使用temporal table function join,如上文提到的出于历史兼容和避免语义歧义考虑。