Spring Boot中接入DeepSeek的流式输出
第一步,添加依赖:
<!-- WebFlux 响应式支持 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
第二步,配置WebClient。这里需要设置WebClient实例,用于向DeepSeek的API发送请求。需要配置baseUrl,可能还需要添加认证头,比如Authorization Bearer token
@Configuration
public class DeepSeekConfig {@Value("${deepseek.api.key}")private String apiKey;@Value("${deepseek.api.url}")private String apiUrl;@Beanpublic WebClient deepseekWebClient() {return WebClient.builder().baseUrl(apiUrl).defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey).defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).codecs(configurer ->configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)) //处理大响应.build();}
}
第三步,创建服务类,使用WebClient发送请求并处理流式响应。这里可能需要将响应转换为Flux,然后处理每个数据块。比如,读取每个chunk的数据,提取需要的内容,可能还要处理不同的数据格式,比如JSON对象或者特定格式的文本。
@Service
@RequiredArgsConstructor
public class DeepSeekService {private final WebClient deepSeekClient;public Flux<String> streamCompletion(String prompt) {Map<String, Object> requestBody = new HashMap<>();requestBody.put("model", "deepseek-reasoner");requestBody.put("messages", List.of(Map.of("role", "user", "content", prompt)));// 启用流式requestBody.put("stream", true);requestBody.put("temperature", 0.7);return deepSeekClient.post().contentType(MediaType.APPLICATION_JSON).bodyValue(requestBody).retrieve().bodyToFlux(String.class).filter(data -> !"[DONE]".equals(data)) // 过滤结束标记.map(this::extractContent).onErrorResume(e -> {// 错误处理return Flux.error(new RuntimeException("API调用失败: " + e.getMessage()));});}// 解析响应数据private String extractContent(String data) {try {JsonNode node = new ObjectMapper().readTree(data);return (Strings.isBlank(node.at("/choices/0/delta/reasoning_content").asText()) || "null".equalsIgnoreCase(node.at("/choices/0/delta/reasoning_content").asText())) ? node.at("/choices/0/delta/content").asText() : node.at("/choices/0/delta/reasoning_content").asText();} catch (Exception e) {return "";}}}
第四步,创建控制器
@RestController
@RequiredArgsConstructor
@Slf4j
public class StreamController {// 添加速率限制private final RateLimiter rateLimiter = RateLimiter.create(10); // 10次/秒private final DeepSeekService deepSeekService;@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamChat(@RequestParam String message) {if (!rateLimiter.tryAcquire()) {return Flux.error(new RuntimeException("请求过于频繁"));}return deepSeekService.streamCompletion(message).map(content -> "data: " + content + "\n\n") // SSE格式.doOnComplete(() -> log.info("流式传输完成")).doOnError(e -> log.error("流式错误: {}", e.getMessage())).log("deepseek-stream", Level.FINE) // 详细日志.metrics();// 集成Micrometer监控}
}
配置
spring.application.name=deepseek_demo
server.port=8080deepseek.api.key=sk-?????????????
deepseek.api.url=https://api.deepseek.com/v1/chat/completions
测试:
@SpringBootTest
class DeepseekDemoApplicationTests {@Autowiredprivate DeepSeekService deepSeekService;@Testvoid contextLoads() {Flux<String> flux = deepSeekService.streamCompletion("springboot如何接入deepseek的流式输出,给出详细的步骤及代码实现");StepVerifier.create(flux).thenConsumeWhile(content -> {System.out.print(content);return true;}).verifyComplete();}
}