当前位置: 首页 > news >正文

使用 ResponseBodyEmitter 实现异步响应式数据流处理

1. 概述

1.1 什么是 ResponseBodyEmitter

ResponseBodyEmitter 是 Spring MVC 提供的一个接口,用于支持异步返回响应数据流。它允许在控制器方法中逐步发送数据给客户端,而无需一次性生成完整的响应。

1.2 使用场景

  • 实时数据推送(如股票行情、聊天消息等)。
  • 大量数据分批传输。
  • 服务器发送事件(SSE, Server-Sent Events)。

1.3 优势与局限性

优势:

  • 支持异步数据流处理。
  • 能够实时更新客户端数据。
  • 简化了复杂数据流的管理。

局限性:

  • 高并发场景下需要额外优化。
  • 客户端断开连接时需手动处理资源释放。

2. 环境准备

2.1 添加依赖

确保项目中包含以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2.2 配置 Spring Boot 项目

创建一个标准的 Spring Boot 项目,并配置好基础环境。

3. 基本使用方法

3.1 创建控制器

定义一个控制器类,用于处理 HTTP 请求。

3.2 返回 ResponseBodyEmitter 对象

通过返回 ResponseBodyEmitter 对象实现异步数据流。

3.3 发送数据给客户端

使用 emitter.send() 方法向客户端发送数据。

示例代码:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/stream")
public class StreamController {
   

    private final ExecutorService executorService = Executors.newFixedThreadPool(10);

    @GetMapping("/events")
    public ResponseBodyEmitter handleEvents() {
   
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();

        // 使用线程池管理异步任务
        executorService.execute(() -> {
   
            try {
   
                for (int i = 0; i < 5; i++) {
   
                    // 模拟延迟
                    TimeUnit.SECONDS.sleep(1);
                    // 发送数据给客户端
                    emitter.send("Event " + i + "\n");
                }
                // 完成发送
                emitter.complete();
            } catch (IOException | InterruptedException e) {
   
                // 发生错误时处理
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }
}

说明:

  • 使用 ExecutorService 管理异步任务,避免直接创建线程。
  • TimeUnit.SECONDS.sleep(1) 模拟每秒发送一次数据。
  • emitter.send("Event " + i + "\n") 发送数据给客户端。
  • emitter.complete() 完成数据发送。
  • emitter.completeWithError(e) 处理异常。

4. 实现服务器发送事件(SSE)

4.1 SSE 简介

SSE 是一种基于 HTTP 的协议,允许服务器向客户端推送实时更新的数据。

4.2 使用 ResponseBodyEmitter 实现 SSE

通过设置响应头 Content-Type: text/event-stream,可以实现 SSE。

示例代码:

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/sse")
public class SseController {
   

    private final ExecutorService executorService = Executors.newFixedThreadPool(10);

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter handleSse() {
   
        SseEmitter emitter = new SseEmitter();

        // 使用线程池管理异步任务
        executorService.execute(() -> {
   
            try {
   
                for (int i = 0; i < 5; i++) {
   
                    // 模拟延迟
                    TimeUnit.SECONDS.sleep(1);
                    // 发送数据给客户端
                    emitter.send(SseEmitter.event().name("message").data("Event " + i));
                }
                // 完成发送
                emitter.complete();
            } catch (IOException | InterruptedException e) {
   
                // 发生错误时处理
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }
}

说明:

  • 使用 SseEmitter 实现 SSE。
  • MediaType.TEXT_EVENT_STREAM_VALUE 设置响应头为 text/event-stream
  • emitter.send(SseEmitter.event().name("message").data("Event " + i)) 发送带有名称的数据。
  • emitter.complete() 完成数据发送。
  • emitter.completeWithError(e) 处理异常。

4.3 客户端代码示例

HTML 示例:

<!DOCTYPE html>
<html>
<head>
    <title>SSE Example</title>
</head>
<body>
    <div id="events"></div>
    <script>
        const eventSource = new EventSource('/sse/stream');
        eventSource.onmessage = function(event) {
     
            document.getElementById('events').innerHTML += event.data + '<br>';
        };
        eventSource.onerror = function(err) {
     
            console.error("EventSource failed:", err);
        };
    </script>
</body>
</html>

说明:

  • 使用 EventSource 连接到 SSE 流。
  • eventSource.onmessage 处理接收到的数据。
  • eventSource.onerror 处理错误。

5. 异步数据推送的最佳实践

5.1 数据流管理

  • 使用线程池管理异步任务,避免资源耗尽。
  • 设置合理的超时时间,防止连接长时间占用。

示例代码:

import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import 

相关文章:

  • 工程化与框架系列(24)--跨平台解决方案
  • 3.8【Q】cv
  • AWS 如何导入内部SSL 证书
  • VsCode导入时选择相对路径
  • 伊藤积分(Ito Integral):随机世界中的积分魔法
  • Windows下配置Conda环境路径
  • C语言中内存布局(内存模型)是怎样的?
  • 一周热点-OpenAI 推出了 GPT-4.5,这可能是其最后一个非推理模型
  • 仿真新能源充电桩管理系统
  • Linux16-数据库、HTML
  • 人工智能(AI)与 生命体智能的本质差异
  • Office/WPS接入DeepSeek等多个AI工具,开启办公新模式!
  • 【Raspberry Pi 5 测评】无显示器上手指南
  • .NET Core全屏截图,C#全屏截图
  • Windows 如何开启和使用FTP服务
  • 从零开始训练小型语言模型之minimind
  • cannon g3810打印机设置
  • Python自学指南:从入门到进阶(第一天)
  • Mysql的卸载安装配置以及简单使用
  • 【GPT入门】第3课 客服会话质检(思维链)
  • 宜春做网站的联系电话/好用吗
  • 用dw做网站怎么添加背景图片/高级seo培训
  • 中文wordpress网站模板下载失败/百度怎么找人工客服
  • 网站建设实训主要收获及体会/ seo won
  • wordpress 菜单跳转/1688关键词怎么优化
  • 做动车哪个网站查/seo网络优化是什么意思