当前位置: 首页 > news >正文

沧州企业网站微平台在哪里找

沧州企业网站,微平台在哪里找,网站制作公司怎样帮客户做优化,聊城经济技术开发区人才网gather() 是 Java 22 中引入的一个非常强大的新特性。Gatherer 是什么简单来说,Gatherer 是一个全新的、高度可定制的中间操作(Intermediate Operation)。它提供了一种比现有的 map, filter, flatMap 等操作更强大、更灵活的数据转换能力。Ga…

gather() 是 Java 22 中引入的一个非常强大的新特性。

Gatherer 是什么

简单来说,Gatherer 是一个全新的、高度可定制的中间操作(Intermediate Operation)。它提供了一种比现有的 mapfilterflatMap 等操作更强大、更灵活的数据转换能力。

Gatherer 允许你实现 任意复杂的、有状态的、多对多(M-to-N) 的元素转换逻辑。

想象一下,流中的元素像传送带上的物品一个个流过来,Gatherer 就像一个在传送带旁边的复杂工作站。这个工作站可以:

  • 查看一个或多个物品(元素),然后决定输出零个、一个或多个新物品。
    • map 只能一对一转换。
    • filter 只能一对一或一对零转换。
    • flatMap 是一对多,但是是无状态的。
  • 维持自己的内部状态(State)。 比如,它可以记住之前看过的元素,然后根据历史信息来处理当前元素。
    • 例如,实现 windowed(窗口化)操作,将流中每 N 个元素分为一组。这必须记住当前窗口里已经收集了多少个元素。
  • 在所有物品都处理完后,进行最终的清理或输出。
    • 例如,一个 windowed 操作,在流末尾,即使最后一个窗口没满,也需要把这个不完整的窗口输出。

Gatherer 的四个核心组件:

Gatherer 的行为由四个函数式接口定义,这让它具备了极高的灵活性:

  1. initializer()状态初始化器。在处理第一个元素前调用,用于创建初始状态对象(比如,一个空的列表用于存放窗口元素)。
  2. integrator()集成器/处理器。这是核心逻辑,每当一个新元素到来时,它会被调用。它接收当前状态、新元素,并决定是否更新状态、是否向下游输出新元素。
  3. combiner()并行合并器。在并行流中,用于合并不同线程上的状态对象。
  4. finisher()终结者。在所有元素处理完毕后调用,用于处理最终的状态,并可能输出最后的元素。

例子:Gatherers.windowFixed(3)

  • initializer: 创建一个空的、大小为3的内部缓冲区(状态)。
  • integrator: 每来一个元素,就放入缓冲区。如果缓冲区满了,就把这个缓冲区包装成一个 List 输出到下游,并清空缓冲区。
  • finisher: 当流结束时,如果缓冲区里还有元素(比如最后只剩1个或2个),就把这个不完整的缓冲区也包装成 List 输出。

所以,Gatherer 的目标就是提供一个统一的、强大的接口,让开发者可以实现几乎任何自定义的流转换逻辑,而不仅仅局限于 map/filter 等预设的操作。

使用场景

  • 元素分组(如窗口函数)
  • 去重连续相似元素
  • 增量累加函数(前缀扫描)
  • 增量重排序函数

jdk 24引入的Gathers工具类提供了一些默认实现,后面有分析

下面将展示如何使用 Gatherer 实现这些功能:

元素分组(窗口函数)

// 实现固定大小的窗口分组
public static <T> Gatherer<T, ?, List<T>> window(int size) {return Gatherer.of(() -> new ArrayList<T>(),  // 初始化状态Gatherer.Integrator.ofGreedy((state, element, downstream) -> {state.add(element);if (state.size() == size) {downstream.push(new ArrayList<>(state));state.clear();}return true;}),(left, right) -> {  // 合并器left.addAll(right);return left;},(state, downstream) -> {  // 完成器if (!state.isEmpty()) {downstream.push(new ArrayList<>(state));}});
}// 使用示例
List<List<Integer>> windows = Stream.of(1,2,3,4,5,6).gather(window(2)).toList();  // [[1,2], [3,4], [5,6]]

去重连续相似元素

// 去重连续相似元素
public static <T> Gatherer<T, ?, T> distinctConsecutive() {return Gatherer.of(() -> new Object() { T last; boolean hasLast = false; },Gatherer.Integrator.ofGreedy((state, element, downstream) -> {if (!state.hasLast || !state.last.equals(element)) {downstream.push(element);state.last = element;state.hasLast = true;}return true;}));
}// 使用示例
List<Integer> distinct = Stream.of(1,1,2,2,2,3,2,2).gather(distinctConsecutive()).toList();  // [1,2,3,2]

增量累加函数(前缀扫描)

// 前缀扫描实现
public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<R, T, R> scanner) {return Gatherer.ofSequential(() -> new Object() { R current = initial.get(); },Gatherer.Integrator.ofGreedy((state, element, downstream) -> {state.current = scanner.apply(state.current, element);return downstream.push(state.current);}));
}// 使用示例
List<String> result = Stream.of(1,2,3,4,5).gather(scan(() -> "", (str, num) -> str + num)).toList();  // ["1", "12", "123", "1234", "12345"]

增量重排序函数

// 实现增量排序(维护一个固定大小的有序窗口)
public static <T> Gatherer<T, ?, T> incrementalSort(int windowSize, Comparator<T> comparator) {return Gatherer.of(() -> new PriorityQueue<>(comparator),Gatherer.Integrator.ofGreedy((state, element, downstream) -> {state.offer(element);if (state.size() > windowSize) {downstream.push(state.poll());}return true;}),(left, right) -> {left.addAll(right);return left;},(state, downstream) -> {while (!state.isEmpty()) {downstream.push(state.poll());}});
}// 使用示例
List<Integer> sorted = Stream.of(5,3,1,4,2).gather(incrementalSort(3, Comparator.naturalOrder())).toList();  // [1,3,4,2,5]

关键点说明

  1. 状态管理

    • 使用 initializer() 创建合适的状态容器
    • 在 integrator() 中维护和更新状态
    • 在 finisher() 中处理剩余元素
  2. 性能考虑

    • 对于无状态操作使用 defaultInitializer()
    • 对于可并行操作提供有效的 combiner()
    • 使用 Greedy 优化非短路操作
  3. 组合使用

    • 可以使用 andThen() 组合多个 Gatherer
    • 预先组合的 Gatherer 比链式调用更高效

这些实现展示了 Gatherer 的强大功能,它能够优雅地处理各种流处理场景,同时保持代码的清晰和可维护性。

核心方法

// 初始化器 - 创建中间状态
default Supplier<A> initializer()// 集成器 - 处理输入元素
Integrator<A, T, R> integrator()// 组合器 - 合并两个状态
default BinaryOperator<A> combiner()// 完成器 - 执行最终操作
default BiConsumer<A, Downstream<? super R>> finisher()

组合操作

// 将多个 Gatherer 组合在一起
default <RR> Gatherer<T, ?, RR> andThen(Gatherer<? super R, ?, ? extends RR> that)

静态工厂方法

// 创建无状态顺序 Gatherer
static <T, R> Gatherer<T, Void, R> ofSequential(Integrator<Void, T, R> integrator)// 创建可并行化的 Gatherer
static <T, A, R> Gatherer<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher
)

 

核心关联类

  • Stream: 通过 gather(Gatherer) 方法使用 Gatherer
  • Gatherers: 提供常用 Gatherer 实现的工具类
  • Downstream: 接收 Gatherer 输出的下一阶段
  • Integrator: 处理元素集成的函数式接口

交互关系

// Stream 与 Gatherer 的关系
Stream<T>.gather(Gatherer<T,A,R>) -> Stream<R>// Gatherer 组合关系
Gatherer<T,A,R>.andThen(Gatherer<R,B,S>) -> Gatherer<T,?,S>

最佳实践

使用注意事项

  1. 状态管理

    • 无状态 Gatherer 使用 defaultInitializer()
    • 有状态 Gatherer 需正确实现状态初始化和合并
  2. 并行处理

    • 需要并行处理时提供有效的 combiner
    • 仅顺序处理时使用 defaultCombiner()
  3. 资源清理

    • 在 finisher 中执行必要的清理操作
    • 不需要清理时使用 defaultFinisher()
  4. 性能优化

    • 使用 Greedy Integrator 优化无短路操作
    • 合理使用预组合的 Gatherer 提高性能

示例代码

// 创建 map 操作的 Gatherer
public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {return Gatherer.of((unused, element, downstream) -> downstream.push(mapper.apply(element)));
}// 组合多个 Gatherer
Gatherer<Integer, ?, String> pipeline = map(i -> i + 1)          // 增1.andThen(map(i -> i.toString()));  // 转字符串


为什么需要 Gatherer.Downstream

Gatherer 的 integrator 和 finisher 需要一个回调机制来将它们处理后的结果(类型为 R)发送出去。它们不能直接访问下游的 Sink,因为这会破坏 Stream 的封装性。

Gatherer.Downstream 就是这个专门的回调接口,它充当了 Gatherer 内部逻辑和外部流水线之间的 “信使”。

// Gatherer 接口的一部分
interface Downstream<R> {boolean push(R r);// ...
}

