当前位置: 首页 > news >正文

深入解析Java Stream Sink接口

Sink 接口是 Stream API 内部实现的核心组件之一。你可以把它理解为数据在流管道中各个阶段之间传递的“管道工”或“接收器”。每个 Stream 操作(无论是中间操作如 filtermap,还是终端操作如 forEachcollect)在内部都会与一个或多个 Sink 实例打交道。

Sink<T> 接口定义

Sink.java

package java.util.stream;import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer;
import java.util.function.LongConsumer;/*** An extension of {@link Consumer} used to conduct values through the stages of* a stream pipeline, with additional methods to manage size information,* control flow, etc.  // ... (详细的文档注释,解释了 Sink 的作用和生命周期) ...* @param <T> type of elements for value streams* @since 1.8*/
interface Sink<T> extends Consumer<T> {// ... (方法定义) ...
}
  • interface Sink<T> extends Consumer<T>:
    • Sink<T> 是一个泛型接口,T 代表它能接收的元素类型。
    • 它继承了 java.util.function.Consumer<T> 接口。这意味着任何 Sink 的实现都必须提供 void accept(T t) 方法,用于接收流中的单个元素。

Sink 的核心方法和生命周期

Sink 接口定义了一套协议来管理数据流的生命周期和控制流:

  1. default void begin(long size):

    • 作用: 通知 Sink 即将开始接收数据。这是在向 Sink 发送任何数据(调用 accept)之前必须调用的第一个方法。
    • size 参数:
      • 如果源的大小已知,可以传递确切的大小。
      • 如果大小未知或无限,则传递 -1
      • 这个大小信息对于某些有状态的操作(如 toArray)或短路操作可能很有用。
    • 状态转换: 调用此方法会将 Sink 从初始状态转换到活动状态。
    • 默认实现: 为空方法体 {}, 允许实现类根据需要覆盖。
  2. void accept(T t): (继承自 Consumer<T>)

    • 作用: 接收流中的一个元素。这个方法会在 begin() 之后和 end() 之前被多次调用。
    • 状态: 只能在 Sink 处于活动状态时调用。
  3. default void end():

    • 作用: 通知 Sink 所有数据都已发送完毕。
    • 行为: 如果 Sink 是有状态的(例如,它在累积结果),它应该在这个时候将任何存储的状态发送到下游,并清除其内部累积的状态和资源。
    • 状态转换: 调用此方法会将 Sink 从活动状态转换回初始状态,此时它可以被重用(再次调用 begin())。
    • 默认实现: 为空方法体 {}, 允许实现类根据需要覆盖。
  4. default boolean cancellationRequested():

    • 作用: 允许 Sink 向上游(数据源或前一个阶段)发出信号,表示它不希望再接收更多的数据。这对于实现短路操作(short-circuiting operations)至关重要,例如 findFirst()anyMatch()limit() 等。
    • 行为: 数据源或上游阶段可以在发送每个元素之前轮询此方法。如果返回 true,上游可以停止发送数据。
    • 默认实现return false;,表示默认情况下不请求取消。需要短路行为的 Sink 实现会覆盖此方法。

针对基本类型的 accept 方法

为了避免基本类型装箱拆箱带来的性能开销,Sink 接口还为 intlongdouble 这三种基本类型提供了专门的 accept 方法:

  • default void accept(int value)
  • default void accept(long value)
  • default void accept(double value)

这些方法的默认实现是抛出 IllegalStateException

这种设计被称为“厨房水槽”(kitchen sink)模式,即一个接口包含了处理多种数据类型的方法,以避免为每种原始类型创建单独的接口层次结构。

内部接口:针对基本类型的特化 Sink

