当前位置: 首页 > news >正文

Web服务器主动推送技术(SSE)

文章目录

  • 从传统 HTTP 到 SSE:实时通信的演进之路
  • 传统 HTTP 的局限性
  • Server-Sent Events的诞生:服务器主动推送
    • SSE 工作原理
    • SSE 的局限性
    • SSE 的适用场景
    • SSE VS WebSocket
    • 实时Web核心诉求
  • 实战项目中的SSE
    • 前端
    • 后端
      • 连接接口
      • 获取聊天记录
      • 聊天接口

从传统 HTTP 到 SSE:实时通信的演进之路

  • 在Web 应用中,实时数据推送已成为许多场景的核心需求,例如实时通知、股票行情、在线聊天和物联网设备监控。然而,传统的 HTTP 请求-响应模式在实时性上存在天然缺陷。 Server-Sent Events(SSE) 实现高效、轻量的服务器主动推送。

传统 HTTP 的局限性

  1. 请求-响应模式:传统的 HTTP 协议基于客户端主动请求、服务器被动响应的模式。
// 客户端定时轮询(Polling)
setInterval(() => {
  fetch('/api/data')
    .then(response => response.json())
    .then(data => updateUI(data));
}, 5000); // 每5秒请求一次
  • 高延迟:数据更新依赖客户端轮询频率(如 5 秒一次),无法实时获取最新状态。
  • 资源浪费:频繁的请求消耗服务器和网络资源,尤其当数据未变化时。
  1. 长轮询(Long Polling):为了优化轮询效率,长轮询允许服务器在数据就绪前保持连接打开。
function longPoll() {
  fetch('/api/long-poll')
    .then(response => response.json())
    .then(data => {
      updateUI(data);
      longPoll(); // 递归调用以维持连接
    });
}
  • 尽管减少了无效请求,但长轮询仍需要反复建立连接,且复杂度较高。

Server-Sent Events的诞生:服务器主动推送

  • SSE(Server-Sent Events) 是一种基于 HTTP 的轻量协议,支持服务器向客户端单向推送事件流。其核心思想是:
  • 长连接:客户端与服务器建立一次 HTTP 连接后,保持打开状态。
  • 事件驱动:服务器可在任意时刻推送数据,客户端通过监听事件实时接收。

SSE 工作原理

  1. 客户端:通过 EventSource API 订阅服务器事件流。
    const eventSource = new EventSource('/sse');
    
    eventSource.onmessage = (event) => {
      console.log('收到数据:', event.data);
    };
    
  2. 服务器:以 text/event-stream 格式持续发送事件。
    HTTP/1.1 200 OK
    Content-Type: text/event-stream
    
    data: 这是第一条消息\n\n
    id: 1\n
    event: status\n\n
    

在这里插入图片描述

SSE 的局限性

  1. 单向通信:不支持客户端向服务器发送数据(需配合其他 HTTP 请求)。
  2. 浏览器兼容性:IE 及旧版 Edge 不支持,但可通过 Polyfill 解决。
  3. 最大连接数:浏览器对同一源的 SSE 连接数有限制(通常 6 个)。

SSE 的适用场景

场景传统HTTPSSE 方案
实时通知高延迟,资源浪费即时推送,低延迟
股票行情轮询导致数据滞后每秒推送多次价格更新
日志监控需手动刷新页面实时滚动显示日志
新闻头条用户错过更新新文章自动推送到客户端

SSE VS WebSocket

  • SSE与WebSocket的相同点:都是用来建立浏览器与服务器之间的通信渠道。
  • SSE与WebSocket的相同点的区别:
WebSocketSSE
通道类型双向全双工单向通道(服务器->浏览器)
协议类型独立协议(ws://)协议HTTP协议
复杂度需处理握手、帧协议默认支持
断线重连需手动实现自动处理
消息自定义类型不支持支持
适用场景服务器主导的推送场景双向交互(如在线游戏)

实时Web核心诉求

  • 实时 Web 技术的核心诉求:更低延迟、更高效率、更简单的实现。
  • SSE 凭借其轻量级、基于 HTTP 的特点,成为服务器推送场景的首选方案。在 Spring Boot 中,通过 S s e E m i t t e r SseEmitter SseEmitter 可以快速构建实时功能,而无需引入复杂的第三方库。

实战项目中的SSE

前端

methods: {
    initSSE(userName) {
      if (!window.EventSource) {
        console.log("浏览器不支持SSE")
        return
      }

      const source = new EventSource(`http://localhost:8443/sse/connect?userId=${userName}`)
      console.log("连接用户=", userName)
      this.currentUserName = userName

      source.addEventListener('open', () => {
        console.log("建立连接...")
      })

      source.addEventListener('add', (e) => {
        console.log("add事件...", e.data)
        const receiveMsg = e.data

        if (!this.botMsgId) {
          this.botMsgId = this.generateRandomId(12)
          const newBotContent = {
            id: "temp",
            content: receiveMsg,
            userName: '家庭医生',
            chatType: 'bot',
            botMsgId: this.botMsgId
          }
          this.chatList.push(newBotContent)
        } else {
          const chatItem = this.chatList.find(item => item.botMsgId === this.botMsgId)
          if (chatItem) {
            chatItem.content += receiveMsg
          }
        }

        this.$nextTick(() => {
          this.scrollToBottom()
        })
      })

      source.addEventListener('finish', () => {
        console.log("finish事件...")
        this.botMsgId = null
        this.scrollToBottom()
      })

      source.addEventListener('error', (e) => {
        console.log("error事件...", e)
        if (e.readyState === EventSource.CLOSED) {
          console.log('connection is closed')
        }
        source.close()
      })
    },

	 async getChatRecords(userName) {
      try {
        const result = await doctorApi.getRecords(userName)
        this.chatList = result
        this.$nextTick(() => {
          this.scrollToBottom()
        })
      } catch (err) {
        console.error('获取聊天记录失败:', err)
      }
    },

    generateRandomId(length) {
      const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
      let result = ''
      for (let i = 0; i < length; i++) {
        result += characters.charAt(Math.floor(Math.random() * characters.length))
      }
      return result
    }
  • doctor.js
import request from '@/api/request'

export const doctorApi = {
  /**
   * 获取聊天记录
   * @param {string} userId - 用户ID
   * @returns {Promise<Array>} 聊天记录列表
   */
  getRecords(userName) {
    return request({
      url: `/record/getRecordList?userName=${userName}`,
      method: 'get'
    })
  },

  /**
   * 发送聊天消息
   * @param {Object} chatData - 聊天数据
   * @param {string} chatData.currentUserName - 当前用户名
   * @param {string} chatData.message - 消息内容
   * @returns {Promise<void>}
   */
  doChat(chatData) {
    return request({
      url: '/ai/chat',
      method: 'post',
      data: chatData
    })
  }
}

后端

连接接口

  • controller
@Slf4j
@RestController
@RequestMapping("/sse")
public class SSEController {
    //连接sse服务的接口
    @GetMapping(path = "/connect", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter connect(@RequestParam String userId) {
        return SSEServer.connect(userId);
    }
}
  • service
public static SseEmitter connect(String userId) {
    // 设置超时时间,0代表永不过期;默认30秒,超时未完成任务则会抛出异常
    SseEmitter sseEmitter = new SseEmitter(0L);

    // 注册SSE的回调方法
    sseEmitter.onCompletion(completionCallback(userId));
    sseEmitter.onError(errorCallback(userId));
    sseEmitter.onTimeout(timeoutCallback(userId));

    sseClients.put(userId, sseEmitter);
    log.info("当前创建新的SSE连接,用户ID为: {}", userId);

    onlineCounts.getAndIncrement();

    return sseEmitter;
}

获取聊天记录

  • RecordController
@RestController
@RequestMapping("/record")
public class RecordController {
    @Resource
    private ChatRecordService chatRecordService;

    @GetMapping("/getRecordList")
    public List<ChatRecord> getRecordList(@RequestParam String userName) {
       return  chatRecordService.getChatRecordList(userName);
    }

}
  • RecordServiceImpl
@Override
   public List<ChatRecord> getChatRecordList(String userName) {
       QueryWrapper<ChatRecord> queryWrapper = new QueryWrapper<>();
       queryWrapper.eq("family_member", userName);
       queryWrapper.orderByAsc("chat_time");
       return chatRecordMapper.selectList(queryWrapper);
   }

聊天接口

  • ChatController
@lombok.extern.slf4j.Slf4j
@Slf4j
@RestController()
@RequestMapping("/ai")
public class ChatController {
    @Resource
    private AIService aiService;
    @PostMapping("/chat")
    public String chatWithDoctor(@RequestBody ChatEntity chatEntity) {
        log.info(chatEntity.toString());
        String currentUserName = chatEntity.getCurrentUserName();
        String message = chatEntity.getMessage();
        return aiService.chatWithDoctor(currentUserName,message);
    }
}
  • ChatServiceImpl
@Override
public String chatWithDoctor(String userName, String message) {
    if (message == null || message.isEmpty()) {
        return "message is empty";
    }
    // 保存用户消息
    chatRecordService.saveChatRecord(userName, message, ChatTypeEnum.USER);

    Prompt prompt = new Prompt(new UserMessage(message));
    log.info(prompt.toString());
    List<String> list = this.chatModel.stream(prompt).toStream().map(chatResponse -> {
        String text = chatResponse.getResult().getOutput().getText();
        SSEServer.sendMessage(userName, text, SSEMsgType.ADD);
        return text;
    }).toList();

    SSEServer.sendMessage(userName, "finish", SSEMsgType.FINISH);

    StringBuilder htmlRes= new StringBuilder();
    for (String s : list) {
        htmlRes.append(s);
    }
    // 保存AI消息
    chatRecordService.saveChatRecord(userName, htmlRes.toString(), ChatTypeEnum.BOT);

    return "success";
}
http://www.dtcms.com/a/106835.html

相关文章:

  • MongoDB 复制集实战
  • 笔记:docker安装(ubuntu 20.04)
  • C# 中充血模型和‌贫血模型
  • 从查重报告入手的精准论文降重秘籍
  • 基于 Spring Cloud 与 Spring Boot 的工程项目管理系统源码:重塑工程管理新范式​
  • 文件实时备份软件PanguFlow
  • zabbix监控网站(nginx、redis、mysql)
  • 在openharmony中编译部署早期vi工具(附带vi工具源码)
  • 生产管理系统如何破解汽车零部件行业追溯难痛点
  • (Kotlin) Android使用DialogX实现iOS风格底部弹窗(带Toggle开关)
  • 算法与数据结构面试题
  • 【硬件视界8】电源供应器(PSU):计算机的“心脏“
  • 洛谷题单3-P5720 【深基4.例4】一尺之棰-python-流程图重构
  • Tomcat 部署 Jenkins.war 详细教程(含常见问题解决)
  • 存储型XSS漏洞解析
  • springBoot统一响应类型3.5.2版本
  • 【橘子大模型】关于PromptTemplate
  • 定制化管理系统与通用管理系统,谁更胜一筹?
  • MySQL的进阶语法7(索引-B+Tree 、Hash、聚集索引 、二级索引(回表查询)、索引的使用及设计原则
  • ES6对函数参数的新设计
  • 贪心算法,其优缺点是什么?
  • 第二篇:系统分析师——1-6章
  • 深入解析 Spring Boot 测试核心注解
  • 使用UDP建立连接,会存在什么问题?
  • es使用ik分词器并自定义中文词库实现热更新
  • 软件工程13 条法则
  • 空调开机启动后发出噼里啪啦的异响分析与解决
  • C语言--字符串逆序
  • Timer的底层实现原理?
  • LETTERS(DFS)