GatherSink 同时实现了 Sink<T> 和 Gatherer.Downstream<R>,这让它成为了一个完美的“双面适配器”:

  1. 作为 Sink<T>:它面向上游,接收类型为 T 的原始元素。这是它作为标准 Stream 操作的一部分的职责。
  2. 作为 Gatherer.Downstream<R>:它面向 Gatherer 的内部逻辑 (integrator 和 finisher)。当 integrator 想要输出一个处理好的元素 R 时,它不直接和下游 Sink 对话,而是调用 downstream.push(r)

我们来看 GatherSink 的代码,就能清晰地看到这个转发过程:

// ... existing code ...static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {private final Sink<R> sink; // 这是真正的下游 Sink// ...// 当上游传来一个元素 T@Overridepublic void accept(T t) {// ...// 调用 integrator,并把 this (自己) 作为 Downstream 传进去proceed &= integrator.integrate(state, t, this);}// ...// 当 integrator 内部调用 downstream.push(r) 时,这个方法被触发@Overridepublic boolean push(R r) {var p = downstreamProceed;if (p)sink.accept(r); // 在这里,它把结果 r 转发给了真正的下游 sinkreturn !cancellationRequested(p);}}
// ... existing code ...
  • 解耦(Decoupling):将 Gatherer 的内部业务逻辑(如何处理数据)与 Stream 的流水线机制(如何传递数据)分离开。Gatherer 的作者不需要关心下游是什么,只需要知道有一个 Downstream 接口可以用来“推送”结果。
  • 控制(Control)Downstream 接口不仅有 push,还有 isRejecting 等方法。这给了 integrator 更多的控制权,比如在推送前检查下游是否还愿意接收数据,从而实现更复杂的短路(short-circuiting)逻辑。
  • 封装(Encapsulation)GatherSink 封装了所有复杂的逻辑,比如状态管理、短路信号的传递等,使得 Gatherer 的实现者可以专注于核心的转换算法,而不用处理 Stream 底层的复杂机制。

Gatherer 是一个强大的自定义流处理工具,而 Gatherer.Downstream 接口及其在 GatherSink 中的实现,是连接这个自定义工具和标准 Stream 流水线的、设计精巧的桥梁。

gather 具体例子

gather 的能力最好通过例子来理解。JDK 在 java.util.stream.Gatherers 这个工具类里提供了一些预置的实现,这些例子能很好地展示它的威力:

a. 窗口化 (Windowing)

想象一下,你想把一个数字流按每3个一组进行分组: Stream.of(1,2,3,4,5,6,7,8) -> [[1, 2, 3], [4, 5, 6], [7, 8]]

用传统操作很难做到,但用 gather 就非常简单:

// (代码来自 Gatherers.java 的 Javadoc 示例)
List<List<Integer>> windows =Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();

b. 滑动窗口 (Sliding Window)

或者,你想创建一个大小为2的滑动窗口: Stream.of(1,2,3,4,5,6,7,8) -> [[1, 2], [2, 3], [3, 4], [4, 5], ...]

同样,gather 也能轻松实现:

// (代码来自 Gatherers.java 的 Javadoc 示例)
List<List<Integer>> windows2 =Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();

c. 其他复杂操作

Gatherers 类还提供了 fold(类似 reduce 但严格串行)、scan(前缀和扫描)等很多有用的操作。

GathererOp 的角色

GathererOp 就是 gather() 操作在 Stream 流水线中的内部表示和执行引擎。

当调用 stream.gather(...) 时,Java 内部就会创建一个 GathererOp 实例,并将其链接到流操作链上。

// ... existing code ...
final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {@SuppressWarnings("unchecked")static <P_IN, P_OUT extends T, T, A, R> Stream<R> of(ReferencePipeline<P_IN, P_OUT> upstream,Gatherer<T, A, R> gatherer) {// ...if (upstream.getClass() == GathererOp.class) {// ... 融合优化 ...} else {return new GathererOp<>((ReferencePipeline<?, T>) upstream,gatherer);}}
// ... existing code ...

这个类非常复杂,因为它承担了所有脏活累活,包括:

  • 操作融合:它非常智能,如果发现连续两个 gather() 操作,或者 gather() 后面跟着 collect(),它会尝试将它们**融合(fuse) 成一个单一的操作来执行,极大地提升性能。
  • 串行与并行执行:它内部包含了处理串行流(GatherSink)和并行流(GathererTask,通过 evaluate 方法调度)的完整逻辑。
  • 状态管理和短路:它负责管理 Gatherer 的状态,并响应短路信号(当 integrator 返回 false 时)。

总结

  • gather() 是一个面向用户的功能,它为 Stream 提供了前所未有的灵活性,让你能定义任意复杂的有状态转换。
  • Gatherer 是一个接口,通过实现它来定义 gather() 的具体行为逻辑。
  • GathererOp 是 JDK 内部的实现细节,是驱动 Gatherer 逻辑在流中高效运行的复杂引擎。

Gatherer 接口实现分析

Gatherer<T, A, R> 接口定义了一个高度可定制的中间操作。它的核心语义是:“接收一个输入元素流(类型 T),通过一个可选的中间状态(类型 A),产生一个输出元素流(类型 R)”

与 map 或 filter 不同,Gatherer 不是简单的“一对一”或“一对零/一”的转换。它是一个 “多对多” 的转换器,可以:

  • 有状态(Stateful): 它可以维护一个在元素处理过程中不断变化的状态。
  • 缓冲(Buffering): 它可以接收多个输入元素后,才产生一个或多个输出。
  • 短路(Short-circuiting): 它可以提前终止流的处理。

你可以把它想象成一个与 Collector 类似但更强大的概念。Collector 是一个终端操作,它消费整个流并产生一个最终结果。而 Gatherer 是一个中间操作,它消费一个流并产生另一个流

 Gatherer 的四个核心组件

Gatherer 的行为由四个核心函数定义,这四个函数共同描述了整个转换过程。

// ... existing code ...
public interface Gatherer<T, A, R> {/*** A function that produces an instance of the intermediate state used for* this gathering operation.* ...*/default Supplier<A> initializer() { /*...*/ };/*** A function which integrates provided elements, potentially using* the provided intermediate state, optionally producing output to the* provided {@link Downstream}.* ...*/Integrator<A, T, R> integrator();/*** A function which accepts two intermediate states and combines them into* one.* ...*/default BinaryOperator<A> combiner() { /*...*/ }/*** A function which accepts the final intermediate state* and a {@link Downstream} object, allowing to perform a final action at* the end of input elements.* ...*/default BiConsumer<A, Downstream<? super R>> finisher() { /*...*/ }
// ... existing code ...

a. initializer(): 状态初始化器

  • 语义Supplier<A> initializer()

  • 作用: 创建并返回一个新的、可变的中间状态对象 A

  • 何时调用: 在处理一个新的数据分片(在并行流中)或整个流(在串行流中)的开始时调用。

  • 默认行为: 如果不提供,则表示这是一个无状态的 Gatherer,状态类型 A 通常为 Void

b. integrator(): 核心处理器

  • 语义Integrator<A, T, R> integrator()

  • 作用: 这是 Gatherer 的核心逻辑。它接收当前的状态 A、一个新的输入元素 T 和一个下游管道 Downstream。它负责:

    1. 根据输入元素 t 更新状态 state

    2. 决定是否要向下游 downstream 推送(push)零个、一个或多个结果 R

    3. 返回一个布尔值,true 表示继续处理,false 表示短路,请求上游停止发送数据。

  • 这是唯一必须提供的方法。

c. combiner(): 并行合并器

  • 语义BinaryOperator<A> combiner()

  • 作用: 在并行流中,当两个子任务都处理完它们的数据分片后,此方法被调用来合并它们各自的中间状态 A

  • 何时调用: 仅在并行流中调用。

  • 默认行为: 如果不提供,则表示该 Gatherer 不支持并行处理。

d. finisher(): 收尾处理器

  • 语义BiConsumer<A, Downstream<? super R>> finisher()
  • 作用: 在所有输入元素都被 integrator 处理完毕后调用。它接收最终的状态 A,允许你执行一些收尾工作,例如,将状态中缓冲的最后一个元素或最终计算结果推送到下游。
  • 何时调用: 在流的末尾调用一次。
  • 默认行为: 如果不提供,表示没有收尾操作。

Downstream 接口:向下游推送结果

Downstream 是 integrator 和 finisher 用来与下游流水线通信的桥梁。

// ... existing code ...@FunctionalInterfaceinterface Downstream<T> {/*** Pushes, if possible, the provided element downstream -- to the next* stage in the pipeline.* ...*/boolean push(T element);
// ... existing code ...

它的核心方法是 push(T element)。调用此方法会将一个结果元素 element 发送到下一个流操作。它也返回一个布尔值,如果下游也请求短路,它会返回 false

示例:用 Gatherer 实现 map

接口文档中的这个例子完美地诠释了 Gatherer 的语义:

// ... existing code ...* {@snippet lang = java:* public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {*     return Gatherer.of(*         (unused, element, downstream) -> // integrator*             downstream.push(mapper.apply(element))*     );* }* }
// ... existing code ...
  • 这是一个无状态的 Gatherer,所以 initializer 是默认的,状态 A 是 Void(由 unused 参数表示)。
  • 它的 integrator 对于每个到来的 element,直接调用 mapper 函数进行转换,然后通过 downstream.push() 将结果推送到下游。
  • 它不关心 push 的返回值,所以它会处理完所有元素(非短路)。
  • 它没有 combiner 和 finisher

Greedy接口

Gatherer.Integrator 接口的核心方法 integrate 返回一个 boolean 值,用于表示是否需要短路(short-circuit)true 表示继续,false 表示停止。

然而,在很多场景下,比如 windowFixed,我们知道 integrator 总是会处理完所有元素,永远不会提前返回 false。这种 integrator 被称为 “贪婪的”(Greedy) 。

Integrator.ofGreedy(...) 就是一个静态工厂方法,它做两件事:

  1. 它接收一个不返回布尔值的 BiConsumer(或者像这里的方法引用 FixedWindow::integrate,它的返回值会被忽略)。
  2. 它返回一个特殊的 Integrator 实现,这个实现被标记为 Integrator.Greedy 接口的实例。
// 在 Gatherer 接口内部
interface Integrator<A, T, R> {// ...@FunctionalInterfaceinterface Greedy<A, T, R> extends Integrator<A, T, R> {// ...}
}

为什么这个信号很重要?

Stream 的执行引擎(即  GathererOp)在处理流时,会检查 integrator 是否是 instanceof Integrator.Greedy

  • 如果是 Greedy:运行时就不需要在每次调用 integrate 后都去检查它的返回值。这可以消除一个分支判断,在处理大量元素的紧凑循环中,这种微小的优化会累积起来,带来可观的性能提升。
  • 如果不是 Greedy:运行时就必须在每次调用后检查返回值,以正确处理可能的短路情况。

所以,ofGreedy 的包装本质上是一种静态的性能提示,它告诉运行时:“放心执行,我这个 integrator 不会短路,你可以走更快的代码路径。”

总结

Gatherer 接口通过其四个核心组件(initializerintegratorcombinerfinisher)提供了一个强大而灵活的框架,用于定义复杂的、有状态的流转换。它的语义核心在于 integrator,它消费输入元素、更新状态,并通过 Downstream 按需产生输出,从而实现了远超传统流操作的强大功能。

Gatherers 类分析

Gatherers 是一个工具类,提供了常用的 Gatherer 实现和工厂方法,用于在流处理中执行窗口操作、并发映射、折叠和扫描等中间操作。

使用场景

  1. 窗口操作:将流元素分组为固定大小或滑动窗口
  2. 并发处理:使用虚拟线程并发执行映射操作
  3. 增量处理:执行折叠(fold)和前缀扫描(scan)操作
  4. 流转换:实现复杂的流到流的转换

窗口操作

// 固定大小窗口
public static <TR> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize)
// 示例:将流元素分组为大小为3的窗口
List<List<Integer>> windows = Stream.of(1,2,3,4,5,6).gather(Gatherers.windowFixed(3)).toList(); // [[1,2,3], [4,5,6]]// 滑动窗口
public static <TR> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize)
// 示例:创建大小为2的滑动窗口
List<List<Integer>> windows = Stream.of(1,2,3,4,5).gather(Gatherers.windowSliding(2)).toList(); // [[1,2], [2,3], [3,4], [4,5]]

并发处理

// 并发映射
public static <T, R> Gatherer<T,?,R> mapConcurrent(int maxConcurrency,Function<? super T, ? extends R> mapper
)
// 示例:并发执行映射操作
List<String> results = Stream.of(1,2,3,4,5).gather(Gatherers.mapConcurrent(4, i -> "Result" + i)).toList();

增量处理

// 折叠操作
public static <T, R> Gatherer<T, ?, R> fold(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> folder
)
// 示例:将数字连接成字符串
Optional<String> result = Stream.of(1,2,3,4).gather(Gatherers.fold(() -> "", (str, num) -> str + num)).findFirst(); // "1234"// 前缀扫描
public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> scanner
)
// 示例:生成累积字符串
List<String> results = Stream.of(1,2,3,4).gather(Gatherers.scan(() -> "", (str, num) -> str + num)).toList(); // ["1", "12", "123", "1234"]

 