Sink 接口内部定义了三个特化的子接口,用于处理基本类型的流:

  • interface OfInt extends Sink<Integer>, IntConsumer:

    • 专门用于处理 int 类型元素的 Sink
    • 它继承了 Sink<Integer> (因为流中的元素最终还是对象 Integer) 和 java.util.function.IntConsumer
    • 核心: 它重新抽象了 void accept(int value) 方法(使其成为抽象方法,强制实现类提供)。
    • 它提供了一个 default void accept(Integer i) 方法,该方法会调用 accept(i.intValue()),从而将对包装类型的调用桥接到对原始类型的调用。
    • Tripwire.ENABLED 和 Tripwire.trip(...) 用于在开发模式下检测潜在的性能问题(例如,不必要地调用了接受包装类型的方法而不是原始类型的方法)。
  • interface OfLong extends Sink<Long>, LongConsumer

  • interface OfDouble extends Sink<Double>, DoubleConsumer:

    • 与 OfInt 类似

这些特化接口使得流管道在处理基本类型时可以保持高效,避免不必要的装箱和拆箱。

内部抽象类:用于构建 Sink 链的 Chained 类

Stream 的中间操作通常是将一个 Sink 连接到另一个 Sink,形成一个处理链。Sink 接口提供了几个抽象的 ChainedXxx 静态内部类作为构建这种链的辅助基类:

  • abstract static class ChainedReference<T, E_OUT> implements Sink<T>:

    • 用途: 用于创建一个接收 T 类型元素,并将其处理后(类型可能变为 E_OUT)传递给下游 Sink 的链式 Sink
    • protected final Sink<? super E_OUT> downstream;: 持有对下游 Sink 的引用。
    • 构造函数 public ChainedReference(Sink<? super E_OUT> downstream): 接收下游 Sink
    • begin()end()cancellationRequested() 方法的默认实现是直接委托给 downstream 的相应方法。
    • 关键: 子类需要实现 accept(T t) 方法,在该方法中对元素 t 进行处理,并将结果(类型为 E_OUT)通过调用 downstream.accept(...) (可能是 accept(E_OUT)accept(int)accept(long) 或 accept(double)) 传递给下游。

    文档注释中的例子很好地说明了这一点(用于 mapToInt 操作):

    // IntSink is = new Sink.ChainedReference<U>(sink) { // 假设这是 Sink.ChainedReference<U, Integer>
    //     public void accept(U u) {
    //         downstream.accept(mapper.applyAsInt(u)); // downstream 是 Sink<? super Integer>
    //     }                                            // 因此调用 downstream.accept(int)
    // };
    

    实际上,如果下游期望的是 int,那么 downstream 应该是 Sink.OfInt 或至少是能接受 int 的 Sink,然后调用 downstream.accept(mapper.applyAsInt(u))

  • abstract static class ChainedInt<E_OUT> implements Sink.OfInt

  • abstract static class ChainedLong<E_OUT> implements Sink.OfLong

  • abstract static class ChainedDouble<E_OUT> implements Sink.OfDouble:

    • 用于创建接收 基本类型 元素的链式 Sink
    • 子类需要实现 accept(基本类型 value) 方法。

这些 ChainedXxx 类极大地简化了 Stream 中间操作的实现。每个中间操作只需要关注其自身的逻辑(在 accept 方法中实现),而生命周期管理(beginend)和取消请求(cancellationRequested)则可以委托给下游。

Sink 在 Stream 管道中的作用

