当前位置: 首页 > news >正文

Java22 stream 新特性 窗口算子 与 虚拟线程map操作:Gatherer 和 Gatherers工具类

gather() 是 Java 22 中引入的一个非常强大的新特性。

Gatherer 是什么

简单来说,Gatherer 是一个全新的、高度可定制的中间操作(Intermediate Operation)。它提供了一种比现有的 mapfilterflatMap 等操作更强大、更灵活的数据转换能力。

Gatherer 允许你实现 任意复杂的、有状态的、多对多(M-to-N) 的元素转换逻辑。

想象一下,流中的元素像传送带上的物品一个个流过来,Gatherer 就像一个在传送带旁边的复杂工作站。这个工作站可以:

  • 查看一个或多个物品(元素),然后决定输出零个、一个或多个新物品。
    • map 只能一对一转换。
    • filter 只能一对一或一对零转换。
    • flatMap 是一对多,但是是无状态的。
  • 维持自己的内部状态(State)。 比如,它可以记住之前看过的元素,然后根据历史信息来处理当前元素。
    • 例如,实现 windowed(窗口化)操作,将流中每 N 个元素分为一组。这必须记住当前窗口里已经收集了多少个元素。
  • 在所有物品都处理完后,进行最终的清理或输出。
    • 例如,一个 windowed 操作,在流末尾,即使最后一个窗口没满,也需要把这个不完整的窗口输出。

Gatherer 的四个核心组件:

Gatherer 的行为由四个函数式接口定义,这让它具备了极高的灵活性:

  1. initializer()状态初始化器。在处理第一个元素前调用,用于创建初始状态对象(比如,一个空的列表用于存放窗口元素)。
  2. integrator()集成器/处理器。这是核心逻辑,每当一个新元素到来时,它会被调用。它接收当前状态、新元素,并决定是否更新状态、是否向下游输出新元素。
  3. combiner()并行合并器。在并行流中,用于合并不同线程上的状态对象。
  4. finisher()终结者。在所有元素处理完毕后调用,用于处理最终的状态,并可能输出最后的元素。

例子:Gatherers.windowFixed(3)

  • initializer: 创建一个空的、大小为3的内部缓冲区(状态)。
  • integrator: 每来一个元素,就放入缓冲区。如果缓冲区满了,就把这个缓冲区包装成一个 List 输出到下游,并清空缓冲区。
  • finisher: 当流结束时,如果缓冲区里还有元素(比如最后只剩1个或2个),就把这个不完整的缓冲区也包装成 List 输出。

所以,Gatherer 的目标就是提供一个统一的、强大的接口,让开发者可以实现几乎任何自定义的流转换逻辑,而不仅仅局限于 map/filter 等预设的操作。

使用场景

  • 元素分组(如窗口函数)
  • 去重连续相似元素
  • 增量累加函数(前缀扫描)
  • 增量重排序函数

jdk 24引入的Gathers工具类提供了一些默认实现,后面有分析

下面将展示如何使用 Gatherer 实现这些功能:

元素分组(窗口函数)

// 实现固定大小的窗口分组
public static <T> Gatherer<T, ?, List<T>> window(int size) {return Gatherer.of(() -> new ArrayList<T>(),  // 初始化状态Gatherer.Integrator.ofGreedy((state, element, downstream) -> {state.add(element);if (state.size() == size) {downstream.push(new ArrayList<>(state));state.clear();}return true;}),(left, right) -> {  // 合并器left.addAll(right);return left;},(state, downstream) -> {  // 完成器if (!state.isEmpty()) {downstream.push(new ArrayList<>(state));}});
}// 使用示例
List<List<Integer>> windows = Stream.of(1,2,3,4,5,6).gather(window(2)).toList();  // [[1,2], [3,4], [5,6]]

去重连续相似元素

// 去重连续相似元素
public static <T> Gatherer<T, ?, T> distinctConsecutive() {return Gatherer.of(() -> new Object() { T last; boolean hasLast = false; },Gatherer.Integrator.ofGreedy((state, element, downstream) -> {if (!state.hasLast || !state.last.equals(element)) {downstream.push(element);state.last = element;state.hasLast = true;}return true;}));
}// 使用示例
List<Integer> distinct = Stream.of(1,1,2,2,2,3,2,2).gather(distinctConsecutive()).toList();  // [1,2,3,2]

增量累加函数(前缀扫描)

// 前缀扫描实现
public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<R, T, R> scanner) {return Gatherer.ofSequential(() -> new Object() { R current = initial.get(); },Gatherer.Integrator.ofGreedy((state, element, downstream) -> {state.current = scanner.apply(state.current, element);return downstream.push(state.current);}));
}// 使用示例
List<String> result = Stream.of(1,2,3,4,5).gather(scan(() -> "", (str, num) -> str + num)).toList();  // ["1", "12", "123", "1234", "12345"]

增量重排序函数

// 实现增量排序(维护一个固定大小的有序窗口)
public static <T> Gatherer<T, ?, T> incrementalSort(int windowSize, Comparator<T> comparator) {return Gatherer.of(() -> new PriorityQueue<>(comparator),Gatherer.Integrator.ofGreedy((state, element, downstream) -> {state.offer(element);if (state.size() > windowSize) {downstream.push(state.poll());}return true;}),(left, right) -> {left.addAll(right);return left;},(state, downstream) -> {while (!state.isEmpty()) {downstream.push(state.poll());}});
}// 使用示例
List<Integer> sorted = Stream.of(5,3,1,4,2).gather(incrementalSort(3, Comparator.naturalOrder())).toList();  // [1,3,4,2,5]

关键点说明

  1. 状态管理

    • 使用 initializer() 创建合适的状态容器
    • 在 integrator() 中维护和更新状态
    • 在 finisher() 中处理剩余元素
  2. 性能考虑

    • 对于无状态操作使用 defaultInitializer()
    • 对于可并行操作提供有效的 combiner()
    • 使用 Greedy 优化非短路操作
  3. 组合使用

    • 可以使用 andThen() 组合多个 Gatherer
    • 预先组合的 Gatherer 比链式调用更高效

这些实现展示了 Gatherer 的强大功能,它能够优雅地处理各种流处理场景,同时保持代码的清晰和可维护性。

核心方法

// 初始化器 - 创建中间状态
default Supplier<A> initializer()// 集成器 - 处理输入元素
Integrator<A, T, R> integrator()// 组合器 - 合并两个状态
default BinaryOperator<A> combiner()// 完成器 - 执行最终操作
default BiConsumer<A, Downstream<? super R>> finisher()

组合操作

// 将多个 Gatherer 组合在一起
default <RR> Gatherer<T, ?, RR> andThen(Gatherer<? super R, ?, ? extends RR> that)

静态工厂方法

// 创建无状态顺序 Gatherer
static <T, R> Gatherer<T, Void, R> ofSequential(Integrator<Void, T, R> integrator)// 创建可并行化的 Gatherer
static <T, A, R> Gatherer<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher
)

 

核心关联类

  • Stream: 通过 gather(Gatherer) 方法使用 Gatherer
  • Gatherers: 提供常用 Gatherer 实现的工具类
  • Downstream: 接收 Gatherer 输出的下一阶段
  • Integrator: 处理元素集成的函数式接口

交互关系

// Stream 与 Gatherer 的关系
Stream<T>.gather(Gatherer<T,A,R>) -> Stream<R>// Gatherer 组合关系
Gatherer<T,A,R>.andThen(Gatherer<R,B,S>) -> Gatherer<T,?,S>

最佳实践

使用注意事项

  1. 状态管理

    • 无状态 Gatherer 使用 defaultInitializer()
    • 有状态 Gatherer 需正确实现状态初始化和合并
  2. 并行处理

    • 需要并行处理时提供有效的 combiner
    • 仅顺序处理时使用 defaultCombiner()
  3. 资源清理

    • 在 finisher 中执行必要的清理操作
    • 不需要清理时使用 defaultFinisher()
  4. 性能优化

    • 使用 Greedy Integrator 优化无短路操作
    • 合理使用预组合的 Gatherer 提高性能

示例代码

// 创建 map 操作的 Gatherer
public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {return Gatherer.of((unused, element, downstream) -> downstream.push(mapper.apply(element)));
}// 组合多个 Gatherer
Gatherer<Integer, ?, String> pipeline = map(i -> i + 1)          // 增1.andThen(map(i -> i.toString()));  // 转字符串


为什么需要 Gatherer.Downstream

Gatherer 的 integrator 和 finisher 需要一个回调机制来将它们处理后的结果(类型为 R)发送出去。它们不能直接访问下游的 Sink,因为这会破坏 Stream 的封装性。

Gatherer.Downstream 就是这个专门的回调接口,它充当了 Gatherer 内部逻辑和外部流水线之间的 “信使”。

// Gatherer 接口的一部分
interface Downstream<R> {boolean push(R r);// ...
}

GatherSink 同时实现了 Sink<T> 和 Gatherer.Downstream<R>,这让它成为了一个完美的“双面适配器”:

  1. 作为 Sink<T>:它面向上游,接收类型为 T 的原始元素。这是它作为标准 Stream 操作的一部分的职责。
  2. 作为 Gatherer.Downstream<R>:它面向 Gatherer 的内部逻辑 (integrator 和 finisher)。当 integrator 想要输出一个处理好的元素 R 时,它不直接和下游 Sink 对话,而是调用 downstream.push(r)

