当前位置: 首页 > news >正文

gRPC从0到1系列【9】

文章目录

  • 三、StreamObserver
    • 3.1 什么是 StreamObserver?
    • 3.2 为什么需要 StreamObserver?
    • 3.3 StreamObserver 的核心方法详解
    • 3.4 四种 gRPC 调用模式下的 StreamObserver
      • 3.4.1 一元 RPC (Unary RPC)
      • 3.4.2 服务端流式 RPC (Server-side Streaming RPC)
      • 3.4.3 客户端流式 RPC (Client-side Streaming RPC)
      • 3.4.4 双向流式 RPC (Bidirectional Streaming RPC)
    • 3.5 高级用法与最佳实践
      • 3.5.1 背压 (Backpressure) 机制
      • 3.5.2 取消与超时
      • 3.5.3 错误处理
    • 3.6 总结

三、StreamObserver

3.1 什么是 StreamObserver?

StreamObserver 是 gRPC 中用于处理流式数据的核心接口。它本质上是一个观察者模式 (Observer Pattern) 的实现,用于在异步通信中传递数据和状态。

你可以把它想象成一个数据管道的 “出口”。当一方(客户端或服务端)需要发送数据时,它会通过 StreamObserver 的方法将数据 “推送” 出去。接收方则实现这个接口来 “观察” 并处理这些数据。

在 gRPC 中,每个流式调用(无论是客户端流还是服务端流)都会涉及到两个 StreamObserver

  • 请求观察者 (Request Observer):由调用方(客户端或服务端)实现,用于发送数据。
  • 响应观察者 (Response Observer):由被调用方(服务端或客户端)实现,用于接收数据和处理状态(完成或错误)。

3.2 为什么需要 StreamObserver?

gRPC 是一个高性能的 RPC 框架,其底层基于 HTTP/2。HTTP/2 的一个核心特性就是多路复用 (Multiplexing),可以在单个 TCP 连接上并发处理多个请求和响应。为了充分利用这一点,gRPC 的 Java API 被设计为全异步 (Fully Asynchronous) 的。

如果使用传统的同步阻塞模型,一个线程在等待一个 RPC 响应时就会被闲置,无法处理其他任务,这会严重影响系统吞吐量。

StreamObserver 就是这个异步模型的关键。它允许你发起一个 RPC 调用后,立即返回,而不需要等待响应。当数据准备好时,gRPC 框架会通过你提供的 StreamObserver 回调方法来通知你。

3.3 StreamObserver 的核心方法详解

StreamObserver 接口定义了三个核心方法,它们构成了流式通信的完整生命周期。

public interface StreamObserver<V> {/*** 接收一个消息。* 这是数据处理的主要入口。*/void onNext(V value);/*** 接收一个错误通知,终止流。* 一旦调用此方法,流就结束了,后续不会再调用 onNext 或 onCompleted。*/void onError(Throwable t);/*** 接收一个完成通知,正常终止流。* 表示发送方已发送完所有数据。*/void onCompleted();
}
  • onNext(V value):
    • 用途: 传递一个数据项。
    • 调用时机: 发送方可以多次调用此方法来发送多个数据项。
    • 注意: 对于一个流,onNext 可以被调用 0 次或多次。
  • onError(Throwable t):
    • 用途: 通知接收方发生了错误,流异常终止。
    • 调用时机: 当发生网络问题、业务逻辑错误或其他任何导致流无法继续的异常时调用。
    • 注意: 一个流生命周期中,onErroronCompleted 只能有一个被调用,且一旦调用,流就结束。
  • onCompleted():
    • 用途: 通知接收方所有数据都已发送完毕,流正常结束。
    • 调用时机: 发送方在发送完所有 onNext 数据后调用。
    • 注意: 一个流生命周期中,onCompletedonError 只能有一个被调用。

3.4 四种 gRPC 调用模式下的 StreamObserver

gRPC 支持四种服务方法,其中三种涉及流式传输。我们来逐一分析每种模式下 StreamObserver 的角色和交互流程。

3.4.1 一元 RPC (Unary RPC)

这是最简单的模式:客户端发送一个请求,服务端返回一个响应。

✅ 1. 流程图

ClientClientStubServerImplServer1. call unaryMethod(request, responseObserver)2. Send Request3. Receive Request4. onNext(response)5. onCompleted()6. Send Response7. responseObserver.onNext(response)8. responseObserver.onCompleted()ClientClientStubServerImplServer

