当前位置: 首页 > news >正文

NoUniqueKey问题和Regular join介绍

问题背景

在flink任务中,遇到了 NoUniqueKey Join的情况,导致了数据膨胀,和下游结果与数据库数据不一致问题

那NoUniqueKey Join为什么会导致问题呢,下面是其中一种场景示例:

1dc083e59e3542c1beedf428bcdad9a0.jpeg

为什么会出现 NoUniqueKey :原因有很多,较为常见的是下方示例,我们通常指使用主键作为pk,而往往业务场景使用的是唯一建进行关联和查询,所以pk就丢了(疑问:flink自身不可以透传pk吗),进行关联时就变成了NoUniqueKey

怎么解决(以下方场景示例):

  1. 查询字段带有 pk 字段
  2. 提前基于关联字段做group
  3. 将pk字段改为 left_code

上诉3中时通用方案,尤其针对下面这个常见案列,具体原因还要具体分析

示例

CREATE TABLE left_table (`left_id` BIGINT     COMMENT '自增主键',`left_code` STRING   COMMENT '唯一建',`left_code` STRING   COMMENT 'left_table唯一建',`left_created_at` TIMESTAMP ,`left_updated_at` TIMESTAMP  ,PRIMARY KEY (`left_id`) NOT ENFORCED
) WITH (
);CREATE TABLE right_table (`right_id` INT    ,`right_code` STRING   COMMENT 'left_table唯一建',`left_code` STRING   COMMENT 'left_table唯一建',`right_state` STRING   COMMENT '状态',`right_created_at` TIMESTAMP      COMMENT '创建时间',`right_updated_at` TIMESTAMP       COMMENT '更新时间',right_proc_time as proctime(),PRIMARY KEY (`right_id`) NOT ENFORCED
) WITH (
);-- 定义输出表,省略
INSERT INTO print_table
SELECTleft.left_code,left.left_created_at,left.left_updated_at,right.right_code,right.right_created_at,right.right_updated_at
FROM left_table left
JOIN right_table right
ON left.left_code = right.right_code
;

它的执行计划中JOIN是:

leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]

Regular join 简单介绍

关联逻辑