我们来看 GatherSink 的代码,就能清晰地看到这个转发过程:

// ... existing code ...static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {private final Sink<R> sink; // 这是真正的下游 Sink// ...// 当上游传来一个元素 T@Overridepublic void accept(T t) {// ...// 调用 integrator,并把 this (自己) 作为 Downstream 传进去proceed &= integrator.integrate(state, t, this);}// ...// 当 integrator 内部调用 downstream.push(r) 时,这个方法被触发@Overridepublic boolean push(R r) {var p = downstreamProceed;if (p)sink.accept(r); // 在这里,它把结果 r 转发给了真正的下游 sinkreturn !cancellationRequested(p);}}
// ... existing code ...
  • 解耦(Decoupling):将 Gatherer 的内部业务逻辑(如何处理数据)与 Stream 的流水线机制(如何传递数据)分离开。Gatherer 的作者不需要关心下游是什么,只需要知道有一个 Downstream 接口可以用来“推送”结果。
  • 控制(Control)Downstream 接口不仅有 push,还有 isRejecting 等方法。这给了 integrator 更多的控制权,比如在推送前检查下游是否还愿意接收数据,从而实现更复杂的短路(short-circuiting)逻辑。
  • 封装(Encapsulation)GatherSink 封装了所有复杂的逻辑,比如状态管理、短路信号的传递等,使得 Gatherer 的实现者可以专注于核心的转换算法,而不用处理 Stream 底层的复杂机制。

Gatherer 是一个强大的自定义流处理工具,而 Gatherer.Downstream 接口及其在 GatherSink 中的实现,是连接这个自定义工具和标准 Stream 流水线的、设计精巧的桥梁。

gather 具体例子

gather 的能力最好通过例子来理解。JDK 在 java.util.stream.Gatherers 这个工具类里提供了一些预置的实现,这些例子能很好地展示它的威力:

a. 窗口化 (Windowing)

想象一下,你想把一个数字流按每3个一组进行分组: Stream.of(1,2,3,4,5,6,7,8) -> [[1, 2, 3], [4, 5, 6], [7, 8]]

用传统操作很难做到,但用 gather 就非常简单:

// (代码来自 Gatherers.java 的 Javadoc 示例)
List<List<Integer>> windows =Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();

b. 滑动窗口 (Sliding Window)

或者,你想创建一个大小为2的滑动窗口: Stream.of(1,2,3,4,5,6,7,8) -> [[1, 2], [2, 3], [3, 4], [4, 5], ...]

同样,gather 也能轻松实现:

// (代码来自 Gatherers.java 的 Javadoc 示例)
List<List<Integer>> windows2 =Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();

c. 其他复杂操作

Gatherers 类还提供了 fold(类似 reduce 但严格串行)、scan(前缀和扫描)等很多有用的操作。

GathererOp 的角色

GathererOp 就是 gather() 操作在 Stream 流水线中的内部表示和执行引擎。

当调用 stream.gather(...) 时,Java 内部就会创建一个 GathererOp 实例,并将其链接到流操作链上。

// ... existing code ...
final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {@SuppressWarnings("unchecked")static <P_IN, P_OUT extends T, T, A, R> Stream<R> of(ReferencePipeline<P_IN, P_OUT> upstream,Gatherer<T, A, R> gatherer) {// ...if (upstream.getClass() == GathererOp.class) {// ... 融合优化 ...} else {return new GathererOp<>((ReferencePipeline<?, T>) upstream,gatherer);}}
// ... existing code ...

这个类非常复杂,因为它承担了所有脏活累活,包括:

  • 操作融合:它非常智能,如果发现连续两个 gather() 操作,或者 gather() 后面跟着 collect(),它会尝试将它们**融合(fuse) 成一个单一的操作来执行,极大地提升性能。
  • 串行与并行执行:它内部包含了处理串行流(GatherSink)和并行流(GathererTask,通过 evaluate 方法调度)的完整逻辑。
  • 状态管理和短路:它负责管理 Gatherer 的状态,并响应短路信号(当 integrator 返回 false 时)。

总结

  • gather() 是一个面向用户的功能,它为 Stream 提供了前所未有的灵活性,让你能定义任意复杂的有状态转换。
  • Gatherer 是一个接口,通过实现它来定义 gather() 的具体行为逻辑。
  • GathererOp 是 JDK 内部的实现细节,是驱动 Gatherer 逻辑在流中高效运行的复杂引擎。

Gatherer 接口实现分析

Gatherer<T, A, R> 接口定义了一个高度可定制的中间操作。它的核心语义是:“接收一个输入元素流(类型 T),通过一个可选的中间状态(类型 A),产生一个输出元素流(类型 R)”

与 map 或 filter 不同,Gatherer 不是简单的“一对一”或“一对零/一”的转换。它是一个 “多对多” 的转换器,可以:

  • 有状态(Stateful): 它可以维护一个在元素处理过程中不断变化的状态。
  • 缓冲(Buffering): 它可以接收多个输入元素后,才产生一个或多个输出。
  • 短路(Short-circuiting): 它可以提前终止流的处理。

你可以把它想象成一个与 Collector 类似但更强大的概念。Collector 是一个终端操作,它消费整个流并产生一个最终结果。而 Gatherer 是一个中间操作,它消费一个流并产生另一个流

 Gatherer 的四个核心组件

Gatherer 的行为由四个核心函数定义,这四个函数共同描述了整个转换过程。

// ... existing code ...
public interface Gatherer<T, A, R> {/*** A function that produces an instance of the intermediate state used for* this gathering operation.* ...*/default Supplier<A> initializer() { /*...*/ };/*** A function which integrates provided elements, potentially using* the provided intermediate state, optionally producing output to the* provided {@link Downstream}.* ...*/Integrator<A, T, R> integrator();/*** A function which accepts two intermediate states and combines them into* one.* ...*/default BinaryOperator<A> combiner() { /*...*/ }/*** A function which accepts the final intermediate state* and a {@link Downstream} object, allowing to perform a final action at* the end of input elements.* ...*/default BiConsumer<A, Downstream<? super R>> finisher() { /*...*/ }
// ... existing code ...

a. initializer(): 状态初始化器

  • 语义Supplier<A> initializer()

  • 作用: 创建并返回一个新的、可变的中间状态对象 A

  • 何时调用: 在处理一个新的数据分片(在并行流中)或整个流(在串行流中)的开始时调用。

  • 默认行为: 如果不提供,则表示这是一个无状态的 Gatherer,状态类型 A 通常为 Void

b. integrator(): 核心处理器

  • 语义Integrator<A, T, R> integrator()

  • 作用: 这是 Gatherer 的核心逻辑。它接收当前的状态 A、一个新的输入元素 T 和一个下游管道 Downstream。它负责:

    1. 根据输入元素 t 更新状态 state

    2. 决定是否要向下游 downstream 推送(push)零个、一个或多个结果 R

    3. 返回一个布尔值,true 表示继续处理,false 表示短路,请求上游停止发送数据。

  • 这是唯一必须提供的方法。

c. combiner(): 并行合并器

  • 语义BinaryOperator<A> combiner()

  • 作用: 在并行流中,当两个子任务都处理完它们的数据分片后,此方法被调用来合并它们各自的中间状态 A

  • 何时调用: 仅在并行流中调用。

  • 默认行为: 如果不提供,则表示该 Gatherer 不支持并行处理。

d. finisher(): 收尾处理器

  • 语义BiConsumer<A, Downstream<? super R>> finisher()
  • 作用: 在所有输入元素都被 integrator 处理完毕后调用。它接收最终的状态 A,允许你执行一些收尾工作,例如,将状态中缓冲的最后一个元素或最终计算结果推送到下游。
  • 何时调用: 在流的末尾调用一次。
  • 默认行为: 如果不提供,表示没有收尾操作。

Downstream 接口:向下游推送结果

Downstream 是 integrator 和 finisher 用来与下游流水线通信的桥梁。

// ... existing code ...@FunctionalInterfaceinterface Downstream<T> {/*** Pushes, if possible, the provided element downstream -- to the next* stage in the pipeline.* ...*/boolean push(T element);
// ... existing code ...

它的核心方法是 push(T element)。调用此方法会将一个结果元素 element 发送到下一个流操作。它也返回一个布尔值,如果下游也请求短路,它会返回 false

示例:用 Gatherer 实现 map

接口文档中的这个例子完美地诠释了 Gatherer 的语义:

// ... existing code ...* {@snippet lang = java:* public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {*     return Gatherer.of(*         (unused, element, downstream) -> // integrator*             downstream.push(mapper.apply(element))*     );* }* }
// ... existing code ...
  • 这是一个无状态的 Gatherer,所以 initializer 是默认的,状态 A 是 Void(由 unused 参数表示)。
  • 它的 integrator 对于每个到来的 element,直接调用 mapper 函数进行转换,然后通过 downstream.push() 将结果推送到下游。
  • 它不关心 push 的返回值,所以它会处理完所有元素(非短路)。
  • 它没有 combiner 和 finisher

Greedy接口

Gatherer.Integrator 接口的核心方法 integrate 返回一个 boolean 值,用于表示是否需要短路(short-circuit)true 表示继续,false 表示停止。

然而,在很多场景下,比如 windowFixed,我们知道 integrator 总是会处理完所有元素,永远不会提前返回 false。这种 integrator 被称为 “贪婪的”(Greedy) 。

Integrator.ofGreedy(...) 就是一个静态工厂方法,它做两件事:

  1. 它接收一个不返回布尔值的 BiConsumer(或者像这里的方法引用 FixedWindow::integrate,它的返回值会被忽略)。
  2. 它返回一个特殊的 Integrator 实现,这个实现被标记为 Integrator.Greedy 接口的实例。
// 在 Gatherer 接口内部
interface Integrator<A, T, R> {// ...@FunctionalInterfaceinterface Greedy<A, T, R> extends Integrator<A, T, R> {// ...}
}

为什么这个信号很重要?

Stream 的执行引擎(即  GathererOp)在处理流时,会检查 integrator 是否是 instanceof Integrator.Greedy

  • 如果是 Greedy:运行时就不需要在每次调用 integrate 后都去检查它的返回值。这可以消除一个分支判断,在处理大量元素的紧凑循环中,这种微小的优化会累积起来,带来可观的性能提升。
  • 如果不是 Greedy:运行时就必须在每次调用后检查返回值,以正确处理可能的短路情况。

所以,ofGreedy 的包装本质上是一种静态的性能提示,它告诉运行时:“放心执行,我这个 integrator 不会短路,你可以走更快的代码路径。”

总结

Gatherer 接口通过其四个核心组件(initializerintegratorcombinerfinisher)提供了一个强大而灵活的框架,用于定义复杂的、有状态的流转换。它的语义核心在于 integrator,它消费输入元素、更新状态,并通过 Downstream 按需产生输出,从而实现了远超传统流操作的强大功能。

Gatherers 类分析

Gatherers 是一个工具类,提供了常用的 Gatherer 实现和工厂方法,用于在流处理中执行窗口操作、并发映射、折叠和扫描等中间操作。

使用场景

  1. 窗口操作:将流元素分组为固定大小或滑动窗口
  2. 并发处理:使用虚拟线程并发执行映射操作
  3. 增量处理:执行折叠(fold)和前缀扫描(scan)操作
  4. 流转换:实现复杂的流到流的转换

窗口操作

// 固定大小窗口
public static <TR> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize)
// 示例:将流元素分组为大小为3的窗口
List<List<Integer>> windows = Stream.of(1,2,3,4,5,6).gather(Gatherers.windowFixed(3)).toList(); // [[1,2,3], [4,5,6]]// 滑动窗口
public static <TR> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize)
// 示例:创建大小为2的滑动窗口
List<List<Integer>> windows = Stream.of(1,2,3,4,5).gather(Gatherers.windowSliding(2)).toList(); // [[1,2], [2,3], [3,4], [4,5]]

并发处理

// 并发映射
public static <T, R> Gatherer<T,?,R> mapConcurrent(int maxConcurrency,Function<? super T, ? extends R> mapper
)
// 示例:并发执行映射操作
List<String> results = Stream.of(1,2,3,4,5).gather(Gatherers.mapConcurrent(4, i -> "Result" + i)).toList();

增量处理

// 折叠操作
public static <T, R> Gatherer<T, ?, R> fold(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> folder
)
// 示例:将数字连接成字符串
Optional<String> result = Stream.of(1,2,3,4).gather(Gatherers.fold(() -> "", (str, num) -> str + num)).findFirst(); // "1234"// 前缀扫描
public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> scanner
)
// 示例:生成累积字符串
List<String> results = Stream.of(1,2,3,4).gather(Gatherers.scan(() -> "", (str, num) -> str + num)).toList(); // ["1", "12", "123", "1234"]

 