✅ 2. 流程分析

  • 客户端调用 stub 的方法,传入请求对象和一个 StreamObserver(我们称之为 responseObserver)。
  • gRPC 框架将请求发送给服务端。
  • 服务端接收到请求,处理后,创建一个 StreamObserver(我们称之为 serviceResponseObserver)。
  • 服务端调用 serviceResponseObserver.onNext(response) 发送响应数据。
  • 服务端调用 serviceResponseObserver.onCompleted() 表示响应结束。
  • 客户端的 responseObserver.onNext(response) 被调用,客户端处理响应。
  • 客户端的 responseObserver.onCompleted() 被调用,整个 RPC 结束。

3.4.2 服务端流式 RPC (Server-side Streaming RPC)

客户端发送一个请求,服务端返回一个数据流。

✅ 1. 流程图

ClientClientStubServerImplServer1. call serverStreamingMethod(request, responseObserver)2. Send Request3. Receive Request4. onNext(response1)5. Send Response16. responseObserver.onNext(response1)7. onNext(response2)8. Send Response29. responseObserver.onNext(response2)loop[Send multiple responses]10. onCompleted()11. Send Completion12. responseObserver.onCompleted()ClientClientStubServerImplServer

✅ 2. 流程分析

  • 客户端调用流式 stub 方法,传入请求和 responseObserver。
  • 服务端接收到请求,获取到一个 StreamObserver(serviceResponseObserver)。
  • 服务端可以多次调用 serviceResponseObserver.onNext(response) 来发送多个响应数据。
  • 发送完毕后,服务端调用 serviceResponseObserver.onCompleted()。
  • 客户端的 responseObserver.onNext(response) 会被多次调用,每次接收一个数据。
  • 当服务端调用 onCompleted 后,客户端的 responseObserver.onCompleted() 被调用。

3.4.3 客户端流式 RPC (Client-side Streaming RPC)

✅ 1. 流程图

ClientClientStubServerImplServer1. call clientStreamingMethod(responseObserver)2. return requestObserver3. requestObserver.onNext(request1)4. Send Request15. onNext(request1)6. requestObserver.onNext(request2)7. Send Request28. onNext(request2)loop[Send multiple requests]9. requestObserver.onCompleted()10. Send Completion11. onCompleted()12. onNext(finalResponse)13. onCompleted()14. Send Final Response15. responseObserver.onNext(finalResponse)16. responseObserver.onCompleted()ClientClientStubServerImplServer

✅ 2. 流程分析

  • 客户端调用流式 stub 方法,该方法会立即返回一个 StreamObserverrequestObserver)。
  • 客户端可以多次调用 requestObserver.onNext(request) 来发送多个请求数据。
  • 发送完毕后,客户端调用 requestObserver.onCompleted()
  • 服务端通过其 StreamObserveronNext 方法多次接收客户端的数据。
  • 服务端处理完所有数据后,创建一个响应,并通过自己的 responseObserver(通常是框架传入的)调用 onNext(response)onCompleted()
  • 客户端的 responseObserver(在第 1 步中传入)接收到响应。

3.4.4 双向流式 RPC (Bidirectional Streaming RPC)

✅ 1. 流程图

ClientClientStubServerImplServer1. call bidiStreamingMethod(responseObserver)2. return requestObserver3. requestObserver.onNext(req1)4. Send Req15. onNext(req1)6. onNext(res1)7. Send Res18. responseObserver.onNext(res1)9. requestObserver.onNext(req2)10. Send Req211. onNext(req2)12. onNext(res2)13. Send Res214. responseObserver.onNext(res2)15. requestObserver.onCompleted()16. Send Completion17. onCompleted()18. onCompleted()19. Send Completion20. responseObserver.onCompleted()ClientClientStubServerImplServer

✅ 2. 流程分析

  • 客户端调用流式 stub 方法,传入 responseObserver,并立即返回一个 requestObserver

  • 从这一刻起,通信是双向的:

    • 客户端可以随时调用 requestObserver.onNext() 发送数据。

    • 服务端可以随时调用其 responseObserver.onNext() 发送数据。

  • 当客户端发送完所有数据后,调用 requestObserver.onCompleted()。

  • 服务端在接收完客户端的数据(通过 onCompleted 通知)并发送完自己的所有数据后,调用其 responseObserver.onCompleted()。

  • 客户端收到服务端的 onCompleted 通知,整个 RPC 结束。

3.5 高级用法与最佳实践

3.5.1 背压 (Backpressure) 机制

