Flink 内部状态管理:PriorityQueueSet解析
HeapPriorityQueue
如果说 AbstractHeapPriorityQueue
搭建了基于数组的优先级队列的骨架,那么 HeapPriorityQueue
就是用经典的 二叉堆(Binary Heap) 算法填充了这个骨架,使其成为一个功能完备、高性能的优先级队列。
HeapPriorityQueue
是 Flink 中一个基础且核心的内存数据结构。它是一个标准的最小堆(Min-Heap)实现,用于存储和管理实现了 HeapPriorityQueueElement
接口的对象。
- 具体实现:它继承了
AbstractHeapPriorityQueue
,并实现了其所有抽象方法,提供了完整的堆操作逻辑。 - 高性能:添加(add)和移除堆顶(poll)操作的时间复杂度都是 O(log n),而访问堆顶(peek)是 O(1)。得益于
HeapPriorityQueueElement
接口,移除任意指定元素也是 O(log n)。 - 核心用途:在 Flink 中,它被广泛用于需要优先级排序的场景,最典型的就是定时器服务(Timer Service)。当操作员(Operator)注册一个定时器时,这个定时器对象就被加入到
HeapPriorityQueue
中。此外,它也作为更复杂数据结构(如KeyGroupPartitionedPriorityQueue
)的构建块。
HeapPriorityQueue
通过实现父类的抽象方法来定义其具体的堆行为。
堆的起始索引
// .../** The index of the head element in the array that represents the heap. */private static final int QUEUE_HEAD_INDEX = 1;// ...@Overrideprotected int getHeadElementIndex() {return QUEUE_HEAD_INDEX;}
// ...
HeapPriorityQueue
选择让堆从数组索引 1
开始,而不是 0
。这是一个经典的小技巧,这样做的好处是父子节点的索引计算会变得非常简洁和高效:
- 对于索引为
i
的节点,其父节点索引是i >>> 1
(即i / 2
)。 - 其左子节点索引是
i << 1
(即i * 2
)。 - 其右子节点索引是
i << 1 + 1
(即i * 2 + 1
)。 这避免了 0-based 索引中2*i+1
和2*i+2
的加法操作,在频繁的堆操作中能带来微小的性能提升。
元素比较器
// .../** Comparator for the priority of contained elements. */@Nonnull protected final PriorityComparator<T> elementPriorityComparator;/*** Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.** @param elementPriorityComparator comparator for the priority of contained elements.* @param minimumCapacity the minimum and initial capacity of this priority queue.*/@SuppressWarnings("unchecked")public HeapPriorityQueue(@Nonnull PriorityComparator<T> elementPriorityComparator,@Nonnegative int minimumCapacity) {super(minimumCapacity);this.elementPriorityComparator = elementPriorityComparator;}
// ...
构造函数接收一个 PriorityComparator<T>
。这个比较器定义了元素“优先级”的比较逻辑。所有堆的调整操作(上浮、下沉)都依赖这个比较器来决定元素应该向哪个方向移动。例如,isElementPriorityLessThen
方法就封装了这个比较逻辑。
添加元素 addInternal
// ...@Overrideprotected void addInternal(@Nonnull T element) {final int newSize = increaseSizeByOne();moveElementToIdx(element, newSize);siftUp(newSize);}
// ...
添加元素的逻辑遵循标准堆的插入步骤:
increaseSizeByOne()
: 增加size
计数,并按需对底层数组进行扩容。moveElementToIdx(element, newSize)
: 将新元素放置在堆的末尾(即数组的size
位置)。siftUp(newSize)
: 对新加入的元素执行“上浮”操作,将其与父节点比较,如果优先级更高(值更小),则交换位置,直到找到它在堆中的正确位置。
删除元素 removeInternal
// ...@Overrideprotected T removeInternal(int removeIdx) {T[] heap = this.queue;T removedValue = heap[removeIdx];assert removedValue.getInternalIndex() == removeIdx;final int oldSize = size;if (removeIdx != oldSize) {T element = heap[oldSize];moveElementToIdx(element, removeIdx);adjustElementAtIndex(element, removeIdx);}heap[oldSize] = null;--size;return removedValue;}
// ...
删除任意位置 removeIdx
元素的逻辑如下:
- 记录下要被删除的元素
removedValue
。 - 获取堆的最后一个元素(在
oldSize
位置)。 - 用最后一个元素覆盖掉要被删除的元素
removeIdx
。 - 对这个被移动到
removeIdx
的元素执行adjustElementAtIndex
操作,恢复堆的有序性。 - 将数组原来的最后一个位置置为
null
,并减少size
。
核心堆操作
siftUp
和 siftDown
是维持堆性质(Heap Property)的两个核心方法。
siftUp(int idx)
(上浮)// ... private void siftUp(int idx) {final T[] heap = this.queue;final T currentElement = heap[idx];int parentIdx = idx >>> 1;while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {moveElementToIdx(heap[parentIdx], idx);idx = parentIdx;parentIdx >>>= 1;}moveElementToIdx(currentElement, idx); } // ...
此方法将指定索引
idx
的元素沿着其父节点路径向上移动,直到该元素的优先级不高于其父节点,或者到达堆顶。siftDown(int idx)
(下沉)// ... private void siftDown(int idx) {final T[] heap = this.queue;final int heapSize = this.size;final T currentElement = heap[idx];int firstChildIdx = idx << 1;// ... (find smaller child) ...while (isElementIndexValid(firstChildIdx, heapSize)&& isElementPriorityLessThen(heap[firstChildIdx], currentElement)) {moveElementToIdx(heap[firstChildIdx], idx);idx = firstChildIdx;// ... (find next smaller child) ...}moveElementToIdx(currentElement, idx); } // ...
此方法将指定索引
idx
的元素与其子节点中优先级最高(值最小)的那个进行比较。如果子节点优先级更高,则交换位置,然后继续向下比较和交换,直到该元素的优先级不低于其任何子节点,或者它成为叶子节点。adjustModifiedElement
&adjustElementAtIndex
// ... public void adjustModifiedElement(@Nonnull T element) {final int elementIndex = element.getInternalIndex();if (element == queue[elementIndex]) {adjustElementAtIndex(element, elementIndex);} } // ... private void adjustElementAtIndex(T element, int index) {siftDown(index);if (queue[index] == element) {siftUp(index);} } // ...
adjustModifiedElement
是一个非常实用的公共方法。当队列中某个元素的优先级发生变化后,可以调用此方法来重新调整它在堆中的位置。它的逻辑是先尝试下沉(siftDown
),如果元素没有移动(说明它的优先级没有比子节点低),再尝试上浮(siftUp
)。这样就覆盖了优先级变高和变低两种情况。removeInternal
中也复用了这个逻辑。
在 Flink 工程中的应用
通过分析相关代码,我们可以看到 HeapPriorityQueue
的实际应用场景:
- 作为元数据堆(Heap of Heaps): 在
KeyGroupPartitionedPriorityQueue
中,有一个heapOfKeyGroupedHeaps
字段,它就是一个HeapPriorityQueue
。这个队列里存放的不是最终的元素,而是其他优先级队列(每个队列对应一个 KeyGroup)。通过这个“堆中堆”的结构,KeyGroupPartitionedPriorityQueue
可以快速地从所有 KeyGroup 中找到全局优先级最高的元素。 - 作为基础被扩展:
HeapPriorityQueueSet
继承了HeapPriorityQueue
,并增加了按 KeyGroup 去重的逻辑。BatchExecutionInternalPriorityQueueSet
也继承了它,为批处理模式增加了去重功能。
总结
HeapPriorityQueue
是 AbstractHeapPriorityQueue
的一个健壮、高效的具体实现。它通过经典的二叉堆算法,结合父类提供的快速索引能力,为 Flink 提供了一个功能强大的内存优先级队列。它不仅直接服务于定时器等场景,还作为更高级、更复杂状态结构(如分区的优先级队列)的基础组件,在 Flink 的状态管理体系中扮演着不可或缺的角色。
HeapPriorityQueueSet
HeapPriorityQueueSet
是 Flink 状态管理中一个非常关键的类。它在 HeapPriorityQueue
的基础上,增加了 “集合(Set)”语义和“按键组(Key-Group)分区” 的能力。
HeapPriorityQueueSet
本质上是一个带去重功能的分区优先级队列。
- 继承关系:
public class HeapPriorityQueueSet<T> extends HeapPriorityQueue<T>
它继承了HeapPriorityQueue
,因此它本身就是一个功能完备的优先级队列,拥有堆的所有特性。 - 核心增强1:Set 语义(去重): 它要确保队列中不会有重复的元素。这里的“重复”是基于对象的
equals()
方法来判断的。 - 核心增强2:Key-Grouped(按键组分区): 它实现了
KeyGroupedInternalPriorityQueue<T>
接口。这意味着它内部管理的元素是按 Key-Group 组织的。一个HeapPriorityQueueSet
实例只负责管理当前 TaskManager 上分配到的 Key-Group 范围内的元素。
实现原理
为了同时实现“优先级队列”和“集合”这两种数据结构的特性,HeapPriorityQueueSet
采用了一种经典的设计模式:组合使用两种数据结构。
HeapPriorityQueue
(父类): 负责维护元素的优先级顺序。所有元素都存储在父类的堆数组中,保证poll()
操作能以 O(log n) 的时间复杂度返回优先级最高的元素。HashMap<T, T>[] deduplicationMapsByKeyGroup
: 负责实现快速去重和查找。这是一个HashMap
数组,数组的每个位置对应一个 Key-Group。
我们来详细看一下这个关键字段:
// .../*** This array contains one hash set per key-group. The sets are used for fast de-duplication and* deletes of elements.*/private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
// ...
- 这是一个
HashMap
的数组。数组的长度等于当前这个HeapPriorityQueueSet
实例所负责的 Key-Group 的数量。 - 数组中的每一个
HashMap
都只存储属于对应 Key-Group 的元素。 - 这个 Map 提供了 O(1) 的平均时间复杂度来检查一个元素是否存在,从而实现了高效的去重。
核心方法重写 (Override)
HeapPriorityQueueSet
重写了父类的 add
, remove
, poll
和 clear
方法,以同步维护 HeapPriorityQueue
和 deduplicationMapsByKeyGroup
的一致性。
add(@Nonnull T element)
// ...@Overridepublic boolean add(@Nonnull T element) {return getDedupMapForElement(element).putIfAbsent(element, element) == null&& super.add(element);}
// ...
添加逻辑非常清晰:
getDedupMapForElement(element)
: 首先,根据元素element
提取出它的 Key,计算出所属的 Key-Group ID,然后找到对应的HashMap
。.putIfAbsent(element, element) == null
: 尝试将元素放入这个HashMap
。putIfAbsent
的特性是:如果 Map 中不存在这个 Key(根据equals()
和hashCode()
判断),则放入并返回null
;如果已存在,则不放入并返回已存在的值。&& super.add(element)
: 只有当putIfAbsent
返回null
时(即元素是新的,不重复),才会继续调用父类的super.add(element)
将其真正加入到底层的堆数组中。
这个实现巧妙地保证了只有不重复的元素才能被加入,从而实现了 Set 语义。
remove(@Nonnull T toRemove)
// ...@Overridepublic boolean remove(@Nonnull T toRemove) {T storedElement = getDedupMapForElement(toRemove).remove(toRemove);return storedElement != null && super.remove(storedElement);}
// ...
删除逻辑与添加类似:
getDedupMapForElement(toRemove).remove(toRemove)
: 首先尝试从对应的HashMap
中删除元素。remove
方法会返回被删除的元素实例,如果不存在则返回null
。storedElement != null
: 检查是否真的从 Map 中删除了一个元素。&& super.remove(storedElement)
: 如果成功从 Map 中删除了元素,再调用父类的super.remove(storedElement)
将其从堆中也删除。注意这里传递的是storedElement
,即从 Map 中返回的那个实例,因为HeapPriorityQueue
的remove
依赖于元素实例内部存储的索引,必须是堆中存储的那个原始实例。
poll()
// ...@Override@Nullablepublic T poll() {final T toRemove = super.poll();return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null;}
// ...
poll
操作是先从优先级队列的源头(堆)开始的:
super.poll()
: 从堆中移除并返回优先级最高的元素。getDedupMapForElement(toRemove).remove(toRemove)
: 如果成功从堆中 poll 出了一个元素,那么也必须将它从去重 Map 中移除,以保持数据一致性。
Key-Group 相关方法
getDedupMapForElement(T element)
: 这是一个私有辅助方法,它封装了从元素到对应HashMap
的查找过程。keyExtractor.extractKeyFromElement(element)
: 使用注入的KeyExtractorFunction
从元素中提取出 Key。KeyGroupRangeAssignment.assignToKeyGroup(...)
: 根据 Key 和总 Key-Group 数计算出全局的 Key-Group ID。getDedupMapForKeyGroup(keyGroup)
: 将全局 Key-Group ID 转换为本地的数组索引,并返回对应的HashMap
。
getSubsetForKeyGroup(int keyGroupId)
: 实现了KeyGroupedInternalPriorityQueue
接口的方法。它允许外部代码获取某个特定 Key-Group 下的所有元素集合。它直接返回了对应HashMap
的keySet()
,非常高效。
总结
HeapPriorityQueueSet
是对 HeapPriorityQueue
的一个功能强大的封装和扩展。它通过组合一个堆(HeapPriorityQueue
)和一个哈希表数组(HashMap[]
),巧妙地实现了一个同时具备优先级排序、集合去重和按键组分区能力的数据结构。
在 Flink 的 Heap State Backend(堆状态后端)中,当需要一个 Timer State(定时器状态)时,HeapPriorityQueuesManager
就会通过 HeapPriorityQueueSetFactory
创建一个 HeapPriorityQueueSet
实例来存储和管理定时器。这确保了同一个 Key 下的同一个定时器不会被重复注册,并且可以高效地按优先级触发。
BatchExecutionInternalPriorityQueueSet
从类名 BatchExecution...
就可以看出,这个类是专门为 Flink 的 批处理(BATCH)执行模式 设计的。它的存在是为了在批处理场景下,对定时器等状态进行一种特化的、更高效的管理。
核心区别:单 Key 处理 vs. 多 Key-Group 处理
这是 BatchExecutionInternalPriorityQueueSet
和 HeapPriorityQueueSet
最本质的区别。
HeapPriorityQueueSet
(流处理模式):- 设计目标是同时管理多个 Key-Group 的数据。
- 它内部有一个
HashMap<T, T>[] deduplicationMapsByKeyGroup
数组,每个HashMap
对应一个 Key-Group。 - 它需要处理来自不同 Key 的数据,并将它们路由到各自的 Key-Group 中。
BatchExecutionInternalPriorityQueueSet
(批处理模式):- 设计目标是一次只处理一个 Key 的所有数据。
- 它内部只有一个简单的
Map<T, T> dedupMap
。 - 在 Flink 的批处理模式下,数据通常会先按 Key 进行排序(Sort)。因此,同一个 Operator 在处理数据时,会连续收到属于同一个 Key 的所有数据。处理完这个 Key 的所有数据后,才会开始处理下一个 Key 的数据。
- 基于这个前提,这个优先级队列不需要同时维护多个 Key-Group 的状态。它只需要一个临时的、用于当前正在处理的 Key 的队列即可。当这个 Key 的数据处理完毕,这个队列就可以被清空(
clear()
),为下一个 Key 的数据做准备。
我们来看代码对比:
HeapPriorityQueueSet
(流模式):
// .../*** This array contains one hash set per key-group. The sets are used for fast de-duplication and* deletes of elements.*/private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
// ...
BatchExecutionInternalPriorityQueueSet
(批模式):
// ...private final Map<T, T> dedupMap = new HashMap<>();
// ...
这个结构上的简化是两者最核心的不同点。
功能上的差异
由于设计目标的差异,BatchExecutionInternalPriorityQueueSet
在功能上做了一些简化和限制。
不支持按 Key-Group 查询:
HeapPriorityQueueSet
实现了getSubsetForKeyGroup(int keyGroupId)
方法,可以返回指定 Key-Group 下的所有元素。而BatchExecutionInternalPriorityQueueSet
则直接抛出UnsupportedOperationException
。// ...@Nonnull@Overridepublic Set<T> getSubsetForKeyGroup(int keyGroupId) {throw new UnsupportedOperationException("Getting subset for key group is not supported in BATCH runtime mode.");} // ...
这是合理的,因为它本身的设计就是“无 Key-Group 感知”的,它只关心当前 Key 的数据集合。
简化的去重逻辑: 它的
add
,remove
,poll
方法的实现逻辑和HeapPriorityQueueSet
非常相似,都是组合了Map
和Heap
的操作。但由于它只有一个dedupMap
,所以不需要像HeapPriorityQueueSet
那样先根据元素计算 Key-Group ID 再找到对应的 Map。它的操作直接针对这个唯一的dedupMap
,逻辑更直接。// ...@Overridepublic boolean add(@Nonnull T element) {return dedupMap.putIfAbsent(element, element) == null && super.add(element);}@Overridepublic boolean remove(@Nonnull T toRemove) {T storedElement = dedupMap.remove(toRemove);return storedElement != null && super.remove(storedElement);} // ...
使用场景
BatchExecutionInternalPriorityQueueSet
被用在 BatchExecutionInternalTimeService
中。这个时间服务是专门为批处理模式下的算子(如 KeyedProcessOperator
)提供定时器功能的。
工作流程大致如下:
- 批处理作业开始,数据按 Key 排序后进入算子。
- 当第一个 Key 的第一条数据到达时,
BatchExecutionInternalTimeService
会创建一个BatchExecutionInternalPriorityQueueSet
实例。 - 这个 Key 的所有后续数据注册的定时器都会被加入到这个实例中。
- 当这个 Key 的所有数据都处理完毕后(通过数据流中的 Key 变化来检测),
BatchExecutionInternalTimeService
会触发这个BatchExecutionInternalPriorityQueueSet
中所有到期的定时器。 - 触发完毕后,调用
clear()
方法清空这个队列。 - 当下一个 Key 的数据到来时,重复以上过程。
这种“用完即弃”的模式,使得 BatchExecutionInternalPriorityQueueSet
的实现可以非常轻量和高效,避免了流处理模式下管理大量 Key-Group 状态的复杂性。
ForStDBCachingPriorityQueueSet
这个类是 Flink 中 ForSt
状态后端(一个基于 RocksDB 的高性能状态后端)对优先级队列的实现。它与我们之前讨论的基于内存的 HeapPriorityQueue
系列完全不同,它的核心是将状态持久化到 RocksDB 中,同时通过内存缓存来优化性能。
ForStDBCachingPriorityQueueSet
的设计目标是在提供持久化保证的同时,尽可能地减少与底层 RocksDB 的昂贵交互(如磁盘 I/O 和 seek 操作)。它通过以下方式实现:
- 持久化存储: 所有元素都序列化后存储在 RocksDB 中。这保证了即使发生故障,定时器等状态也能从快照中恢复。
- 利用 RocksDB 的有序性: RocksDB 是一个键值存储引擎,其键是按字典序排序的。
ForStDBCachingPriorityQueueSet
利用了这一特性。它要求元素的序列化器 (byteOrderProducingSerializer
) 必须保证序列化后的字节数组的字典序与元素的逻辑优先级顺序一致。这样,RocksDB 中存储的第一个键就对应着优先级最高的元素。 - 内存缓存 (
OrderedByteArraySetCache
): 为了避免每次peek()
或poll()
都去查询 RocksDB,它在内存中维护了一个固定大小的缓存。这个缓存 (orderedCache
) 存储了当前 RocksDB 中优先级最高的 N 个元素的序列化字节数组。 - 写穿透(Write-Through)策略: 对队列的任何写操作(add, remove)都会同时更新内存缓存和 RocksDB(通过
ForStDBWriteBatchWrapper
批量写入),以保证两者的数据一致性。
关键组件和字段
// .../** The RocksDB instance that serves as store. */@Nonnull private final RocksDB db;/** Handle to the column family of the RocksDB instance in which the elements are stored. */@Nonnull private final ColumnFamilyHandle columnFamilyHandle;/*** Serializer for the contained elements. The lexicographical order of the bytes of serialized* objects must be aligned with their logical order.*/@Nonnull private final TypeSerializer<E> byteOrderProducingSerializer;/** Wrapper to batch all writes to RocksDB. */@Nonnull private final ForStDBWriteBatchWrapper batchWrapper;/** The key-group id in serialized form. */@Nonnull private final byte[] groupPrefixBytes;/** In memory cache that holds a head-subset of the elements stored in RocksDB. */@Nonnull private final OrderedByteArraySetCache orderedCache;/** Cache for the head element in de-serialized form. */@Nullable private E peekCache;/** This flag is true iff all elements in RocksDB are also contained in the cache. */private boolean allElementsInCache;
// ...
db
,columnFamilyHandle
: RocksDB 的实例和列族句柄,是数据持久化的目的地。byteOrderProducingSerializer
: 关键的序列化器,保证了字节序和逻辑优先级的统一。batchWrapper
: 批量写入包装器,将多次写操作合并为一次,提升 RocksDB 写入性能。groupPrefixBytes
: 每个 RocksDB 的 Key 都以 Key-Group ID 作为前缀,用于隔离不同 Key-Group 的数据。orderedCache
: 核心的内存缓存。它是一个有序的字节数组集合(用TreeSet<byte[]>
实现),存储了优先级最高的 N 个元素的序列化形式。peekCache
: 对orderedCache
中最高优先级元素(队头)的反序列化缓存。避免每次peek()
都进行反序列化。allElementsInCache
: 一个优化标志。如果为true
,表示 RocksDB 中的所有元素都已在内存缓存中,可以避免不必要的 RocksDB 查询。
peek()
和 poll()
- 读操作
// ...@Nullable@Overridepublic E peek() {checkRefillCacheFromStore();// ... (logic to return from peekCache or deserialize from orderedCache) ...}@Nullable@Overridepublic E poll() {checkRefillCacheFromStore();final byte[] firstBytes = orderedCache.pollFirst();// ...// write-through syncremoveFromRocksDB(firstBytes);// ...}
// ...
checkRefillCacheFromStore()
: 这是所有操作的第一步。它检查内存缓存orderedCache
是否为空。如果为空,并且allElementsInCache
为false
,说明缓存需要从 RocksDB 重新填充。它会创建一个 RocksDB 迭代器,从 RocksDB 中读取数据并批量加载(bulkLoadFromOrderedIterator
)到缓存中,直到缓存满或者 RocksDB 遍历完毕。peek()
: 在确保缓存有数据后,peek()
操作优先返回peekCache
。如果peekCache
为空,它会从orderedCache
中取出第一个字节数组(不移除),反序列化它,存入peekCache
并返回。poll()
: 在确保缓存有数据后,poll()
从orderedCache
中移除第一个字节数组,然后通过batchWrapper
将对应的 Key 从 RocksDB 中也删除(写穿透),最后返回反序列化后的元素。
add()
和 remove()
- 写操作
// ...@Overridepublic boolean add(@Nonnull E toAdd) {checkRefillCacheFromStore();final byte[] toAddBytes = serializeElement(toAdd);// ... (complex logic to decide whether to add to cache) ...// write-through syncaddToRocksDB(toAddBytes);// ...}@Overridepublic boolean remove(@Nonnull E toRemove) {checkRefillCacheFromStore();// ...final byte[] toRemoveBytes = serializeElement(toRemove);// write-through syncremoveFromRocksDB(toRemoveBytes);orderedCache.remove(toRemoveBytes);// ...}
// ...
add(E toAdd)
:- 序列化新元素
toAdd
为toAddBytes
。 - 决策是否加入缓存: 这是一个核心优化。它会比较
toAddBytes
和缓存中优先级最低的元素(orderedCache.peekLast()
)。- 如果新元素的优先级高于缓存中优先级最低的元素,或者缓存未满,那么新元素有资格进入缓存。如果缓存已满,会先将缓存中优先级最低的元素踢出(
pollLast
),再将新元素加入。 - 如果新元素的优先级低于缓存中所有元素,那么它不会被加入缓存,只会写入 RocksDB。
- 如果新元素的优先级高于缓存中优先级最低的元素,或者缓存未满,那么新元素有资格进入缓存。如果缓存已满,会先将缓存中优先级最低的元素踢出(
- 写穿透: 无论是否加入缓存,新元素最终都会通过
batchWrapper
写入 RocksDB。
- 序列化新元素
remove(E toRemove)
:- 序列化要删除的元素
toRemove
为toRemoveBytes
。 - 写穿透: 同时从 RocksDB(通过
batchWrapper
)和内存缓存orderedCache
中删除这个字节数组。
- 序列化要删除的元素
总结
ForStDBCachingPriorityQueueSet
是一个为大规模、持久化状态设计的复杂但高效的优先级队列。它牺牲了纯内存操作的速度,换来了巨大的容量和故障恢复能力。其设计的精髓在于:
- 利用 RocksDB 的有序性来模拟优先级队列。
- 通过一个智能的内存缓存,将最高优先级的元素(最常被访问)保留在内存中,极大地优化了
peek
和poll
操作的性能,使其在大多数情况下能避免直接访问磁盘。 - 通过写穿透和批量写入机制,在保证数据一致性的同时,提升了写入效率。
这种设计是 Flink 能够支持 TB 级别状态的关键技术之一。
RocksDBCachingPriorityQueueSet
Flink 原生 RocksDB 状态后端 (flink-statebackend-rocksdb
) 中对持久化优先级队列的实现。而 ForStDBCachingPriorityQueueSet
是 ForSt 状态后端 (flink-statebackend-forst
) 中对应的实现。
由于 ForSt 本身就是 RocksDB 的一个分支(fork),旨在进行一些特定的优化,所以 Flink 为这两个后端提供了功能上对等的、几乎是逐行复制的优先级队列实现。
相同之处:核心架构和逻辑
RocksDBCachingPriorityQueueSet
与 ForStDBCachingPriorityQueueSet
共享完全相同的核心架构:
- 持久化存储: 使用 RocksDB 作为底层的键值存储。
- 有序性依赖: 依赖一个特殊的序列化器 (
byteOrderProducingSerializer
),该序列化器能保证元素序列化后的字节序与它们的逻辑优先级顺序一致。 - 内存缓存: 内部都有一个
OrderedByteArraySetCache
(通常由TreeSet
实现),用于缓存 RocksDB 中优先级最高的 N 个元素的序列化字节。 - 写穿透策略: 对队列的
add
/remove
操作会同时更新内存缓存和 RocksDB。 - 缓存填充: 当缓存为空时,都会从 RocksDB 中读取数据来重新填充缓存 (
checkRefillCacheFromStore
)。 peek
缓存: 都有一个peekCache
字段,用于缓存队头元素的反序列化对象,避免重复反序列化。seekHint
优化:RocksDBCachingPriorityQueueSet.java
// .../*** This holds the key that we use to seek to the first element in RocksDB, to improve* seek/iterator performance.*/@Nonnull private byte[] seekHint; // ...@Nullable@Overridepublic E poll() { // ...if (orderedCache.isEmpty()) {seekHint = firstBytes;} // ...}
seekHint
优化。当缓存被取空时,它会记录下最后一个被取出的元素。下次需要从 RocksDB 填充缓存时,迭代器可以直接seek
到这个seekHint
的位置,而无需从头开始扫描,这在队列非常大时能显著提升性能。
如果你逐一比较 add
, poll
, peek
, remove
等核心方法的实现,你会发现它们的逻辑是完全一致的。
不同之处
这两个类之间唯一的、也是最关键的区别在于它们依赖的底层库和包装类不同。
RocksDBCachingPriorityQueueSet
:- 位于包
org.apache.flink.state.rocksdb
。 - 依赖原生 RocksDB 的 Java 包装:
import org.rocksdb.RocksDB;
- 使用
RocksDBWriteBatchWrapper
来进行批量写入。 - 由
RocksDBPriorityQueueSetFactory
创建。
- 位于包
ForStDBCachingPriorityQueueSet
:- 位于包
org.apache.flink.state.forst.sync
。 - 依赖 ForSt DB 的 Java 包装:
import org.forstdb.RocksDB;
- 使用
ForStDBWriteBatchWrapper
来进行批量写入。 - 由
ForStPriorityQueueSetFactory
创建。
- 位于包
总结
RocksDBCachingPriorityQueueSet
和 ForStDBCachingPriorityQueueSet
并非功能不同的两个类,而更像是同一个设计模式在两个不同后端上的平行实现。你可以将 ForStDBCachingPriorityQueueSet
理解为将 RocksDBCachingPriorityQueueSet
的代码复制一份,然后将所有 org.rocksdb.*
的引用替换为 org.forstdb.*
的结果。
这种代码结构清晰地表明:
- Flink 对持久化优先级队列的设计是统一的。
- 为了支持不同的(但相似的)底层存储,Flink 选择了提供独立的、几乎是镜像的实现,而不是通过复杂的抽象和接口来统一它们,这可能是为了避免抽象带来的性能开销,并保持每个后端实现的独立性和内聚性。
因此,当你理解了其中一个的运作原理,就等于完全理解了另一个。它们的区别不在于“做什么”或“怎么做”,而在于“用谁的工具来做”。
AbstractHeapPriorityQueue
AbstractHeapPriorityQueue
为 Flink 中所有基于内存数组实现的优先级队列提供了通用的骨架。它的主要特点是:
- 基于数组的堆结构:内部使用一个对象数组
T[] queue
来存储元素,并通过计算索引来维护二叉堆的逻辑结构。 - 支持快速删除:通过泛型约束
T extends HeapPriorityQueueElement
,要求队列中的每个元素都能记录自身在数组中的索引。这使得在 O(log n) 时间复杂度内删除任意元素成为可能,而不仅仅是堆顶元素。这对于 Flink 的定时器管理(例如,取消一个已注册的定时器)至关重要。 - 抽象设计:它定义了优先级队列的核心逻辑和公共方法(如
poll
,peek
,add
,remove
),但将具体的堆操作(如元素的上浮siftUp
和下沉siftDown
)留给子类实现。
// ...
public abstract class AbstractHeapPriorityQueue<T extends HeapPriorityQueueElement>implements InternalPriorityQueue<T> {
// ...
<T extends HeapPriorityQueueElement>
: 这是一个关键的泛型约束。extends HeapPriorityQueueElement
: 这个约束要求所有存入该队列的元素都必须实现HeapPriorityQueueElement
接口。该接口提供了getInternalIndex()
和setInternalIndex(int)
方法。这使得每个元素实例自身都“知道”自己在堆数组中的位置。当需要删除一个特定元素时,可以直接通过getInternalIndex()
找到它,然后执行删除和堆调整操作,极大地提高了非堆顶元素的删除效率。
implements InternalPriorityQueue<T>
: 它实现了 Flink 内部的优先级队列接口,确保了其行为符合 Flink 状态后端对优先级队列的通用规范。
核心字段
// .../** The array that represents the heap-organized priority queue. */@Nonnull protected T[] queue;/** The current size of the priority queue. */@Nonnegative protected int size;
// ...
@Nonnull protected T[] queue;
: 这是优先级队列的底层存储结构,一个简单的 Java 数组。堆的父子关系是通过数组下标计算来维持的。例如,如果一个元素在索引i
,它的子节点通常在2*i+1
和2*i+2
(对于0-based堆)。@Nonnegative protected int size;
: 记录当前队列中实际存储的元素数量。size
的值也表示堆中最后一个元素之后的位置。
构造函数
// ...@SuppressWarnings("unchecked")public AbstractHeapPriorityQueue(@Nonnegative int minimumCapacity) {this.queue = (T[]) new HeapPriorityQueueElement[getHeadElementIndex() + minimumCapacity];this.size = 0;}
// ...
构造函数接收一个 minimumCapacity
(最小容量)参数。它会创建一个 HeapPriorityQueueElement
类型的数组。注意数组的长度是 getHeadElementIndex() + minimumCapacity
。getHeadElementIndex()
是一个抽象方法,这允许子类决定堆是从索引 0
开始还是从索引 1
开始。这是一个常见的设计,因为 1-based 的堆在计算父子索引时更简洁(父节点为 i/2
,子节点为 2*i
和 2*i+1
)。
主要公共方法 (Public API)
这些方法大多实现了 InternalPriorityQueue
接口。
poll()
: 移除并返回优先级最高的元素(即队头/堆顶元素)。如果队列为空,返回null
。它通过调用removeInternal(getHeadElementIndex())
来实现。peek()
: 查看队头元素但不移除它。直接返回queue
数组在getHeadElementIndex()
位置的元素。add(@Nonnull T toAdd)
: 添加一个新元素。它将实际的添加逻辑委托给抽象方法addInternal(toAdd)
。其返回值比较有意思:return toAdd.getInternalIndex() == getHeadElementIndex()
,它表示新添加的元素是否成为了新的队头。这在某些场景下(如KeyGroupPartitionedPriorityQueue
)很有用,可以用来判断是否需要更新更高层级的堆结构。remove(@Nonnull T toRemove)
: 移除一个指定的元素。这是HeapPriorityQueueElement
接口大放异彩的地方。它通过toRemove.getInternalIndex()
直接获取元素在数组中的位置,然后调用removeInternal
执行删除。返回值同样表示被删除的元素是否是队头元素。addAll(@Nullable Collection<? extends T> toAdd)
: 批量添加元素。它首先调用resizeForBulkLoad
尝试一次性将数组扩容到足够大,以避免在循环中多次扩容的开销,然后遍历集合逐个添加元素。clear()
: 清空队列。通过Arrays.fill
将数组中有效元素部分置为null
,并将size
置为0,以便垃圾回收。iterator()
: 返回一个迭代器。需要特别注意的是,这个迭代器遍历元素的顺序与优先级无关。它只是简单地按数组索引顺序遍历堆中的元素。
内部及抽象方法 (Protected & Abstract Methods)
resizeQueueArray(...)
: 负责在需要时对queue
数组进行扩容。它会尝试扩容到期望大小,如果期望大小超过了Java数组的最大限制 (MAX_ARRAY_SIZE
),它会尝试扩容到MAX_ARRAY_SIZE
。如果连最小需求都无法满足,则抛出OutOfMemoryError
。moveElementToIdx(T element, int idx)
: 这是一个非常重要的方法。当堆中元素位置发生变化时(例如,上浮或下沉操作),必须调用此方法。它做了两件事:queue[idx] = element;
: 将元素放到数组的新位置。element.setInternalIndex(idx);
: 更新元素自己记录的索引。这确保了数据的一致性。
protected abstract T removeInternal(@Nonnegative int elementIndex)
: 抽象的删除方法。子类需要实现具体的删除逻辑。通常包括:- 将要删除的元素与堆的最后一个元素交换位置。
- 将
size
减一。 - 将被交换到
elementIndex
位置的元素执行“下沉(siftDown)”操作,以恢复堆的有序性。
protected abstract void addInternal(@Nonnull T toAdd)
: 抽象的添加方法。子类需要实现具体的添加逻辑。通常包括:- 将新元素放置在数组末尾(
size
的位置)。 - 将
size
加一。 - 对新加入的元素执行“上浮(siftUp)”操作,直到找到其在堆中的正确位置。
- 将新元素放置在数组末尾(
protected abstract int getHeadElementIndex()
: 抽象方法,让子类决定堆的起始索引(通常是0或1)。
内部迭代器 HeapIterator
// ...private final class HeapIterator implements CloseableIterator<T> {private int runningIdx;private final int endIdx;HeapIterator() {this.runningIdx = getHeadElementIndex();this.endIdx = runningIdx + size;}@Overridepublic boolean hasNext() {return runningIdx < endIdx;}@Overridepublic T next() {if (runningIdx >= endIdx) {throw new NoSuchElementException("Iterator has no next element.");}return queue[runningIdx++];}@Overridepublic void close() {}}
// ...
这个迭代器的实现非常直接,它从堆的起始位置线性扫描到底部。再次强调,这不保证按优先级顺序返回元素。close()
方法是空实现,以满足 CloseableIterator
接口的要求。
总结
AbstractHeapPriorityQueue
是 Flink 中一个设计精良的抽象类。它通过将通用逻辑(如数组管理、API接口实现)与特定逻辑(堆的上浮/下沉操作)分离,提供了很好的扩展性。其最核心的设计亮点是通过 HeapPriorityQueueElement
接口实现了高效的任意元素删除,完美地契合了 Flink 流处理中对定时器进行动态添加和取消的需求。像 HeapPriorityQueue
和 KeyGroupPartitionedPriorityQueue
等关键的状态结构都建立在这个基础之上。