当前位置: 首页 > news >正文

gRPC从0到1系列【14】

文章目录

  • 客户端流式RPC
    • 5.5 常见陷阱及解决方案
      • 5.5.1 忘记调用 onCompleted()
      • 5.5.2 阻塞 onNext() 方法
      • 5.5.3 服务端状态管理不当
      • 5.5.4 错误处理不完整
      • 5.5.5 忽略取消和超时
      • 5.5.6 对背压机制理解不足
      • 5.5.7 缺少流控和限速
      • 5.5.8 忽略元数据 (Metadata) 传递
      • 5.5.9 总结
    • 5.6 客户端流式 RPC 核心要点总结
      • 5.6.1 核心定义与通信模式
      • 5.6.2 关键角色: StreamObserver
      • 5.6.3 生命周期与核心操作
      • 5.6.4 最佳实践与避坑指南

客户端流式RPC

5.5 常见陷阱及解决方案

5.5.1 忘记调用 onCompleted()

这是客户端流式 RPC 中最常见、最致命的错误。

  • 问题描述:客户端发送完所有数据后,如果忘记调用 requestObserver.onCompleted(),服务端将永远不会收到流结束的信号。服务端的 onCompleted() 方法也不会被调用,导致服务端一直处于等待状态,无法执行最终的处理和响应。这会造成资源泄漏(服务端为该流维护的状态一直不释放)和RPC 超时(客户端最终会因收不到响应而超时)。

最优处理方案

  • 使用 try-finally:这是最安全、最推荐的做法。无论代码是否发生异常,都确保 onCompleted() 被调用。
  • 明确的逻辑:在你的业务逻辑中,清晰地定义 “数据流结束” 的条件,并在该条件满足时立即调用 onCompleted()
StreamObserver<MyRequest> requestObserver = asyncStub.processData(responseObserver);
try {for (MyRequest request : generateRequestsData()) {requestObserver.onNext(request);}// 所有数据发送完毕,必须调用 onCompleted()requestObserver.onCompleted();
} catch (Exception e) {// 如果发生异常,调用 onError() 来取消流requestObserver.onError(e);
}

5.5.2 阻塞 onNext() 方法

  • 问题描述:如果在 onNext() 调用之后,立即执行一个耗时的阻塞操作(如磁盘 I/O、数据库查询、或 Thread.sleep()),你会严重影响 RPC 的吞吐量。因为 gRPC 客户端需要通过事件循环来处理网络 I/O,长时间阻塞当前线程会阻塞这个循环,导致后续的 onNext() 调用和对服务端响应的处理都被延迟。
  • 最优处理方案
    • 非阻塞发送onNext() 调用应该非常快。它只是将数据放入一个内部缓冲区等待网络发送。
    • 异步生成数据:如果数据生成过程本身就很耗时(例如,从慢速磁盘读取),应该将数据生成和 onNext() 发送解耦。使用一个生产者 - 消费者模型:一个后台线程负责生成数据并将其放入一个队列,另一个线程(或事件循环)从队列中取出数据并调用 onNext() 发送。

5.5.3 服务端状态管理不当

服务端需要在内存中暂存客户端发送的所有数据,直到流结束。

  • 问题描述
    1. 内存泄漏:如果服务端在 onCompleted()onError() 中忘记清理为该流分配的资源(如累加器、列表等),这些内存将永远不会被释放,最终可能导致内存溢出(OOM)。
    2. 线程安全:如果一个客户端的流在多个线程中被处理(尽管不常见),那么对共享状态的访问必须是线程安全的。
  • 最优处理方案
    • 使用 try-finally 清理资源:在服务端的 StreamObserver 实现中,将状态维护在 try-finally 块中,确保无论流是正常结束还是异常结束,资源都能被释放。
    • 使用线程安全的数据结构:如果存在并发访问的可能,使用 AtomicLongConcurrentHashMap 或其他线程安全的集合来存储状态。
    • 及时失败:如果在 onNext() 中检测到无法处理的数据(如格式错误),应立即调用 responseObserver.onError() 并返回,停止处理后续数据。
