当前位置: 首页 > news >正文

Python_SSE案例实现

Python_SSE案例实现

1-Python使用Flask搭建一个Web项目框架

  • python具体的Flask搭建参考博客

  • Python使用Flask验证SSE :https://gitee.com/enzoism/python_flask_sse

  • 1)场景一:标准SSE事件

  • 2)场景二:自定义SSE事件


2-SpringBoot-Servelt实现SSE

SpringBoot-模拟SSE对话交互-代码地址:https://gitee.com/enzoism/springboot_sse


3-Server-Sent Events (SSE) 开发知识点整理

Server-Sent Events (SSE) 开发知识点整理

一、SSE 基础概念

  1. 定义:SSE 是一种允许服务器向客户端推送数据的 HTML5 技术

  2. 特点

    • 单向通信(服务器→客户端)
    • 基于 HTTP 协议
    • 使用简单文本协议
    • 自动重连机制
    • 支持跨域(需 CORS)
  3. 与 WebSocket 对比

    特性SSEWebSocket
    方向性单向(服务器→客户端)双向
    协议HTTP独立协议(ws/wss)
    数据格式文本二进制/文本
    复杂度简单较复杂
    重连机制内置需手动实现

二、SSE 协议格式

  1. 基本格式

    data: <message>\n\n
    
  2. 完整字段

    • data: 消息内容(可多行)
    • id: 事件ID(用于重连)
    • event: 事件类型(默认"message")
    • retry: 重连时间(ms)
    • : 注释(会被忽略)

    示例:

    event: update
    id: 123
    retry: 5000
    data: {"user": "John", "status": "online"}
    data: continued message
    \n\n
    

三、服务端实现

Flask 实现(Python)

from flask import Flask, Response
import timeapp = Flask(__name__)@app.route('/stream')
def stream():def event_stream():for i in range(5):yield f"data: Message {i}\n\n"time.sleep(1)return Response(event_stream(), content_type='text/event-stream')

Node.js 实现

const express = require('express');
const app = express();app.get('/stream', (req, res) => {res.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');let counter = 0;const interval = setInterval(() => {res.write(`data: ${counter++}\n\n`);if(counter > 10) {clearInterval(interval);res.end();}}, 1000);req.on('close', () => clearInterval(interval));
});

四、客户端实现

基本用法

const eventSource = new EventSource('/sse-endpoint');// 接收消息
eventSource.onmessage = (e) => {console.log('New message:', e.data);
};// 自定义事件
eventSource.addEventListener('update', (e) => {console.log('Update:', e.data);
});// 错误处理
eventSource.onerror = (e) => {if (e.eventPhase === EventSource.CLOSED) {console.log('Connection closed');} else {console.error('Error:', e);}
};

关闭连接

eventSource.close();

五、高级特性

  1. 重连机制

    • 客户端自动尝试重连
    • 可通过retry字段设置重连间隔
  2. 事件ID

    • 服务端发送id字段
    • 断线重连时,客户端会发送Last-Event-ID
  3. 多事件类型

    // 服务端
    event: notification
    data: New alert!// 客户端
    eventSource.addEventListener('notification', ...);
    

六、性能优化

  1. 连接管理

    • 限制并发连接数
    • 及时关闭闲置连接
  2. 消息压缩

    • 启用gzip压缩
  3. 心跳机制

    def event_stream():while True:yield ": heartbeat\n\n"  # 注释行作为心跳time.sleep(15)
    

七、安全考虑

  1. CORS 配置

    @app.after_request
    def add_cors(response):response.headers['Access-Control-Allow-Origin'] = '*'return response
    
  2. 认证授权

    • 使用Cookie或Token认证
    • 敏感数据加密
  3. DOS防护

    • 限制请求频率
    • 验证客户端身份

八、常见问题解决

  1. Nginx 代理配置

    proxy_set_header Connection '';
    proxy_http_version 1.1;
    proxy_buffering off;
    
  2. 连接意外关闭

    • 添加心跳包
    • 客户端实现手动重连
  3. 浏览器限制

    • Chrome/Firefox最多支持6个SSE连接
    • 考虑使用单个连接多路复用

九、实际应用场景

  1. 实时通知:站内消息、系统警报
  2. 数据监控:服务器状态、股票价格
  3. 进度报告:文件上传/处理进度
  4. 动态更新:新闻推送、赛事比分

十、示例:完整聊天应用

服务端(Python)

from flask import Flask, request, Responseapp = Flask(__name__)
channels = {}@app.route('/chat/<channel>')
def chat(channel):def stream():messages = channels.setdefault(channel, [])for msg in messages:yield f"data: {msg}\n\n"while True:# 实际应用中应有更高效的消息分发机制time.sleep(1)return Response(stream(), content_type='text/event-stream')@app.route('/send/<channel>', methods=['POST'])
def send(channel):msg = request.form.get('message')channels.setdefault(channel, []).append(msg)return "OK"

客户端

<input id="message">
<button onclick="send()">Send</button>
<div id="chat"></div><script>
const channel = 'general';
const chat = new EventSource(`/chat/${channel}`);chat.onmessage = (e) => {document.getElementById('chat').innerHTML += `<div>${e.data}</div>`;
};function send() {const msg = document.getElementById('message').value;fetch(`/send/${channel}`, {method: 'POST',body: new URLSearchParams({message: msg})});
}
</script>

掌握这些知识点后,您应该能够熟练地在各种项目中实现SSE功能,并根据实际需求进行定制开发。

相关文章:

  • PostgreSQL 中的序列(Sequence)
  • 深度解析Crawl4AI:面向大模型的新一代智能爬虫
  • 【合新通信】无人机天线拉远RFOF(射频光纤传输)解决方案
  • 学习黑客BitLocker与TPM详解
  • 【文献分享】机遇还是挑战:数字化转型对农业企业经营绩效的影响
  • 【markdown】介绍如何在markdown中绘制流程图
  • 具身-机器人-分层框架-大脑模块-RoboBrain1.0 RoboOS
  • DDR的PCB设计(T点)
  • 数据可视化:用一张图讲好一个故事
  • 防止网页被爬取的方法与第三方用户行为检测组件分析
  • ThreadLocal原理分析--结合Spring事务
  • Hive原理
  • 关于大语言模型的困惑度(PPL)指标优势与劣势
  • JMV 优化过程是什么?有什么效果?为什么要升级垃圾收集器?
  • vLLM部署多模态大模型Qwen2.5-VL-3B-Instruct
  • Android Studio的jks文件
  • NHANES指标推荐:sNfL
  • 5月12日星期一今日早报简报微语报早读
  • [原创](现代Delphi 12指南):[macOS 64bit App开发]: 如何获取当前用户主目录(即:~波浪符号目录)?
  • 智慧城市综合运营管理系统Axure原型
  • 学者的“好运气”:读本尼迪克特·安德森《椰壳碗外的人生》
  • 贵州省总工会正厅级副主席梁伟被查,曾任贵州省纪委副书记
  • 习近平会见古共中央第一书记、古巴国家主席迪亚斯-卡内尔
  • 公示!17个新职业、42个新工种亮相
  • 观察|印巴交火开始升级,是否会演变为第四次印巴战争?
  • 10家A股农商行去年年报:瑞丰银行营收增速领跑,常熟银行等4家净利增速超11%