核心关联类

  1. Gatherer:基础接口,定义了流处理中间操作的契约
  2. Stream:通过 gather() 方法使用 Gatherer
  3. FutureTask:在并发映射中用于任务管理
  4. ArrayDeque:用于管理待处理的任务队列

交互关系

// Stream 与 Gatherer 的关系
Stream<T>.gather(Gatherer<T,A,R>) -> Stream<R>// Gatherer 组合关系
Gatherer<T,A,R>.andThen(Gatherer<R,B,S>) -> Gatherer<T,?,S>// 内部实现类关系
Gatherers.Composite<T,A,R,AA,RR> implements Gatherer<T,Object,RR>

最佳实践

  1. 窗口操作

    • 注意大窗口可能占用大量内存
    • 最后一个窗口可能小于指定大小
    • 窗口内容是不可修改的列表
  2. 并发处理

    • 合理设置 maxConcurrency,避免过度并发
    • 注意处理可能的异常情况
    • 操作会保持流的顺序性
  3. 性能优化

    • 对于简单转换,优先使用内置方法
    • 合理使用组合操作减少中间状态
    • 注意内存使用,特别是大窗口操作
  4. 错误处理

    • 并发操作中的异常会被包装为 RuntimeException
    • 确保转换函数是线程安全的
    • 注意处理中断情况

Gatherers

如果说 Gatherer 接口定义了“是什么”(what),那么 Gatherers 类就提供了“怎么用”(how)的最佳范例。Gatherers 是一个 final 的、拥有私有构造函数的工具类。它的核心职责是提供一系列预先实现好的、常用的 Gatherer 实例的静态工厂方法

public final class Gatherers {private Gatherers() { } // This class is not intended to be instantiated// Public built-in Gatherers and factory methods for them// ...
}

它的定位类似于 Collectors 或 Comparators,为开发者提供了开箱即用的强大功能,避免了用户为了一些常见场景而不得不手动实现复杂的 Gatherer 接口。

Gatherers 类提供了多个非常有用的静态方法,每个方法都返回一个配置好的 Gatherer。我们来分析几个关键的实现:

windowFixed(int windowSize) - 固定窗口

  • 功能: 将流中的元素按固定大小 windowSize 分组。最后一组可能小于 windowSize
  • 示例[1,2,3,4,5] 用 windowFixed(2) 处理后得到 [[1,2], [3,4], [5]]
  • 实现分析:

    Gatherers.java

    // ... existing code ...
    public static <TR> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize) {// ... 参数检查 ...class FixedWindow { // 状态类Object[] window;int at;FixedWindow() { /* 初始化 window 和 at */ }boolean integrate(TR element, Downstream<? super List<TR>> downstream) {window[at++] = element; // 元素存入数组if (at < windowSize) { // 窗口未满return true; // 继续} else { // 窗口已满// ... 创建新窗口,重置 at ...final var oldWindow = window;window = new Object[windowSize];at = 0;return downstream.push(SharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(oldWindow));}}void finish(Downstream<? super List<TR>> downstream) {if (at > 0 && !downstream.isRejecting()) { // 如果最后还有剩余元素// ... 创建一个精确大小的数组并拷贝 ...downstream.push(/* 推送最后一个窗口 */);}}}return Gatherer.<TR, FixedWindow, List<TR>>ofSequential(FixedWindow::new, // InitializerIntegrator.ofGreedy(FixedWindow::integrate), // IntegratorFixedWindow::finish // Finisher);
    }
    // ... existing code ...
    
    这个实现非常经典:
    1. 定义一个内部状态类 FixedWindow,包含一个数组 window 用于缓冲元素和一个计数器 at
    2. initializer 就是 FixedWindow::new
    3. integrator 将元素放入数组,当数组满了就 push 出去,并重置状态。
    4. finisher 负责处理流结束时可能还未满的最后一个窗口。
    5. 它被标记为 ofSequential,表示这个实现不支持并行。

SharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(lastWindow)

为什么不直接用 List.of 或 Arrays.asList? 这层包装是为了极致的性能和零拷贝(zero-copy)

我们先看看替代方案有什么问题:

  • Arrays.asList(lastWindow): 这个方法会创建一个 List 包装器,但它直接引用原始数组。如果你后续修改了这个数组,List 的内容也会变。在 Gatherer 的实现中,状态对象(如 FixedWindow)可能会复用其内部数组,这就可能导致已经 push 出去的 List 的内容被意外修改,这是非常危险的。
  • List.of(lastWindow): 这个方法会创建一个不可变的列表,这很好。但为了保证不可变性,它会创建一个内部数组的防御性拷贝(defensive copy)。也就是说,它会分配新内存并把 lastWindow 的所有元素复制过去。

在 windowFixed 这种需要频繁创建列表的场景下,每次都进行数组拷贝会带来显著的性能开销和内存压力。

SharedSecrets 是 JDK 内部的一个机制,用于在不同包之间安全地共享一些不希望公开为公共 API 的功能。jdk.internal.access 包就是专门为此设计的。

这里的 listFromTrustedArrayNullsAllowed(lastWindow) 方法,正如其名,做了以下事情:

  1. Trusted Array (受信任的数组): 它“信任”调用者,相信传入的 lastWindow 数组后续不会再被修改
  2. Zero-Copy: 基于这种信任,它直接使用这个数组作为内部存储来构造一个 List,而不进行任何拷贝
  3. Unmodifiable: 返回的 List 仍然是不可变的,任何修改操作都会抛出异常。

在 windowFixed 的实现中,lastWindow 是一个局部变量,创建后马上就用于生成 List 并被丢弃,它永远不会被再次修改。

// ... existing code ...void finish(Downstream<? super List<TR>> downstream) {if (at > 0 && !downstream.isRejecting()) {var lastWindow = new Object[at]; // 1. 创建一个新数组System.arraycopy(window, 0, lastWindow, 0, at); // 2. 拷贝内容window = null; // 3. 丢弃对旧状态数组的引用at = 0;downstream.push( // 4. 使用新数组创建 ListSharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(lastWindow));}}
// ... existing code ...