// Java - 服务端@Override
public StreamObserver<NumberRequest> processNumbers(StreamObserver<SumResponse> responseObserver) {final AtomicLong sum = new AtomicLong(0);return new StreamObserver<NumberRequest>() {@Overridepublic void onNext(NumberRequest request) {sum.addAndGet(request.getNumber());}@Overridepublic void onError(Throwable t) {// 发生错误,清理工作可以在这里做,或者在 onCompleted/onError 的公共逻辑中做logger.severe("Error in stream: " + t.getMessage());responseObserver.onError(t);}@Overridepublic void onCompleted() {try {// 最终处理逻辑SumResponse response = SumResponse.newBuilder().setSum(sum.get()).build();responseObserver.onNext(response);responseObserver.onCompleted();} finally {// 如果有需要关闭的资源,可以在这里处理// 对于基本类型,这一步可能不需要}}};
}

5.5.4 错误处理不完整

错误处理在流式 RPC 中至关重要,任何一方的错误都需要正确地传递给另一方。

  • 问题描述
    • 客户端错误未传递:客户端在发送数据时发生异常,如果不调用 requestObserver.onError(),服务端将永远等待,导致资源泄漏。
    • 服务端错误处理不当:服务端在处理数据时发生异常,如果不调用 responseObserver.onError(),客户端将一直等待最终响应,直到超时。
    • 忽略 Status 细节:gRPC 的错误是通过 Status 对象传递的。简单地传递一个通用的 Exception 会丢失错误码(如 INVALID_ARGUMENT, NOT_FOUND)和详细描述,客户端无法根据具体错误类型进行差异化处理。
  • 最优处理方案
    • 总是调用 onError():在任何异常路径上,都必须调用 onError() 来通知对方。
    • 使用 gRPC Status 异常:服务端应使用 StatusRuntimeException 来封装错误信息。客户端则应检查 onError() 中接收到的 Throwable 是否为 StatusRuntimeException,并根据 getStatus().getCode() 来处理不同类型的错误。
@Override
public void onNext(NumberRequest request) {if (request.getNumber() < 0) {// 发送一个带有具体错误码和描述的异常responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Negative numbers are not allowed: " + request.getNumber()).asRuntimeException());}// ... 正常处理
}

5.5.5 忽略取消和超时

客户端可能会因为用户操作或超时而主动取消 RPC。

  • 问题描述:如果服务端不监听取消事件,即使客户端已经取消了 RPC,服务端仍会继续处理剩余的数据,造成计算资源的浪费
  • 最优处理方案
    • 服务端监听取消事件:在服务端的 StreamObserver 中,可以将其转换为 ServerCallStreamObserver,并注册一个取消回调。
    • 客户端设置超时:客户端应始终为流式 RPC 设置一个合理的超时时间,防止因网络问题或服务端无响应而导致客户端无限期等待。
// 服务端取消监听示例:
@Override
public StreamObserver<NumberRequest> processNumbers(StreamObserver<SumResponse> responseObserver) {// 转换为 ServerCallStreamObserverServerCallStreamObserver<SumResponse> serverResponseObserver =(ServerCallStreamObserver<SumResponse>) responseObserver;// 注册取消回调serverResponseObserver.setOnCancelHandler(() -> {// 当客户端取消时,这里的代码会被执行logger.info("Client has cancelled the RPC. Stopping processing.");// 可以在这里清理资源});// ... 后续的 StreamObserver 实现return new StreamObserver<NumberRequest>() { ... };
}

5.5.6 对背压机制理解不足

虽然 gRPC 自动处理背压,但开发者需要理解其工作原理以避免写出低效代码。

  • 问题描述:背压是指当服务端处理速度慢于客户端发送速度时,gRPC 会通知客户端减慢发送速度。如果你在客户端的 onNext() 中不检查流的状态,可能会误以为数据已经被发送,而实际上它可能还在客户端的缓冲区中等待。
  • 最优处理方案
    • 理解是自动的:首先要知道你不需要手动实现背压逻辑。gRPC 框架已经处理好了。
    • 监控流的状态 (高级):在高级场景下,你可以将客户端的 requestObserver 转换为 ClientCallStreamObserver,并使用 isReady() 方法来检查流是否准备好接收更多数据。如果 isReady() 返回 false,你应该暂停发送,直到 setOnReadyHandler() 中注册的回调被触发。这对于实现复杂的流控策略很有用。

5.5.7 缺少流控和限速

在某些场景下,即使有背压,你也可能需要主动限制客户端的发送速率。

  • 问题描述:如果客户端产生数据的速度远快于网络或服务端的处理能力,即使有背压,也可能导致客户端内存中积压大量待发送数据。
  • 最优处理方案
    • 实现客户端限速器:在客户端应用层实现一个限速器(如使用令牌桶算法),控制 onNext() 的调用频率,确保发送速率在一个合理的范围内。

5.5.8 忽略元数据 (Metadata) 传递

元数据(如认证令牌、请求 ID、压缩算法指示等)对于 RPC 调用非常重要。

  • 问题描述
    • 忽略初始元数据:客户端可能忘记在发起 RPC 时通过 stub.withCallCredentials()stub.withMetadata() 传递必要的元数据(如 Authorization Token),导致服务端认证失败。
    • 忽略尾随元数据:服务端可能需要在流结束时返回一些附加信息(如 Set-Cookie、操作的 ETag 等),但客户端可能不知道如何通过 ClientCallStreamObservergetTrailers() 方法来获取这些尾随元数据。
  • 最优处理方案
    • 熟悉元数据 API:学习并使用 io.grpc.Metadata 类,在客户端正确传递初始元数据。
    • 获取尾随元数据:在客户端,如果需要获取服务端的尾随元数据,可以在 responseObserveronCompleted() 方法中,将 responseObserver 转换为 ClientCallStreamObserver 来获取。

5.5.9 总结

要避开 gRPC 客户端流式 RPC 的陷阱,请牢记以下核心原则:

  1. try-finally 是你的朋友:用它来确保 onCompleted() 总是被调用。
  2. 保持 onNext() 轻量:避免在其中执行任何阻塞操作。
  3. 错误处理要彻底:任何异常都应通过 onError() 传播,并使用 gRPC 的 Status 来提供丰富的错误信息。
  4. 服务端要响应取消:监听取消事件,避免资源浪费。
  5. 善用超时机制:为客户端调用设置合理的超时。
  6. 理解并信任背压:知道 gRPC 会自动处理流量控制。

5.6 客户端流式 RPC 核心要点总结

5.6.1 核心定义与通信模式

  • 定义:客户端可以向服务端连续发送多个请求消息,而服务端在接收完所有请求消息后,才返回一个响应消息。
  • 模式(Stream of Requests) -> (Single Response)

5.6.2 关键角色: StreamObserver

  • 客户端视角
    • requestObserver(获取的)**:客户端调用 RPC 方法后,**立即获得一个 StreamObserver。客户端通过调用它的 onNext() 方法来发送流式请求数据。
    • responseObserver(提供的):客户端在发起调用时,需要自己实现并传入一个 StreamObserver,用于接收服务端最终返回的那个响应和 RPC 状态。
  • 服务端视角
    • requestObserver (实现的)**:服务端需要实现一个 StreamObserver 来 “观察” 客户端发来的流式请求。它的 onNext() 方法会被多次调用以接收数据,onCompleted() 方法被调用时标志着客户端数据发送完毕。
    • responseObserver (提供的):gRPC 框架会向服务端的方法实现提供一个 StreamObserver。服务端通过调用它的 onNext() 发送最终响应,并调用 onCompleted() 结束 RPC。

5.6.3 生命周期与核心操作

一个完整的客户端流式 RPC 生命周期包含以下几个关键步骤:

  1. 发起调用:客户端调用流式 stub 方法,传入 responseObserver,并获得 requestObserver
  2. 流式发送:客户端循环调用 requestObserver.onNext(request) 发送多个请求。
  3. 结束流:客户端调用 requestObserver.onCompleted(),明确告知服务端数据已发送完毕(这是最关键、最容易忘记的一步)
  4. 服务端处理:服务端在自己的 onCompleted() 方法中,对接收到的所有数据进行最终处理。
  5. 返回响应:服务端调用 responseObserver.onNext(response) 发送单个响应,然后调用 responseObserver.onCompleted() 结束 RPC。
  6. 客户端接收:客户端通过自己传入的 responseObserveronNext()onCompleted() 方法接收最终结果。

5.6.4 最佳实践与避坑指南

  • 始终调用 onCompleted():使用 try-finally 块确保 onCompleted() 总是被调用,防止服务端无限等待。
  • 高效发送:保持 onNext() 调用的轻量,避免在其中执行阻塞操作。
  • 健壮的错误处理:任何异常路径都应调用 onError(),并使用 gRPC 的 Status 传递详细错误信息。
  • 服务端响应取消:服务端应监听客户端的取消事件,及时停止无用的计算,释放资源。
  • 设置超时:客户端应设置合理的超时时间,防止无限期等待。

客户端流式 RPC 是一种 “先上车后买票” 的通信模式,客户端通过 StreamObserver 流式发送数据,在调用 onCompleted() 后,服务端对整批数据进行处理并返回一个最终结果,非常适合大数据上传和批量处理场景。

http://www.dtcms.com/a/435222.html

相关文章:

  • JVM的内存分配策略有哪些?
  • 卡特兰数【模板】(四个公式模板)
  • Process Monitor 学习笔记(5.5):保存/打开追踪记录——复盘、复现与分享的正确姿势
  • 【机器学习宝藏】深入解析经典人脸识别数据集:Olivetti Faces
  • 【C++】深入理解红黑树:概念、性质和实现
  • 制作卖东西网站玩具网站 下载
  • 网站建设培训课程wordpress描述插件
  • php网站超市源码下载十大永久免费crm
  • 网站色彩代码carousel wordpress
  • 帮别人做网站一般app开发费用多少
  • 上海网站建设服务市价编程做网站容易还是做软件
  • Go 语言流程控制详解:if / switch / for
  • 企业网站栏目设计h5手机网站实例
  • 操作系统应用开发(十三)RustDesk文件服务搭建——东方仙盟筑基期
  • 莱州网站建设服务程序开发的步骤是什么
  • 网站域名多少钱一年杭州seo公司排名
  • 武昌网站制作公司深圳vi设计公司推荐
  • AI驱动的软件质量保障:未来已来
  • Lama Cleaner图片去水印工具最新版IOPaint-1.5.3使用教程-优雅草卓伊凡
  • Spring Boot 配置属性绑定
  • tauri中的wry和tao是干啥的?都是什么作用
  • 个人网站建设发布信息wordpress移动端悬浮导航代码
  • 神经网络评估指标:准确率、召回率等详解(代码验证)
  • linux免密切换
  • 藏语自然语言处理入门 - 2 分词
  • 2020年美国新冠肺炎疫情数据分析与可视化
  • 天津重型网站建设推荐影响网站alexa排名的主要因素有
  • 2.Java中创建线程
  • 分段函数的傅里叶变换及其应用
  • 全网网站建设优化长江设计公司