核心类org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator

 private void processElement(RowData input,JoinRecordStateView inputSideStateView,JoinRecordStateView otherSideStateView,boolean inputIsLeft)throws Exception {boolean inputIsOuter = inputIsLeft ? leftIsOuter : rightIsOuter;boolean otherIsOuter = inputIsLeft ? rightIsOuter : leftIsOuter;boolean isAccumulateMsg = RowDataUtil.isAccumulateMsg(input);RowKind inputRowKind = input.getRowKind();input.setRowKind(RowKind.INSERT); // erase RowKind for later state updatingAssociatedRecords associatedRecords =AssociatedRecords.of(input, inputIsLeft, otherSideStateView, joinCondition);if (isAccumulateMsg) { // record is accumulateif (inputIsOuter) { // input side is outerOuterJoinRecordStateView inputSideOuterStateView =(OuterJoinRecordStateView) inputSideStateView;if (associatedRecords.isEmpty()) { // there is no matched rows on the other side// send +I[record+null]outRow.setRowKind(RowKind.INSERT);outputNullPadding(input, inputIsLeft);// state.add(record, 0)inputSideOuterStateView.addRecord(input, 0);} else { // there are matched rows on the other sideif (otherIsOuter) { // other side is outerOuterJoinRecordStateView otherSideOuterStateView =(OuterJoinRecordStateView) otherSideStateView;for (OuterRecord outerRecord : associatedRecords.getOuterRecords()) {RowData other = outerRecord.record;// if the matched num in the matched rows == 0if (outerRecord.numOfAssociations == 0) {// send -D[null+other]outRow.setRowKind(RowKind.DELETE);outputNullPadding(other, !inputIsLeft);} // ignore matched number > 0// otherState.update(other, old + 1)otherSideOuterStateView.updateNumOfAssociations(other, outerRecord.numOfAssociations + 1);}}// send +I[record+other]soutRow.setRowKind(RowKind.INSERT);for (RowData other : associatedRecords.getRecords()) {output(input, other, inputIsLeft);}// state.add(record, other.size)inputSideOuterStateView.addRecord(input, associatedRecords.size());}} else { // input side not outer// state.add(record)inputSideStateView.addRecord(input);if (!associatedRecords.isEmpty()) { // if there are matched rows on the other sideif (otherIsOuter) { // if other side is outerOuterJoinRecordStateView otherSideOuterStateView =(OuterJoinRecordStateView) otherSideStateView;for (OuterRecord outerRecord : associatedRecords.getOuterRecords()) {if (outerRecord.numOfAssociations== 0) { // if the matched num in the matched rows == 0// send -D[null+other]outRow.setRowKind(RowKind.DELETE);outputNullPadding(outerRecord.record, !inputIsLeft);}// otherState.update(other, old + 1)otherSideOuterStateView.updateNumOfAssociations(outerRecord.record, outerRecord.numOfAssociations + 1);}// send +I[record+other]soutRow.setRowKind(RowKind.INSERT);} else {// send +I/+U[record+other]s (using input RowKind)outRow.setRowKind(inputRowKind);}for (RowData other : associatedRecords.getRecords()) {output(input, other, inputIsLeft);}}// skip when there is no matched rows on the other side}} else { // input record is retract// state.retract(record)inputSideStateView.retractRecord(input);if (associatedRecords.isEmpty()) { // there is no matched rows on the other sideif (inputIsOuter) { // input side is outer// send -D[record+null]outRow.setRowKind(RowKind.DELETE);outputNullPadding(input, inputIsLeft);}// nothing to do when input side is not outer} else { // there are matched rows on the other sideif (inputIsOuter) {// send -D[record+other]soutRow.setRowKind(RowKind.DELETE);} else {// send -D/-U[record+other]s (using input RowKind)outRow.setRowKind(inputRowKind);}for (RowData other : associatedRecords.getRecords()) {output(input, other, inputIsLeft);}// if other side is outerif (otherIsOuter) {OuterJoinRecordStateView otherSideOuterStateView =(OuterJoinRecordStateView) otherSideStateView;for (OuterRecord outerRecord : associatedRecords.getOuterRecords()) {if (outerRecord.numOfAssociations == 1) {// send +I[null+other]outRow.setRowKind(RowKind.INSERT);outputNullPadding(outerRecord.record, !inputIsLeft);} // nothing else to do when number of associations > 1// otherState.update(other, old - 1)otherSideOuterStateView.updateNumOfAssociations(outerRecord.record, outerRecord.numOfAssociations - 1);}}}}}

主要逻辑实际上就两步:

  1. 将数据存储于本侧的状态中
  2. 根据join key去另一侧的状态中获取数据并且match

在flinksql对changelog的处理中将数据分为了Accumulate(insert/update_after)和Retract(delete/update_before)两类。

如果该消息类型为Accumulate,则处理逻辑的伪代码如下:

//record is ACC
if input side is outer //本侧是outer joinif no matched row on the other side //另一侧没有匹配记录send +[record + null]state.add(record, 0) // 0 表示另一侧没有关联的记录else // other.size > 0if other side is outerif (associated rows in matched rows == 0)//另一侧之前在本侧没有匹配的记录,所以需要撤回之前的 [null + other]send -[null + other]elseskipendifotherState.update(other, old + 1) //另一侧关联的记录 + 1endifsend +[record, other]s //另一侧有多少匹配的记录就发送多少条state.add(record, other.size) //更新状态endif
else //本侧不是 outer joinstate.add(record)if no matched row on the other side //另一侧没有匹配记录skip //无需输出else // other.size > 0if other size is outerif (associated rows in matched rows == 0) send -[null + other]elseskipendifotherState.update(other, old + 1) //另一侧关联的记录 + 1endifsend +[record + other]s //另一侧有多少匹配的记录就发送多少条endif
endif

如果该消息为Retract,则处理逻辑的伪代码如下:

//record is RETRACT
state.retract(record)
if no matched rows on the other side //另一侧没有关联记录if input side is outersend -[record + null]endif
else //另一侧存在关联记录send -[record, other]s //要撤回已发送的关联记录if other side is outerif the matched num in the matched rows == 0, this should never happen!if the matched num in the matched rows == 1, send +[null + other]if the matched num in the matched rows > 1, skipotherState.update(other, old - 1) //另一侧关联的记录数 - 1endif
endif

inner join的处理相对简单,重点在于当一侧流来数据去另一侧进行匹配的时候,会获取多少个匹配值,并且由于双流join为了保证join的准确性,需要将左右侧历史数据存放于状态中,因此除了给状态设置ttl,flink本身也对双流join的状态数据结构做了一些设计。

状态存储

根据 JoinInputSideSpec 中输入侧的特点(是否包含唯一键、关联键是否包含唯一键),Flink SQL 设计了几种不同的状态存储结构,即 JoinKeyContainsUniqueKey, InputSideHasUniqueKey 和 InputSideHasNoUniqueKey,分别如下:

f0fc5936e27243098d3ac12972a4c462.png

 public interface JoinRecordStateView {/*** Add a new record to the state view.*/void addRecord(BaseRow record) throws Exception;/*** Retract the record from the state view.*/void retractRecord(BaseRow record) throws Exception;/*** Gets all the records under the current context (i.e. join key).*/Iterable<BaseRow> getRecords() throws Exception;
}

这些类都基于JoinRecordStateView进行函数实现:分别是往状态中添加一条记录,往状态中回撤一条记录,获取关联记录。

StreamingJoinOperator中的状态即使用MapState(键控状态),key就是当前关联记录的join key,在不同情况下不同的状态存储结构如下:

7ecc1133356b4ac8bf6cbf0a498f6508.png

 根据上面表格,可得出不同的join key和unique key的状态存储情况:

  1. 如果join key中包含unique key,那么一个join key只是对应一条记录
  2. 如果join key不包含unique key,那么一个join key 对应多条记录,但这些记录的unique key一定不同
  3. 如果输入没有unique key,那么只能使用ListState,当然,出于性能考虑flink仍然采用的MapState,只是key值即为记录本身

下图给一个inner join的例子,两侧的状态均为InputSideHasNoUniqueKey

7c4f8349e33d416d87bcc520960ef8ff.png

unique key的获取与判断

FlinkSql的解析是在calcite的基础上做了二次开发,Unique key的获取和判断发生在planner的Optimize阶段,即将AST语法树(SqlNode)转化成关系代数式(RelNode)和行表达式(RexNode),然后从RelNode中判断是否有unique key。

核心类:org.apache.flink.table.planner.plan.metadata.FlinkRelMdUpsertKeys和org.apache.flink.table.planner.plan.metadata.FlinkRelMdUniqueKeys

private def getJoinUniqueKeys(joinInfo: JoinInfo,joinRelType: JoinRelType,left: RelNode,right: RelNode,mq: RelMetadataQuery,ignoreNulls: Boolean): JSet[ImmutableBitSet] = {val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)val rightUniqueKeys = mq.getUniqueKeys(right, ignoreNulls)getJoinUniqueKeys(joinRelType, left.getRowType, leftUniqueKeys, rightUniqueKeys,mq.areColumnsUnique(left, joinInfo.leftSet, ignoreNulls),mq.areColumnsUnique(right, joinInfo.rightSet, ignoreNulls))}def getJoinUniqueKeys(joinRelType: JoinRelType,leftType: RelDataType,leftUniqueKeys: JSet[ImmutableBitSet],rightUniqueKeys: JSet[ImmutableBitSet],isLeftUnique: JBoolean,isRightUnique: JBoolean): JSet[ImmutableBitSet] = {// first add the different combinations of concatenated unique keys// from the left and the right, adjusting the right hand side keys to// reflect the addition of the left hand side// NOTE zfong 12/18/06 - If the number of tables in a join is large,// the number of combinations of unique key sets will explode. If// that is undesirable, use RelMetadataQuery.areColumnsUnique() as// an alternative way of getting unique key information.val retSet = new JHashSet[ImmutableBitSet]val nFieldsOnLeft = leftType.getFieldCountval rightSet = if (rightUniqueKeys != null) {val res = new JHashSet[ImmutableBitSet]rightUniqueKeys.foreach { colMask =>val tmpMask = ImmutableBitSet.buildercolMask.foreach(bit => tmpMask.set(bit + nFieldsOnLeft))res.add(tmpMask.build())}if (leftUniqueKeys != null) {res.foreach { colMaskRight =>leftUniqueKeys.foreach(colMaskLeft => retSet.add(colMaskLeft.union(colMaskRight)))}}res} else {null}// determine if either or both the LHS and RHS are unique on the// equi-join columnsval leftUnique = isLeftUniqueval rightUnique = isRightUnique// if the right hand side is unique on its equijoin columns, then we can// add the unique keys from left if the left hand side is not null// generatingif (rightUnique != null&& rightUnique&& (leftUniqueKeys != null)&& !joinRelType.generatesNullsOnLeft) {retSet.addAll(leftUniqueKeys)}// same as above except left and right are reversedif (leftUnique != null&& leftUnique&& (rightSet != null)&& !joinRelType.generatesNullsOnRight) {retSet.addAll(rightSet)}retSet}private def getTableUniqueKeys(relOptTable: RelOptTable): JSet[ImmutableBitSet] = {relOptTable match {case sourceTable: TableSourceTable =>val catalogTable = sourceTable.catalogTablecatalogTable match {case act: CatalogTable =>val builder = ImmutableSet.builder[ImmutableBitSet]()val schema = act.getResolvedSchemaif (schema.getPrimaryKey.isPresent) {// use relOptTable's type which may be projected based on original schemaval columns = relOptTable.getRowType.getFieldNamesval primaryKeyColumns = schema.getPrimaryKey.get().getColumns// we check this because a portion of a composite primary key is not uniqueif (columns.containsAll(primaryKeyColumns)) {val columnIndices = primaryKeyColumns.map(c => columns.indexOf(c))builder.add(ImmutableBitSet.of(columnIndices: _*))}}val uniqueSet = sourceTable.uniqueKeysSet.orElse(null)if (uniqueSet != null) {builder.addAll(uniqueSet)}val result = builder.build()if (result.isEmpty) null else result}case table: FlinkPreparingTableBase => table.uniqueKeysSet.orElse(null)case _ => null}}def getProjectUniqueKeys(projects: JList[RexNode],typeFactory: RelDataTypeFactory,getInputUniqueKeys :() => util.Set[ImmutableBitSet],ignoreNulls: Boolean): JSet[ImmutableBitSet] = {// LogicalProject maps a set of rows to a different set;// Without knowledge of the mapping function(whether it// preserves uniqueness), it is only safe to derive uniqueness// info from the child of a project when the mapping is f(a) => a.//// Further more, the unique bitset coming from the child needsval projUniqueKeySet = new JHashSet[ImmutableBitSet]()val mapInToOutPos = new JHashMap[Int, JArrayList[Int]]()def appendMapInToOutPos(inIndex: Int, outIndex: Int): Unit = {if (mapInToOutPos.contains(inIndex)) {mapInToOutPos(inIndex).add(outIndex)} else {val arrayBuffer = new JArrayList[Int]()arrayBuffer.add(outIndex)mapInToOutPos.put(inIndex, arrayBuffer)}}// Build an input to output position map.projects.zipWithIndex.foreach {case (projExpr, i) =>projExpr match {case ref: RexInputRef => appendMapInToOutPos(ref.getIndex, i)case a: RexCall if ignoreNulls && a.getOperator.equals(SqlStdOperatorTable.CAST) =>val castOperand = a.getOperands.get(0)castOperand match {case castRef: RexInputRef =>val castType = typeFactory.createTypeWithNullability(projExpr.getType, true)val origType = typeFactory.createTypeWithNullability(castOperand.getType, true)if (castType == origType) {appendMapInToOutPos(castRef.getIndex, i)}case _ => // ignore}//rename or castcase a: RexCall if (a.getKind.equals(SqlKind.AS) || isFidelityCast(a)) &&a.getOperands.get(0).isInstanceOf[RexInputRef] =>appendMapInToOutPos(a.getOperands.get(0).asInstanceOf[RexInputRef].getIndex, i)case _ => // ignore}}if (mapInToOutPos.isEmpty) {// if there's no RexInputRef in the projected expressions// return empty set.return projUniqueKeySet}val childUniqueKeySet = getInputUniqueKeys()if (childUniqueKeySet != null) {// Now add to the projUniqueKeySet the child keys that are fully// projected.childUniqueKeySet.foreach { colMask =>val filerInToOutPos = mapInToOutPos.filter { inToOut =>colMask.asList().contains(inToOut._1)}val keys = filerInToOutPos.keysif (colMask.forall(keys.contains(_))) {val total = filerInToOutPos.map(_._2.size).productfor (i <- 0 to total) {val tmpMask = ImmutableBitSet.builder()filerInToOutPos.foreach { inToOut =>val outs = inToOut._2tmpMask.set(outs.get(i % outs.size))}projUniqueKeySet.add(tmpMask.build())}}}}projUniqueKeySet}def getUniqueKeysOnAggregate(grouping: Array[Int]): util.Set[ImmutableBitSet] = {// group by keys form a unique keyImmutableSet.of(ImmutableBitSet.of(grouping.indices: _*))}

由于sql场景很多因此文档里只列举几个场景:

  1. join的uniqueKey需要判断两侧的uniquekey情况并进行组合判断
  2. scan的uniqueKey由scan的列是否全部包含该table定义中primary key来决定
  3. 投影的uniqueKey由投影的列是否全部包含输入子Node的unique key来决定
  4. 聚合的uniqueKey即是group by的unique key

比如上诉案例,两张表uniquekey均为为id,然后对scan后结果做投影,此时投影列不包含子Node unique key,唯一键丢失。

基于上面两个投影做join,因此本次join操作的左侧为no unique key,右侧则有unique key,又由于本次为inner join所以join后的投影也没有unique key

大致可以归纳为以下几个步骤或方法(不仅仅是FlinkSql,所有sql逻辑都这样):

  1. 静态信息:部分rel节点可以从表的元数据中获取unique key信息,例如table scan(Flink中TableSourceScan)
  2. 推导:根据输入的uniquekey信息来推导新的unique key,例如投影
  3. 聚合和分组:在进行聚合操作时,分组的key会形成一个新的unique key
  4. join:根据join类型和join条件来推导结果的unique key

相关文章:

  • UE5 脚部贴地不穿过地板方案
  • C++命名空间
  • USB接口的PCB设计
  • K8S - Helm 入门与实战 - 应用部署与依赖治理
  • Java 8 非对称加密代码示例
  • 《让内容“活”起来:Flutter社交应用瀑布流布局的破界实践》
  • 神经网络中之多类别分类:从基础到高级应用
  • Ollama本地部署大模型指南
  • 2025年国考行测 试卷结构和备考建议
  • uv全功能更新:统一管理Python项目、工具、脚本和环境的终极解决方案
  • ONLYOFFICE 文档 API :新的文档格式支持及配置方法
  • 信息论04:从信息熵到互信息——信息共享的数学度量
  • Elasticsearch知识汇总之ElasticSearch监控方案
  • Linux中为某个进程临时指定tmp目录
  • Linux环境基础与开发工具使用
  • MySQL从入门到精通(二):Windows和Mac版本MySQL安装教程
  • BERT 微调
  • 线性回归有截距
  • 七参数的计算与检验
  • Informer源码解析4——完整注意力机制解读
  • 安徽六安原市长潘东旭,已任省市场监督管理局党组书记、局长
  • 五一期间7名游客接连被困青海荒漠,警方提醒严禁非法穿越
  • 李云泽:房地产“白名单”贷款审批通过金额增至6.7万亿元
  • 奥迪4S店内揭车衣时遭“连环车损”,双方因赔偿分歧陷僵局
  • 马上评|独生子女奖励不能“私了”,政府诚信是第一诚信
  • 新加坡总理黄循财领导人民行动党胜选,外交部回应