流式数据处理实战:用状态机 + scan 优雅过滤 AI 响应中的 `<think>` 标签
流式数据处理实战:用状态机 + scan 优雅过滤 AI 响应中的 <think>
标签
1. 引言:流式数据处理的挑战
在现代 AI 应用开发中,流式 API(如 OpenAI、Claude 等)能实时返回分块数据,提升用户体验。但流式数据存在一个常见问题:如何跨多个分块处理结构化内容?
例如,某些 AI 会在响应中返回 <think>...</think>
标签(内部是 AI 的推理过程,不应展示给用户)。由于流式数据是分块返回的,可能出现:
<think>
和</think>
被拆到不同分块- 标签不完整(如只收到
<think>
但未收到</think>
)
传统的字符串替换(如 replaceAll
)无法处理这种情况。本文将介绍如何用 响应式编程(Reactive Programming) 的 scan
操作符 + 状态机(State Machine) 解决该问题。
2. 核心思路:状态机 + scan
2.1 什么是状态机?
状态机(State Machine)是一种基于状态转移的编程模型,适用于处理有状态的数据流。在我们的场景中:
- 状态:
insideThink
(是否在<think>
标签内) - 事件:遇到
<think>
或</think>
2.2 什么是 scan?
scan
是响应式编程(如 Reactor、RxJava)中的一个操作符,类似于 reduce
,但会在每次元素到达时立即发出累积结果。
- 输入:
Flux<T>
(数据流) - 输出:
Flux<State>
(带状态的流)
3. 代码实现
3.1 定义状态机
public class ThinkTagState {private boolean insideThink = false; // 是否在 <think> 标签内private final StringBuilder buffer = new StringBuilder(); // 有效文本缓冲区public ThinkTagState process(String chunk) {StringBuilder output = new StringBuilder();String remaining = chunk;while (!remaining.isEmpty()) {if (!insideThink) {// 查找 <think> 标签int startIdx = remaining.indexOf("<think>");if (startIdx == -1) {output.append(remaining); // 无标签,直接保留break;}// 保留 <think> 之前的内容output.append(remaining.substring(0, startIdx));insideThink = true;remaining = remaining.substring(startIdx + "<think>".length());} else {// 查找 </think> 标签int endIdx = remaining.indexOf("</think>");if (endIdx == -1) {break; // 未闭合,等待后续分块}// 跳过 </think> 之后的内容insideThink = false;remaining = remaining.substring(endIdx + "</think>".length());}}buffer.append(output);return this;}public String getFilteredText() {return buffer.toString();}public boolean isInsideThink() {return insideThink;}
}
3.2 流式处理 Pipeline
Flux<ChatResponseDTO> filteredStream = aiCallService.stream(new Prompt(messages)).scan(new ThinkTagState(), (state, response) -> {String chunk = response.getResult().getOutput().getText();return state.process(chunk); // 更新状态}).filter(state -> !state.getFilteredText().isEmpty()) // 跳过空内容.map(state -> {ChatResponseDTO dto = new ChatResponseDTO();dto.setAnswer(state.getFilteredText());state.buffer.setLength(0); // 清空缓冲区return dto;});
4. 关键点解析
4.1 状态机如何工作?
- 初始状态:
insideThink = false
- 遇到
<think>
→insideThink = true
(跳过后续内容) - 遇到
</think>
→insideThink = false
(恢复正常处理) - 跨分块处理:如果只有
<think>
没有</think>
,会等待后续分块
4.2 为什么用 scan
而不是 map
?
map
无记忆能力,无法处理跨分块的状态scan
维护状态,适合流式数据的渐进式处理
4.3 性能优化
- 缓冲区清空:每次
map
后调用buffer.setLength(0)
避免内存增长 - 正则替代方案:如果确定标签不跨分块,可用简单
replaceAll
5. 扩展场景
5.1 处理多个标签(如 <reasoning>
)
enum TagType { THINK, REASONING, NONE }public class MultiTagState {private TagType currentTag = TagType.NONE;// 类似逻辑,但需处理多种标签
}
5.2 结合元数据(Metadata)
// 使用 Tuple2 保留原始响应
.scan(Tuples.of(new ThinkTagState(), null), (tuple, response) -> {ThinkTagState state = tuple.getT1().process(response.getText());return Tuples.of(state, response); // 保留原始响应
})
.map(tuple -> {ChatResponseMetadata metadata = tuple.getT2().getMetadata();// ...
});
6. 总结
方案 | 优点 | 缺点 |
---|---|---|
状态机 + scan | 精准处理跨分块标签 | 代码稍复杂 |
简单 replaceAll | 实现简单 | 无法处理跨分块 |
推荐选择:
- 如果 AI 响应可能跨分块 → 状态机 + scan
- 如果标签保证完整 → 正则替换
7. 进一步阅读
- Reactor 官方文档 - scan
- 状态机设计模式
希望这篇博文能帮助你优雅处理流式数据!如果有问题,欢迎留言讨论。 🚀