核心关联类

  1. Gatherer:基础接口,定义了流处理中间操作的契约
  2. Stream:通过 gather() 方法使用 Gatherer
  3. FutureTask:在并发映射中用于任务管理
  4. ArrayDeque:用于管理待处理的任务队列

交互关系

// Stream 与 Gatherer 的关系
Stream<T>.gather(Gatherer<T,A,R>) -> Stream<R>// Gatherer 组合关系
Gatherer<T,A,R>.andThen(Gatherer<R,B,S>) -> Gatherer<T,?,S>// 内部实现类关系
Gatherers.Composite<T,A,R,AA,RR> implements Gatherer<T,Object,RR>

最佳实践

  1. 窗口操作

    • 注意大窗口可能占用大量内存
    • 最后一个窗口可能小于指定大小
    • 窗口内容是不可修改的列表
  2. 并发处理

    • 合理设置 maxConcurrency,避免过度并发
    • 注意处理可能的异常情况
    • 操作会保持流的顺序性
  3. 性能优化

    • 对于简单转换,优先使用内置方法
    • 合理使用组合操作减少中间状态
    • 注意内存使用,特别是大窗口操作
  4. 错误处理

    • 并发操作中的异常会被包装为 RuntimeException
    • 确保转换函数是线程安全的
    • 注意处理中断情况

Gatherers

如果说 Gatherer 接口定义了“是什么”(what),那么 Gatherers 类就提供了“怎么用”(how)的最佳范例。Gatherers 是一个 final 的、拥有私有构造函数的工具类。它的核心职责是提供一系列预先实现好的、常用的 Gatherer 实例的静态工厂方法

public final class Gatherers {private Gatherers() { } // This class is not intended to be instantiated// Public built-in Gatherers and factory methods for them// ...
}

它的定位类似于 Collectors 或 Comparators,为开发者提供了开箱即用的强大功能,避免了用户为了一些常见场景而不得不手动实现复杂的 Gatherer 接口。

Gatherers 类提供了多个非常有用的静态方法,每个方法都返回一个配置好的 Gatherer。我们来分析几个关键的实现:

windowFixed(int windowSize) - 固定窗口

  • 功能: 将流中的元素按固定大小 windowSize 分组。最后一组可能小于 windowSize
  • 示例[1,2,3,4,5] 用 windowFixed(2) 处理后得到 [[1,2], [3,4], [5]]
  • 实现分析:

    Gatherers.java

    // ... existing code ...
    public static <TR> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize) {// ... 参数检查 ...class FixedWindow { // 状态类Object[] window;int at;FixedWindow() { /* 初始化 window 和 at */ }boolean integrate(TR element, Downstream<? super List<TR>> downstream) {window[at++] = element; // 元素存入数组if (at < windowSize) { // 窗口未满return true; // 继续} else { // 窗口已满// ... 创建新窗口,重置 at ...final var oldWindow = window;window = new Object[windowSize];at = 0;return downstream.push(SharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(oldWindow));}}void finish(Downstream<? super List<TR>> downstream) {if (at > 0 && !downstream.isRejecting()) { // 如果最后还有剩余元素// ... 创建一个精确大小的数组并拷贝 ...downstream.push(/* 推送最后一个窗口 */);}}}return Gatherer.<TR, FixedWindow, List<TR>>ofSequential(FixedWindow::new, // InitializerIntegrator.ofGreedy(FixedWindow::integrate), // IntegratorFixedWindow::finish // Finisher);
    }
    // ... existing code ...
    
    这个实现非常经典:
    1. 定义一个内部状态类 FixedWindow,包含一个数组 window 用于缓冲元素和一个计数器 at
    2. initializer 就是 FixedWindow::new
    3. integrator 将元素放入数组,当数组满了就 push 出去,并重置状态。
    4. finisher 负责处理流结束时可能还未满的最后一个窗口。
    5. 它被标记为 ofSequential,表示这个实现不支持并行。

SharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(lastWindow)

为什么不直接用 List.of 或 Arrays.asList? 这层包装是为了极致的性能和零拷贝(zero-copy)

我们先看看替代方案有什么问题:

  • Arrays.asList(lastWindow): 这个方法会创建一个 List 包装器,但它直接引用原始数组。如果你后续修改了这个数组,List 的内容也会变。在 Gatherer 的实现中,状态对象(如 FixedWindow)可能会复用其内部数组,这就可能导致已经 push 出去的 List 的内容被意外修改,这是非常危险的。
  • List.of(lastWindow): 这个方法会创建一个不可变的列表,这很好。但为了保证不可变性,它会创建一个内部数组的防御性拷贝(defensive copy)。也就是说,它会分配新内存并把 lastWindow 的所有元素复制过去。

在 windowFixed 这种需要频繁创建列表的场景下,每次都进行数组拷贝会带来显著的性能开销和内存压力。

SharedSecrets 是 JDK 内部的一个机制,用于在不同包之间安全地共享一些不希望公开为公共 API 的功能。jdk.internal.access 包就是专门为此设计的。

这里的 listFromTrustedArrayNullsAllowed(lastWindow) 方法,正如其名,做了以下事情:

  1. Trusted Array (受信任的数组): 它“信任”调用者,相信传入的 lastWindow 数组后续不会再被修改
  2. Zero-Copy: 基于这种信任,它直接使用这个数组作为内部存储来构造一个 List,而不进行任何拷贝
  3. Unmodifiable: 返回的 List 仍然是不可变的,任何修改操作都会抛出异常。

在 windowFixed 的实现中,lastWindow 是一个局部变量,创建后马上就用于生成 List 并被丢弃,它永远不会被再次修改。

