Python_SSE案例实现
Python_SSE案例实现
1-Python使用Flask搭建一个Web项目框架
-
python具体的Flask搭建参考博客
-
Python使用Flask验证SSE :https://gitee.com/enzoism/python_flask_sse
-
1)场景一:标准SSE事件
-
2)场景二:自定义SSE事件
2-SpringBoot-Servelt实现SSE
SpringBoot-模拟SSE对话交互-代码地址:https://gitee.com/enzoism/springboot_sse
3-Server-Sent Events (SSE) 开发知识点整理
Server-Sent Events (SSE) 开发知识点整理
一、SSE 基础概念
-
定义:SSE 是一种允许服务器向客户端推送数据的 HTML5 技术
-
特点:
- 单向通信(服务器→客户端)
- 基于 HTTP 协议
- 使用简单文本协议
- 自动重连机制
- 支持跨域(需 CORS)
-
与 WebSocket 对比:
特性 SSE WebSocket 方向性 单向(服务器→客户端) 双向 协议 HTTP 独立协议(ws/wss) 数据格式 文本 二进制/文本 复杂度 简单 较复杂 重连机制 内置 需手动实现
二、SSE 协议格式
-
基本格式:
data: <message>\n\n
-
完整字段:
data
: 消息内容(可多行)id
: 事件ID(用于重连)event
: 事件类型(默认"message")retry
: 重连时间(ms):
注释(会被忽略)
示例:
event: update id: 123 retry: 5000 data: {"user": "John", "status": "online"} data: continued message \n\n
三、服务端实现
Flask 实现(Python)
from flask import Flask, Response
import timeapp = Flask(__name__)@app.route('/stream')
def stream():def event_stream():for i in range(5):yield f"data: Message {i}\n\n"time.sleep(1)return Response(event_stream(), content_type='text/event-stream')
Node.js 实现
const express = require('express');
const app = express();app.get('/stream', (req, res) => {res.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');let counter = 0;const interval = setInterval(() => {res.write(`data: ${counter++}\n\n`);if(counter > 10) {clearInterval(interval);res.end();}}, 1000);req.on('close', () => clearInterval(interval));
});
四、客户端实现
基本用法
const eventSource = new EventSource('/sse-endpoint');// 接收消息
eventSource.onmessage = (e) => {console.log('New message:', e.data);
};// 自定义事件
eventSource.addEventListener('update', (e) => {console.log('Update:', e.data);
});// 错误处理
eventSource.onerror = (e) => {if (e.eventPhase === EventSource.CLOSED) {console.log('Connection closed');} else {console.error('Error:', e);}
};
关闭连接
eventSource.close();
五、高级特性
-
重连机制:
- 客户端自动尝试重连
- 可通过
retry
字段设置重连间隔
-
事件ID:
- 服务端发送
id
字段 - 断线重连时,客户端会发送
Last-Event-ID
头
- 服务端发送
-
多事件类型:
// 服务端 event: notification data: New alert!// 客户端 eventSource.addEventListener('notification', ...);
六、性能优化
-
连接管理:
- 限制并发连接数
- 及时关闭闲置连接
-
消息压缩:
- 启用gzip压缩
-
心跳机制:
def event_stream():while True:yield ": heartbeat\n\n" # 注释行作为心跳time.sleep(15)
七、安全考虑
-
CORS 配置:
@app.after_request def add_cors(response):response.headers['Access-Control-Allow-Origin'] = '*'return response
-
认证授权:
- 使用Cookie或Token认证
- 敏感数据加密
-
DOS防护:
- 限制请求频率
- 验证客户端身份
八、常见问题解决
-
Nginx 代理配置:
proxy_set_header Connection ''; proxy_http_version 1.1; proxy_buffering off;
-
连接意外关闭:
- 添加心跳包
- 客户端实现手动重连
-
浏览器限制:
- Chrome/Firefox最多支持6个SSE连接
- 考虑使用单个连接多路复用
九、实际应用场景
- 实时通知:站内消息、系统警报
- 数据监控:服务器状态、股票价格
- 进度报告:文件上传/处理进度
- 动态更新:新闻推送、赛事比分
十、示例:完整聊天应用
服务端(Python)
from flask import Flask, request, Responseapp = Flask(__name__)
channels = {}@app.route('/chat/<channel>')
def chat(channel):def stream():messages = channels.setdefault(channel, [])for msg in messages:yield f"data: {msg}\n\n"while True:# 实际应用中应有更高效的消息分发机制time.sleep(1)return Response(stream(), content_type='text/event-stream')@app.route('/send/<channel>', methods=['POST'])
def send(channel):msg = request.form.get('message')channels.setdefault(channel, []).append(msg)return "OK"
客户端
<input id="message">
<button onclick="send()">Send</button>
<div id="chat"></div><script>
const channel = 'general';
const chat = new EventSource(`/chat/${channel}`);chat.onmessage = (e) => {document.getElementById('chat').innerHTML += `<div>${e.data}</div>`;
};function send() {const msg = document.getElementById('message').value;fetch(`/send/${channel}`, {method: 'POST',body: new URLSearchParams({message: msg})});
}
</script>
掌握这些知识点后,您应该能够熟练地在各种项目中实现SSE功能,并根据实际需求进行定制开发。