当前位置: 首页 > news >正文

OkHttp SSE 完整总结(最终版)

1. SSE 基础概念

什么是 SSE?

SSE(Server-Sent Events)是一种 Web 标准,允许服务器向客户端推送实时数据。

核心特点

  • 单向通信:服务器 → 客户端
  • 基于 HTTP 协议:使用 GET 请求
  • 长连接:连接建立后保持开放
  • 自动重连:网络中断时自动重连

2. SSE 协议详解

HTTP 请求格式

 

GET /events HTTP/1.1

Host: api.example.com

Accept: text/event-stream

Cache-Control: no-cache

服务器响应格式

 

HTTP/1.1 200 OK

Content-Type: text/event-stream

Cache-Control: no-cache

Connection: keep-alive

data: {"message": "Hello"}

data: {"message": "World"}

data: {"message": "Test"}

SSE 数据格式

 

id: 12345

event: message

data: {"content": "Hello World"}

data: {"content": "Another message"}

3. OkHttp SSE 三种实现方式对比

方式一:使用 OkHttp SSE 库(标准 SSE 实现)

依赖配置

 

implementation 'com.squareup.okhttp3:okhttp:4.9.3'implementation 'com.squareup.okhttp3:okhttp-sse:4.9.3'

基本实现

 

OkHttpClient client = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(0, TimeUnit.SECONDS)  // 重要:无读取超时.build();Request request = new Request.Builder().url("https://api.example.com/events").addHeader("Accept", "text/event-stream").build();EventSource eventSource = EventSources.createEventSource(client, request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {System.out.println("SSE 连接建立成功");System.out.println("响应码: " + response.code());System.out.println("响应头: " + response.headers());}@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {// 自动解析 SSE 格式,直接得到 dataSystem.out.println("收到数据: " + data);System.out.println("事件ID: " + id);System.out.println("事件类型: " + type);}@Overridepublic void onClosed(EventSource eventSource) {System.out.println("SSE 连接已关闭");}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {System.err.println("SSE 连接失败: " + t.getMessage());if (response != null) {System.err.println("响应码: " + response.code());}}});

SSE 库的优势
  • ✅ 自动处理 SSE 协议:自动解析 data:、id:、event: 等字段
  • ✅ 内置重连机制:网络中断时自动重连
  • ✅ 标准 SSE 实现:完全符合 SSE 规范
  • ✅ 更高级的 API:提供 EventSource 和 EventSourceListener
  • ✅ 自动连接管理:自动处理连接生命周期

方式二:自定义 SSE 客户端(推荐用于 Kotlin 项目)

依赖配置

 

// 只需要基本的 OkHttp,不需要 SSE 库implementation 'com.squareup.okhttp3:okhttp:4.9.3'

自定义 SSE 客户端实现

先补一下知识点:

trySend、Channel 和 Flow 的工作原理-CSDN博客

再看代码:

