如何实现大模型 “边生成边显示“
今天要实现大模型 “边生成边显示” 的效果。并介绍如何Java接入DeepSeek接口。
Java 可以通过SSE(Server-Sent Events,服务器发送事件) 技术实现。SSE 是一种基于 HTTP 的单向通信协议,适合服务器持续向客户端推送数据流(如大模型的流式输出),非常符合 “边吐内容边显示” 的场景。
准备工作
进入deepseek官网 https://www.deepseek.com/
创建key,API key 仅在创建时可见可复制,请妥善保存。
创建一个Java springboot工程。
导入依赖
必要依赖
okhttp
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.9.3</version>
</dependency>
控制层
@Slf4j
@RestController
@RequestMapping("/model/v2")
public class SSEv2Controller {// 线程池:用于异步处理大模型生成(避免阻塞主线程)private final ExecutorService executor = Executors.newCachedThreadPool();// DeepSeek客户端实例private final ModelSseClient modelSseClient = new ModelSseClient();/*** 处理前端的SSE请求,返回模型流式响应** @param prompt 用户输入的提示词* @return SseEmitter(用于向前端推送流式数据)*/@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter streamResponse(@RequestParam String prompt) {// 创建SSE发射器(30分钟超时)SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);// 异步调用DeepSeek模型(避免阻塞控制器线程)executor.submit(() -> {// 通过回调处理模型返回的流式数据modelSseClient.streamChat(prompt, new StreamCallback() {@Overridepublic void onChunk(String chunk) {try {// 将模型返回的内容块推送给前端emitter.send(SseEmitter.event().data(chunk));} catch (Exception e) {// 推送失败时关闭连接emitter.completeWithError(e);}}@Overridepublic void onComplete() {try {// 模型响应结束,发送结束信号并关闭连接emitter.send(SseEmitter.event().data("[DONE]"));emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}}@Overridepublic void onError(Throwable throwable) {// 模型调用出错时,通知前端并关闭连接emitter.completeWithError(throwable);}});});// 当连接关闭时清理资源emitter.onCompletion(() -> System.out.println("SSE连接已完成"));emitter.onError(e -> System.err.println("SSE连接错误: " + e.getMessage()));emitter.onTimeout(() -> emitter.completeWithError(new RuntimeException("SSE连接超时")));return emitter;}
}
模型流式数据回调接口
// 模型流式数据回调接口
public interface StreamCallback {// 接收模型返回的内容块void onChunk(String chunk);// 流式响应结束void onComplete();// 发生错误时调用void onError(Throwable throwable);
}
模型客户端
public class ModelSseClient {// DeepSeek API地址(请确认官方最新地址)private static final String DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions";// 替换为你的DeepSeek API Keyprivate static final String API_KEY = "key";// 模型名称(根据DeepSeek官方文档填写)private static final String MODEL = "deepseek-chat";private OkHttpClient client;private ObjectMapper objectMapper;public ModelSseClient() {// 初始化OkHttp客户端(设置超时时间)this.client = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS) // 流式响应需较长读取超时.writeTimeout(30, TimeUnit.SECONDS).build();// 初始化JSON解析器(用于构建请求体和解析响应)this.objectMapper = new ObjectMapper();}/*** 调用DeepSeek流式接口,通过回调返回实时内容** @param prompt 用户输入的提示词* @param callback 回调接口(用于传递内容块、结束信号、错误信息)*/public void streamChat(String prompt, StreamCallback callback) {try {// 构建请求体(使用Jackson生成JSON,避免字符串拼接导致的格式错误)// 1. 构建messages参数List<Message> messages = new ArrayList<>();messages.add(new Message("user", prompt));// 2. 构建完整请求对象ChatRequest request = new ChatRequest(MODEL, messages, true);// 3. 转换为JSON字符串String requestBody = objectMapper.writeValueAsString(request);// 构建请求Request httpRequest = new Request.Builder().url(DEEPSEEK_API_URL).addHeader("Content-Type", "application/json").addHeader("Authorization", "Bearer " + API_KEY).post(RequestBody.create(requestBody, MediaType.parse("application/json"))).build();// 发送异步请求client.newCall(httpRequest).enqueue(new Callback() {@Overridepublic void onFailure(Call call, IOException e) {// 调用失败时通知回调callback.onError(e);}@Overridepublic void onResponse(Call call, Response response) throws IOException {if (!response.isSuccessful()) {// 非200状态码时通知错误(如401认证失败、400参数错误)callback.onError(new RuntimeException("DeepSeek请求失败: 状态码=" + response.code() + ", 信息=" + response.message()));response.close();return;}ResponseBody body = response.body();if (body == null) {callback.onError(new RuntimeException("DeepSeek响应体为空"));response.close();return;}try (BufferedSource source = body.source()) {// 循环读取流式响应while (!source.exhausted()) {String line = source.readUtf8Line();if (line == null) break;if (line.trim().isEmpty()) continue;// 解析SSE格式数据if (line.startsWith("data: ")) {String data = line.substring("data: ".length()).trim();if (data.equals("[DONE]")) {// 流式结束,通知回调callback.onComplete();break;}// 解析JSON获取内容JsonNode jsonNode = objectMapper.readTree(data);String content = jsonNode.at("/choices/0/delta/content").asText(null);if (content != null && !content.isEmpty()) {// 将内容块通过回调传递给SSE控制器callback.onChunk(content);}}}} finally {response.close();}}});} catch (Exception e) {// 构建请求体失败时通知错误callback.onError(e);}}
}
创建两个类,用来构建请求体
class ChatRequest {private String model;private List<Message> messages;private boolean stream;
}class Message { private String role;private String content;
}
