Spring Boot 2.2.6调用DeepSeek API并通过SSE将流式响应推送给前端的完整实现
1. 添加依赖 (pom.xml)
<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- SSE 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!-- HTTP客户端 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-json</artifactId></dependency>
</dependencies>
2. 配置类 (WebClientConfig.java
)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient webClient() {return WebClient.builder().baseUrl("https://api.deepseek.com/v1").defaultHeader("Authorization", "Bearer YOUR_API_KEY") // 替换为你的API密钥.build();}
}
3. 请求/响应DTO
import lombok.Data;
import java.util.List;@Data
public class DeepSeekRequest {private String model = "deepseek-chat";private List<Message> messages;private boolean stream = true;@Datapublic static class Message {private String role;private String content;public Message(String role, String content) {this.role = role;this.content = content;}}
}@Data
public class DeepSeekResponse {private List<Choice> choices;@Datapublic static class Choice {private Delta delta;}@Datapublic static class Delta {private String content;}
}
4. SSE服务实现 (DeepSeekService.java
)
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;import java.util.Collections;@Service
public class DeepSeekService {private final WebClient webClient;public DeepSeekService(WebClient webClient) {this.webClient = webClient;}public Flux<String> streamCompletion(String userMessage) {// 使用 FluxProcessor 替代 SinksFluxProcessor<String, String> processor = DirectProcessor.<String>create().serialize();FluxSink<String> sink = processor.sink();DeepSeekRequest request = new DeepSeekRequest();request.setMessages(Collections.singletonList(new DeepSeekRequest.Message("user", userMessage)));webClient.post().uri("/chat/completions").contentType(MediaType.APPLICATION_JSON).bodyValue(request).accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(String.class).subscribe(data -> {ObjectMapper objectMapper = new ObjectMapper();try {String jsonString = objectMapper.writeValueAsString(data);sink.next(jsonString);} catch (JsonProcessingException e) {sink.error(e);}},sink::error,sink::complete);return processor;}
}
5. SSE控制器 (SseController.java
)
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;@RestController
@RequestMapping("/sse")
public class SseController {private final DeepSeekService deepSeekService;public SseController(DeepSeekService deepSeekService) {this.deepSeekService = deepSeekService;}@GetMapping(path = "/deepseek", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter streamDeepSeekResponse(@RequestParam String message) {SseEmitter emitter = new SseEmitter(60 * 1000L); // 60秒超时Flux<String> responseStream = deepSeekService.streamCompletion(message);responseStream.subscribe(content -> {try {// 发送SSE事件emitter.send(SseEmitter.event().data(content).name("message"));} catch (Exception e) {emitter.completeWithError(e);}},emitter::completeWithError,emitter::complete);return emitter;}
}
6. 前端实现 (HTML + JavaScript)
<!DOCTYPE html>
<html>
<head><title>DeepSeek SSE Demo</title>
</head>
<body><input type="text" id="message" placeholder="输入你的问题"><button onclick="startSSE()">开始对话</button><div id="output" style="white-space: pre-wrap; margin-top: 20px;"></div><script>let eventSource;function startSSE() {const message = document.getElementById('message').value;const outputDiv = document.getElementById('output');outputDiv.innerHTML = ''; // 清空之前的内容if (eventSource) eventSource.close();// 创建SSE连接eventSource = new EventSource(`/sse/deepseek?message=${encodeURIComponent(message)}`);eventSource.addEventListener("message", (event) => {// 实时追加内容outputDiv.innerHTML += event.data;});eventSource.addEventListener("error", (err) => {console.error("SSE error:", err);outputDiv.innerHTML += "\n\n[连接已关闭]";eventSource.close();});}</script>
</body>
</html>
关键点说明:
SSE流式传输:
使用SseEmitter实现服务端推送
通过text/event-stream内容类型保持长连接
DeepSeek API集成:
设置stream=true启用流式响应
处理data: [DONE]结束标记
解析JSON响应中的content字段
响应式编程:
使用WebClient处理HTTP流
使用Sinks进行背压管理
Flux实现响应式流处理
前端实现:
使用EventSource API接收SSE
实时追加内容到DOM
处理连接错误和关闭
测试步骤:
1.启动Spring Boot应用
2.访问前端页面(默认端口8080)
3.输入问题并点击按钮
4.查看实时输出的思考过程
注意事项:
1.替换YOUR_API_KEY为实际的DeepSeek API密钥
2.生产环境建议:
3.添加JSON解析库(如Jackson)处理响应
4.增加错误处理和重试机制
5.添加API速率限制
6.实现更健壮的SSE连接管理
此实现能让前端实时接收并显示DeepSeek API返回的流式响应,实现"思考过程"的逐字显示效果。