当前位置: 首页 > news >正文

Java Stream sort算子实现:SortedOps

SortedOps 是 Java Stream API 内部一个至关重要的 final 类,它的核心职责是为 Stream 流水线提供排序功能。当在代码中调用 stream.sorted() 时,背后就是由 SortedOps 类来驱动和实现具体的排序逻辑。

这个类被设计为 final 并且构造函数是 private 的,这意味着它是一个纯粹的工具类,不能被实例化或继承,只能通过其静态方法来使用。

下面我们分几个部分来详细解析它。

静态工厂方法 (Entry Points)

SortedOps 提供了一系列静态的 make 方法,作为创建排序操作的入口。这些方法根据不同的 Stream 类型(引用类型、intlongdouble)返回一个实现了排序逻辑的 Stream 操作实例。

// ... existing code .../*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T*/static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {return new OfRef<>(upstream);}/*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T* @param comparator the comparator to order elements by*/static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,Comparator<? super T> comparator) {return new OfRef<>(upstream, comparator);}/*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T*/static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {return new OfInt(upstream);}/*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T*/static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {return new OfLong(upstream);}/*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T*/static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {return new OfDouble(upstream);}
// ... existing code ...
  • makeRef: 用于对象流 (Stream<T>)。它有两个重载版本:一个用于自然排序(要求元素实现 Comparable 接口),另一个接受一个 Comparator 用于自定义排序。
  • makeIntmakeLongmakeDouble: 分别用于原始类型流 IntStreamLongStreamDoubleStream。它们总是使用对应原始类型的自然顺序进行排序。

这些 make 方法内部会 new 一个对应的内部类实例(如 OfRefOfInt 等),这些内部类才是排序操作的核心。

StatefulOp 内部类

SortedOps 包含四个核心的静态内部类:OfRefOfIntOfLongOfDouble。它们都继承自 ...Pipeline.StatefulOp,这表明 sorted() 是一个有状态的中间操作 (Stateful Intermediate Operation)

“有状态”意味着该操作需要处理完上游(upstream)的所有元素后,才能向下游(downstream)传递第一个元素。对于排序来说,这是显而易见的——不看到所有元素,就无法确定哪个元素是最小的。这也意味着排序操作需要一个缓冲区来存储所有流元素,可能会消耗大量内存。

我们以 OfRef 为例来分析其关键方法:

// ... existing code ...private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {/*** Comparator used for sorting*/private final boolean isNaturalSort;private final Comparator<? super T> comparator;// ... existing code ...@Overridepublic Sink<T> opWrapSink(int flags, Sink<T> sink) {Objects.requireNonNull(sink);// If the input is already naturally sorted and this operation// also naturally sorted then this is a no-opif (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)return sink;else if (StreamOpFlag.SIZED.isKnown(flags))return new SizedRefSortingSink<>(sink, comparator);elsereturn new RefSortingSink<>(sink, comparator);}@Overridepublic <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator,IntFunction<T[]> generator) {// If the input is already naturally sorted and this operation// naturally sorts then collect the outputif (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {return helper.evaluate(spliterator, false, generator);}else {// @@@ Weak two-pass parallel implementation; parallel collect, parallel sortT[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);Arrays.parallelSort(flattenedData, comparator);return Nodes.node(flattenedData);}}}
// ... existing code ...
  • opWrapSink(...): 这个方法用于串行流 (Sequential Stream) 的处理。它负责包装下游的 Sink(可以理解为元素的消费者)。

    1. 优化: 它首先检查上游流是否已经通过 StreamOpFlag.SORTED 标记为“已排序”。如果上游已按自然顺序排好,并且当前操作也是自然排序,那么这个排序操作就什么都不用做(no-op),直接返回下游的 sink。这是一个非常重要的性能优化,避免了对已排序的流进行重复排序。
    2. Sized vs. Unsized: 接着,它检查流是否通过 StreamOpFlag.SIZED 标记为“大小已知”。
      • 如果大小已知,它会创建一个 SizedRefSortingSink,这个 Sink 会预先分配一个足够大的数组来存放所有元素,效率更高。
      • 如果大小未知,它会创建一个 RefSortingSink,这个 Sink 内部使用 ArrayList 这种可动态调整大小的结构来存储元素。
  • opEvaluateParallel(...): 这个方法用于并行流 (Parallel Stream) 的处理。

    1. 优化: 同样,它也会检查流是否已经排序,如果是,则跳过排序步骤。
    2. 并行实现: 如果需要排序,这里的实现被注释为 "Weak two-pass parallel implementation"(弱两阶段并行实现)。
      • 阶段一 (Collect): 并行地将流中的所有元素收集到一个数组中。
      • 阶段二 (Sort): 调用 Arrays.parallelSort() 对这个大数组进行并行排序。
    3. 最后,将排序好的数组包装成一个 Node 对象返回。这种实现方式简单,但缺点是必须在内存中缓冲所有数据,对于巨大的流,内存开销会非常大。

OfIntOfLongOfDouble 的实现与 OfRef 非常相似,只是它们针对原始类型做了特化,使用原始类型数组(如 int[])和 SpinedBuffer(一种为 Stream 设计的高效可增长块状缓冲区)来优化性能和内存,避免了自动装箱/拆箱的开销。

