AIGC入门,从理解通信协议sse与streamhttp开始
引言
随着AIGC(生成式人工智能)的普及,大模型如GPT、通义千问等已广泛应用于对话、代码生成、数据分析等场景。这些模型在运行时需要与客户端进行高效通信,SSE(Server-Sent Events) 和 Stream HTTP(流式HTTP) 是两种核心通信协议。本文将结合理论和Java代码示例,帮助你理解这两种协议的原理、区别及在AIGC中的实际应用。
一、通信协议基础
1.1 什么是SSE?
SSE(Server-Sent Events)是一种基于HTTP的单向通信协议,允许服务器主动向客户端推送实时事件。其特点包括:
● 事件格式标准化:通过data:字段传递结构化数据(如JSON)。
● 自动重连:网络中断后客户端会自动重新连接。
● 长连接保持:服务器持续发送数据,客户端逐步接收。
典型应用场景:实时聊天、股票行情推送、大模型流式输出。
1.2 什么是Stream HTTP?
Stream HTTP 是基于 HTTP 的分块传输编码(Chunked Transfer Encoding) 的流式传输方式。服务器可以持续向客户端发送数据块,客户端逐步接收并处理。
典型应用场景:视频流、大文件下载、自定义数据流。
二、SSE与Stream HTTP的区别
三、Java代码示例
3.1 使用SSE实现大模型流式输出
服务器端(Spring Boot)
package com.lzc.ai;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@RestController
public class SseController {private final ExecutorService executor = new ThreadPoolExecutor(5, 5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // 使用线程池管理发射器@GetMapping("/sse")public SseEmitter streamData() {SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // 创建一个SseEmitter实例,不设置超时时间executor.execute(() -> {try {for (int i = 0; i < 10; i++) { // 发送10条消息String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));emitter.send(SseEmitter.event().id(Thread.currentThread().getName() + "=" + i).name("msg").data("Message " + i + " " + currentTime)); // 发送事件和数据Thread.sleep(1000); // 等待1秒}emitter.complete(); // 完成SSE连接} catch (Exception e) {emitter.completeWithError(e); // 出现错误时,关闭连接并发送错误信息}});return emitter;}
}
客户端(chrome浏览器)
http://localhost:8000/sse
3.2 使用Stream HTTP分块传输
服务器端(Spring Boot)
package com.lzc.ai;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@RestController
public class StreamController {@GetMapping("/stream")public void streamData(HttpServletResponse response) throws IOException, InterruptedException {//默认即是chunked,所以不需要设置//response.setHeader("Transfer-Encoding", "chunked");//text/plain模式下,浏览器会缓存数据直到连接关闭,再一次性渲染,如果想观察数据分块传输效果,可以使用curl命令行或者其它手写java client工具response.setContentType("text/plain");PrintWriter writer = response.getWriter();for (int i = 0; i < 10; i++) {//得到当前时间 yyyy-MM-dd HH:mm:ssString currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));writer.write("Data chunk " + i + " " + currentTime + "\n");writer.flush(); // 必须调用 flush 以触发分块传输TimeUnit.SECONDS.sleep(1);}}
}
客户端(chrome浏览器)
说明:虽然从浏览器中来看,页面数据是在10秒后统一返回的,但是由于服务端设置了writer.flush(); 故每秒钟其实是有返回的,只不过浏览器进行了缓存,待全部数据拿回来之后才显示出来,如果想看过程数据,可以使用curl命令查看
四、在AIGC中的实际应用
4.1 大模型流式输出(SSE)
● OpenAI API:当调用 POST /chat/completions 并设置 stream: true 时,返回的是SSE格式的数据流。
● 代码示例(Python):
import openaiopenai.api_key = "YOUR_API_KEY"
response = openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=[{"role": "user", "content": "Hello!"}],stream=True
)for chunk in response:print(chunk.choices[0].delta.get("content", ""), end="", flush=True)
4.2 大文件下载(Stream HTTP)
● 适用场景:模型权重文件、训练数据集的分块下载。
● 优势:支持断点续传,节省带宽。
五、总结
● SSE 是 AIGC 中流式输出的首选协议,因其标准化格式、自动重连和实时性优势。
● Stream HTTP 更适合大文件传输或自定义协议场景,但需手动处理数据解析和重连。
● 选择建议:
- AI对话/代码生成:使用 SSE。
- 大文件下载/视频流:使用 Stream HTTP。
通过本文的理论讲解和Java代码示例,你可以快速入门AIGC中的通信协议设计,并在实际项目中灵活应用。