// ... existing code ...void finish(Downstream<? super List<TR>> downstream) {if (at > 0 && !downstream.isRejecting()) {var lastWindow = new Object[at]; // 1. 创建一个新数组System.arraycopy(window, 0, lastWindow, 0, at); // 2. 拷贝内容window = null; // 3. 丢弃对旧状态数组的引用at = 0;downstream.push( // 4. 使用新数组创建 ListSharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(lastWindow));}}
// ... existing code ...

这个场景完美符合 listFromTrustedArrayNullsAllowed 的使用条件,因此可以用这种零拷贝的方式来最大化性能。

windowSliding(int windowSize) - 滑动窗口

  • 功能: 创建一个在流上滑动的窗口。每一步,窗口都向前滑动一个元素。
  • 示例[1,2,3,4] 用 windowSliding(3) 处理后得到 [[1,2,3], [2,3,4]]
  • 实现分析:
    // ... existing code ...
    public static <TR> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize) {// ...class SlidingWindow {// ...boolean integrate(TR element, Downstream<? super List<TR>> downstream) {window[at++] = element;if (at < windowSize) { // 窗口未满return true;} else { // 窗口已满final var oldWindow = window;final var newWindow = new Object[windowSize];// 关键:将旧窗口的后 n-1 个元素拷贝到新窗口的前 n-1 个位置System.arraycopy(oldWindow,1, newWindow, 0, windowSize - 1);window = newWindow; // 状态更新为新窗口at -= 1;return downstream.push(/* 推送旧窗口 */);}}// ...}// ...
    }
    // ... existing code ...
    
    与 windowFixed 类似,但 integrate 的逻辑更复杂。当窗口满了之后,它不是清空窗口,而是通过 System.arraycopy 将元素向前“滑动”,为下一个元素腾出位置。

Gatherers.fold - 最终的折叠

fold 方法的行为非常类似于 Stream.reduce,它将流中的所有元素聚合成一个最终结果

  • 目标: 对一个元素序列执行一个有状态的、顺序敏感的规约操作。
  • 核心场景: 当你需要实现的规约操作不满足 reduce 所需的“结合律”(associativity),或者说,当元素的处理顺序至关重要,无法通过并行的 combiner 函数来合并中间结果时,fold 就是最佳选择。
// ... existing code ...public static <T, R> Gatherer<T, ?, R> fold(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> folder) {Objects.requireNonNull(initial, "'initial' must not be null");Objects.requireNonNull(folder, "'folder' must not be null");class State {R value = initial.get();State() {}}return Gatherer.ofSequential(State::new,Integrator.ofGreedy((state, element, downstream) -> {state.value = folder.apply(state.value, element);return true;}),(state, downstream) -> downstream.push(state.value));}
// ... existing code ...
  • 参数:
    • initial: 一个 Supplier,提供累积的初始值。
    • folder: 一个 BiFunction,接收当前的累积值和下一个元素,并返回新的累积值。
  • State 类:
    • 非常简单,只有一个字段 value,用于在整个流的处理过程中保存当前的累积结果。它在 Gatherer 初始化时通过 initial.get() 获得初始值。
  • Integrator:
    • ofGreedy 表示它会贪婪地消费所有上游元素。
    • state.value = folder.apply(state.value, element);: 这是核心逻辑。每来一个 element,就用 folder 函数更新 state.value
    • return true;: 总是返回 true,表示它永远不会主动短路,会处理完所有元素。
    • downstream 参数虽然存在,但在这里并未使用,因为 fold 在处理过程中不产生任何中间结果。
  • Finisher:
    • (state, downstream) -> downstream.push(state.value): 这是 fold 的关键。只有当所有元素都被 integrator 处理完毕后,finisher 才会被调用。它只做一件事:将最终累积的 state.value 推送给下游。因此,fold 操作的输出流中最多只有一个元素。
  • Gatherer.ofSequential:
    • fold 被定义为顺序的(没有 combiner),这意味着在并行流中,它会由 Hybrid 策略执行,保证 integrator 是按元素顺序调用的。

Gatherers.scan - 增量的扫描(前缀和)

scan 方法与 fold 不同,它会为每一个输入元素都生成一个输出结果,这个结果是到当前元素为止的累积值。它也被称为“前缀和”(Prefix Sum)操作。

  • 目标: 创建一个流,其中每个元素都是原始流到该点为止的累积结果。
  • 核心场景: 当你需要观察一个累积过程的每一步中间状态时。例如,计算账户余额随每一笔交易的变化,或者在时间序列数据中计算移动平均值的每一步。
// ... existing code ...public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> scanner) {Objects.requireNonNull(initial, "'initial' must not be null");Objects.requireNonNull(scanner, "'scanner' must not be null");class State {R current = initial.get();boolean integrate(T element, Downstream<? super R> downstream) {return downstream.push(current = scanner.apply(current, element));}}return Gatherer.ofSequential(State::new,Integrator.<State,T, R>ofGreedy(State::integrate));}
// ... existing code ...
  • 参数: 与 fold 完全相同。
  • State 类:
    • 同样只有一个字段 current,用于保存当前的累积值。
  • Integrator:
    • return downstream.push(current = scanner.apply(current, element));: 这是与 fold 的根本区别
    • 每当一个新 element 到来:
      1. current = scanner.apply(current, element): 首先,使用 scanner 函数计算出新的累积值,并更新 state.current
      2. downstream.push(...): 然后,立即将这个新计算出的累积值推送给下游。
    • 因此,每处理一个输入元素,就会产生一个输出元素。
  • Finisher:
    • scan 方法没有提供 finisher。因为所有的结果都在 integrator 中被实时推送了,当流结束时,没有额外的工作需要做。

mapConcurrent(...) - 虚拟线程并发映射

    public static <T, R> Gatherer<T,?,R> mapConcurrent(final int maxConcurrency,final Function<? super T, ? extends R> mapper) 

mapConcurrent 是一个中间操作,它允许以受控的并发级别,对流中的每个元素应用一个函数。它主要用于 I/O 密集型或其它会引起阻塞的操作(例如,发起网络请求、查询数据库),通过利用虚拟线程(Virtual Threads)来显著提高吞吐量,同时保持流元素的原始顺序。

  • 解决的问题: 标准的 Stream.map 是串行的。在并行流中,map 虽然并行,但使用的是重量级的平台线程(Platform Threads),这对于成千上万个会长时间阻塞的 I/O 任务来说,资源开销巨大且效率低下。
  • 核心武器虚拟线程。虚拟线程是轻量级的,可以大量创建而不会耗尽系统资源,非常适合用于执行 I/O 阻塞任务。
  • 设计要点:
    1. 并发执行: 对每个元素应用的 mapper 函数都在一个独立的虚拟线程中并发执行。
    2. 控制并发度: 通过 maxConcurrency 参数限制同时“在途”的任务数量,防止下游系统过载。
    3. 保持顺序: 尽管 mapper 函数是并发执行且完成时间不确定,但其结果必须按照原始流的顺序推送到下游。
    4. 支持短路: 当下游不再需要元素时(例如 findFirst()),能够尽力取消正在进行和等待中的任务。

mapConcurrent 的实现非常精巧,主要由三个部分构成:MapConcurrentTask(任务封装)、State(状态管理与核心逻辑)以及最终的 Gatherer 构建。

MapConcurrentTask - 任务的封装

// ... existing code ...final class MapConcurrentTask extends FutureTask<R> {final Thread thread;private MapConcurrentTask(Callable<R> callable) {super(callable);this.thread = Thread.ofVirtual().unstarted(this);}}
// ... existing code ...
  • extends FutureTask<R>: 每个任务都是一个 FutureTask,这使得我们可以异步地获取其执行结果(get()),检查其状态(isDone()),以及取消它(cancel())。
  • final Thread thread: 每个 FutureTask 都关联一个专属的虚拟线程。
  • Thread.ofVirtual().unstarted(this): 这是关键。它创建了一个虚拟线程,但并不立即启动。线程的 Runnable 目标就是 this,即 FutureTask 自身(因为 FutureTask 实现了 Runnable)。当调用 thread.start() 时,这个虚拟线程就会开始执行 FutureTask 的 run() 方法,进而执行我们传入的 mapper 函数。

State - 核心逻辑与状态机

这个 State 类是 mapConcurrent 这个 Gatherer (收集器) 的核心,它封装了所有用于实现并发处理的状态和逻辑。mapConcurrent 的目标是接收上游的元素,然后并发地将一个 mapper 函数应用在每个元素上,最后将结果按顺序推送到下游。State 类就是这个过程中的“总调度室”。

State 类的主要职责是:

  1. 任务管理: 接收上游流(Stream)中的元素。
  2. 并发执行: 为每个元素创建一个独立的任务(使用虚拟线程),并启动它来执行 mapper 函数。
  3. 状态跟踪: 维护一个正在处理中的任务队列(work-in-progress)。
  4. 结果回写: 按顺序从完成的任务中获取结果,并将结果推送到下游。
  5. 异常与清理: 妥善处理执行过程中的异常,并确保所有启动的线程最终都能被清理。

成员变量 (Fields)

// ... existing code ...final class State {private final ArrayDeque<MapConcurrentTask> wip =new ArrayDeque<>(Math.min(maxConcurrency, 16));// ... existing code ...