这个场景完美符合 listFromTrustedArrayNullsAllowed 的使用条件,因此可以用这种零拷贝的方式来最大化性能。

windowSliding(int windowSize) - 滑动窗口

  • 功能: 创建一个在流上滑动的窗口。每一步,窗口都向前滑动一个元素。
  • 示例[1,2,3,4] 用 windowSliding(3) 处理后得到 [[1,2,3], [2,3,4]]
  • 实现分析:
    // ... existing code ...
    public static <TR> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize) {// ...class SlidingWindow {// ...boolean integrate(TR element, Downstream<? super List<TR>> downstream) {window[at++] = element;if (at < windowSize) { // 窗口未满return true;} else { // 窗口已满final var oldWindow = window;final var newWindow = new Object[windowSize];// 关键:将旧窗口的后 n-1 个元素拷贝到新窗口的前 n-1 个位置System.arraycopy(oldWindow,1, newWindow, 0, windowSize - 1);window = newWindow; // 状态更新为新窗口at -= 1;return downstream.push(/* 推送旧窗口 */);}}// ...}// ...
    }
    // ... existing code ...
    
    与 windowFixed 类似,但 integrate 的逻辑更复杂。当窗口满了之后,它不是清空窗口,而是通过 System.arraycopy 将元素向前“滑动”,为下一个元素腾出位置。

Gatherers.fold - 最终的折叠

fold 方法的行为非常类似于 Stream.reduce,它将流中的所有元素聚合成一个最终结果

  • 目标: 对一个元素序列执行一个有状态的、顺序敏感的规约操作。
  • 核心场景: 当你需要实现的规约操作不满足 reduce 所需的“结合律”(associativity),或者说,当元素的处理顺序至关重要,无法通过并行的 combiner 函数来合并中间结果时,fold 就是最佳选择。
// ... existing code ...public static <T, R> Gatherer<T, ?, R> fold(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> folder) {Objects.requireNonNull(initial, "'initial' must not be null");Objects.requireNonNull(folder, "'folder' must not be null");class State {R value = initial.get();State() {}}return Gatherer.ofSequential(State::new,Integrator.ofGreedy((state, element, downstream) -> {state.value = folder.apply(state.value, element);return true;}),(state, downstream) -> downstream.push(state.value));}
// ... existing code ...
  • 参数:
    • initial: 一个 Supplier,提供累积的初始值。
    • folder: 一个 BiFunction,接收当前的累积值和下一个元素,并返回新的累积值。
  • State 类:
    • 非常简单,只有一个字段 value,用于在整个流的处理过程中保存当前的累积结果。它在 Gatherer 初始化时通过 initial.get() 获得初始值。
  • Integrator:
    • ofGreedy 表示它会贪婪地消费所有上游元素。
    • state.value = folder.apply(state.value, element);: 这是核心逻辑。每来一个 element,就用 folder 函数更新 state.value
    • return true;: 总是返回 true,表示它永远不会主动短路,会处理完所有元素。
    • downstream 参数虽然存在,但在这里并未使用,因为 fold 在处理过程中不产生任何中间结果。
  • Finisher:
    • (state, downstream) -> downstream.push(state.value): 这是 fold 的关键。只有当所有元素都被 integrator 处理完毕后,finisher 才会被调用。它只做一件事:将最终累积的 state.value 推送给下游。因此,fold 操作的输出流中最多只有一个元素。
  • Gatherer.ofSequential:
    • fold 被定义为顺序的(没有 combiner),这意味着在并行流中,它会由 Hybrid 策略执行,保证 integrator 是按元素顺序调用的。

Gatherers.scan - 增量的扫描(前缀和)

scan 方法与 fold 不同,它会为每一个输入元素都生成一个输出结果,这个结果是到当前元素为止的累积值。它也被称为“前缀和”(Prefix Sum)操作。

  • 目标: 创建一个流,其中每个元素都是原始流到该点为止的累积结果。
  • 核心场景: 当你需要观察一个累积过程的每一步中间状态时。例如,计算账户余额随每一笔交易的变化,或者在时间序列数据中计算移动平均值的每一步。
// ... existing code ...public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> scanner) {Objects.requireNonNull(initial, "'initial' must not be null");Objects.requireNonNull(scanner, "'scanner' must not be null");class State {R current = initial.get();boolean integrate(T element, Downstream<? super R> downstream) {return downstream.push(current = scanner.apply(current, element));}}return Gatherer.ofSequential(State::new,Integrator.<State,T, R>ofGreedy(State::integrate));}
// ... existing code ...
  • 参数: 与 fold 完全相同。
  • State 类:
    • 同样只有一个字段 current,用于保存当前的累积值。
  • Integrator:
    • return downstream.push(current = scanner.apply(current, element));: 这是与 fold 的根本区别
    • 每当一个新 element 到来:
      1. current = scanner.apply(current, element): 首先,使用 scanner 函数计算出新的累积值,并更新 state.current
      2. downstream.push(...): 然后,立即将这个新计算出的累积值推送给下游。
    • 因此,每处理一个输入元素,就会产生一个输出元素。
  • Finisher:
    • scan 方法没有提供 finisher。因为所有的结果都在 integrator 中被实时推送了,当流结束时,没有额外的工作需要做。

mapConcurrent(...) - 虚拟线程并发映射

    public static <T, R> Gatherer<T,?,R> mapConcurrent(final int maxConcurrency,final Function<? super T, ? extends R> mapper) 

mapConcurrent 是一个中间操作,它允许以受控的并发级别,对流中的每个元素应用一个函数。它主要用于 I/O 密集型或其它会引起阻塞的操作(例如,发起网络请求、查询数据库),通过利用虚拟线程(Virtual Threads)来显著提高吞吐量,同时保持流元素的原始顺序。

  • 解决的问题: 标准的 Stream.map 是串行的。在并行流中,map 虽然并行,但使用的是重量级的平台线程(Platform Threads),这对于成千上万个会长时间阻塞的 I/O 任务来说,资源开销巨大且效率低下。
  • 核心武器虚拟线程。虚拟线程是轻量级的,可以大量创建而不会耗尽系统资源,非常适合用于执行 I/O 阻塞任务。
  • 设计要点:
    1. 并发执行: 对每个元素应用的 mapper 函数都在一个独立的虚拟线程中并发执行。
    2. 控制并发度: 通过 maxConcurrency 参数限制同时“在途”的任务数量,防止下游系统过载。
    3. 保持顺序: 尽管 mapper 函数是并发执行且完成时间不确定,但其结果必须按照原始流的顺序推送到下游。
    4. 支持短路: 当下游不再需要元素时(例如 findFirst()),能够尽力取消正在进行和等待中的任务。

mapConcurrent 的实现非常精巧,主要由三个部分构成:MapConcurrentTask(任务封装)、State(状态管理与核心逻辑)以及最终的 Gatherer 构建。

MapConcurrentTask - 任务的封装

// ... existing code ...final class MapConcurrentTask extends FutureTask<R> {final Thread thread;private MapConcurrentTask(Callable<R> callable) {super(callable);this.thread = Thread.ofVirtual().unstarted(this);}}
// ... existing code ...
  • extends FutureTask<R>: 每个任务都是一个 FutureTask,这使得我们可以异步地获取其执行结果(get()),检查其状态(isDone()),以及取消它(cancel())。
  • final Thread thread: 每个 FutureTask 都关联一个专属的虚拟线程。
  • Thread.ofVirtual().unstarted(this): 这是关键。它创建了一个虚拟线程,但并不立即启动。线程的 Runnable 目标就是 this,即 FutureTask 自身(因为 FutureTask 实现了 Runnable)。当调用 thread.start() 时,这个虚拟线程就会开始执行 FutureTask 的 run() 方法,进而执行我们传入的 mapper 函数。

State - 核心逻辑与状态机

这个 State 类是 mapConcurrent 这个 Gatherer (收集器) 的核心,它封装了所有用于实现并发处理的状态和逻辑。mapConcurrent 的目标是接收上游的元素,然后并发地将一个 mapper 函数应用在每个元素上,最后将结果按顺序推送到下游。State 类就是这个过程中的“总调度室”。

State 类的主要职责是:

  1. 任务管理: 接收上游流(Stream)中的元素。
  2. 并发执行: 为每个元素创建一个独立的任务(使用虚拟线程),并启动它来执行 mapper 函数。
  3. 状态跟踪: 维护一个正在处理中的任务队列(work-in-progress)。
  4. 结果回写: 按顺序从完成的任务中获取结果,并将结果推送到下游。
  5. 异常与清理: 妥善处理执行过程中的异常,并确保所有启动的线程最终都能被清理。

成员变量 (Fields)

// ... existing code ...final class State {private final ArrayDeque<MapConcurrentTask> wip =new ArrayDeque<>(Math.min(maxConcurrency, 16));// ... existing code ...

State 类只有一个成员变量 wip

  • private final ArrayDeque<MapConcurrentTask> wip:
    • wip 是 "work-in-progress" 的缩写,意为“正在进行中的工作”。
    • 它是一个 ArrayDeque(一种双端队列),用于存储所有已经启动但可能尚未完成的并发任务。
    • MapConcurrentTask 是 FutureTask 的一个内部子类,它包装了 mapper.apply(element) 这个调用。每个任务都关联一个虚拟线程。
    • 队列的初始容量被限制在 maxConcurrency 和 16 之间取较小值,这只是一个小的性能优化,ArrayDeque 可以根据需要自动扩容。
    • 关键点: 使用队列(Queue)这种数据结构保证了任务的先进先出(FIFO)顺序。这意味着元素是按它们从上游到达的顺序被处理和向下游推送的,从而维持了流的有序性