当数据流的生产者速度快于消费者时,会导致内存积压,甚至系统崩溃。gRPC 内置了背压机制来解决这个问题。

  • 工作原理: gRPC 框架会监控消费者的处理速度。当消费者处理不过来时,框架会自动减缓或暂停从网络接收数据,从而间接通知生产者减慢发送速度。
  • 对开发者的意义: 你通常不需要手动实现背压。只要你正确地实现了 StreamObserver,尤其是在 onNext 中避免长时间的阻塞操作,gRPC 就会处理好背压。如果 onNext 中确实有耗时操作,应将其异步化(例如提交到线程池处理),以快速释放 onNext 方法,允许 gRPC 接收更多数据。

3.5.2 取消与超时

  • 超时 (Deadline): 你可以为 RPC 调用设置一个截止时间。如果在截止时间内没有完成,RPC 会自动失败,并调用 onError
// 在客户端设置超时
asyncStub.withDeadlineAfter(5, TimeUnit.SECONDS).calculate(responseObserver);
  • 取消 (Cancellation): 客户端可以通过 ClientCall.cancel() 来主动取消一个正在进行的 RPC。对于流式调用,可以通过 StreamObserver的 onError 传递一个 Status.CANCELLED 异常来实现。

3.5.3 错误处理

  • 使用 StatusRuntimeException: gRPC 推荐使用 io.grpc.StatusRuntimeException 来传递错误。它包含了标准的错误码(如 NVALID_ARGUMENT, NOT_FOUND, UNAVAILABLE 等)和描述信息。
  • 服务端
import io.grpc.Status;
import io.grpc.StatusRuntimeException;// ...
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Division by zero").asRuntimeException());
  • 客户端
@Override
public void onError(Throwable t) {if (t instanceof StatusRuntimeException) {StatusRuntimeException e = (StatusRuntimeException) t;logger.severe("gRPC error: " + e.getStatus().getCode() + " - " + e.getStatus().getDescription());} else {logger.severe("Unexpected error: " + t.getMessage());}finishLatch.countDown();
}

3.6 总结

StreamObserver 是 gRPC 异步流式通信的基石。通过实现这个简单的接口,你可以构建出功能强大、高性能的流式数据服务。

  • 核心思想: 观察者模式,用于异步推送数据和状态。
  • 三个方法: onNext (数据)、onError (错误)、onCompleted (结束) 定义了流的完整生命周期。
  • 四种模式: 根据数据流的方向和数量,StreamObserver 在一元、服务端流式、客户端流式和双向流式 RPC 中扮演着不同但关键的角色。
  • 最佳实践: 理解并利用好背压、超时、取消和标准错误处理机制,是构建健壮 gRPC 应用的关键。

http://www.dtcms.com/a/426578.html

相关文章:

  • IDEA 2024 中创建 Maven 项目的详细步骤
  • 2025 AI 图景:从工具革命到生态重构的五大趋势
  • 网站开发者模式下载视频wordpress如何添加备案号
  • UNIX下C语言编程与实践22-UNIX 文件其他属性获取:stat 结构与 localtime 函数的使用
  • UNIX下C语言编程与实践15-UNIX 文件系统三级结构:目录、i 节点、数据块的协同工作机制
  • 青浦做网站的公司网站开发语言html5 php
  • 【分布式中间件】RabbitMQ 功能详解与高可靠实现指南
  • SOME/IP-SD报文结构和交互详解
  • 给贾维斯加“手势控制”:从原理到落地,打造多模态交互的本地智能助
  • 电商数据分析优化清理大师
  • 论文阅读:《Self-Supervised Continual Graph Learning in Adaptive Riemannian Spaces》
  • Qt事件处理全解析
  • 深入理解 LLM 分词器:BPE、WordPiece 与 Unigram
  • 【大模型评估】大模型评估的五类数据
  • 3-2 Windows 安全设置
  • 网站建设平台 汉龙举报个人备案网站做经营性
  • 做技术网站赚钱比较好用的微信社群管理软件
  • DCT与DST变换原理及其在音视频编码中的应用解析
  • 高端网络建站松岗做网站哪家便宜
  • 大连网站设计报价游戏大全免费版入口
  • 长沙人才招聘网站硅谷主角刚开始做的是软件还是网站
  • 网站正能量做网站 人员
  • 做刷票的网站阳山做网站
  • 可以做超链接或锚文本的网站有哪些西安品牌策划公司排名
  • 抽奖网站怎么制作手机端网站的建设
  • 黄岛网站建设多少钱wordpress 硬件要求
  • 网站建设开票名称怎么写做网站宣传图的网站
  • 花店网站建设课程设计论文城市生活服务app下载
  • 从哪方面建设网站开通网站必须做域名空间
  • 涡阳在北京做网站的名人如何与老板谈网站建设