State 类只有一个成员变量 wip

  • private final ArrayDeque<MapConcurrentTask> wip:
    • wip 是 "work-in-progress" 的缩写,意为“正在进行中的工作”。
    • 它是一个 ArrayDeque(一种双端队列),用于存储所有已经启动但可能尚未完成的并发任务。
    • MapConcurrentTask 是 FutureTask 的一个内部子类,它包装了 mapper.apply(element) 这个调用。每个任务都关联一个虚拟线程。
    • 队列的初始容量被限制在 maxConcurrency 和 16 之间取较小值,这只是一个小的性能优化,ArrayDeque 可以根据需要自动扩容。
    • 关键点: 使用队列(Queue)这种数据结构保证了任务的先进先出(FIFO)顺序。这意味着元素是按它们从上游到达的顺序被处理和向下游推送的,从而维持了流的有序性

integrate 方法:接收元素并发起任务

// ... existing code ...boolean integrate(T element, Downstream<? super R> downstream) {// Prepare the next task and add it to the work-in-progressfinal var task = new MapConcurrentTask(() -> mapper.apply(element));wip.addLast(task);assert wip.peekLast() == task;assert wip.size() <= maxConcurrency;// Start the next tasktask.thread.start();// Flush at least 1 element if we're at capacityreturn flush(wip.size() < maxConcurrency ? 0 : 1, downstream);}
// ... existing code ...

integrate 方法是 Gatherer 的 integrator (整合器) 的具体实现。每当上游有一个新元素 element 准备好被处理时,这个方法就会被调用。

它的工作流程如下:

  1. 创建任务new MapConcurrentTask(() -> mapper.apply(element)) 创建一个新的 MapConcurrentTask。这个任务的核心工作就是调用用户提供的 mapper 函数来处理元素。
  2. 入队wip.addLast(task) 将新创建的任务添加到 wip 队列的末尾。
  3. 启动线程task.thread.start() 立即启动与该任务关联的虚拟线程。这意味着 mapper 函数的执行会立刻在一个新的虚拟线程中开始,而 integrate 方法本身不会等待它完成,从而实现了并发。
  4. 调用 flush (冲刷/刷新): 这是实现背压 (back-pressure) 的关键。
    • wip.size() < maxConcurrency: 检查当前正在处理的任务数是否已经达到了用户设定的最大并发数 maxConcurrency
    • 如果 wip.size() 小于 maxConcurrency,则调用 flush(0, downstream)atLeastN 参数为 0 意味着 flush 方法会尝试“非阻塞地”冲刷掉所有已经完成的任务,但如果队列头部的任务还没完成,它不会等待,会立刻返回。
    • 如果 wip.size() 达到了 maxConcurrency,则调用 flush(1, downstream)atLeastN 参数为 1 意味着 flush 方法必须等待并冲刷掉至少一个任务的结果后才能返回。这起到了一个限流的作用:在处理能力达到上限时,它会阻塞上游,直到至少一个并发任务完成,腾出“空位”后,才允许上游继续提供新元素。

flush 方法:获取结果并推向下游

// ... existing code ...boolean flush(long atLeastN, Downstream<? super R> downstream) {boolean success = false, interrupted = false;try {boolean proceed = !downstream.isRejecting();MapConcurrentTask current;while (proceed&& (current = wip.peekFirst()) != null&& (current.isDone() || atLeastN > 0)) {R result;// Ensure that the task is done before proceedingfor (;;) {try {result = current.get();break;} catch (InterruptedException ie) {interrupted = true; // ignore for now, and restore later}}proceed &= downstream.push(result);atLeastN -= 1;final var correctRemoval = wip.pollFirst() == current;assert correctRemoval;}return (success = proceed); // Ensure that cleanup occurs if needed} catch (ExecutionException e) {
// ... existing code ...} finally {
// ... existing code ...}}
// ... existing code ...

flush 方法是 State 的另一个核心,负责从 wip 队列中取出已完成的任务,获取其结果,并将其推送到下游。

它的工作流程如下:

  1. 循环检查队首任务while 循环持续检查 wip 队列的头部。
    • proceed = !downstream.isRejecting(): 首先检查下游是否还能接收元素。
    • current = wip.peekFirst(): 查看队首的任务,但不移除它。
    • current.isDone() || atLeastN > 0: 这是循环能否执行的关键条件。它表示:
      • 如果队首任务已经完成 (isDone()),则可以处理它。
      • 或者,如果 atLeastN > 0,意味着调用者(integrate 或 finisher)要求必须冲刷掉至少 atLeastN 个元素。在这种情况下,即使队首任务尚未完成,循环也会继续,并在内部通过 current.get() 阻塞等待它完成。
  2. 获取结果:
    • result = current.get(): 调用 FutureTask 的 get() 方法来获取任务的执行结果。这个调用是阻塞的,如果任务还没执行完,它会一直等待直到任务完成。
    • InterruptedException 处理:如果在 get() 等待期间线程被中断,它会捕获 InterruptedException,设置 interrupted 标志位,然后继续尝试 get()。中断状态会在 finally 块中被恢复。
  3. 推向下游proceed &= downstream.push(result) 将获取到的结果 result 推送给下游。如果下游拒绝接收(例如,对于 findFirst 这样的短路操作),push 会返回 falseproceed 也将变为 false,从而终止循环。
  4. 出队wip.pollFirst(): 成功处理完一个任务后,将其从队列中移除。
  5. 异常处理与清理 (finally):
    • catch (ExecutionException e): 如果任务在执行 mapper 时抛出异常,get() 会将其包装在 ExecutionException 中抛出。这里会解开包装,将原始的 RuntimeException 重新抛出。
    • finally 块确保了健壮性。如果 flush 因为下游拒绝或异常而提前退出 (!success),它会负责清理所有仍在 wip 队列中的任务:
      1. task.cancel(true): 向所有未完成的任务发送取消信号(中断它们的执行线程)。
      2. next.thread.join(): 等待所有任务的线程都真正终止,防止产生僵尸线程。
      3. Thread.currentThread().interrupt(): 如果在执行过程中发生了中断,就在最后恢复当前线程的中断状态,这是良好的并发编程实践。

Gatherer 的构造

// ... existing code ...return Gatherer.ofSequential(State::new,Integrator.<State, T, R>ofGreedy(State::integrate),(state, downstream) -> state.flush(Long.MAX_VALUE, downstream));
// ... existing code ...