integrate 方法:接收元素并发起任务

// ... existing code ...boolean integrate(T element, Downstream<? super R> downstream) {// Prepare the next task and add it to the work-in-progressfinal var task = new MapConcurrentTask(() -> mapper.apply(element));wip.addLast(task);assert wip.peekLast() == task;assert wip.size() <= maxConcurrency;// Start the next tasktask.thread.start();// Flush at least 1 element if we're at capacityreturn flush(wip.size() < maxConcurrency ? 0 : 1, downstream);}
// ... existing code ...

integrate 方法是 Gatherer 的 integrator (整合器) 的具体实现。每当上游有一个新元素 element 准备好被处理时,这个方法就会被调用。

它的工作流程如下:

  1. 创建任务new MapConcurrentTask(() -> mapper.apply(element)) 创建一个新的 MapConcurrentTask。这个任务的核心工作就是调用用户提供的 mapper 函数来处理元素。
  2. 入队wip.addLast(task) 将新创建的任务添加到 wip 队列的末尾。
  3. 启动线程task.thread.start() 立即启动与该任务关联的虚拟线程。这意味着 mapper 函数的执行会立刻在一个新的虚拟线程中开始,而 integrate 方法本身不会等待它完成,从而实现了并发。
  4. 调用 flush (冲刷/刷新): 这是实现背压 (back-pressure) 的关键。
    • wip.size() < maxConcurrency: 检查当前正在处理的任务数是否已经达到了用户设定的最大并发数 maxConcurrency
    • 如果 wip.size() 小于 maxConcurrency,则调用 flush(0, downstream)atLeastN 参数为 0 意味着 flush 方法会尝试“非阻塞地”冲刷掉所有已经完成的任务,但如果队列头部的任务还没完成,它不会等待,会立刻返回。
    • 如果 wip.size() 达到了 maxConcurrency,则调用 flush(1, downstream)atLeastN 参数为 1 意味着 flush 方法必须等待并冲刷掉至少一个任务的结果后才能返回。这起到了一个限流的作用:在处理能力达到上限时,它会阻塞上游,直到至少一个并发任务完成,腾出“空位”后,才允许上游继续提供新元素。

flush 方法:获取结果并推向下游

// ... existing code ...boolean flush(long atLeastN, Downstream<? super R> downstream) {boolean success = false, interrupted = false;try {boolean proceed = !downstream.isRejecting();MapConcurrentTask current;while (proceed&& (current = wip.peekFirst()) != null&& (current.isDone() || atLeastN > 0)) {R result;// Ensure that the task is done before proceedingfor (;;) {try {result = current.get();break;} catch (InterruptedException ie) {interrupted = true; // ignore for now, and restore later}}proceed &= downstream.push(result);atLeastN -= 1;final var correctRemoval = wip.pollFirst() == current;assert correctRemoval;}return (success = proceed); // Ensure that cleanup occurs if needed} catch (ExecutionException e) {
// ... existing code ...} finally {
// ... existing code ...}}
// ... existing code ...

flush 方法是 State 的另一个核心,负责从 wip 队列中取出已完成的任务,获取其结果,并将其推送到下游。

它的工作流程如下:

  1. 循环检查队首任务while 循环持续检查 wip 队列的头部。
    • proceed = !downstream.isRejecting(): 首先检查下游是否还能接收元素。
    • current = wip.peekFirst(): 查看队首的任务,但不移除它。
    • current.isDone() || atLeastN > 0: 这是循环能否执行的关键条件。它表示:
      • 如果队首任务已经完成 (isDone()),则可以处理它。
      • 或者,如果 atLeastN > 0,意味着调用者(integrate 或 finisher)要求必须冲刷掉至少 atLeastN 个元素。在这种情况下,即使队首任务尚未完成,循环也会继续,并在内部通过 current.get() 阻塞等待它完成。
  2. 获取结果:
    • result = current.get(): 调用 FutureTask 的 get() 方法来获取任务的执行结果。这个调用是阻塞的,如果任务还没执行完,它会一直等待直到任务完成。
    • InterruptedException 处理:如果在 get() 等待期间线程被中断,它会捕获 InterruptedException,设置 interrupted 标志位,然后继续尝试 get()。中断状态会在 finally 块中被恢复。
  3. 推向下游proceed &= downstream.push(result) 将获取到的结果 result 推送给下游。如果下游拒绝接收(例如,对于 findFirst 这样的短路操作),push 会返回 falseproceed 也将变为 false,从而终止循环。
  4. 出队wip.pollFirst(): 成功处理完一个任务后,将其从队列中移除。
  5. 异常处理与清理 (finally):
    • catch (ExecutionException e): 如果任务在执行 mapper 时抛出异常,get() 会将其包装在 ExecutionException 中抛出。这里会解开包装,将原始的 RuntimeException 重新抛出。
    • finally 块确保了健壮性。如果 flush 因为下游拒绝或异常而提前退出 (!success),它会负责清理所有仍在 wip 队列中的任务:
      1. task.cancel(true): 向所有未完成的任务发送取消信号(中断它们的执行线程)。
      2. next.thread.join(): 等待所有任务的线程都真正终止,防止产生僵尸线程。
      3. Thread.currentThread().interrupt(): 如果在执行过程中发生了中断,就在最后恢复当前线程的中断状态,这是良好的并发编程实践。

Gatherer 的构造

// ... existing code ...return Gatherer.ofSequential(State::new,Integrator.<State, T, R>ofGreedy(State::integrate),(state, downstream) -> state.flush(Long.MAX_VALUE, downstream));
// ... existing code ...