Sink 实现:排序的执行者

Sink 是实际执行操作的组件。SortedOps 定义了多种 Sink 实现,用于在 end() 方法被调用时执行排序。

我们来看 SizedRefSortingSink 和 RefSortingSink 的核心逻辑:

// ... existing code ...private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {private T[] array;private int offset;
// ... existing code ...@Override@SuppressWarnings("unchecked")public void begin(long size) {if (size >= Nodes.MAX_ARRAY_SIZE)throw new IllegalArgumentException(Nodes.BAD_SIZE);array = (T[]) new Object[(int) size]; // 预分配数组}@Overridepublic void end() {Arrays.sort(array, 0, offset, comparator); // 排序downstream.begin(offset);if (!cancellationRequestedCalled) {for (int i = 0; i < offset; i++)downstream.accept(array[i]); // 推送给下游}else {for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)downstream.accept(array[i]); // 支持短路}downstream.end();array = null; // 释放内存}@Overridepublic void accept(T t) {array[offset++] = t; // 接受并存入数组}}private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {private ArrayList<T> list;
// ... existing code ...@Overridepublic void begin(long size) {if (size >= Nodes.MAX_ARRAY_SIZE)throw new IllegalArgumentException(Nodes.BAD_SIZE);list = (size >= 0) ? new ArrayList<>((int) size) : new ArrayList<>(); // 初始化 ArrayList}@Overridepublic void end() {list.sort(comparator); // 排序downstream.begin(list.size());if (!cancellationRequestedCalled) {list.forEach(downstream::accept); // 推送给下游}
// ... existing code ...downstream.end();list = null; // 释放内存}@Overridepublic void accept(T t) {list.add(t); // 接受并存入列表}}
// ... existing code ...

这两个 Sink 的工作流程遵循一个清晰的模式:

  1. begin(size): 在处理开始时被调用,用于初始化内部的存储结构(固定大小的数组或 ArrayList)。
  2. accept(t): 每当从上游接收到一个元素时被调用,它将元素存入缓冲区。
  3. end(): 当所有元素都接收完毕后被调用。这是排序发生的时刻。它调用 Arrays.sort() 或 list.sort() 对缓冲区内的所有元素进行排序。
  4. 排序完成后,它会遍历排好序的缓冲区,并将每个元素通过 downstream.accept(t) 推送给流水线中的下一个操作。
  5. 短路 (Short-circuiting)sorted 操作本身不能短路,但它会配合下游的短路操作(如 limit(n))。cancellationRequestedCalled 标志位记录了下游是否请求提前终止。如果是,那么在 end() 方法中推送数据时,会不断检查 downstream.cancellationRequested(),以便尽快停止。
  6. 最后,清空内部缓冲区(array = null 或 list = null),以便垃圾回收器可以回收这部分内存。

总结

SortedOps 是 Java Stream sorted() 操作的幕后功臣。它通过一套精心设计的内部类和 Sink 实现,高效地完成了对不同类型、不同场景(串行/并行、大小已知/未知)的流的排序任务。

核心要点:

  • 有状态操作sorted() 是有状态的,需要缓冲所有元素,可能导致高内存占用。
  • 优化: 对已排序的流进行再次排序是几乎没有成本的。
  • 并行实现: 并行排序通过“先收集再并行排序”的两阶段方式实现,简单但内存开销大。
  • Sink 模式: 使用 beginacceptend 的 Sink 模式来处理元素流,将数据收集和处理逻辑清晰地分离开。
  • 原始类型特化: 为 intlongdouble 提供了专门的实现,以避免装箱开销,提升性能。
http://www.dtcms.com/a/335078.html

相关文章:

  • 网络层(1)
  • DeepResearch开源与闭源方案对比
  • autofit.js: 自动调整HTML元素大小的JavaScript库
  • 小智-ESP32的MQTT协议
  • linux设备驱动之字符设备驱动
  • Python-Pandas基础
  • 主从复制+哨兵
  • 移动互联网发展战略
  • Altium Designer 22使用笔记(7)---网表导入,叠层设置
  • Spring框架(IOC)
  • 程序设计|C语言教学——C语言基础4:进阶
  • TOGAF八步一法笔记2
  • day42_2025-08-16
  • TDengine 3.3.7.0 版新功能(BLOB 数据类型)
  • ZYNQ QSPI控制器说明
  • JIT 编译与解释执行机制:Java 性能加速的幕后引擎
  • Linux软件编程-线程(2)
  • Python训练营打卡 DAY 38 Dataset和Dataloader类
  • 《代码重生:杨蓉与62.webp》
  • PowerShell中搜索文件夹
  • 国内代理IP在SEO行业中的应用
  • 云安全 - The Big IAM Challenge
  • 低代码平台能力框架:可复用组件与复杂业务的实现机制
  • 现金流预测模型:12个月精准计算指南
  • 嵌入式硬件篇---电感本质
  • Tomcat架构深度解析:从Server到Servlet的全流程揭秘
  • 【数据分享】上市公司供应链成本分摊数据(2007-2024)
  • 使用Python的defaultdict处理字典中的缺失键
  • C++设计模式:面向对象设计原则
  • 肖臻《区块链技术与应用》第20-22讲 - 以太坊难度调整、权益证明和智能合约