最后,mapConcurrent 方法通过 Gatherer.ofSequential 将 State 类的各个部分组装成一个完整的 Gatherer

  • State::newInitializer (初始化器)。当流开始处理时,会调用它来创建一个新的 State 实例。
  • State::integrateIntegrator (整合器)。如前所述,每当有新元素时,会调用 state.integrate(...)ofGreedy 表示这个整合器会尽可能多地处理元素,直到下游的 push 返回 false
  • (state, downstream) -> state.flush(Long.MAX_VALUE, downstream)Finisher (完成器)。当上游所有元素都处理完毕后,会调用这个 finisher。它调用 flush 并传入 Long.MAX_VALUE,这会强制 flush 方法等待并冲刷掉 wip 队列中所有剩余的任务,确保没有任何结果丢失。

    总结

    Gatherers.mapConcurrent 是一个高度优化的并发工具。它巧妙地将虚拟线程、FutureTask 和 Gatherer 的状态模型结合在一起,实现了:

    • 高并发:利用虚拟线程处理阻塞操作。
    • 顺序保留:通过一个先进先出的工作队列和阻塞式的 get() 调用来保证输出顺序。
    • 资源可控:通过 maxConcurrency 参数实现背压,防止资源耗尽。
    • 健壮性:具备完善的异常处理和任务取消机制。

    它完美地展示了 Gatherer 接口的强大扩展能力,使得开发者可以在 Stream API 内部实现复杂的、有状态的、甚至是并发的流处理逻辑。

    enum Value 

    enum Value 的核心目标是为 Gatherer 的各个函数式接口(initializerfinishercombiner)提供一个统一的、默认的、无操作的(no-op)实现

    它的存在主要是为了解决以下几个问题:

    1. 避免使用 null: 在 API 设计中,使用 null 来表示“不存在”或“未提供”某个功能,常常会导致代码中充斥着繁琐的 null 检查,并且是 NullPointerException 的主要来源。Value 枚举通过提供一个具体的对象实例来代表“默认行为”,从而完全避免了 null 的使用。
    2. 提供可识别的默认值: 当 Gatherer 的实现者没有提供某个功能(例如,一个无状态的 Gatherer 不需要 initializer),API 内部可以使用 Value.DEFAULT 这个唯一的、可识别的“哨兵值”(Sentinel Value)来填充。
    3. 提升性能和代码简洁性: API 内部可以通过简单的引用比较(例如 if (finisher == Value.DEFAULT)) 来判断是否需要执行某个操作,这比调用一个空的 lambda 表达式更高效,也让代码意图更清晰。

    这种设计是软件工程中空对象模式(Null Object Pattern) 的一个经典应用。


    代码逐段分析

    a. 枚举定义与单例实例
    // ... existing code ...enum Value implements Supplier, BinaryOperator, BiConsumer {DEFAULT;
    // ... existing code ...
    
    • enum Value: 将 Value 定义为一个枚举类型。
    • implements Supplier, BinaryOperator, BiConsumer: 它同时实现了 Gatherer 所需的三个核心函数式接口。这意味着 Value 的实例本身就可以被当作这三种接口的任意一种来使用。
    • DEFAULT;: 这是该枚举的唯一实例。利用 Java 枚举的特性,DEFAULT 是一个天然的、线程安全的单例。在整个 JDK 中,所有需要“默认无操作行为”的地方,都会引用这同一个 Value.DEFAULT 对象。
    b. 特殊的 statelessCombiner
    // ... existing code ...final BinaryOperator<Void> statelessCombiner = new BinaryOperator<>() {@Override public Void apply(Void left, Void right) { return null; }};
    // ... existing code ...
    
    • 这是一个非常特殊的字段。它是一个 BinaryOperator<Void>,专门用于无状态(stateless) 的 Gatherer
    • 无状态的 Gatherer 其状态容器(State/Accumulator)的类型是 Void,在实践中通常用 null 来表示。当并行执行流时,需要合并(combine)两个分支的状态。合并两个 null 状态的结果,自然还是 null
    • 所以,statelessCombiner 的 apply 方法逻辑非常简单:接收两个 Void 类型的参数(实际上都会是 null),然后返回 null。这为无状态的 Gatherer 提供了一个正确且可执行的 combiner
    c. 接口方法的默认实现
    // ... existing code ...// BiConsumer@Override public void accept(Object state, Object downstream) {}// BinaryOperator@Override public Object apply(Object left, Object right) {throw new UnsupportedOperationException("This combiner cannot be used!");}// Supplier@Override public Object get() { return null; }
    // ... existing code ...
    

    这里是 Value 作为三个接口的实例时,其方法的具体实现:

    • accept(Object state, Object downstream): 这是 BiConsumer 接口的方法。它的实现是一个空方法体。当 Value.DEFAULT 被用作 finisher (完成器) 时,表示在流的末尾不需要执行任何最终操作。调用它不会产生任何效果。
    • get(): 这是 Supplier 接口的方法。它的实现是返回 null。当 Value.DEFAULT 被用作 initializer (初始化器) 时,表示这是一个无状态的 Gatherer,其初始状态就是 null
    • apply(Object left, Object right): 这是 BinaryOperator 接口的方法。它的实现是直接抛出 UnsupportedOperationException。这非常关键,它传达了一个明确的信息:这个默认的 combiner 是一个“虚拟”的,它不能被实际调用。这通常用于那些本身就不支持并行化(non-concurrent)的 Gatherer。如果有人错误地尝试并行化一个这样的 Gatherer,就会立刻得到一个清晰的错误,而不是潜在的数据不一致问题。
    d. 便捷的类型转换工厂方法
    // ... existing code ...@ForceInline@SuppressWarnings("unchecked")<A> Supplier<A> initializer() { return (Supplier<A>)this; }@ForceInline@SuppressWarnings("unchecked")<T> BinaryOperator<T> combiner() { return (BinaryOperator<T>) this; }@ForceInline@SuppressWarnings("unchecked")<T, R> BiConsumer<T, Gatherer.Downstream<? super R>> finisher() {return (BiConsumer<T, Downstream<? super R>>) this;}
    // ... existing code ...
    

    这三个方法是 Value 枚举的点睛之笔。它们是便捷的工厂方法,用于获取类型安全的默认实例。

    • 作用: 它们都只做一件事——返回 this(即 Value.DEFAULT 对象),但通过泛型和强制类型转换,将其“伪装”成调用者所期望的、带有正确泛型参数的函数式接口。
    • 示例: 当 Gatherer.of(...) 工厂方法需要一个默认的 initializer 时,它不需要自己写 () -> null,而是可以直接调用 Value.DEFAULT.initializer()。这样做既简洁,又能从编译器那里获得正确的类型 Supplier<A>
    • @ForceInline: 这个注解是给 JIT (Just-In-Time) 编译器的提示,建议它将这些方法的调用进行内联。因为方法体极其简单,内联后几乎没有额外的方法调用开销,性能极高。
    • @SuppressWarnings("unchecked"): 因为这里涉及到底层实现中的强制类型转换,所以用这个注解来抑制编译器关于“未经检查的转换”的警告。这是在明确知道类型安全可以由外部逻辑保证的情况下,一种常见的底层库编程技巧。

    总结

    enum Value 是 Gatherer API 内部一个优雅而高效的实现细节。它通过一个单例枚举实例,完美地扮演了“空对象”的角色,为 Gatherer 的 initializerfinisher 和 combiner 提供了统一、安全、高效的默认行为。

    它的设计哲学体现了现代 Java API 的优秀实践:

    • 用特定对象代替 null,增强代码的健壮性。
    • 利用接口和泛型,提供类型安全且灵活的 API。
    • 提供可识别的哨兵值,允许进行高效的内部优化。

    record GathererImpl<T, A, R> 

    GathererImpl 的设计目标非常纯粹:作为一个简单、透明、不可变的数据载体(Data Carrier),用于封装定义一个 Gatherer 所需的全部四个核心函数。

    它利用了 Java 16 引入的 record(记录)特性,该特性旨在简化用于“纯粹数据聚合”的类的创建。

    a. record 定义与泛型参数
    // ... existing code ...record GathererImpl<T, A, R>(@Override Supplier<A> initializer,@Override Integrator<A, T, R> integrator,@Override BinaryOperator<A> combiner,@Override BiConsumer<A, Downstream<? super R>> finisher) implements Gatherer<T, A, R> {
    // ... existing code ...
    
    • record GathererImpl<T, A, R>:

      • record 关键字告诉编译器,这是一个记录类。编译器会自动为它生成:
        1. 一个接收所有组件(initializerintegrator 等)的全参构造函数
        2. 为每个组件生成一个同名的公共访问方法(例如 public Supplier<A> initializer())。
        3. equals()hashCode() 和 toString() 方法的默认实现。
      • 这使得 GathererImpl 成为一个天然的、简洁的、不可变的(因为其组件都是 final 的)数据聚合对象。
    • 泛型参数:

      • T: The type of input elements. (输入元素的类型)
      • A: The type of the mutable accumulator (or state). (可变累加器/状态的类型)
      • R: The type of result elements. (输出/结果元素的类型) 这三个泛型参数与 Gatherer 接口的定义完全一致,贯穿了整个操作。
    • implements Gatherer<T, A, R>:

      • 这明确表示 GathererImpl 是 Gatherer 接口的一个具体实现。
      • 因为 record 自动生成的访问器方法(initializer()integrator() 等)与 Gatherer 接口中定义的方法签名完全匹配,所以 GathererImpl 天然地满足了 Gatherer 接口的契约。
    b. 四个核心组件

    GathererImpl 的主体就是它的四个组件,它们是定义一个 Gatherer 行为的“四要素”:

    1. Supplier<A> initializer (初始化器):

      • 职责: 创建并返回一个新的、可变的累加器(状态容器)。
      • 调用时机: 在流处理开始时,或者在并行处理中为每个子任务开始时调用。
      • 示例: 对于 toList()initializer 就是 () -> new ArrayList<>()
    2. Integrator<A, T, R> integrator (整合器):

      • 职责: 这是核心处理逻辑。它定义了如何将一个输入元素 T 合并到累加器 A 中,并有选择地将零个或多个结果 R 推送到下游。
      • 调用时机: 对于上游流中的每一个元素,都会调用此方法。
      • 示例: 对于 map(f)integrator 的逻辑就是 (state, element, downstream) -> downstream.push(f.apply(element))
    3. BinaryOperator<A> combiner (合并器):

      • 职责: 在并行流(parallel stream)中,定义如何将两个并行的子任务产生的累加器 A 合并成一个。
      • 调用时机: 当并行流的各个分支需要汇总结果时调用。
      • 示例: 对于 toList()combiner 的逻辑就是 (list1, list2) -> { list1.addAll(list2); return list1; }
    4. BiConsumer<A, Downstream<? super R>> finisher (完成器):

      • 职责: 在所有输入元素都被整合(integrated)完毕后,执行最终的处理。这对于有状态的操作至关重要,例如,它可以在最后将累加器中剩余的元素全部推送到下游。
      • 调用时机: 当上游流的所有元素都处理完成后,在流的末尾调用一次。
      • 示例: 对于 windowFixed(size),当流结束时,如果累加器里还有未满一个窗口的元素,finisher 就会负责将这个最后的、不完整的窗口也推送出去。
    c. 静态工厂方法 of
    // ... existing code ...static <T, A, R> GathererImpl<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher) {return new GathererImpl<>(Objects.requireNonNull(initializer,"initializer"),Objects.requireNonNull(integrator, "integrator"),Objects.requireNonNull(combiner, "combiner"),Objects.requireNonNull(finisher, "finisher"));}
    // ... existing code ...
    
    • 职责: 这是创建 GathererImpl 实例的标准方式。它提供了一个清晰的、统一的构造入口。
    • Objects.requireNonNull(...): 这个工厂方法最重要的一个功能就是防御性编程。它对传入的每一个函数组件都进行了非空检查。
      • 这确保了任何一个 GathererImpl 实例内部的四个组件都绝对不会是 null
      • 这极大地简化了 JDK 内部处理 Gatherer 的代码,因为它们可以放心地调用 gatherer.initializer() 等方法,而无需担心 NullPointerException
      • 如果用户在构造 Gatherer 时忘记提供某个必要的函数(并且没有使用 Value.DEFAULT 这样的默认值),requireNonNull 会立即抛出异常,并带有明确的错误信息(如 "initializer"),这是一种“快速失败”(fail-fast)的设计原则,有助于及早发现和修复 bug。

    总结

    GathererImpl 可以被看作是 Gatherer 接口的一个标准实现模板。它本身没有任何复杂的逻辑,只是一个“容器”。

    • 角色: 它是一个不可变的元组(Tuple),忠实地持有定义 Gatherer 行为的四个函数。
    • 优点:
      • 简洁: 使用 record 大大减少了样板代码。
      • 不可变: 一旦创建,其行为(即它包含的四个函数)就固定了,这使得它在并发环境中是线程安全的。
      • 健壮: 通过 of 工厂方法的非空检查,保证了实例的有效性和内部状态的一致性。

    当你调用 Gatherer.of(...) 或 Gatherers 类中任何一个预定义的工厂方法(如 mapfilterwindowFixed 等)时,它们在内部创建并返回的,就是这样一个 GathererImpl 的实例,其中填充了实现特定操作所需的 initializerintegratorcombiner 和 finisher

    Composite

    Composite 类的唯一目标是:将两个 Gatherer 对象(我们称之为 left 和 right)串联起来,形成一个全新的、功能上等同于两者顺序执行的单一 Gatherer

    当用户写下 stream.gather(gatherer1.andThen(gatherer2)) 时,andThen 方法在内部就会创建一个 Composite 实例,left 是 gatherer1right 是 gatherer2。这个 Composite 对象本身也是一个 Gatherer,可以被 stream.gather() 方法直接使用。

    它实现了函数式编程中经典的函数组合(Function Composition)思想,但将其应用在了更复杂的 Gatherer 结构上。

    // ... existing code ...static final class Composite<T, A, R, AA, RR> implements Gatherer<T, Object, RR> {private final Gatherer<T, A, ? extends R> left;private final Gatherer<? super R, AA, ? extends RR> right;// FIXME change `impl` to a computed constant when availableprivate GathererImpl<T, Object, RR> impl;
    // ... existing code ...
    
    • implements Gatherer<T, Object, RR>: 这是理解 Composite 的关键。一个组合后的 Gatherer,其:

      • 输入类型 T: 与第一个 Gatherer (left) 的输入类型相同。
      • 输出类型 RR: 与第二个 Gatherer (right) 的输出类型相同。
      • 累加器类型 Object: 这是一个难点。因为 left 和 right 各有自己的累加器(类型分别为 A 和 AA),组合后的 Gatherer 需要同时管理这两个累加器。为了类型擦除和实现的方便,这里统一使用 Object 作为对外暴露的累加器类型,内部再通过一个包装对象来持有真正的 A 和 AA 实例。
    • 泛型参数:

      • Gatherer<T, A, R> left: 第一个操作。它接收 T 类型的输入,使用 A 类型的累加器,产生 R 类型的输出。
      • Gatherer<R, AA, RR> right: 第二个操作。它接收的输入类型 R 必须是 left 输出类型的超类,使用 AA 类型的累加器,最终产生 RR 类型的输出。
    • 成员变量:

      • left 和 right: 分别存储了要组合的两个 Gatherer
      • private GathererImpl<T, Object, RR> impl: 这是组合逻辑的核心。Composite 类本身只是一个外壳,它真正的 Gatherer 行为(即 initializerintegrator 等四个核心函数)是被**懒加载(lazily computed)**并存储在这个 impl 字段中的。FIXME 注释表明,开发者希望未来能用更现代的 Java 特性(如 lazy 关键字)来优化这个懒加载过程。

    构造与工厂方法

    // ... existing code ...static <T, A, R, AA, RR> Composite<T, A, R, AA, RR> of(Gatherer<T, A, ? extends R> left,Gatherer<? super R, AA, ? extends RR> right) {return new Composite<>(left, right);}private Composite(Gatherer<T, A, ? extends R> left,Gatherer<? super R, AA, ? extends RR> right) {this.left = left;this.right = right;}
    // ... existing code ...
    
    • of(...) 是一个标准的静态工厂方法,用于创建 Composite 实例。
    • 构造函数非常简单,只是将传入的 left 和 right Gatherer 存储起来。真正的组合逻辑被推迟到了第一次使用时。

    核心:impl() 懒加载方法

    // ... existing code ...@SuppressWarnings("unchecked")private GathererImpl<T, Object, RR> impl() {// ATTENTION: this method currently relies on a "benign" data-race// as it should deterministically produce the same result even if// initialized concurrently on different threads.var i = impl;return i != null? i: (impl = (GathererImpl<T, Object, RR>)impl(left, right));}
    // ... existing code ...
    

    这是 Composite 类中最复杂也最核心的方法。它负责在第一次被需要时,动态地构建出那个代表了组合行为的 GathererImpl

    • 懒加载模式:

      1. var i = impl;: 先读取一次 impl 字段。
      2. i != null ? i : ...: 如果 impl 已经计算过了(不为 null),就直接返回它。
      3. ... : (impl = ...): 如果 impl 还是 null,就调用另一个 impl(left, right) 静态方法(这是真正干活的方法)来计算出新的 GathererImpl,将其赋值给 impl 字段,然后返回。
    • “良性数据竞争” (Benign Data-Race):

      • ATTENTION 注释解释了这里的并发处理方式。这个懒加载没有使用 synchronized 或 volatile
      • 这意味着,如果在多线程环境下,多个线程同时调用 impl() 并且 impl 恰好是 null,那么 impl(left, right) 这个计算过程可能会被执行多次。
      • 但设计者认为这是“良性的”,因为 impl(left, right) 的计算是确定性的(deterministic)——无论它被计算多少次,对于相同的 left 和 right,其结果都是一个等价的 GathererImpl
      • 最终,多个线程中的一个会成功地将计算结果写入 impl 字段,其他线程的写入可能会覆盖它,但这没关系,因为覆盖物和被覆盖物是相同的。这是一种以牺牲一点点计算资源为代价,来避免锁开销的常见优化技巧。

    组合逻辑的实现(impl(left, right) 方法

    这个方法是 Gatherer 组合魔法的真正实现者,它负责将两个独立的 Gathererleft 和 right)“编织”成一个功能完备的、全新的 GathererImpl

    这个方法的代码很长,因为它包含了针对不同情况的优化路径。我们可以将其分为两个主要部分:特殊优化路径通用路径

    1. 准备工作:提取函数与特征检测

    // ... existing code ...static final <T, A, R, AA, RR> GathererImpl<T, ?, RR> impl(Gatherer<T, A, R> left, Gatherer<? super R, AA, RR> right) {final var leftInitializer = left.initializer();final var leftIntegrator = left.integrator();
    // ... existing code ...final var rightFinisher = right.finisher();final var leftStateless = leftInitializer == Gatherer.defaultInitializer();final var rightStateless = rightInitializer == Gatherer.defaultInitializer();final var leftGreedy = leftIntegrator instanceof Integrator.Greedy;final var rightGreedy = rightIntegrator instanceof Integrator.Greedy;
    // ... existing code ...
    

    在做任何事情之前,代码首先做了两件准备工作:

    1. 提取函数: 将 left 和 right Gatherer 的四个核心函数(initializerintegratorcombinerfinisher)分别提取到局部变量中。这可以提高代码可读性,并可能带来微小的性能提升(避免重复调用接口方法)。

    2. 特征检测: 它检查了 left 和 right 的几个重要特征:

      • leftStateless / rightStateless: 通过比较它们的 initializer 是否是默认的 initializer (Value.DEFAULT),来判断它们是否是无状态的 (stateless)。无状态的 Gatherer 不需要创建和维护状态对象。
      • leftGreedy / rightGreedy: 通过 instanceof 检查,判断它们的 integrator 是否是贪婪的 (greedy)。贪婪的 integrator 保证了它在处理一个元素时,要么向下游推送一个元素,要么不推送,但绝不会“短路”(即向下游返回 false 来终止流)。

    这些特征检测的结果,将决定接下来走哪条执行路径。


    2. 特殊优化路径:双重无状态与贪婪 (Stateless & Greedy Fast Path)

    // ... existing code ...if (leftStateless && rightStateless && leftGreedy && rightGreedy) {return new GathererImpl<>(Gatherer.defaultInitializer(),Gatherer.Integrator.ofGreedy((unused, element, downstream) ->leftIntegrator.integrate(null,element,r -> rightIntegrator.integrate(null, r, downstream))),
    // ... existing code ...);} else {
    // ... existing code ...
    

    这是为最理想情况设计的“快速通道”。当两个 Gatherer 都是无状态且都是贪婪的时(例如 map(...).andThen(filter(...))),代码会进行大幅优化。

    • initializer: 因为两者都无状态,所以组合后的 Gatherer 也是无状态的,直接使用默认的 initializer
    • integrator: 这是优化的核心。它创建了一个新的、也是贪婪的 integrator。其逻辑非常直接:
      (unused, element, downstream) ->leftIntegrator.integrate(null, // 因为 left 无状态,状态对象是 nullelement,// 这里是关键:leftIntegrator 的下游(downstream)// 被一个 lambda 表达式替代了。r -> rightIntegrator.integrate(null, // 因为 right 无状态,状态对象是 nullr,    // r 是 left 处理完 element 后的输出downstream // 最终的下游))
      
      这形成了一个极其高效的函数调用链。处理一个元素 element 的过程,就是简单地调用 leftIntegrator,其结果 r 又被立刻送入 rightIntegrator,最后的结果直接推送到真正的下游。整个过程没有任何状态管理、没有短路检查,开销极小。
    • combiner 和 finisher: 也做了相应的简化,因为没有状态,所以逻辑变得非常简单。

    3. 通用路径:有状态或非贪婪 (Stateful or Non-Greedy General Path)

    如果 left 或 right 中任何一个是有状态的非贪婪的,就必须走这条更复杂的通用路径。这条路径的核心是定义了一个新的内部类 State

    a. State 类:组合状态的容器
    // ... existing code ...} else {class State {final A leftState;final AA rightState;boolean leftProceed;boolean rightProceed;
    // ... existing code ...
    

    这个 State 类是组合后 Gatherer 的统一状态容器。它封装了:

    • final A leftStateleft Gatherer 的状态对象。
    • final AA rightStateright Gatherer 的状态对象。
    • boolean leftProceed / rightProceed: 用于跟踪 left 和 right integrator 是否已经短路。这是处理非贪婪 Gatherer 的关键。如果 left 短路了,就不应该再给它提供新元素。如果 right 短路了,left 也不应该再把处理结果推送给它。
    b. State 类的核心方法
    • State() (构造函数):

      • 负责初始化。它会检查 left 和 right 是否是无状态的,如果是,则其状态就是 null;否则,就调用它们各自的 initializer 来创建初始状态。
      • leftProceed 和 rightProceed 初始都为 true
    • integrate(T t, Downstream<? super RR> c):

      • 这是组合后的 integrator 的核心逻辑。
      • 它调用 leftIntegrator.integrate(...),但巧妙地将 left 的下游替换为了 this::rightIntegrate
      • 这意味着,left 处理完元素后,不会直接推送到最终下游,而是会调用 State 类的 rightIntegrate 方法。
      • 它还负责检查和更新 leftProceed 和 rightProceed 这两个短路标志位。
    • rightIntegrate(R r, Downstream<? super RR> downstream):

      • 这个方法充当了 left 和 right 之间的“管道”。
      • 当 left 产生一个中间结果 r 时,这个方法被调用。
      • 它会检查 right 是否还能继续(rightProceed),如果可以,就调用 rightIntegrator.integrate(rightState, r, downstream),将 r 送入 right Gatherer 进行处理,并将最终结果推送到下游。
    • finish(Downstream<? super RR> c):

      • 组合后的 finisher
      • 它会依次调用 left.finisher() 和 right.finisher()
      • 调用 left.finisher() 时,同样会将其下游重定向到 rightIntegrate,以确保 left 在结束时产生的任何剩余元素也能被 right 正确处理。
    c. 构建最终的 GathererImpl
    // ... existing code ...return new GathererImpl<T, State, RR>(State::new,(leftGreedy && rightGreedy)? Integrator.<State, T, RR>ofGreedy(State::integrate): Integrator.<State, T, RR>of(State::integrate),// ...State::finish);
    // ... existing code ...
    

    最后,通用路径会使用 State 类的方法引用来构建一个新的 GathererImpl

    • initializerState::new
    • integratorState::integrate (并根据 left 和 right 是否都为贪婪来决定组合后的 integrator 是否也为贪婪)
    • combinerState::joinLeft
    • finisherState::finish

    总结

    Composite.impl 静态方法是一个高度工程化的杰作。它展示了如何在提供一个通用、强大功能(组合 Gatherer)的同时,不牺牲性能。

    1. 它首先识别特殊情况:当两个 Gatherer 都是无状态和贪婪时,它会生成一个极度优化的、几乎没有额外开销的函数调用链。
    2. 然后提供通用解决方案:对于其他所有情况,它通过创建一个新的 State 类来封装两个 Gatherer 的状态和短路逻辑,确保即使在复杂的有状态和非贪婪场景下,组合行为也能正确无误地执行。

    这种“快速通道 + 通用后备”的设计模式,是高性能库中非常常见的技术,它确保了简单用例的性能最大化,同时保证了复杂用例的正确性。

    andThen

    提供了一种流畅的、函数式的方式来将两个 Gatherer 串联(compose)起来。如果说 Composite 类是实现 Gatherer 组合的“幕后黑手”,那么 andThen 方法就是暴露给开发者使用的、优雅的公开 API。

    它的核心思想是函数组合:创建一个新的 Gatherer,它会先用第一个 Gathererthis)处理输入流,然后将第一个 Gatherer 的输出作为第二个 Gathererthat)的输入,最终新 Gatherer 的输出是第二个 Gatherer 的输出。这完全等同于 g2(g1(input)) 的概念。

    1. andThen 方法源码

    这是 Gatherer 接口中 andThen 方法的实现:

    // ... existing code ...default <R_OUT> Gatherer<T, ?, R_OUT> andThen(Gatherer<? super R, ?, ? extends R_OUT> after) {Objects.requireNonNull(after);return new Composite<>(this, after);}// ... existing code ...
    

    可以看到,它的实现非常简洁:

    1. 非空检查:使用 Objects.requireNonNull(after) 确保传入的 Gatherer 不为 null
    2. 创建 Composite 实例:直接 new Composite<>(this, after),将当前的 Gatherer (this) 和传入的 Gatherer (after) 作为构造函数参数,创建一个 Composite 对象。Composite 类会处理所有复杂的组合逻辑,正如我们之前详细分析的那样。

    2. 泛型解析

    andThen 方法的泛型设计确保了类型安全,我们来解读一下:

    <R_OUT> Gatherer<T, ?, R_OUT> andThen(Gatherer<? super R, ?, ? extends R_OUT> after)
    
    • this:调用 andThen 的 Gatherer 实例,其类型是 Gatherer<T, A, R>
      • T: 它的输入元素类型。
      • R: 它的输出结果类型。
    • after: 作为参数传入的 Gatherer,其类型是 Gatherer<? super R, ?, ? extends R_OUT>
      • ? super Rafter 的输入类型必须是 this 输出类型 R 的超类(或者是 R 本身)。这保证了 this 的输出可以安全地作为 after 的输入。
      • ? extends R_OUTafter 的输出类型是 R_OUT 或其子类
    • 返回值Gatherer<T, ?, R_OUT>
      • T: 组合后 Gatherer 的输入类型与 this 的输入类型相同。
      • ?: 组合后 Gatherer 的中间累加器类型被隐藏了。这是因为 Composite 内部会管理两个 Gatherer 各自的累加器,这个复杂的内部状态对外部调用者是不可见的,用 ? 表示最为合适。
      • R_OUT: 组合后 Gatherer 的最终输出类型与 after 的输出类型相同。

    andThen 方法是 Gatherer API 可组合性的关键体现。它优雅地隐藏了 Composite 类的实现细节,为开发者提供了一个符合直觉的、链式调用的接口来构建复杂的流处理管道。

    通过 gatherer1.andThen(gatherer2).andThen(gatherer3) 这样的调用,开发者可以像拼接乐高积木一样,将简单、专一的 Gatherer 组合成功能强大的复合 Gatherer,这正是函数式编程和现代 API 设计所推崇的模式。

    总结

    Composite 类是 Gatherer API 可组合性的基石。它本身是一个轻量级的包装器,通过懒加载的方式,在需要时动态地将两个 Gatherer 的核心行为函数(四要素)进行精密的“编织”,生成一个新的、代表了两者串联行为的 GathererImpl

    它完美地展示了如何将复杂的操作分解为可组合的、独立的单元,并通过一个通用的组合器(Composite)将它们无缝地连接起来,这正是函数式和声明式编程强大威力的体现。

    总结

    Gatherers 类是 Stream.gather() 功能不可或缺的一部分。它不仅提供了多个即插即用的高效 Gatherer 实现,解决了窗口、扫描、并发映射等常见但复杂的流处理问题,而且其源代码本身就是学习如何正确、高效地实现 Gatherer 接口的最佳教材。通过分析 windowFixedscanmapConcurrent 等方法的实现,我们可以深刻理解 Gatherer 接口中状态、集成、收尾等概念的实际应用。

    http://www.dtcms.com/a/584292.html

    相关文章:

  1. 青岛 企业网站建站wordpress 每页 关高
  2. 已经备案的网站新增ip怎么做wordpress下载页面天涯
  3. 天津网站公司怎么生成网站地图
  4. docker可以做网站吗视频会议
  5. 做网站还 淘宝logofree制作网站
  6. 请问下网站开发怎么弄企业网站推广的一般策略
  7. 网站优化排名公司哪家好手机建立网站
  8. 企业高端网站建设公司采购平台
  9. 网站建设培训学校广州公网ip做网站
  10. 网站快照明天更新是什么情况前端微信小程序开发教程
  11. 常州公司网站模板建站化妆品网站 源码
  12. 网站字体大小是多少合适自己动手做衣服网站
  13. erp是什么办公软件网站优化吧
  14. 做网站上传照片的尺寸wordpress 教学下载
  15. 做礼品公司网站的费用乡镇社区教育中心网站建设
  16. 贵州省建设厅官网站北京企业网站设计报价
  17. 免费学建筑知识网站app引导页模板html
  18. 网站建设的进度网站开发设计资讯
  19. 有没有返利网站做京东的移动互联网的概念是什么
  20. 网站排名突然掉了怎么回事如何创建公司网站
  21. 新手做网站的注意事项房屋平面图在线制作网站
  22. 做网站是不是很简单wordpress采集网页文章
  23. 图书馆建设投稿网站站群宝塔批量建站
  24. 什么叫做营销型网站门店营销活动策划方案
  25. 泉州那几个公司网站建设比较好宝塔反代wordpress
  26. 网站风格设计描述网站开发项目经验
  27. 怎样在外贸网站上做土特产html做网站项目案例
  28. wordpress怎么开启东莞短视频seo制作
  29. 网站那个做的比较好中国十大装修公司加盟
  30. 天津品牌网站制作网站板块设计