最后,mapConcurrent 方法通过 Gatherer.ofSequential 将 State 类的各个部分组装成一个完整的 Gatherer

  • State::newInitializer (初始化器)。当流开始处理时,会调用它来创建一个新的 State 实例。
  • State::integrateIntegrator (整合器)。如前所述,每当有新元素时,会调用 state.integrate(...)ofGreedy 表示这个整合器会尽可能多地处理元素,直到下游的 push 返回 false
  • (state, downstream) -> state.flush(Long.MAX_VALUE, downstream)Finisher (完成器)。当上游所有元素都处理完毕后,会调用这个 finisher。它调用 flush 并传入 Long.MAX_VALUE,这会强制 flush 方法等待并冲刷掉 wip 队列中所有剩余的任务,确保没有任何结果丢失。

    总结

    Gatherers.mapConcurrent 是一个高度优化的并发工具。它巧妙地将虚拟线程、FutureTask 和 Gatherer 的状态模型结合在一起,实现了:

    • 高并发:利用虚拟线程处理阻塞操作。
    • 顺序保留:通过一个先进先出的工作队列和阻塞式的 get() 调用来保证输出顺序。
    • 资源可控:通过 maxConcurrency 参数实现背压,防止资源耗尽。
    • 健壮性:具备完善的异常处理和任务取消机制。

    它完美地展示了 Gatherer 接口的强大扩展能力,使得开发者可以在 Stream API 内部实现复杂的、有状态的、甚至是并发的流处理逻辑。

    enum Value 

    enum Value 的核心目标是为 Gatherer 的各个函数式接口(initializerfinishercombiner)提供一个统一的、默认的、无操作的(no-op)实现

    它的存在主要是为了解决以下几个问题:

    1. 避免使用 null: 在 API 设计中,使用 null 来表示“不存在”或“未提供”某个功能,常常会导致代码中充斥着繁琐的 null 检查,并且是 NullPointerException 的主要来源。Value 枚举通过提供一个具体的对象实例来代表“默认行为”,从而完全避免了 null 的使用。
    2. 提供可识别的默认值: 当 Gatherer 的实现者没有提供某个功能(例如,一个无状态的 Gatherer 不需要 initializer),API 内部可以使用 Value.DEFAULT 这个唯一的、可识别的“哨兵值”(Sentinel Value)来填充。
    3. 提升性能和代码简洁性: API 内部可以通过简单的引用比较(例如 if (finisher == Value.DEFAULT)) 来判断是否需要执行某个操作,这比调用一个空的 lambda 表达式更高效,也让代码意图更清晰。

    这种设计是软件工程中空对象模式(Null Object Pattern) 的一个经典应用。


    代码逐段分析

    a. 枚举定义与单例实例
    // ... existing code ...enum Value implements Supplier, BinaryOperator, BiConsumer {DEFAULT;
    // ... existing code ...
    
    • enum Value: 将 Value 定义为一个枚举类型。
    • implements Supplier, BinaryOperator, BiConsumer: 它同时实现了 Gatherer 所需的三个核心函数式接口。这意味着 Value 的实例本身就可以被当作这三种接口的任意一种来使用。
    • DEFAULT;: 这是该枚举的唯一实例。利用 Java 枚举的特性,DEFAULT 是一个天然的、线程安全的单例。在整个 JDK 中,所有需要“默认无操作行为”的地方,都会引用这同一个 Value.DEFAULT 对象。
    b. 特殊的 statelessCombiner
    // ... existing code ...final BinaryOperator<Void> statelessCombiner = new BinaryOperator<>() {@Override public Void apply(Void left, Void right) { return null; }};
    // ... existing code ...
    
    • 这是一个非常特殊的字段。它是一个 BinaryOperator<Void>,专门用于无状态(stateless) 的 Gatherer
    • 无状态的 Gatherer 其状态容器(State/Accumulator)的类型是 Void,在实践中通常用 null 来表示。当并行执行流时,需要合并(combine)两个分支的状态。合并两个 null 状态的结果,自然还是 null
    • 所以,statelessCombiner 的 apply 方法逻辑非常简单:接收两个 Void 类型的参数(实际上都会是 null),然后返回 null。这为无状态的 Gatherer 提供了一个正确且可执行的 combiner
    c. 接口方法的默认实现
    // ... existing code ...// BiConsumer@Override public void accept(Object state, Object downstream) {}// BinaryOperator@Override public Object apply(Object left, Object right) {throw new UnsupportedOperationException("This combiner cannot be used!");}// Supplier@Override public Object get() { return null; }
    // ... existing code ...
    

    这里是 Value 作为三个接口的实例时,其方法的具体实现:

    • accept(Object state, Object downstream): 这是 BiConsumer 接口的方法。它的实现是一个空方法体。当 Value.DEFAULT 被用作 finisher (完成器) 时,表示在流的末尾不需要执行任何最终操作。调用它不会产生任何效果。
    • get(): 这是 Supplier 接口的方法。它的实现是返回 null。当 Value.DEFAULT 被用作 initializer (初始化器) 时,表示这是一个无状态的 Gatherer,其初始状态就是 null
    • apply(Object left, Object right): 这是 BinaryOperator 接口的方法。它的实现是直接抛出 UnsupportedOperationException。这非常关键,它传达了一个明确的信息:这个默认的 combiner 是一个“虚拟”的,它不能被实际调用。这通常用于那些本身就不支持并行化(non-concurrent)的 Gatherer。如果有人错误地尝试并行化一个这样的 Gatherer,就会立刻得到一个清晰的错误,而不是潜在的数据不一致问题。
    d. 便捷的类型转换工厂方法
    // ... existing code ...@ForceInline@SuppressWarnings("unchecked")<A> Supplier<A> initializer() { return (Supplier<A>)this; }@ForceInline@SuppressWarnings("unchecked")<T> BinaryOperator<T> combiner() { return (BinaryOperator<T>) this; }@ForceInline@SuppressWarnings("unchecked")<T, R> BiConsumer<T, Gatherer.Downstream<? super R>> finisher() {return (BiConsumer<T, Downstream<? super R>>) this;}
    // ... existing code ...
    

    这三个方法是 Value 枚举的点睛之笔。它们是便捷的工厂方法,用于获取类型安全的默认实例。

    • 作用: 它们都只做一件事——返回 this(即 Value.DEFAULT 对象),但通过泛型和强制类型转换,将其“伪装”成调用者所期望的、带有正确泛型参数的函数式接口。
    • 示例: 当 Gatherer.of(...) 工厂方法需要一个默认的 initializer 时,它不需要自己写 () -> null,而是可以直接调用 Value.DEFAULT.initializer()。这样做既简洁,又能从编译器那里获得正确的类型 Supplier<A>
    • @ForceInline: 这个注解是给 JIT (Just-In-Time) 编译器的提示,建议它将这些方法的调用进行内联。因为方法体极其简单,内联后几乎没有额外的方法调用开销,性能极高。
    • @SuppressWarnings("unchecked"): 因为这里涉及到底层实现中的强制类型转换,所以用这个注解来抑制编译器关于“未经检查的转换”的警告。这是在明确知道类型安全可以由外部逻辑保证的情况下,一种常见的底层库编程技巧。

    总结

    enum Value 是 Gatherer API 内部一个优雅而高效的实现细节。它通过一个单例枚举实例,完美地扮演了“空对象”的角色,为 Gatherer 的 initializerfinisher 和 combiner 提供了统一、安全、高效的默认行为。

    它的设计哲学体现了现代 Java API 的优秀实践:

    • 用特定对象代替 null,增强代码的健壮性。
    • 利用接口和泛型,提供类型安全且灵活的 API。
    • 提供可识别的哨兵值,允许进行高效的内部优化。

    record GathererImpl<T, A, R> 

    GathererImpl 的设计目标非常纯粹:作为一个简单、透明、不可变的数据载体(Data Carrier),用于封装定义一个 Gatherer 所需的全部四个核心函数。

    它利用了 Java 16 引入的 record(记录)特性,该特性旨在简化用于“纯粹数据聚合”的类的创建。

    a. record 定义与泛型参数
    // ... existing code ...record GathererImpl<T, A, R>(@Override Supplier<A> initializer,@Override Integrator<A, T, R> integrator,@Override BinaryOperator<A> combiner,@Override BiConsumer<A, Downstream<? super R>> finisher) implements Gatherer<T, A, R> {
    // ... existing code ...
    
    • record GathererImpl<T, A, R>:

      • record 关键字告诉编译器,这是一个记录类。编译器会自动为它生成:
        1. 一个接收所有组件(initializerintegrator 等)的全参构造函数
        2. 为每个组件生成一个同名的公共访问方法(例如 public Supplier<A> initializer())。
        3. equals()hashCode() 和 toString() 方法的默认实现。
      • 这使得 GathererImpl 成为一个天然的、简洁的、不可变的(因为其组件都是 final 的)数据聚合对象。
    • 泛型参数:

      • T: The type of input elements. (输入元素的类型)
      • A: The type of the mutable accumulator (or state). (可变累加器/状态的类型)
      • R: The type of result elements. (输出/结果元素的类型) 这三个泛型参数与 Gatherer 接口的定义完全一致,贯穿了整个操作。
    • implements Gatherer<T, A, R>:

      • 这明确表示 GathererImpl 是 Gatherer 接口的一个具体实现。
      • 因为 record 自动生成的访问器方法(initializer()integrator() 等)与 Gatherer 接口中定义的方法签名完全匹配,所以 GathererImpl 天然地满足了 Gatherer 接口的契约。
    b. 四个核心组件

    GathererImpl 的主体就是它的四个组件,它们是定义一个 Gatherer 行为的“四要素”:

    1. Supplier<A> initializer (初始化器):

      • 职责: 创建并返回一个新的、可变的累加器(状态容器)。
      • 调用时机: 在流处理开始时,或者在并行处理中为每个子任务开始时调用。
      • 示例: 对于 toList()initializer 就是 () -> new ArrayList<>()
    2. Integrator<A, T, R> integrator (整合器):

      • 职责: 这是核心处理逻辑。它定义了如何将一个输入元素 T 合并到累加器 A 中,并有选择地将零个或多个结果 R 推送到下游。
      • 调用时机: 对于上游流中的每一个元素,都会调用此方法。
      • 示例: 对于 map(f)integrator 的逻辑就是 (state, element, downstream) -> downstream.push(f.apply(element))
    3. BinaryOperator<A> combiner (合并器):

      • 职责: 在并行流(parallel stream)中,定义如何将两个并行的子任务产生的累加器 A 合并成一个。
      • 调用时机: 当并行流的各个分支需要汇总结果时调用。
      • 示例: 对于 toList()combiner 的逻辑就是 (list1, list2) -> { list1.addAll(list2); return list1; }
    4. BiConsumer<A, Downstream<? super R>> finisher (完成器):

      • 职责: 在所有输入元素都被整合(integrated)完毕后,执行最终的处理。这对于有状态的操作至关重要,例如,它可以在最后将累加器中剩余的元素全部推送到下游。
      • 调用时机: 当上游流的所有元素都处理完成后,在流的末尾调用一次。
      • 示例: 对于 windowFixed(size),当流结束时,如果累加器里还有未满一个窗口的元素,finisher 就会负责将这个最后的、不完整的窗口也推送出去。
    c. 静态工厂方法 of
    // ... existing code ...static <T, A, R> GathererImpl<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher) {return new GathererImpl<>(Objects.requireNonNull(initializer,"initializer"),Objects.requireNonNull(integrator, "integrator"),Objects.requireNonNull(combiner, "combiner"),Objects.requireNonNull(finisher, "finisher"));}
    // ... existing code ...
    
    • 职责: 这是创建 GathererImpl 实例的标准方式。它提供了一个清晰的、统一的构造入口。
    • Objects.requireNonNull(...): 这个工厂方法最重要的一个功能就是防御性编程。它对传入的每一个函数组件都进行了非空检查。
      • 这确保了任何一个 GathererImpl 实例内部的四个组件都绝对不会是 null
      • 这极大地简化了 JDK 内部处理 Gatherer 的代码,因为它们可以放心地调用 gatherer.initializer() 等方法,而无需担心 NullPointerException
      • 如果用户在构造 Gatherer 时忘记提供某个必要的函数(并且没有使用 Value.DEFAULT 这样的默认值),requireNonNull 会立即抛出异常,并带有明确的错误信息(如 "initializer"),这是一种“快速失败”(fail-fast)的设计原则,有助于及早发现和修复 bug。

    总结

    GathererImpl 可以被看作是 Gatherer 接口的一个标准实现模板。它本身没有任何复杂的逻辑,只是一个“容器”。

    • 角色: 它是一个不可变的元组(Tuple),忠实地持有定义 Gatherer 行为的四个函数。
    • 优点:
      • 简洁: 使用 record 大大减少了样板代码。
      • 不可变: 一旦创建,其行为(即它包含的四个函数)就固定了,这使得它在并发环境中是线程安全的。
      • 健壮: 通过 of 工厂方法的非空检查,保证了实例的有效性和内部状态的一致性。

    当你调用 Gatherer.of(...) 或 Gatherers 类中任何一个预定义的工厂方法(如 mapfilterwindowFixed 等)时,它们在内部创建并返回的,就是这样一个 GathererImpl 的实例,其中填充了实现特定操作所需的 initializerintegratorcombiner 和 finisher

    Composite

    Composite 类的唯一目标是:将两个 Gatherer 对象(我们称之为 left 和 right)串联起来,形成一个全新的、功能上等同于两者顺序执行的单一 Gatherer

    当用户写下 stream.gather(gatherer1.andThen(gatherer2)) 时,andThen 方法在内部就会创建一个 Composite 实例,left 是 gatherer1right 是 gatherer2。这个 Composite 对象本身也是一个 Gatherer,可以被 stream.gather() 方法直接使用。

    它实现了函数式编程中经典的函数组合(Function Composition)思想,但将其应用在了更复杂的 Gatherer 结构上。

    // ... existing code ...static final class Composite<T, A, R, AA, RR> implements Gatherer<T, Object, RR> {private final Gatherer<T, A, ? extends R> left;private final Gatherer<? super R, AA, ? extends RR> right;// FIXME change `impl` to a computed constant when availableprivate GathererImpl<T, Object, RR> impl;
    // ... existing code ...
    
    • implements Gatherer<T, Object, RR>: 这是理解 Composite 的关键。一个组合后的 Gatherer,其:

      • 输入类型 T: 与第一个 Gatherer (left) 的输入类型相同。
      • 输出类型 RR: 与第二个 Gatherer (right) 的输出类型相同。
      • 累加器类型 Object: 这是一个难点。因为 left 和 right 各有自己的累加器(类型分别为 A 和 AA),组合后的 Gatherer 需要同时管理这两个累加器。为了类型擦除和实现的方便,这里统一使用 Object 作为对外暴露的累加器类型,内部再通过一个包装对象来持有真正的 A 和 AA 实例。
    • 泛型参数:

      • Gatherer<T, A, R> left: 第一个操作。它接收 T 类型的输入,使用 A 类型的累加器,产生 R 类型的输出。
      • Gatherer<R, AA, RR> right: 第二个操作。它接收的输入类型 R 必须是 left 输出类型的超类,使用 AA 类型的累加器,最终产生 RR 类型的输出。
    • 成员变量:

      • left 和 right: 分别存储了要组合的两个 Gatherer
      • private GathererImpl<T, Object, RR> impl: 这是组合逻辑的核心。Composite 类本身只是一个外壳,它真正的 Gatherer 行为(即 initializerintegrator 等四个核心函数)是被**懒加载(lazily computed)**并存储在这个 impl 字段中的。FIXME 注释表明,开发者希望未来能用更现代的 Java 特性(如 lazy 关键字)来优化这个懒加载过程。

    构造与工厂方法

    // ... existing code ...static <T, A, R, AA, RR> Composite<T, A, R, AA, RR> of(Gatherer<T, A, ? extends R> left,Gatherer<? super R, AA, ? extends RR> right) {return new Composite<>(left, right);}private Composite(Gatherer<T, A, ? extends R> left,Gatherer<? super R, AA, ? extends RR> right) {this.left = left;this.right = right;}
    // ... existing code ...
    
    • of(...) 是一个标准的静态工厂方法,用于创建 Composite 实例。
    • 构造函数非常简单,只是将传入的 left 和 right Gatherer 存储起来。真正的组合逻辑被推迟到了第一次使用时。

    核心:impl() 懒加载方法

    // ... existing code ...@SuppressWarnings("unchecked")private GathererImpl<T, Object, RR> impl() {// ATTENTION: this method currently relies on a "benign" data-race// as it should deterministically produce the same result even if// initialized concurrently on different threads.var i = impl;return i != null? i: (impl = (GathererImpl<T, Object, RR>)impl(left, right));}
    // ... existing code ...
    

    这是 Composite 类中最复杂也最核心的方法。它负责在第一次被需要时,动态地构建出那个代表了组合行为的 GathererImpl

    • 懒加载模式:

      1. var i = impl;: 先读取一次 impl 字段。
      2. i != null ? i : ...: 如果 impl 已经计算过了(不为 null),就直接返回它。
      3. ... : (impl = ...): 如果 impl 还是 null,就调用另一个 impl(left, right) 静态方法(这是真正干活的方法)来计算出新的 GathererImpl,将其赋值给 impl 字段,然后返回。
    • “良性数据竞争” (Benign Data-Race):

      • ATTENTION 注释解释了这里的并发处理方式。这个懒加载没有使用 synchronized 或 volatile
      • 这意味着,如果在多线程环境下,多个线程同时调用 impl() 并且 impl 恰好是 null,那么 impl(left, right) 这个计算过程可能会被执行多次。
      • 但设计者认为这是“良性的”,因为 impl(left, right) 的计算是确定性的(deterministic)——无论它被计算多少次,对于相同的 left 和 right,其结果都是一个等价的 GathererImpl
      • 最终,多个线程中的一个会成功地将计算结果写入 impl 字段,其他线程的写入可能会覆盖它,但这没关系,因为覆盖物和被覆盖物是相同的。这是一种以牺牲一点点计算资源为代价,来避免锁开销的常见优化技巧。

    组合逻辑的实现(impl(left, right) 方法

    这个方法是 Gatherer 组合魔法的真正实现者,它负责将两个独立的 Gathererleft 和 right)“编织”成一个功能完备的、全新的 GathererImpl

    这个方法的代码很长,因为它包含了针对不同情况的优化路径。我们可以将其分为两个主要部分:特殊优化路径通用路径

    1. 准备工作:提取函数与特征检测

    // ... existing code ...static final <T, A, R, AA, RR> GathererImpl<T, ?, RR> impl(Gatherer<T, A, R> left, Gatherer<? super R, AA, RR> right) {final var leftInitializer = left.initializer();final var leftIntegrator = left.integrator();
    // ... existing code ...final var rightFinisher = right.finisher();final var leftStateless = leftInitializer == Gatherer.defaultInitializer();final var rightStateless = rightInitializer == Gatherer.defaultInitializer();final var leftGreedy = leftIntegrator instanceof Integrator.Greedy;final var rightGreedy = rightIntegrator instanceof Integrator.Greedy;
    // ... existing code ...
    

    在做任何事情之前,代码首先做了两件准备工作:

    1. 提取函数: 将 left 和 right Gatherer 的四个核心函数(initializerintegratorcombinerfinisher)分别提取到局部变量中。这可以提高代码可读性,并可能带来微小的性能提升(避免重复调用接口方法)。

    2. 特征检测: 它检查了 left 和 right 的几个重要特征:

      • leftStateless / rightStateless: 通过比较它们的 initializer 是否是默认的 initializer (Value.DEFAULT),来判断它们是否是无状态的 (stateless)。无状态的 Gatherer 不需要创建和维护状态对象。
      • leftGreedy / rightGreedy: 通过 instanceof 检查,判断它们的 integrator 是否是贪婪的 (greedy)。贪婪的 integrator 保证了它在处理一个元素时,要么向下游推送一个元素,要么不推送,但绝不会“短路”(即向下游返回 false 来终止流)。

    这些特征检测的结果,将决定接下来走哪条执行路径。


    2. 特殊优化路径:双重无状态与贪婪 (Stateless & Greedy Fast Path)

    // ... existing code ...if (leftStateless && rightStateless && leftGreedy && rightGreedy) {return new GathererImpl<>(Gatherer.defaultInitializer(),Gatherer.Integrator.ofGreedy((unused, element, downstream) ->leftIntegrator.integrate(null,element,r -> rightIntegrator.integrate(null, r, downstream))),
    // ... existing code ...);} else {
    // ... existing code ...
    

    这是为最理想情况设计的“快速通道”。当两个 Gatherer 都是无状态且都是贪婪的时(例如 map(...).andThen(filter(...))),代码会进行大幅优化。

    • initializer: 因为两者都无状态,所以组合后的 Gatherer 也是无状态的,直接使用默认的 initializer
    • integrator: 这是优化的核心。它创建了一个新的、也是贪婪的 integrator。其逻辑非常直接:
      (unused, element, downstream) ->leftIntegrator.integrate(null, // 因为 left 无状态,状态对象是 nullelement,// 这里是关键:leftIntegrator 的下游(downstream)// 被一个 lambda 表达式替代了。r -> rightIntegrator.integrate(null, // 因为 right 无状态,状态对象是 nullr,    // r 是 left 处理完 element 后的输出downstream // 最终的下游))
      
      这形成了一个极其高效的函数调用链。处理一个元素 element 的过程,就是简单地调用 leftIntegrator,其结果 r 又被立刻送入 rightIntegrator,最后的结果直接推送到真正的下游。整个过程没有任何状态管理、没有短路检查,开销极小。
    • combiner 和 finisher: 也做了相应的简化,因为没有状态,所以逻辑变得非常简单。

    3. 通用路径:有状态或非贪婪 (Stateful or Non-Greedy General Path)

    如果 left 或 right 中任何一个是有状态的非贪婪的,就必须走这条更复杂的通用路径。这条路径的核心是定义了一个新的内部类 State

    a. State 类:组合状态的容器
    // ... existing code ...} else {class State {final A leftState;final AA rightState;boolean leftProceed;boolean rightProceed;
    // ... existing code ...
    

    这个 State 类是组合后 Gatherer 的统一状态容器。它封装了:

    • final A leftStateleft Gatherer 的状态对象。
    • final AA rightStateright Gatherer 的状态对象。
    • boolean leftProceed / rightProceed: 用于跟踪 left 和 right integrator 是否已经短路。这是处理非贪婪 Gatherer 的关键。如果 left 短路了,就不应该再给它提供新元素。如果 right 短路了,left 也不应该再把处理结果推送给它。
    b. State 类的核心方法
    • State() (构造函数):

      • 负责初始化。它会检查 left 和 right 是否是无状态的,如果是,则其状态就是 null;否则,就调用它们各自的 initializer 来创建初始状态。
      • leftProceed 和 rightProceed 初始都为 true
    • integrate(T t, Downstream<? super RR> c):

      • 这是组合后的 integrator 的核心逻辑。
      • 它调用 leftIntegrator.integrate(...),但巧妙地将 left 的下游替换为了 this::rightIntegrate
      • 这意味着,left 处理完元素后,不会直接推送到最终下游,而是会调用 State 类的 rightIntegrate 方法。
      • 它还负责检查和更新 leftProceed 和 rightProceed 这两个短路标志位。
    • rightIntegrate(R r, Downstream<? super RR> downstream):

      • 这个方法充当了 left 和 right 之间的“管道”。
      • 当 left 产生一个中间结果 r 时,这个方法被调用。
      • 它会检查 right 是否还能继续(rightProceed),如果可以,就调用 rightIntegrator.integrate(rightState, r, downstream),将 r 送入 right Gatherer 进行处理,并将最终结果推送到下游。
    • finish(Downstream<? super RR> c):

      • 组合后的 finisher
      • 它会依次调用 left.finisher() 和 right.finisher()
      • 调用 left.finisher() 时,同样会将其下游重定向到 rightIntegrate,以确保 left 在结束时产生的任何剩余元素也能被 right 正确处理。
    c. 构建最终的 GathererImpl
    // ... existing code ...return new GathererImpl<T, State, RR>(State::new,(leftGreedy && rightGreedy)? Integrator.<State, T, RR>ofGreedy(State::integrate): Integrator.<State, T, RR>of(State::integrate),// ...State::finish);
    // ... existing code ...
    

    最后,通用路径会使用 State 类的方法引用来构建一个新的 GathererImpl

    • initializerState::new
    • integratorState::integrate (并根据 left 和 right 是否都为贪婪来决定组合后的 integrator 是否也为贪婪)
    • combinerState::joinLeft
    • finisherState::finish

    总结

    Composite.impl 静态方法是一个高度工程化的杰作。它展示了如何在提供一个通用、强大功能(组合 Gatherer)的同时,不牺牲性能。

    1. 它首先识别特殊情况:当两个 Gatherer 都是无状态和贪婪时,它会生成一个极度优化的、几乎没有额外开销的函数调用链。
    2. 然后提供通用解决方案:对于其他所有情况,它通过创建一个新的 State 类来封装两个 Gatherer 的状态和短路逻辑,确保即使在复杂的有状态和非贪婪场景下,组合行为也能正确无误地执行。

    这种“快速通道 + 通用后备”的设计模式,是高性能库中非常常见的技术,它确保了简单用例的性能最大化,同时保证了复杂用例的正确性。

    andThen

    提供了一种流畅的、函数式的方式来将两个 Gatherer 串联(compose)起来。如果说 Composite 类是实现 Gatherer 组合的“幕后黑手”,那么 andThen 方法就是暴露给开发者使用的、优雅的公开 API。

    它的核心思想是函数组合:创建一个新的 Gatherer,它会先用第一个 Gathererthis)处理输入流,然后将第一个 Gatherer 的输出作为第二个 Gathererthat)的输入,最终新 Gatherer 的输出是第二个 Gatherer 的输出。这完全等同于 g2(g1(input)) 的概念。

    1. andThen 方法源码

    这是 Gatherer 接口中 andThen 方法的实现:

    // ... existing code ...default <R_OUT> Gatherer<T, ?, R_OUT> andThen(Gatherer<? super R, ?, ? extends R_OUT> after) {Objects.requireNonNull(after);return new Composite<>(this, after);}// ... existing code ...
    

    可以看到,它的实现非常简洁:

    1. 非空检查:使用 Objects.requireNonNull(after) 确保传入的 Gatherer 不为 null
    2. 创建 Composite 实例:直接 new Composite<>(this, after),将当前的 Gatherer (this) 和传入的 Gatherer (after) 作为构造函数参数,创建一个 Composite 对象。Composite 类会处理所有复杂的组合逻辑,正如我们之前详细分析的那样。

    2. 泛型解析

    andThen 方法的泛型设计确保了类型安全,我们来解读一下:

    <R_OUT> Gatherer<T, ?, R_OUT> andThen(Gatherer<? super R, ?, ? extends R_OUT> after)
    
    • this:调用 andThen 的 Gatherer 实例,其类型是 Gatherer<T, A, R>
      • T: 它的输入元素类型。
      • R: 它的输出结果类型。
    • after: 作为参数传入的 Gatherer,其类型是 Gatherer<? super R, ?, ? extends R_OUT>
      • ? super Rafter 的输入类型必须是 this 输出类型 R 的超类(或者是 R 本身)。这保证了 this 的输出可以安全地作为 after 的输入。
      • ? extends R_OUTafter 的输出类型是 R_OUT 或其子类
    • 返回值Gatherer<T, ?, R_OUT>
      • T: 组合后 Gatherer 的输入类型与 this 的输入类型相同。
      • ?: 组合后 Gatherer 的中间累加器类型被隐藏了。这是因为 Composite 内部会管理两个 Gatherer 各自的累加器,这个复杂的内部状态对外部调用者是不可见的,用 ? 表示最为合适。
      • R_OUT: 组合后 Gatherer 的最终输出类型与 after 的输出类型相同。

    andThen 方法是 Gatherer API 可组合性的关键体现。它优雅地隐藏了 Composite 类的实现细节,为开发者提供了一个符合直觉的、链式调用的接口来构建复杂的流处理管道。

    通过 gatherer1.andThen(gatherer2).andThen(gatherer3) 这样的调用,开发者可以像拼接乐高积木一样,将简单、专一的 Gatherer 组合成功能强大的复合 Gatherer,这正是函数式编程和现代 API 设计所推崇的模式。

    总结

    Composite 类是 Gatherer API 可组合性的基石。它本身是一个轻量级的包装器,通过懒加载的方式,在需要时动态地将两个 Gatherer 的核心行为函数(四要素)进行精密的“编织”,生成一个新的、代表了两者串联行为的 GathererImpl

    它完美地展示了如何将复杂的操作分解为可组合的、独立的单元,并通过一个通用的组合器(Composite)将它们无缝地连接起来,这正是函数式和声明式编程强大威力的体现。

    总结

    Gatherers 类是 Stream.gather() 功能不可或缺的一部分。它不仅提供了多个即插即用的高效 Gatherer 实现,解决了窗口、扫描、并发映射等常见但复杂的流处理问题,而且其源代码本身就是学习如何正确、高效地实现 Gatherer 接口的最佳教材。通过分析 windowFixedscanmapConcurrent 等方法的实现,我们可以深刻理解 Gatherer 接口中状态、集成、收尾等概念的实际应用。

    http://www.dtcms.com/a/347647.html

    相关文章:

  1. 告别静态网页:我用Firefly AI + Spline,构建次世代交互式Web体验
  2. 学习Java24天
  3. React学习(十二)
  4. IDEA相关的设置和技巧
  5. C语言第十一章内存在数据中的存储
  6. Redis资料
  7. JAVA读取项目内的文件或图片
  8. springboot项目结构
  9. Axure:如何打开自定义操作界面
  10. 顺序表(ArrayList)
  11. 刷题日记0823
  12. [特殊字符] 数据库知识点总结(SQL Server 方向)
  13. MySQL:事务管理
  14. games101 作业0 环境搭建与熟悉线性代数库
  15. H264编解码过程简述
  16. 数据结构 -- 哈希表
  17. RAGFlow (一) 开发环境搭建
  18. imx6ull-驱动开发篇37——Linux MISC 驱动实验
  19. [机械结构设计-18]:Solidworks - 特征(Feature)是构成三维模型的基本单元,是设计意图的载体,也是参数化设计的核心。
  20. 深入剖析分布式事务的Java实现:从理论到Seata实战
  21. c语言中enum与#define的用法区别
  22. 算法题(189):食物链
  23. 如何利用数据库事务,来防止数据不一致的问题
  24. 云原生概述
  25. [e3nn] 归一化 | BatchNorm normalize2mom
  26. 自然语言处理——06 迁移学习(上)
  27. MATLAB实现CNN-LSTM-Attention 时序和空间特征结合-融合注意力机制混合神经网络模型的风速预测
  28. 云计算-K8s 运维:Python SDK 操作 Job/Deployment/Pod+RBAC 权限配置及自定义 Pod 调度器实战
  29. Kubernetes相关问题集(四)
  30. 「数据获取」《贵港统计年鉴》(2008-2023)(2016、2017缺失)(获取方式看绑定的资源)