记录一下微信小程序里使用SSE
文章目录
- SSE 是什么?
- EventSource与SSE的关系
- 在小程序里实现SSE的思路
- SSE 消息格式
- 具体实现SSE步骤
- 举个例子理解一下
- 封装工具类及页面使用
- 封装 utils/sse.js
- 在页面中使用封装好的sse.js
- 注意事项:
SSE 是什么?
SSE 是 Server-Sent Events(服务器推送事件)的缩写。
它是一种标准化的 Web 技术,允许服务器单向地、持续地向客户端(通常是浏览器)推送数据。可以把它想象成一个“来自服务器的广播”,客户端只需要收听即可。
特点:
单向通信:数据流只能从服务器 -> 客户端。客户端不能通过这个连接向服务器发送消息(只能通过常规的 HTTP 请求)。
基于 HTTP:它工作在标准的 HTTP/HTTPS 协议之上,不需要像 WebSocket 那样建立特殊的协议连接,因此兼容性好,更容易穿透防火墙。
自动重连:SSE 规范内置了断线自动重连机制。如果连接意外中断,客户端会自动尝试重新连接。
事件模型:服务器可以给不同的消息打上“事件类型”的标签,客户端可以根据不同的事件类型来注册不同的监听器,逻辑清晰。
与其它轮询、websocket有啥区别,以前写过
EventSource与SSE的关系
具体EventSource看MDN文档
SSE:是技术规范或协议。它定义了服务器应该如何格式化数据(例如 data: …\n\n),以及客户端应该如何处理这个长连接。
EventSource:是浏览器提供给 JavaScript 开发者的客户端接口 (API)。你可以在你的网页 JS 代码中使用 new EventSource(‘/my-sse-endpoint’) 来创建一个到服务器的 SSE 连接。
| 环境 | 技术/协议 | 客户端实现工具 |
|---|---|---|
| 浏览器 | SSE | EventSource API (原生) |
| 小程序 | SSE | wx.request + enableChunked (手动模拟) |
在小程序里实现SSE的思路
使用
wx.request并将enableChunked设置为true,再通过requestTask.onChunkReceived 监听数据块,正是在小程序中实现 SSE (Server-Sent Events) 的方式。
这套机制完美地模拟了浏览器原生
EventSource的核心功能:客户端发起一个长连接,服务器可以持续地、分块地向客户端推送消息。
虽然
onChunkReceived提供了接收数据的能力,但要构建一个健壮的 SSE 客户端,你还需要处理一些细节,主要是数据块的拼接和解析。因为 SSE 协议有自己的消息格式,而一个数据块(chunk)可能包含不完整的消息或多条消息。
SSE 消息格式
服务器发送的原始数据通常长这样,每条消息以
\n\n分隔:
id: 1
event: message
data: {"user": "Alice", "text": "Hello"}id: 2
event: message
data: {"user": "Bob", "text": "Hi there"}
具体实现SSE步骤
- 初始化请求:设置
enableChunked: true。 - 创建缓冲区:在
onChunkReceived外部创建一个变量(如 buffer),用于存储不完整的消息片段。 - 监听并处理数据块:
1、在onChunkReceived回调中,将收到的ArrayBuffer数据转换成字符串。
2、将新字符串追加到缓冲区buffer。
3、循环检查buffer中是否包含消息分隔符\n\n。如果包含,说明至少有一条完整的消息。将其截取出来进行处理。
4、将已处理的消息从buffer中移除。
5、重复此过程,直到buffer中没有完整消息为止。
6、解析消息:对截取出的完整消息字符串进行解析,提取event、data等字段
举个例子理解一下
处理数据块的拼接和解析:
// pages/sse/sse.js
Page({data: {messages: []},requestTask: null,buffer: '', // 用于存储不完整的消息片段onLoad: function () {this.startSSE();},startSSE: function () {this.requestTask = wx.request({url: 'https://your-server.com/sse-endpoint', // 你的 SSE 服务器地址method: 'GET',enableChunked: true, // 关键:开启分块传输header: {'Accept': 'text/event-stream' // 告知服务器需要事件流},success: () => {console.log('SSE request success (stream ended)');},fail: (err) => {console.error('SSE request failed:', err);// 这里可以添加重连逻辑}});// 监听数据块this.requestTask.onChunkReceived(this.handleChunk.bind(this));},handleChunk: function (res) {// 1. 将 ArrayBuffer 转换为字符串const chunkText = new TextDecoder('utf-8').decode(new Uint8Array(res.data));// 2. 追加到缓冲区this.buffer += chunkText;// 3. 循环处理缓冲区中的完整消息let boundaryIndex;while ((boundaryIndex = this.buffer.indexOf('\n\n')) !== -1) {// 提取一条完整的消息const message = this.buffer.substring(0, boundaryIndex);// 从缓冲区移除已处理的消息this.buffer = this.buffer.substring(boundaryIndex + 2);// 4. 解析并处理消息this.parseMessage(message);}},parseMessage: function (message) {if (!message) return;console.log('Received raw message:', message);let eventData = '';const lines = message.split('\n');for (const line of lines) {if (line.startsWith('data:')) {// 提取 data 字段内容eventData = line.substring(5).trim();}}if (eventData) {try {const jsonData = JSON.parse(eventData);console.log('Parsed data:', jsonData);// 更新页面数据const newMessages = this.data.messages.concat(jsonData);this.setData({messages: newMessages});} catch (e) {console.error('Failed to parse SSE data as JSON:', eventData);}}},onUnload: function () {// 页面销毁时,中止请求if (this.requestTask) {this.requestTask.abort();}}
});
封装工具类及页面使用
封装 utils/sse.js
/*** 小程序 SSE (Server-Sent Events) 客户端** 特性:* - 封装了 wx.request 的 enableChunked 功能* - 自动解析 SSE 消息格式 (id, event, data)* - 支持事件监听 (on, off)* - 自动处理断线重连*/
class SSE {constructor(url, options = {}) {this.url = url;this.headers = options.headers || {};this.reconnectInterval = options.reconnectInterval || 3000; // 重连间隔,单位msthis.maxReconnectAttempts = options.maxReconnectAttempts || 5; // 最大重连次数this.requestTask = null;this.listeners = {};this.buffer = '';this.reconnectAttempts = 0;this.manualClose = false;}/*** 连接到 SSE 服务器*/connect() {this.manualClose = false;this.reconnectAttempts = 0;this._createRequest();}/*** 注册事件监听* @param {string} eventName 事件名 ('open', 'message', 'error', 'close')* @param {function} callback 回调函数*/on(eventName, callback) {if (!this.listeners[eventName]) {this.listeners[eventName] = [];}this.listeners[eventName].push(callback);}/*** 注销事件监听* @param {string} eventName 事件名* @param {function} callback 回调函数*/off(eventName, callback) {if (this.listeners[eventName]) {this.listeners[eventName] = this.listeners[eventName].filter(cb => cb !== callback);}}/*** 手动关闭连接*/close() {this.manualClose = true;if (this.requestTask) {// 先移除监听器if (this.chunkHandler) {this.requestTask.offChunkReceived(this.chunkHandler);this.chunkHandler = null;}// 再中止请求this.requestTask.abort();this.requestTask = null;}this._emit('close', { message: 'Connection closed manually.' });console.log('SSE connection manually closed.');}_emit(eventName, data) {if (this.listeners[eventName]) {this.listeners[eventName].forEach(callback => {try {callback(data);} catch (e) {console.error(`Error in SSE listener for event '${eventName}':`, e);}});}}_createRequest() {this.requestTask = wx.request({url: this.url,method: 'GET',enableChunked: true,header: {'Accept': 'text/event-stream','Cache-Control': 'no-cache',...this.headers,},success: () => {// success 在流结束时触发console.log('SSE stream finished.');if (!this.manualClose) {this._emit('close', { message: 'Stream finished by server.' });this._reconnect();}},fail: (err) => {if (!this.manualClose) {this._emit('error', err);this._reconnect();}}});// 保存绑定后的函数引用this.chunkHandler = this._handleChunk.bind(this);// 监听数据块this.requestTask.onChunkReceived(this.chunkHandler);// 模拟一个 'open' 事件this._emit('open', { message: 'Connection opened.' });this.reconnectAttempts = 0; // 连接成功,重置重连次数console.log('SSE connection opened.');}_handleChunk(res) {const chunkText = new TextDecoder('utf-8').decode(new Uint8Array(res.data));this.buffer += chunkText;let boundaryIndex;while ((boundaryIndex = this.buffer.indexOf('\n\n')) !== -1) {const message = this.buffer.substring(0, boundaryIndex);this.buffer = this.buffer.substring(boundaryIndex + 2);this._parseMessage(message);}}_parseMessage(rawMessage) {if (rawMessage.startsWith(':')) { // 忽略注释/心跳return;}const message = {id: null,event: 'message', // 默认事件类型data: ''};const lines = rawMessage.split('\n');lines.forEach(line => {const [field, value] = line.split(/:(.*)/s);if (value) {const trimmedValue = value.trim();switch (field) {case 'id':message.id = trimmedValue;break;case 'event':message.event = trimmedValue;break;case 'data':message.data += trimmedValue + '\n';break;}}});// 移除最后一个换行符message.data = message.data.slice(0, -1);// 触发自定义事件或默认的 message 事件this._emit(message.event, message);}_reconnect() {if (this.manualClose || this.reconnectAttempts >= this.maxReconnectAttempts) {console.log('SSE reconnection stopped.');return;}setTimeout(() => {this.reconnectAttempts++;console.log(`SSE reconnecting... attempt ${this.reconnectAttempts}`);this._createRequest();}, this.reconnectInterval);}
}// 导出 SSE 类
export default SSE;
在页面中使用封装好的sse.js
import SSE from '../../utils/sse.js'; // 引入封装好的模块Page({data: {sseStatus: '未连接',messages: []},sse: null, // 用于存放 SSE 实例onLoad: function () {// 1. 创建 SSE 实例this.sse = new SSE('https://your-server.com/sse-endpoint', {headers: {'Authorization': 'Bearer your-token' // 可以传入自定义请求头}});// 2. 监听事件this.sse.on('open', () => {this.setData({ sseStatus: '连接成功!' });console.log('SSE connection opened by page.');});this.sse.on('message', (event) => {console.log('Received default message:', event.data);try {const data = JSON.parse(event.data);const newMessages = this.data.messages.concat(data);this.setData({messages: newMessages});} catch (e) {// 如果数据不是JSON,直接显示const newMessages = this.data.messages.concat({ text: event.data });this.setData({messages: newMessages});}});// 监听自定义事件,例如服务器发送 event: updatethis.sse.on('update', (event) => {console.log('Received custom event [update]:', event.data);// ...处理更新逻辑});this.sse.on('error', (err) => {this.setData({ sseStatus: '连接出错,正在重连...' });console.error('SSE error:', err);});this.sse.on('close', () => {this.setData({ sseStatus: '连接已关闭' });console.log('SSE connection closed by page.');});// 3. 开始连接this.sse.connect();},onUnload: function () {// 4. 页面销毁时,务必关闭连接if (this.sse) {this.sse.close();}}
});
注意事项:
- 兼容性:务必确认你的小程序目标用户的基础库版本是否支持
enableChunked。这是一个相对较新的特性。 - 服务器端:服务器必须支持长连接,并正确设置响应头(如
Content-Type: text/event-stream)。 - 客户端解析:如示例所示,客户端需要一个简单的解析器来处理消息的
拼接和提取。 - 心跳与重连:为了维持连接的稳定性,服务器最好能定时发送心跳消息(例如一个只包含注释 :heartbeat 的消息),客户端也需要实现断线重连逻辑。