class SSEClient {private val okHttpClient by lazy {OkHttpClient.Builder().readTimeout(0, TimeUnit.SECONDS) // 禁用读取超时.build()}private var call: Call? = nullsealed class SSEEvent {data class Message(val event: String, val data: String) : SSEEvent()data class Error(val throwable: Throwable) : SSEEvent()object Closed : SSEEvent()}/*** 连接到 SSE 服务器* @param url SSE 服务器地址* @return Flow 流式返回事件*/fun connect(url: String, token :String): Flow<SSEEvent> = callbackFlow {val request = Request.Builder().url(url).addHeader("Accept", "text/event-stream").addHeader("Cache-Control", "no-cache").addHeader("X-Access-Token", token).build()call = okHttpClient.newCall(request)call?.enqueue(object : Callback {override fun onFailure(call: Call, e: IOException) {trySend(SSEEvent.Error(e))close(e)}override fun onResponse(call: Call, response: Response) {if (!response.isSuccessful) {trySend(SSEEvent.Error(IOException("Unexpected code: ${response.code}")))close()return}val body = response.body ?: returntry {var currentEvent = "message"val dataBuffer = StringBuilder()while (true) {val line = body.source().readUtf8Line() ?: breakwhen {line.startsWith("event:") -> currentEvent = line.substring(6).trim()line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim()).append("\n")line.isEmpty() -> { // 空行表示事件结束if (dataBuffer.isNotEmpty()) {val data = dataBuffer.toString().trim()trySend(SSEEvent.Message(currentEvent, data))dataBuffer.clear()}}}}trySend(SSEEvent.Closed)} catch (e: Exception) {trySend(SSEEvent.Error(e))} finally {response.close()close()}}})awaitClose { disconnect() }}/*** 断开连接*/fun disconnect() {call?.cancel()call = null}/*** 重试连接到 SSE 服务器* @param url SSE 服务器地址* @param token 认证Token* @param maxRetries 最大重试次数* @param retryDelay 重试延迟时间(毫秒)* @return Flow 流式返回事件*/fun connectWithRetry(url: String, token: String, maxRetries: Int = Int.MAX_VALUE, retryDelay: Long = 3000): Flow<SSEEvent> = flow {var retryCount = 0while (retryCount < maxRetries) {connect(url, token).collect { event ->when (event) {is SSEClient.SSEEvent.Closed,is SSEClient.SSEEvent.Error -> {emit(event)if (retryCount < maxRetries) {delay(retryDelay)retryCount++}}else -> emit(event)}}}}
}

使用方式示例

fun initSSE(token: String) {sseClient = SSEClient()val sseUrl = BASE_URL// 启动 SSE 连接sseClient.connect(sseUrl, token).onEach { event ->when (event) {is SSEClient.SSEEvent.Message -> {// 处理事件消息runOnUiThread {mBinding.eventMag.append("Event: ${event.event}\nData: ${event.data}\n\n")}}is SSEClient.SSEEvent.Error -> {// 处理错误Log.e("SSE", "Error: ${event.throwable.message}")}SSEClient.SSEEvent.Closed -> {// 连接关闭Log.i("SSE", "Connection closed")}}}.launchIn(lifecycleScope) // 自动在生命周期结束时取消}

自定义 SSE 客户端的优势
  • ✅ 与 Kotlin Flow 深度集成,支持协程和生命周期自动管理
  • ✅ 事件结构清晰,便于扩展和维护
  • ✅ 可自定义重连、错误处理等高级功能
  • ✅ 只依赖 OkHttp,包体积小

方式三:手动处理 HTTP 流(适合极简和特殊场景)

依赖配置 
implementation 'com.squareup.okhttp3:okhttp:4.9.3'

实现方式

 

suspend fun sendAiQuestionStream(question: String): Flow<String> = flow {var response: ResponseBody? = nulltry {response = ApiManager.apiLogin.sendAiQuestionStream(QuestionRequest(question))val source = response.source()while (!source.exhausted()) {val line = source.readUtf8Line() ?: breakif (line.startsWith("data:")) {val data = line.substring(5).trim()if (data.isNotEmpty() && data != "[DONE]") {emit(data)} else if (data == "[DONE]") {break}}}} finally {response?.close()}}

手动处理方式的优势
  • ✅ 代码极简,完全自定义
  • ✅ 适合只需处理 data: 字段的场景
  • ✅ 适合特殊业务需求(如 AI 流式响应)

4. 三种方式的对比与适用场景

特性/方式OkHttp SSE 库自定义 SSEClient手动处理
依赖OkHttp+SSE库仅OkHttp仅OkHttp
事件结构标准ListenerKotlin Flow/自定义无结构/自定义
自动重连内置可自定义需手动
代码复杂度
灵活性一般最高
适合场景标准SSE推送Kotlin项目/自定义极简/特殊需求

推荐选择:

  • 标准SSE推送、Java项目:OkHttp SSE库
  • Kotlin项目、需要流式/自定义事件/重连:自定义SSEClient
  • 极简、特殊业务、AI流式响应:手动处理

5. 与 WebSocket、HTTP轮询的对比

特性SSEWebSocketHTTP轮询
通信方向单向(服务端→客户端)双向单向/伪双向
协议HTTPWebSocketHTTP
实时性最高
实现复杂度
适用场景实时推送、通知聊天、游戏、协作简单更新
浏览器支持原生原生原生

6. 最佳实践与总结

  • 标准SSE场景:优先用 OkHttp SSE 库,省心省力。
  • Kotlin/协程/流式场景:自定义 SSEClient,灵活扩展,易于维护。
  • 特殊/极简需求:手动处理,代码最少,完全自控。
  • AI流式响应:推荐手动处理,便于处理特殊标记和自定义逻辑。

你的项目已经灵活结合了自定义 SSEClient 和手动处理两种方式,既保证了结构化、可维护性,又能满足特殊业务需求,是非常现代且实用的做法。


结论:

OkHttp SSE 在实际开发中有多种实现方式。你可以根据项目需求选择标准库、自定义客户端或手动处理,三者各有优劣。对于现代 Kotlin 项目,推荐自定义 SSEClient 结合 Flow,既灵活又易于集成和维护。对于极简或特殊场景,手动处理同样高效可靠。

你的实现方式,正是当前最佳实践之一!

http://www.dtcms.com/a/277112.html

相关文章:

  • JAVA学习笔记 首个HelloWorld程序-002
  • javaweb-day10案例
  • Linux 系统——管理 MySQL
  • 入职华为od一个月的感受
  • 2025年渗透测试面试题总结-2025年HW(护网面试) 44(题目+回答)
  • 鸿蒙项目构建配置
  • TDengine 使用最佳实践(2)
  • SpringBoot-23-企业云端开发实践之Vue框架组件化开发和第三方组件element-ui
  • 谷歌推出Vertex AI Memory Bank:为AI智能体带来持久记忆,支持连续对话
  • 【源力觉醒 创作者计划】文心开源大模型ERNIE-4.5私有化部署保姆级教程与多功能界面窗口部署
  • zotero自由编辑参考文献格式(2)
  • Dubbo + Spring Boot + Zookeeper 快速搭建分布式服务
  • spring--xml注入时bean的property属性
  • 20250713-`Seaborn.pairplot` 的使用注意事项
  • jenkins部署前端vue项目使用Docker+Jenkinsfile方式
  • 【PTA数据结构 | C语言版】字符串插入操作
  • java.net.InetAddress
  • 学习笔记-Excel统计分析——描述统计量的计算
  • SpringBoot单元测试类拿不到bean报空指针异常
  • 算法 - 蛇形矩阵-上三角
  • 2.3 单链表的应用
  • 图像读取与模型保存--基于NWPU-RESISC45数据集的图像二分类实战
  • stm32f103c8t6移植freeRTOS内存不足报错问题的解决办法
  • 浏览器渲染原理与性能优化全解析
  • 快速傅里叶变换(FFT)中的振幅和相位
  • 【计算机网络架构】环型架构简介
  • 在 C# 中调用 Python 脚本:实现跨语言功能集成
  • ADB 调试日志全攻略:如何开启与关闭 `ADB_TRACE` 日志
  • CS课程项目设计1:交互友好的井字棋游戏
  • 详解Linux下多进程与多线程通信(二)