当前位置: 首页 > news >正文

Java+AI开发实战与知识点归纳系列:Spring流式输出实战——LangChain4j与Ollama集成

Java+AI开发实战与知识点归纳系列:Spring流式输出实战——LangChain4j与Ollama集成

该系列会以java+AI开发为主线,"顺路"的讲解许多java知识,同时学会AI应用的基本开发。每一篇都是完整可运行的代码。文末有项目目录结构,对照文章内容即可完成开发~
此外本文附带了完整的项目代码

如果文章内容有不流畅或讲解缺失,请随时评论区留言~

在上一篇文章中,我们学会了如何通过Spring配置管理LangChain4j与Ollama的集成,实现了"改配置不用改代码"的灵活部署。但是,当我们实际使用AI聊天应用时,会发现一个问题:用户提问后需要等待很久才能看到完整的回答,体验很不好。

今天我们要解决的就是这个问题:如何实现AI回答的流式输出,让用户能够实时看到AI的思考过程,就像ChatGPT那样一个字一个字地"打字"出来

一、从问题出发:为什么需要流式输出?(如果觉得老生常谈,直接跳下一节就好)

想象一下这样的场景:你问AI一个复杂问题,比如"请详细解释Spring Boot的自动配置原理",如果使用传统的同步方式,你可能需要等待10-30秒才能看到完整答案。这种等待是很煎熬的,用户不知道系统是否还在工作,甚至可能以为程序卡死了。

而流式输出就像真人对话一样,AI一边思考一边说话,用户能实时看到回答的进展,大大提升了交互体验。这就是我们今天要实现的目标:

  1. 实时响应:用户发送消息后立即开始接收AI的回答
  2. 流式传输:AI的回答一个词一个词地传输到前端
  3. 优雅处理:妥善处理网络异常、模型错误等边界情况

二、技术选型:为什么选择WebFlux?

要实现流式输出,我们需要选择合适的技术栈。传统的Spring MVC基于Servlet API,天然是阻塞式的,不太适合处理长时间的流式响应。而Spring WebFlux基于Reactor模式,天生支持异步非阻塞,是实现流式输出的最佳选择。

WebFlux vs Spring MVC对比(在实现的时候,仔细考虑过选SseEmitter还是flux的,最终选择了WebFlux)

特性Spring MVCSpring WebFlux
编程模型阻塞式非阻塞式
线程模型一个请求一个线程少量线程处理大量请求
流式支持有限支持原生支持
学习成本较低较高

在我们的项目中,你会看到pom.xml中引入的是spring-boot-starter-webflux而不是spring-boot-starter-web,这就是为了支持响应式编程。

三、核心实现:一步步构建流式聊天

在开始实现之前,我想说如果不了解响应式编程,可能会对WebFlux的概念和使用方式感到陌生。但是,我会尽量用简单的语言和代码示例,帮助你理解流式输出的实现原理。如果刚入门确实难以看懂,AI时代,我强烈建议你合理使用AI工具,提示词也给你准备好了,5分钟后再切回来吧~

提示词:你是一个技术专家也是一个优秀的讲师。请快速的为我介绍响应式编程的基本概念和原理,为我介绍springwebflux。然后给我一个示例,是基于springwebflux的,并针对示例为我一步步讲解。目标是让我快速理解和掌握响应式编程的核心思想,熟悉springwebflux实现的响应式编程。我希望你讲的通俗易懂,自顶向下,由浅入深一些。

第一步:配置响应式Web环境

首先,我们需要在pom.xml中引入WebFlux依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

这个依赖会自动引入Reactor Core、Netty等响应式编程所需的核心库。与传统的Tomcat不同,WebFlux默认使用Netty作为Web服务器,Netty的事件循环模型天然支持高并发的异步处理。

第二步:设计流式响应的Controller

接下来是关键的Controller设计。我们需要返回一个Flux<String>类型,这是Reactor中表示多个异步数据流的类型:

@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
public class ChatController {private final ChatService chatService;@PostMapping(value = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)public Flux<String> streamChat(@RequestBody ChatRequest request) {return chatService.streamChat(request.getMessage());}
}

这里有几个关键点需要理解:

  1. MediaType.APPLICATION_STREAM_JSON_VALUE:这个媒体类型告诉浏览器这是一个流式响应,数据会分批次发送
  2. Flux:Flux是Reactor中的核心类型,表示0到N个异步序列元素
  3. @RequestBody ChatRequest:我们仍然可以正常接收JSON请求体

第三步:实现核心的流式服务

最核心的部分是ChatService,这里我们需要将LangChain4j的回调模式转换为Reactor的流式模式:

@Service
@RequiredArgsConstructor
public class ChatService {private final StreamingChatLanguageModel streamingChatLanguageModel;public Flux<String> streamChat(String message) {return Flux.create(sink -> {try {// 发送初始连接成功消息sink.next("I am coming! \n");// 使用StreamingChatLanguageModel的chat方法streamingChatLanguageModel.chat(message,new StreamingChatResponseHandler() {@Overridepublic void onPartialResponse(String partialResponse) {sink.next(partialResponse);}@Overridepublic void onCompleteResponse(ChatResponse completeResponse) {sink.complete();}@Overridepublic void onError(Throwable error) {// 优雅的错误处理handleStreamError(sink, error);}});} catch (Exception e) {sink.error(e);}}, FluxSink.OverflowStrategy.BUFFER);}private void handleStreamError(FluxSink<String> sink, Throwable error) {if (error instanceof NullPointerException) {String errorMsg = error.getMessage() != null ? error.getMessage() : "未知空指针异常";System.err.println("警告: 捕获到空指针异常: " + errorMsg);if (errorMsg.contains("getMessage()") || errorMsg.contains("getContent()")) {sink.next("\n[系统提示: 模型返回了空响应,请重试]");sink.complete();return;}}try {System.err.println("错误: " + error.getClass().getName() + ": " + error.getMessage());sink.next("\n[系统错误: " + error.getMessage() + "]");sink.complete();} catch (Exception e) {sink.error(error);}}
}

这段代码的核心思想是桥接模式:将LangChain4j的回调式API转换为Reactor的流式API。让我们深入理解几个关键概念:

Flux.create()的工作原理

Flux.create()是Reactor提供的工厂方法,用于从外部的异步源创建Flux。它接收一个Consumer参数,FluxSink就像一个"水龙头",我们可以通过它向下游发送数据:

  • sink.next(data):发送一个数据元素
  • sink.complete():表示数据流结束
  • sink.error(throwable):发送错误信号
StreamingChatResponseHandler回调处理

LangChain4j使用回调模式处理流式响应,我们需要实现三个关键方法:

  1. onPartialResponse():每当模型生成一小段文本时调用
  2. onCompleteResponse():当模型完成整个回答时调用
  3. onError():当发生错误时调用
背压处理策略

FluxSink.OverflowStrategy.BUFFER指定了背压处理策略。背压是响应式编程中的重要概念,当生产者产生数据的速度超过消费者处理速度时,就会发生背压。BUFFER策略会缓存所有数据,适合我们这种场景。

第四步:配置Ollama流式模型

我们需要配置LangChain4j的流式聊天模型。注意这里使用的是StreamingChatLanguageModel而不是普通的ChatLanguageModel

@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "langchain4j.chat", name = "enable", havingValue = "ollama")
public class OllamaConfig {private final OllamaProperties ollamaProperties;@Beanpublic StreamingChatLanguageModel streamingChatLanguageModel() {log.info("正在初始化Ollama流式聊天模型...");return OllamaStreamingChatModel.builder().baseUrl(ollamaProperties.getBaseUrl()).modelName(ollamaProperties.getModelName()).temperature(ollamaProperties.getTemperature()).timeout(ollamaProperties.getTimeout()).build();}
}

这里的@ConditionalOnProperty注解实现了条件化配置,只有当langchain4j.chat.enable=ollama时才会创建这个Bean。这种设计让我们可以轻松切换不同的AI模型提供商。

四、前端实现:处理流式数据

后端实现了流式输出,前端也需要相应地处理流式数据。我们使用Fetch API的ReadableStream来处理:

function sendMessage() {const message = messageInput.value.trim();if (!message) return;// 添加用户消息addMessage(message, true);messageInput.value = '';// 发送请求并处理流式响应fetch('/api/chat/stream', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({ message: message })}).then(response => {if (!response.ok) {throw new Error('网络响应不正常');}// 创建流式读取器const reader = response.body.getReader();const decoder = new TextDecoder();// 处理数据流function processStream() {return reader.read().then(({ done, value }) => {if (done) {completeStreamingMessage();return;}// 解码接收到的数据const text = decoder.decode(value, { stream: true });addStreamingMessage(text);return processStream();});}return processStream();}).catch(error => {console.error('Error:', error);addMessage('发生错误: ' + error.message, false);});
}

关键技术点解析

ReadableStream API

response.body.getReader()返回一个ReadableStreamDefaultReader,它可以逐块读取响应体数据。这是浏览器原生支持的流式API,非常适合处理服务器发送的流式数据。

TextDecoder的流式解码

new TextDecoder()创建一个文本解码器,decode(value, { stream: true })中的stream: true参数很重要,它告诉解码器这是流式数据,可能存在跨块的字符边界。

递归处理模式

processStream()函数使用递归调用来持续读取数据流,直到done为true表示流结束。这种模式简洁而高效。

五、错误处理与优化

在实际应用中,我们需要考虑各种异常情况:

1. 网络异常处理

private void handleStreamError(FluxSink<String> sink, Throwable error) {if (error instanceof NullPointerException) {// 处理模型返回空响应的情况String errorMsg = error.getMessage() != null ? error.getMessage() : "未知空指针异常";if (errorMsg.contains("getMessage()") || errorMsg.contains("getContent()")) {sink.next("\n[系统提示: 模型返回了空响应,请重试]");sink.complete();return;}}// 通用错误处理sink.next("\n[系统错误: " + error.getMessage() + "]");sink.complete();
}

2. 超时处理

在application.yml中配置合理的超时时间:

langchain4j:chat:ollama:timeout: 120s  # 2分钟超时

3. 前端连接管理

// 关闭之前的连接(如果有)
if (eventSource) {eventSource.close();
}

六、性能优化与最佳实践

1. 背压控制

当AI生成速度很快时,可能会产生背压。我们使用BUFFER策略,但在生产环境中可能需要考虑DROP或ERROR策略:

Flux.create(sink -> {// 实现逻辑
}, FluxSink.OverflowStrategy.BUFFER);  // 可以改为DROP或ERROR

2. 内存管理

流式处理需要注意内存使用,避免大量数据积压:

// 在配置中限制缓冲区大小
@Bean
public StreamingChatLanguageModel streamingChatLanguageModel() {return OllamaStreamingChatModel.builder().baseUrl(ollamaProperties.getBaseUrl()).modelName(ollamaProperties.getModelName()).temperature(ollamaProperties.getTemperature()).timeout(ollamaProperties.getTimeout()).build();
}

3. 日志监控

添加适当的日志来监控流式处理的性能:

@Override
public void onPartialResponse(String partialResponse) {log.debug("接收到部分响应: {} 字符", partialResponse.length());sink.next(partialResponse);
}

七、项目结构总览

src/
└── main/├── java/│   └── com/example/ollamademo/│       ├── OllamaDemoApplication.java          # 启动类│       ├── config/│       │   ├── OllamaConfig.java              # Ollama配置│       │   └── properties/│       │       └── OllamaProperties.java      # 配置属性│       ├── controller/│       │   ├── ChatController.java            # 流式聊天控制器│       │   └── WebController.java             # 页面控制器│       ├── dto/│       │   └── ChatRequest.java               # 请求DTO│       └── service/│           └── ChatService.java               # 核心流式服务└── resources/├── application.yml                        # 配置文件└── templates/└── index.html                         # 前端页面

八、运行与测试

  1. 启动Ollama服务

    ollama serve
    ollama pull qwen3:8b
    
  2. 启动Spring Boot应用

    mvn spring-boot:run
    
  3. 访问应用
    打开浏览器访问 http://localhost:8080

  4. 测试流式输出
    输入问题,观察AI回答是否一个字一个字地出现

九、总结与展望

本文主要内容回顾:

  1. 响应式编程基础:理解了Flux、Mono等Reactor核心概念
  2. 流式API设计:掌握了如何设计流式响应的RESTful API
  3. 异步编程模式:学会了回调模式与流式模式的转换
  4. 前端流式处理:掌握了ReadableStream API的使用
  5. 错误处理策略:学会了在流式场景下的异常处理

在下一篇文章中,我们将再走一步,探讨如何实现多轮对话的上下文管理,让AI能够记住之前的对话内容,实现更智能的交互体验。


本系列文章将持续更新,每一篇都是完整可运行的项目。如果你觉得有帮助,欢迎点赞收藏,也欢迎在评论区分享你的想法和问题!
💻完整的代码可以直接下载我上传的资源。

http://www.dtcms.com/a/365688.html

相关文章:

  • Spring Authorization Server 1.5.2 使用YML配置的方式,最常用法总结
  • VAR的教师强制teacher forcing
  • Canaan 阿瓦隆 A1246I 81T矿机评测:性能、功耗与能效全面分析
  • 解锁产品说明书的“视觉密码”:多模态 RAG 与 GPT-4 的深度融合 (AI应用与技术系列)
  • 【收藏必备】大模型面试宝典:Transformer到实战应用全解析,助你斩获30W年薪offer!
  • Debezium日常分享系列之:Debezium 3.3.0.Alpha2发布
  • MySQL 行转列 (Pivot) 的 N 种实现方式:静态、动态与 GROUP_CONCAT 详解
  • C++入门小馆:C++11第一弹
  • 面试复习题-Flutter
  • https 协议与 wss 协议有什么不同
  • 详细教程:如何利用nslookup命令查询DNS解析状态?
  • 深度学习------模型的保存和使用
  • CSS 伪类与伪元素:深度解析
  • 大疆图传技术参数对比 你了解多少?
  • 2025高教社杯数模国赛【思路预约】
  • Mysql的锁退化
  • 虚拟机+ubuntu+docker+python部署,以及中途遇到的问题和解决方案
  • 计算机科学领域-CS基础
  • 信创MySQL到达梦数据库的SQL语法转换技术解析
  • 使用Java定时爬取CSDN博客并自动邮件推送
  • CPU和GPU的区别与作用域
  • prometheus+grafana搭建
  • 虚拟机NAT模式通过宿主机(Windows)上网不稳定解决办法(无法上网)(将宿主机设置固定ip并配置dns)
  • 【面试题】OOV(未登录词)问题如何解决?
  • Unity 枪械红点瞄准器计算
  • K8S 部署 NFS Dynamic Provisioning(动态存储供应)
  • Grafana可视化平台深度解析:选型、竞品、成本与资源消耗
  • SpringCloud整合分布式事务Seata
  • C语言(长期更新)第13讲:指针详解(三)
  • 毒蛇品种检测识别数据集:12个类别,6500+图像,全yolo标注