考虑文档注释中的例子: strings.stream().filter(s -> s.startsWith("A")).mapToInt(String::length).max();

  1. 源 (Spliterator for strings): 产生 String 对象。
  2. Filter Stage:
    • 它会有一个 Sink<String> (可能是 ChainedReference<String, String>)。
    • 它的 accept(String s) 方法会检查 s.startsWith("A")
    • 如果为 true,它会将 s 传递给下游的 Sink (即 mapToInt 阶段的 Sink)。
  3. MapToInt Stage:
    • 它会有一个 Sink<String> (可能是 ChainedReference<String, Integer>),其下游是一个 Sink.OfInt
    • 它的 accept(String s) 方法会调用 String::length 得到一个 int
    • 然后它会调用其下游 Sink.OfInt 的 accept(int length) 方法。
  4. Max Stage (Terminal Operation):
    • 它会有一个 Sink.OfInt
    • 它的 accept(int value) 方法会比较当前值与已知的最大值,并更新最大值。
    • 它的 begin() 可能用于初始化最大值(例如为 Integer.MIN_VALUE)。
    • 它的 end() 可能用于最终确定结果或触发某些完成动作。
    • max() 本身会返回一个 OptionalInt,这个 Sink 的结果会用于构建这个 OptionalInt

数据流动的方向是: Spliterator -> FilterSink -> MapToIntSink -> MaxSink

每个 Sink 都遵循 begin() -> accept()* -> end() 的生命周期。如果任何一个 Sink 在其中一个 accept() 调用后通过 cancellationRequested() 返回 true,上游的 Sink 或 Spliterator 就可以停止发送数据。

总结

Sink 接口及其相关的内部接口和类是 Java Stream API 实现的基石。它们共同定义了一个强大且灵活的机制,用于:

  • 数据传递: 在流的各个阶段之间高效地传递元素,包括对基本类型的优化。
  • 生命周期管理: 通过 begin() 和 end() 方法控制数据流的开始和结束,允许有状态操作进行初始化和清理。
  • 控制流: 通过 cancellationRequested() 实现短路操作,提高效率。
  • 可组合性: 通过 ChainedXxx 类轻松构建操作链。

理解 Sink 的工作原理有助于更深入地理解 Stream API 的内部机制和性能特点。虽然开发者通常不直接实现 Sink 接口(除非编写自定义的 Stream 操作或收集器),但了解其概念对于编写高效的 Stream 代码非常有益。

http://www.dtcms.com/a/312042.html

相关文章:

  • Design Compiler:Milkyway库的创建与使用
  • 1-7〔 OSCP ◈ 研记 〕❘ 信息收集▸主动采集E:SMB基础
  • 硬件-可靠性学习DAY1——系统可靠性设计指南:从原理到实践
  • Markdown 中的图表 Mermaid 与 classDiagram
  • Thread 中的 run() 方法 和 start() 方法的
  • 笔记:C语言中指向指针的指针作用
  • MQTT协议测试环境部署
  • 错误: 找不到或无法加载主类 原因: java.lang.ClassNotFoundException
  • (nice!!!)(LeetCode 每日一题) 2561. 重排水果 (哈希表 + 贪心)
  • UNet改进(29):记忆增强注意力机制在UNet中的创新应用-原理、实现与性能提升
  • 【嵌入式汇编基础】-ARM架构基础(三)
  • 动态规划解最长回文子串:深入解析与优化问题
  • 【redis】基于工业界技术分享的内容总结
  • JS的作用域
  • 第15届蓝桥杯Python青少组中/高级组选拔赛(STEMA)2024年1月28日真题
  • sqli-labs:Less-20关卡详细解析
  • MFC 实现托盘图标菜单图标功能
  • 中州养老Day02:服务管理护理计划模块
  • 中之人模式下的虚拟主持人:动捕设备与面捕技术的协同驱动
  • 2025系规教材改革后,论文怎么写?
  • 错误处理_IncompatibleKeys
  • 在Linux上对固态硬盘进行分区、格式化和挂载的步骤
  • CH32V单片机启用 FPU 速度测试
  • MVVM——ArkUI的UI开发模式
  • 使用Python开发Ditto剪贴板数据导出工具
  • 使用C++实现日志(2)
  • MCP终极指南 - 从原理到实战(基础篇)
  • 面试实战,问题二十二,Java JDK 17 有哪些新特性,怎么回答
  • windows内核研究(软件调试-异常的处理流程)
  • 幂等性介绍和下单接口幂等性保证实现方案