67.实现AI流式回答的后端实现(2)
实现AI流式回答中断功能的后端实现(下)
前端交互设计
要实现流畅的中断体验,前后端需要紧密配合。前端需要:
- 在发送流式请求时获取唯一的emitter ID
- 提供中断按钮并绑定中断API调用
- 优雅处理中断后的UI状态
后端相应需要扩展:
// 扩展的流式端点,返回emitter ID
@PostMapping("/stream")
public Map<String, Object> streamResponse(...) {String emitterId = UUID.randomUUID().toString();SseEmitterWrapper wrapper = new SseEmitterWrapper(emitter);emitterRegistry.put(emitterId, wrapper);// ...原有逻辑...return Map.of("emitterId", emitterId,"emitter", wrapper.getEmitter());
}
注册表实现
完整的Emitter注册表实现:
@Component
public class EmitterRegistry {private final ConcurrentMap<String, SseEmitterWrapper> emitters = new ConcurrentHashMap<>();private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();public EmitterRegistry() {// 每小时清理一次已完成的emittercleaner.scheduleAtFixedRate(this::cleanUp, 1, 1, TimeUnit.HOURS);}public void put(String id, SseEmitterWrapper wrapper) {emitters.put(id, wrapper);}public SseEmitterWrapper get(String id) {return emitters.get(id);}public void cleanUp() {emitters.entrySet().removeIf(entry -> entry.getValue().isCompleted());}@PreDestroypublic void shutdown() {cleaner.shutdown();}
}
性能优化策略
-
连接管理:
// 在WebClient配置中
WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(30)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)))
总结:
通过这两篇文章,我完整实现了:
- 基于SSE的流式回答功能
- 用户可中断机制
- 完善的异常处理和资源管理
- 性能优化策略
关键收获:
takeUntil
是实现中断的核心操作符- 状态管理需要线程安全的设计
- 完整的生命周期管理至关重要
- 监控和指标收集对生产环境必不可少