Java+AI开发实战与知识点归纳系列:Spring流式输出实战——LangChain4j与Ollama集成
Java+AI开发实战与知识点归纳系列:Spring流式输出实战——LangChain4j与Ollama集成
该系列会以java+AI开发为主线,"顺路"的讲解许多java知识,同时学会AI应用的基本开发。每一篇都是完整可运行的代码。文末有项目目录结构,对照文章内容即可完成开发~
此外本文附带了完整的项目代码如果文章内容有不流畅或讲解缺失,请随时评论区留言~
在上一篇文章中,我们学会了如何通过Spring配置管理LangChain4j与Ollama的集成,实现了"改配置不用改代码"的灵活部署。但是,当我们实际使用AI聊天应用时,会发现一个问题:用户提问后需要等待很久才能看到完整的回答,体验很不好。
今天我们要解决的就是这个问题:如何实现AI回答的流式输出,让用户能够实时看到AI的思考过程,就像ChatGPT那样一个字一个字地"打字"出来。
一、从问题出发:为什么需要流式输出?(如果觉得老生常谈,直接跳下一节就好)
想象一下这样的场景:你问AI一个复杂问题,比如"请详细解释Spring Boot的自动配置原理",如果使用传统的同步方式,你可能需要等待10-30秒才能看到完整答案。这种等待是很煎熬的,用户不知道系统是否还在工作,甚至可能以为程序卡死了。
而流式输出就像真人对话一样,AI一边思考一边说话,用户能实时看到回答的进展,大大提升了交互体验。这就是我们今天要实现的目标:
- 实时响应:用户发送消息后立即开始接收AI的回答
- 流式传输:AI的回答一个词一个词地传输到前端
- 优雅处理:妥善处理网络异常、模型错误等边界情况
二、技术选型:为什么选择WebFlux?
要实现流式输出,我们需要选择合适的技术栈。传统的Spring MVC基于Servlet API,天然是阻塞式的,不太适合处理长时间的流式响应。而Spring WebFlux基于Reactor模式,天生支持异步非阻塞,是实现流式输出的最佳选择。
WebFlux vs Spring MVC对比(在实现的时候,仔细考虑过选SseEmitter还是flux的,最终选择了WebFlux)
特性 | Spring MVC | Spring WebFlux |
---|---|---|
编程模型 | 阻塞式 | 非阻塞式 |
线程模型 | 一个请求一个线程 | 少量线程处理大量请求 |
流式支持 | 有限支持 | 原生支持 |
学习成本 | 较低 | 较高 |
在我们的项目中,你会看到pom.xml中引入的是spring-boot-starter-webflux
而不是spring-boot-starter-web
,这就是为了支持响应式编程。
三、核心实现:一步步构建流式聊天
在开始实现之前,我想说如果不了解响应式编程,可能会对WebFlux的概念和使用方式感到陌生。但是,我会尽量用简单的语言和代码示例,帮助你理解流式输出的实现原理。如果刚入门确实难以看懂,AI时代,我强烈建议你合理使用AI工具,提示词也给你准备好了,5分钟后再切回来吧~
提示词:你是一个技术专家也是一个优秀的讲师。请快速的为我介绍响应式编程的基本概念和原理,为我介绍springwebflux。然后给我一个示例,是基于springwebflux的,并针对示例为我一步步讲解。目标是让我快速理解和掌握响应式编程的核心思想,熟悉springwebflux实现的响应式编程。我希望你讲的通俗易懂,自顶向下,由浅入深一些。
第一步:配置响应式Web环境
首先,我们需要在pom.xml中引入WebFlux依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
这个依赖会自动引入Reactor Core、Netty等响应式编程所需的核心库。与传统的Tomcat不同,WebFlux默认使用Netty作为Web服务器,Netty的事件循环模型天然支持高并发的异步处理。
第二步:设计流式响应的Controller
接下来是关键的Controller设计。我们需要返回一个Flux<String>
类型,这是Reactor中表示多个异步数据流的类型:
@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
public class ChatController {private final ChatService chatService;@PostMapping(value = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)public Flux<String> streamChat(@RequestBody ChatRequest request) {return chatService.streamChat(request.getMessage());}
}
这里有几个关键点需要理解:
- MediaType.APPLICATION_STREAM_JSON_VALUE:这个媒体类型告诉浏览器这是一个流式响应,数据会分批次发送
- Flux:Flux是Reactor中的核心类型,表示0到N个异步序列元素
- @RequestBody ChatRequest:我们仍然可以正常接收JSON请求体
第三步:实现核心的流式服务
最核心的部分是ChatService,这里我们需要将LangChain4j的回调模式转换为Reactor的流式模式:
@Service
@RequiredArgsConstructor
public class ChatService {private final StreamingChatLanguageModel streamingChatLanguageModel;public Flux<String> streamChat(String message) {return Flux.create(sink -> {try {// 发送初始连接成功消息sink.next("I am coming! \n");// 使用StreamingChatLanguageModel的chat方法streamingChatLanguageModel.chat(message,new StreamingChatResponseHandler() {@Overridepublic void onPartialResponse(String partialResponse) {sink.next(partialResponse);}@Overridepublic void onCompleteResponse(ChatResponse completeResponse) {sink.complete();}@Overridepublic void onError(Throwable error) {// 优雅的错误处理handleStreamError(sink, error);}});} catch (Exception e) {sink.error(e);}}, FluxSink.OverflowStrategy.BUFFER);}private void handleStreamError(FluxSink<String> sink, Throwable error) {if (error instanceof NullPointerException) {String errorMsg = error.getMessage() != null ? error.getMessage() : "未知空指针异常";System.err.println("警告: 捕获到空指针异常: " + errorMsg);if (errorMsg.contains("getMessage()") || errorMsg.contains("getContent()")) {sink.next("\n[系统提示: 模型返回了空响应,请重试]");sink.complete();return;}}try {System.err.println("错误: " + error.getClass().getName() + ": " + error.getMessage());sink.next("\n[系统错误: " + error.getMessage() + "]");sink.complete();} catch (Exception e) {sink.error(error);}}
}
这段代码的核心思想是桥接模式:将LangChain4j的回调式API转换为Reactor的流式API。让我们深入理解几个关键概念:
Flux.create()的工作原理
Flux.create()
是Reactor提供的工厂方法,用于从外部的异步源创建Flux。它接收一个Consumer参数,FluxSink就像一个"水龙头",我们可以通过它向下游发送数据:
sink.next(data)
:发送一个数据元素sink.complete()
:表示数据流结束sink.error(throwable)
:发送错误信号
StreamingChatResponseHandler回调处理
LangChain4j使用回调模式处理流式响应,我们需要实现三个关键方法:
- onPartialResponse():每当模型生成一小段文本时调用
- onCompleteResponse():当模型完成整个回答时调用
- onError():当发生错误时调用
背压处理策略
FluxSink.OverflowStrategy.BUFFER
指定了背压处理策略。背压是响应式编程中的重要概念,当生产者产生数据的速度超过消费者处理速度时,就会发生背压。BUFFER策略会缓存所有数据,适合我们这种场景。
第四步:配置Ollama流式模型
我们需要配置LangChain4j的流式聊天模型。注意这里使用的是StreamingChatLanguageModel
而不是普通的ChatLanguageModel
:
@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "langchain4j.chat", name = "enable", havingValue = "ollama")
public class OllamaConfig {private final OllamaProperties ollamaProperties;@Beanpublic StreamingChatLanguageModel streamingChatLanguageModel() {log.info("正在初始化Ollama流式聊天模型...");return OllamaStreamingChatModel.builder().baseUrl(ollamaProperties.getBaseUrl()).modelName(ollamaProperties.getModelName()).temperature(ollamaProperties.getTemperature()).timeout(ollamaProperties.getTimeout()).build();}
}
这里的@ConditionalOnProperty
注解实现了条件化配置,只有当langchain4j.chat.enable=ollama
时才会创建这个Bean。这种设计让我们可以轻松切换不同的AI模型提供商。
四、前端实现:处理流式数据
后端实现了流式输出,前端也需要相应地处理流式数据。我们使用Fetch API的ReadableStream来处理:
function sendMessage() {const message = messageInput.value.trim();if (!message) return;// 添加用户消息addMessage(message, true);messageInput.value = '';// 发送请求并处理流式响应fetch('/api/chat/stream', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({ message: message })}).then(response => {if (!response.ok) {throw new Error('网络响应不正常');}// 创建流式读取器const reader = response.body.getReader();const decoder = new TextDecoder();// 处理数据流function processStream() {return reader.read().then(({ done, value }) => {if (done) {completeStreamingMessage();return;}// 解码接收到的数据const text = decoder.decode(value, { stream: true });addStreamingMessage(text);return processStream();});}return processStream();}).catch(error => {console.error('Error:', error);addMessage('发生错误: ' + error.message, false);});
}
关键技术点解析
ReadableStream API
response.body.getReader()
返回一个ReadableStreamDefaultReader,它可以逐块读取响应体数据。这是浏览器原生支持的流式API,非常适合处理服务器发送的流式数据。
TextDecoder的流式解码
new TextDecoder()
创建一个文本解码器,decode(value, { stream: true })
中的stream: true
参数很重要,它告诉解码器这是流式数据,可能存在跨块的字符边界。
递归处理模式
processStream()
函数使用递归调用来持续读取数据流,直到done
为true表示流结束。这种模式简洁而高效。
五、错误处理与优化
在实际应用中,我们需要考虑各种异常情况:
1. 网络异常处理
private void handleStreamError(FluxSink<String> sink, Throwable error) {if (error instanceof NullPointerException) {// 处理模型返回空响应的情况String errorMsg = error.getMessage() != null ? error.getMessage() : "未知空指针异常";if (errorMsg.contains("getMessage()") || errorMsg.contains("getContent()")) {sink.next("\n[系统提示: 模型返回了空响应,请重试]");sink.complete();return;}}// 通用错误处理sink.next("\n[系统错误: " + error.getMessage() + "]");sink.complete();
}
2. 超时处理
在application.yml中配置合理的超时时间:
langchain4j:chat:ollama:timeout: 120s # 2分钟超时
3. 前端连接管理
// 关闭之前的连接(如果有)
if (eventSource) {eventSource.close();
}
六、性能优化与最佳实践
1. 背压控制
当AI生成速度很快时,可能会产生背压。我们使用BUFFER策略,但在生产环境中可能需要考虑DROP或ERROR策略:
Flux.create(sink -> {// 实现逻辑
}, FluxSink.OverflowStrategy.BUFFER); // 可以改为DROP或ERROR
2. 内存管理
流式处理需要注意内存使用,避免大量数据积压:
// 在配置中限制缓冲区大小
@Bean
public StreamingChatLanguageModel streamingChatLanguageModel() {return OllamaStreamingChatModel.builder().baseUrl(ollamaProperties.getBaseUrl()).modelName(ollamaProperties.getModelName()).temperature(ollamaProperties.getTemperature()).timeout(ollamaProperties.getTimeout()).build();
}
3. 日志监控
添加适当的日志来监控流式处理的性能:
@Override
public void onPartialResponse(String partialResponse) {log.debug("接收到部分响应: {} 字符", partialResponse.length());sink.next(partialResponse);
}
七、项目结构总览
src/
└── main/├── java/│ └── com/example/ollamademo/│ ├── OllamaDemoApplication.java # 启动类│ ├── config/│ │ ├── OllamaConfig.java # Ollama配置│ │ └── properties/│ │ └── OllamaProperties.java # 配置属性│ ├── controller/│ │ ├── ChatController.java # 流式聊天控制器│ │ └── WebController.java # 页面控制器│ ├── dto/│ │ └── ChatRequest.java # 请求DTO│ └── service/│ └── ChatService.java # 核心流式服务└── resources/├── application.yml # 配置文件└── templates/└── index.html # 前端页面
八、运行与测试
-
启动Ollama服务:
ollama serve ollama pull qwen3:8b
-
启动Spring Boot应用:
mvn spring-boot:run
-
访问应用:
打开浏览器访问http://localhost:8080
-
测试流式输出:
输入问题,观察AI回答是否一个字一个字地出现
九、总结与展望
本文主要内容回顾:
- 响应式编程基础:理解了Flux、Mono等Reactor核心概念
- 流式API设计:掌握了如何设计流式响应的RESTful API
- 异步编程模式:学会了回调模式与流式模式的转换
- 前端流式处理:掌握了ReadableStream API的使用
- 错误处理策略:学会了在流式场景下的异常处理
在下一篇文章中,我们将再走一步,探讨如何实现多轮对话的上下文管理,让AI能够记住之前的对话内容,实现更智能的交互体验。
本系列文章将持续更新,每一篇都是完整可运行的项目。如果你觉得有帮助,欢迎点赞收藏,也欢迎在评论区分享你的想法和问题!
💻完整的代码可以直接下载我上传的资源。