当前位置: 首页 > news >正文

【gRPC】Java高性能远程调用之gRPC详解

gRPC详解

  • 一、什么是gRPC?
  • 二、用proto生成代码
    • 2.1、前期准备
    • 2.2、protobuf插件安装
  • 三、简单 RPC
    • 3.1、开发gRPC服务端
    • 3.2、开发gRPC客户端
    • 3.3、验证gRPC服务
  • 四、服务器端流式 RPC
    • 4.1、开发一个gRPC服务,类型是服务端流
    • 4.2、开发一个客户端,调用前面发布的gRPC服务
    • 4.3、验证
  • 五、客户端流式 RPC
    • 5.1、在proto文件中定义客户端流类型的gRPC接口
    • 5.2、开发服务端应用
    • 5.3、开发客户端应用
    • 5.4、验证
  • 六、双向流式 RPC
    • 6.1、在proto文件中定义双端流类型的gRPC接口
    • 6.2、开发服务端应用
    • 6.3、开发客户端应用
    • 6.4、验证

一、什么是gRPC?

gRPC是一款由Google开发的高性能、开源的 RPC(远程过程调用)框架。它基于 HTTP/2 协议,并使用 Protocol Buffers(Protobuf)作为默认的序列化工具。gRPC 的主要作用包括:

  • 高性能:基于 HTTP/2,支持多路复用、流式传输和二进制编码,性能优于传统的 HTTP/1.1。

  • 跨语言支持:支持多种编程语言(如 Java、Go、Python、C++ 等),适合微服务架构中的多语言环境。

  • 强类型接口:通过 Protobuf 定义服务接口,生成强类型的客户端和服务端代码,减少错误。

  • 流式通信:支持单向流、双向流等复杂的通信模式。

  • 适用于微服务:适合低延迟、高吞吐量的场景,如微服务之间的通信。

在这里插入图片描述

贴一张油管老哥基于GraphQL、REST、gRPC的测试结果,在应对较大的RPS时,gRPC需要更少的CPU和内存使用率,并且网络带宽消耗也是最小的(花费最小),同时还有着很低的时延,10万的rps响应能在20ms内。

在这里插入图片描述
基于以上优点,这篇文章详细说说gRPC怎么使用。

详细代码地址:github,有用的话,麻烦给个star

二、用proto生成代码

2.1、前期准备

环境与依赖

依赖版本
jdk1.8
maven3.9.9
springboot2.4.2
grpc1.26.0
protobuf3.19.4
grpc-client-spring-boot-starter2.13.0.RELEASE
grpc-server-spring-boot-starter2.13.0.RELEASE

项目结构

创建一个父项目grpc-test,包含三个字项目,grpc-clientgrpc-servergrpc-api

在这里插入图片描述

2.2、protobuf插件安装

  • IDEA安装Protobuf插件

在这里插入图片描述

  • grpc-test服务的pom文件中引入maven依赖,parent可替换成自己的springboot版本
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.csdn.dev</groupId>
        <artifactId>csdn-dev-boot</artifactId>
        <version>${revision}</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>

    <artifactId>grpc-test</artifactId>

    <modules>
        <module>grpc-server</module>
        <module>grpc-client</module>
        <module>grpc-api</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <java.version>1.8</java.version>
        <protobuf.version>3.19.4</protobuf.version>
        <grpc.version>1.26.0</grpc.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>

        <!-- grpc server和spring-boot集成框架 -->
        <dependency>
            <groupId>net.devh</groupId>
            <artifactId>grpc-server-spring-boot-starter</artifactId>
            <version>2.13.0.RELEASE</version>
        </dependency>

        <!-- grpc client和spring-boot集成框架 -->
        <dependency>
            <groupId>net.devh</groupId>
            <artifactId>grpc-client-spring-boot-starter</artifactId>
            <version>2.13.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>
    </dependencies>
</project>
  • maven中protobuf plugin
<build>
    <!-- os系统信息插件, protobuf-maven-plugin需要获取系统信息下载相应的protobuf程序 -->
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.6.2</version>
        </extension>
    </extensions>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>

        <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>

            <configuration>
                <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>

                <!-- proto文件目录 -->
                <protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
                <!-- 生成的Java文件目录 -->
                <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                <clearOutputDirectory>false</clearOutputDirectory>
                <!--<outputDirectory>${project.build.directory}/generated-sources/protobuf</outputDirectory>-->
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

在maven中的build中的plugins中添加上面插件。

protoSourceRoot指定*.proto定义的消息文件路径。

outputDirectory指定输出的java文件地址。默认是输出到target中。

clearOutputDirectory是否清空输出文件,默认为是,如果是,outputDirectory下的项目工程会被清空。

执行maven,出现下图中protobuf表示插件安装成功

在这里插入图片描述

  • grpc-api模块的src/main/proto目录下新增名为helloworld.proto的文件,这里面定义了一个gRPC服务,里面含有一个接口,并且还有这个接口的入参和返回结果的定义:
syntax = "proto3";

option java_multiple_files = true;
// 生成java代码的package
option java_package = "com.demo.grpc.protocol";
option java_outer_classname = "HelloWorld";

// 入参的数据结构
message HelloRequest {
  string name = 1;
}

// 返回结果的数据结构
message HelloResponse {
  string message = 1;
}

// gRPC服务
service HelloWorldService {
  // 接口定义
  rpc SayHello (HelloRequest) returns (HelloResponse) {
  }
}
  • proto文件已经做好,接下来要根据这个文件来生成java代码,双击插件中的compilecompile-custom,分别编译beanservice。也可以执行mvn clean compile生成文件。

在这里插入图片描述

  • 生成的文件

在这里插入图片描述

三、简单 RPC

gRPC服务的开发和调用,实现的效果如下图:

在这里插入图片描述

3.1、开发gRPC服务端

  • 首先要开发的是gRPC服务端,回顾前文中helloworld.proto中定义的服务和接口,如下所示,名为HelloWorldService的服务对外提供名为SayHello接口,这就是咱们接下来的任务,创建一个springboot应用,该应用以gRPC的方式提供SayHello接口给其他应用远程调用:
// gRPC服务
service HelloWorldService {
  // 接口定义
  rpc SayHello (HelloRequest) returns (HelloResponse) {
  }
}
  • 基于springboot框架开发一个普通的gRPC服务端应用,一共需要五个步骤,如下图所示,接下来我们按照下图序号的顺序来开发:

在这里插入图片描述

  • 导入maven依赖,引入grpc-api依赖
<dependency>
    <groupId>com.csdn.dev</groupId>
    <artifactId>grpc-api</artifactId>
</dependency>
  • 这是个springboot应用,配置文件内容如下:
spring:
  application:
    name: grpc-server
grpc:
  server:
    port: 9252
  • 新建拦截类LogGrpcInterceptor.java,每当gRPC请求到来后该类会先执行,这里是将方法名字在日志中打印出来,您可以对请求响应做更详细的处理:
package com.demo.grpc.interceptor;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class LogGrpcInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
                                                                 ServerCallHandler<ReqT, RespT> serverCallHandler) {
        log.debug(serverCall.getMethodDescriptor().getFullMethodName());
        return serverCallHandler.startCall(serverCall, metadata);
    }
}
  • 为了让LogGrpcInterceptor可以在gRPC请求到来时被执行,需要做相应的配置,如下所示,在普通的bean的配置中添加注解即可:
package com.demo.grpc.config;

import com.demo.grpc.interceptor.LogGrpcInterceptor;
import io.grpc.ServerInterceptor;
import net.devh.boot.grpc.server.interceptor.GrpcGlobalServerInterceptor;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class GlobalInterceptorConfiguration {

    @GrpcGlobalServerInterceptor
    ServerInterceptor logServerInterceptor(){
        return new LogGrpcInterceptor();
    }
}
  • 应用启动类很简单
package com.demo.grpc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class GrpcServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(GrpcServerApplication.class, args);
    }
}
  • 接下来是最重要的service类,gRPC服务在此处对外暴露出去,完整代码如下,有几处要注意的地方稍后提到:
package com.demo.grpc.service;

import com.demo.grpc.protocol.HelloRequest;
import com.demo.grpc.protocol.HelloResponse;
import com.demo.grpc.protocol.HelloWorldServiceGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;

import java.util.Date;

@GrpcService
public class GrpcServerService extends HelloWorldServiceGrpc.HelloWorldServiceImplBase {

    @Override
    public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
        HelloResponse response = HelloResponse.newBuilder().setMessage("Hello " + request.getName() + ", " + new Date()).build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }
}
  • 上述GrpcServerService.java中有几处需要注意:
  1. 是使用@GrpcService注解,再继承HelloWorldServiceImplBase ,这样就可以借助grpc-server-spring-boot-starter库将sayHello暴露为gRPC服务;
  2. HelloWorldServiceImplBase是前文中根据proto自动生成的java代码,在grpc-api模块中;
  3. sayHello方法中处理完毕业务逻辑后,调用onNext方法填入返回内容;
  4. 调用onCompleted方法表示本次gRPC服务完成;

至此,gRPC服务端编码就完成了,咱们接着开始客户端开发;

3.2、开发gRPC客户端

  • 同理引入grpc-api依赖
<dependency>
    <groupId>com.csdn.dev</groupId>
    <artifactId>grpc-api</artifactId>
</dependency>
  • 应用配置文件grpc-client/src/main/resources/application.yml,注意address的值就是gRPC服务端的信息,我这里grpc-servergrpc-client在同一台电脑上运行,请您根据自己情况来设置:
server:
  port: 8082
spring:
  application:
    name: grpc-client

grpc:
  client:
    # gRPC配置的名字,GrpcClient注解会用到
    grpc-server:
      # gRPC服务端地址
      address: 'static://127.0.0.1:9252'
      enableKeepAlive: true
      keepAliveWithoutCalls: true
      negotiationType: plaintext
  • 接下来要创建下图展示的类,按序号顺序创建:

在这里插入图片描述

  • 首先是拦截类LogGrpcInterceptor,与服务端的拦截类差不多,不过实现的接口不同:
package com.demo.grpc.interceptor;

import io.grpc.*;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class LogGrpcInterceptor implements ClientInterceptor {

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
            MethodDescriptor<ReqT, RespT> method, 
            CallOptions callOptions, 
            Channel channel) {
        log.debug(method.getFullMethodName());
        return channel.newCall(method, callOptions);
    }
}
  • 为了让拦截类能够正常工作,即发起gRPC请求的时候被执行,需要新增一个配置类:
package com.demo.grpc.config;

import com.demo.grpc.interceptor.LogGrpcInterceptor;
import io.grpc.ClientInterceptor;
import net.devh.boot.grpc.client.interceptor.GrpcGlobalClientInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;

@Order(Ordered.LOWEST_PRECEDENCE)
@Configuration(proxyBeanMethods = false)
public class GlobalClientInterceptorConfiguration {

    @GrpcGlobalClientInterceptor
    ClientInterceptor logGrpcInterceptor() {
        return new LogGrpcInterceptor();
    }
}
  • 启动类:
package com.demo.grpc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class GrpcClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(GrpcClientApplication.class, args);
    }
}
  • 接下来是最重要的服务类GrpcClientService,有几处要注意的地方稍后会提到:
package com.demo.grpc.service;

import com.demo.grpc.protocol.HelloRequest;
import com.demo.grpc.protocol.HelloResponse;
import com.demo.grpc.protocol.HelloWorldServiceGrpc;
import io.grpc.StatusRuntimeException;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;

@Service
public class GrpcClientService {

    @GrpcClient("grpc-server")
    private HelloWorldServiceGrpc.HelloWorldServiceBlockingStub helloWorldServiceBlockingStub;

    public String sendMessage(final String name) {
        try {
            final HelloResponse response = this.helloWorldServiceBlockingStub.sayHello(HelloRequest.newBuilder().setName(name).build());
            return response.getMessage();
        } catch (StatusRuntimeException e) {
            return "FAILED with " + e.getStatus().getCode().name();
        }
    }
}
  • 上述GrpcClientService类有几处要注意的地方:
  1. @ServiceGrpcClientService注册为spring的普通bean实例;
  2. @GrpcClient修饰HelloWorldServiceBlockingStub,这样就可以通过grpc-client-spring-boot-starter库发起gRPC调用,被调用的服务端信息来自名为grpc-server的配置;
  3. HelloWorldServiceBlockingStub来自前文中根据helloworld.proto生成的java代码;
  4. helloWorldServiceBlockingStub.sayHello方法会远程调用grpc-server应用的gRPC服务;
  • 为了验证gRPC服务调用能否成功,再新增个web接口,接口内部会调用GrpcClientService.sendMessage,这样咱们通过浏览器就能验证gRPC服务是否调用成功了:
package com.demo.grpc.controller;

import com.demo.grpc.service.GrpcClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class GrpcClientController {

    @Resource
    private GrpcClientService grpcClientService;

    @GetMapping("/test1")
    public String printMessage(@RequestParam(defaultValue = "Hanson") String name) {
        return grpcClientService.sendMessage(name);
    }
}
  • 编码完成,接下来将两个服务都启动,验证gRPC服务是否正常;

3.3、验证gRPC服务

  • grpc-servergrpc-client都是普通的springboot应用,可以在IDEA中启动,点击下图红框位置,在弹出菜单中选择Run 'LocalServerApplication’即可启动grpc-server

在这里插入图片描述

  • grpc-server启动后,控制台会提示gRPC server已启动,正在监听9252端口,如下图:

在这里插入图片描述

  • grpc-client后,在浏览器输入http://localhost:8082/test1?name=Huang,可以看到响应的内容正是来自grpc-serverGrpcServerService.java

在这里插入图片描述

  • 从web端到gRPC服务端的关键节点信息如下图:

在这里插入图片描述

  • 可以看到grpc-server的拦截日志:

在这里插入图片描述

  • 还有grpc-client的拦截日志:

在这里插入图片描述

四、服务器端流式 RPC

客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息;

4.1、开发一个gRPC服务,类型是服务端流

  • src/main/proto目录下新增文件order.proto,里面定一个了一个gRPC方法ListOrders及其入参和返回对象,内容如下,要注意的是返回值要用关键字stream修饰,表示该接口类型是服务端流
syntax = "proto3";

option java_multiple_files = true;
// 生成java代码的package
option java_package = "com.demo.grpc.order";
// 类名
option java_outer_classname = "OrderProto";

// 买家ID
message Buyer {
  int32 buyerId = 1;
}

// 返回结果的数据结构
message Order {
  // 订单ID
  int32 orderId = 1;
  // 商品ID
  int32 productId = 2;
  // 交易时间
  int64 orderTime = 3;
  // 买家备注
  string buyerRemark = 4;
}

// gRPC服务,这是个在线商城的订单查询服务
service OrderQuery {
  // 服务端流式:订单列表接口,入参是买家信息,返回订单列表(用stream修饰返回值)
  rpc ListOrders (Buyer) returns (stream Order) {}
}
  • 双击插件中的compilecompile-custom,即可根据proto生成java代码:

在这里插入图片描述

  • 新生成的java代码如下图红框:

在这里插入图片描述

  • 接下来是最关键的gRPC服务,代码如下,可见responseObserver.onNext方法被多次调用,用以向客户端持续输出数据,最后通过responseObserver.onCompleted结束输出:
package com.demo.grpc.service;

import java.util.ArrayList;
import java.util.List;

import com.demo.grpc.order.Buyer;
import com.demo.grpc.order.Order;
import com.demo.grpc.order.OrderQueryGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
public class GrpcOrderServerService extends OrderQueryGrpc.OrderQueryImplBase {

    /**
     * 造一批数据
     * @return
     */
    private static List<Order> mockOrders(){
        List<Order> list = new ArrayList<>();
        Order.Builder builder = Order.newBuilder();

        for (int i = 0; i < 10; i++) {
            list.add(builder
                    .setOrderId(i)
                    .setProductId(100+i)
                    .setOrderTime(System.currentTimeMillis()/1000)
                    .setBuyerRemark(("Hanson-" + i))
                    .build());
        }

        return list;
    }

    @Override
    public void listOrders(Buyer request, StreamObserver<Order> responseObserver) {
        // 持续输出到client
        for (Order order : mockOrders()) {
            responseObserver.onNext(order);
        }
        // 结束输出
        responseObserver.onCompleted();
    }
}
  • 至此,服务端开发完成,咱们再开发一个springboot应用作为客户端,看看如何远程调用listOrders接口,得到responseObserver.onNext方法输出的数据;

4.2、开发一个客户端,调用前面发布的gRPC服务

  • 客户端模块的基本功能是提供一个web接口,其内部会调用服务端的listOrders接口,将得到的数据返回给前端,如下图:

在这里插入图片描述

  • 服务端的listOrders接口返回的Order对象里面有很多gRPC相关的内容,不适合作为web接口的返回值,因此定义一个OrderDTO类作为web接口返回值:
package com.demo.grpc.entity;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class OrderDTO {

    private int orderId;
    private int productId;
    private String orderTime;
    private String buyerRemark;
}
  • 重点来了,GrpcOrderClientService.java,里面展示了如何远程调用gRPC服务的listOrders接口,可见对于服务端流类型的接口,客户端这边通过stub调用会得到Iterator类型的返回值,接下来要做的就是遍历Iterator
package com.demo.grpc.service;

import com.demo.grpc.entity.OrderDTO;
import com.demo.grpc.order.Buyer;
import com.demo.grpc.order.Order;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

@Slf4j
@Service
public class GrpcOrderClientService {

    @GrpcClient("grpc-server")
    private com.demo.grpc.order.OrderQueryGrpc.OrderQueryBlockingStub orderQueryBlockingStub;

    public List<OrderDTO> listOrders(final String name) {
        // gRPC的请求参数
        Buyer buyer = Buyer.newBuilder().setBuyerId(101).build();

        // gRPC的响应
        Iterator<Order> orderIterator;

        // 当前方法的返回值
        List<OrderDTO> orders = new ArrayList<>();

        // 通过stub发起远程gRPC请求
        try {
            orderIterator = orderQueryBlockingStub.listOrders(buyer);
        } catch (final StatusRuntimeException e) {
            log.error("error grpc invoke", e);
            return new ArrayList<>();
        }

        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

        log.info("start put order to list");
        while (orderIterator.hasNext()) {
            Order order = orderIterator.next();

            orders.add(new OrderDTO(order.getOrderId(),
                    order.getProductId(),
                    // 使用DateTimeFormatter将时间戳转为字符串
                    dtf.format(LocalDateTime.ofEpochSecond(order.getOrderTime(), 0, ZoneOffset.of("+8"))),
                    order.getBuyerRemark()));
            log.info("");
        }

        log.info("end put order to list");

        return orders;
    }
}
  • 最后做一个controller类,对外提供一个web接口,里面会调用GrpcOrderClientService 的方法:
package com.demo.grpc.controller;

import com.demo.grpc.entity.OrderVO;
import com.demo.grpc.service.GrpcClientService;
import com.demo.grpc.service.GrpcOrderClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;


@RestController
public class GrpcClientController {

    @Resource
    private GrpcOrderClientService grpcOrderClientService;

    @GetMapping("/test2")
    public List<OrderVO> printMessage2(@RequestParam(defaultValue = "Hanson") String name) {
        return grpcOrderClientService.listOrders(name);
    }
}
  • 至此,编码完成,开始验证

4.3、验证

  • 启动grpc-server,启动成功后会监听9252端口:

在这里插入图片描述

  • 启动grpc-client,再在浏览器访问:http://localhost:8082/test2?name=Huang ,得到结果如下,可见成功地获取了gRPC的远程数据:

在这里插入图片描述

五、客户端流式 RPC

客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应;

5.1、在proto文件中定义客户端流类型的gRPC接口

  • 首先要做的就是定义gRPC接口,打开order.proto,在里面新增方法和相关的数据结构,需要重点关注的是AddToCart方法的入参ProductOrder前面添加了stream修饰,代表该方法是客户端流类型:
// 提交购物车时的产品信息
message ProductOrder {
  // 商品ID
  int32 productId = 1;
  // 商品数量
  int32 number = 2;
}

// 提交购物车返回结果的数据结构
message AddCartResponse {
  // 返回码
  int32 code = 1;
  // 描述信息
  string message = 2;
}

// gRPC服务,这是个在线商城的购物车服务
service CartService {
  // 客户端流式:添加多个商品到购物车
  rpc AddToCart (stream ProductOrder) returns (AddCartResponse) {}
}
  • 双击插件中的compilecompile-custom,即可根据proto生成java代码:

在这里插入图片描述

  • 新生成的java代码如下图红框:

在这里插入图片描述

5.2、开发服务端应用

  • 重点是提供grpc服务的GrpcServerService.java,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNextonCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount`,这样就可以记录总数了:
package com.demo.grpc.service;

import com.demo.grpc.order.AddCartResponse;
import com.demo.grpc.order.CartServiceGrpc;
import com.demo.grpc.order.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

@Slf4j
@GrpcService
public class GrpcCartServerService extends CartServiceGrpc.CartServiceImplBase {


    @Override
    public StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartResponse> responseObserver) {
        // 返回匿名类,给上层框架使用
        return new StreamObserver<ProductOrder>() {

            // 记录处理产品的总量
            private int totalCount = 0;

            @Override
            public void onNext(ProductOrder value) {
                log.info("正在处理商品[{}],数量为[{}]",
                        value.getProductId(),
                        value.getNumber());

                // 增加总量
                totalCount += value.getNumber();
            }

            @Override
            public void onError(Throwable t) {
                log.error("添加购物车异常", t);
            }

            @Override
            public void onCompleted() {
                log.info("添加购物车完成,共计[{}]件商品", totalCount);
                responseObserver.onNext(AddCartResponse.newBuilder()
                        .setCode(10000)
                        .setMessage(String.format("添加购物车完成,共计[%d]件商品", totalCount))
                        .build());
                responseObserver.onCompleted();
            }
        };
    }
}

5.3、开发客户端应用

  • 正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增getExtra方法可以返回String对象,详细的用法稍后会看到:
package com.demo.grpc.service;

import io.grpc.stub.StreamObserver;

public interface ExtendResponseObserver<T> extends StreamObserver<T> {
    String getExtra();
}
  • 重头戏来了,看看如何远程调用客户端流类型的gRPC接口,前面小结提到的2、3、4点都会涉及到,代码中已经添加详细注释:
package com.demo.grpc.service;

import com.demo.grpc.order.AddCartResponse;
import com.demo.grpc.order.CartServiceGrpc;
import com.demo.grpc.order.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
public class GrpcCartClientService {

    @GrpcClient("grpc-server")
    private CartServiceGrpc.CartServiceStub cartServiceStub;

    public String addToCart(int count) {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // responseObserver的onNext和onCompleted会在另一个线程中被执行,
        // ExtendResponseObserver继承自StreamObserver
        ExtendResponseObserver<AddCartResponse> responseObserver = new ExtendResponseObserver<AddCartResponse>() {

            String extraStr;

            @Override
            public String getExtra() {
                return extraStr;
            }

            private int code;

            private String message;

            @Override
            public void onNext(AddCartResponse value) {
                log.info("on next");
                code = value.getCode();
                message = value.getMessage();
            }

            @Override
            public void onError(Throwable t) {
                log.error("gRPC request error", t);
                extraStr = "gRPC error, " + t.getMessage();
                countDownLatch.countDown();
            }

            @Override
            public void onCompleted() {
                log.info("on complete");
                extraStr = String.format("返回码[%d],返回信息:%s" , code, message);
                countDownLatch.countDown();
            }
        };

        // 远程调用,此时数据还没有给到服务端
        StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver);

        for(int i=0; i<count; i++) {
            // 发送一笔数据到服务端
            requestObserver.onNext(build(101 + i, 1 + i));
        }

        // 客户端告诉服务端:数据已经发完了
        requestObserver.onCompleted();

        try {
            // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,
            // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,
            // await的超时时间设置为2秒
            countDownLatch.await(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("countDownLatch await error", e);
        }

        log.info("service finish");
        // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得
        return responseObserver.getExtra();
    }

    /**
     * 创建ProductOrder对象
     * @param productId
     * @param num
     * @return
     */
    private static ProductOrder build(int productId, int num) {
        return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
    }
}
  • 最后做个web接口,可以通过web请求验证远程调用:
package com.demo.grpc.controller;


import com.demo.grpc.entity.OrderVO;
import com.demo.grpc.service.GrpcCartClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;


@RestController
public class GrpcClientController {

    @Resource
    private GrpcCartClientService grpcCartClientService;

    @GetMapping("/test3")
    public String printMessage(@RequestParam(defaultValue = "1") int count) {
        return grpcCartClientService.addToCart(count);
    }
}
  • 编码完成,开始验证

5.4、验证

  • 启动client和server

  • 浏览器输入http://localhost:8082/test3?count=100,响应如下,可见远程调用gRPC服务成功:

在这里插入图片描述

  • 下面是服务端日志,可见逐一处理了客户端的每一笔数据:

在这里插入图片描述

  • 下面是客户端日志,可见由于CountDownLatch的作用,发起gRPC请求的线程一直等待responseObserver.onCompleted在另一个线程被执行完后,才会继续执行:

在这里插入图片描述

六、双向流式 RPC

双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;

其实就是结合了4,5两章节内容,快速过一下

6.1、在proto文件中定义双端流类型的gRPC接口

  • 首先要做的就是定义gRPC接口,打开order.proto,在里面新增方法和相关的数据结构,需要重点关注的是BatchDeduct方法的入参ProductOrder和返回值DeductResponse都添加了stream修饰(ProductOrder是上一章定义的),代表该方法是双向流类型:
// 扣减库存返回结果的数据结构
message DeductResponse {
  // 返回码
  int32 code = 1;
  // 描述信息
  string message = 2;
}

// gRPC服务,这是个在线商城的库存服务
service StockService {
  // 双向流式:批量扣减库存
  rpc BatchDeduct (stream ProductOrder) returns (stream DeductResponse) {}
}


在这里插入图片描述

其他相同地方不赘述

6.2、开发服务端应用

  • 重点是提供grpc服务的GrpcStockServerService.java,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNextonCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount,这样就可以记录总数了,由于请求参数是流,因此匿名类的onNext会被多次调用,并且由于返回值是流,因此onNext中调用了responseObserver.onNext方法来响应流中的每个请求,这样客户端就不断收到服务端的响应数据(即客户端的onNext方法会被多次调用):
package com.demo.grpc.service;

import com.demo.grpc.order.DeductResponse;
import com.demo.grpc.order.ProductOrder;
import com.demo.grpc.order.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
@Slf4j
public class GrpcStockServerService extends StockServiceGrpc.StockServiceImplBase {

    @Override
    public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductResponse> responseObserver) {
        // 返回匿名类,给上层框架使用
        return new StreamObserver<ProductOrder>() {

            private int totalCount = 0;

            @Override
            public void onNext(ProductOrder value) {
                log.info("正在处理商品[{}],数量为[{}]",
                        value.getProductId(),
                        value.getNumber());

                // 增加总量
                totalCount += value.getNumber();

                int code;
                String message;

                // 假设单数的都有库存不足的问题
                if (0 == value.getNumber() % 2) {
                    code = 10000;
                    message = String.format("商品[%d]扣减库存数[%d]成功", value.getProductId(), value.getNumber());
                } else {
                    code = 10001;
                    message = String.format("商品[%d]扣减库存数[%d]失败", value.getProductId(), value.getNumber());
                }

                responseObserver.onNext(DeductResponse.newBuilder()
                        .setCode(code)
                        .setMessage(message)
                        .build());
            }

            @Override
            public void onError(Throwable t) {
                log.error("批量减扣库存异常", t);
            }

            @Override
            public void onCompleted() {
                log.info("批量减扣库存完成,共计[{}]件商品", totalCount);
                responseObserver.onCompleted();
            }
        };
    }
}

6.3、开发客户端应用

  • 看看如何远程调用双向流类型的gRPC接口,代码中已经添加详细注释:
package com.demo.grpc.service;

import com.demo.grpc.order.DeductResponse;
import com.demo.grpc.order.ProductOrder;
import com.demo.grpc.order.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author hanson.huang
 * @version V1.0
 * @ClassName GrpcStockClientService
 * @date 2025/3/5 22:15
 **/
@Service
@Slf4j
public class GrpcStockClientService {
    
    @GrpcClient("grpc-server")
    private StockServiceGrpc.StockServiceStub stockServiceStub;

    /**
     * 批量减库存
     * @param count
     * @return
     */
    public String batchDeduct(int count) {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // responseObserver的onNext和onCompleted会在另一个线程中被执行,
        // ExtendResponseObserver继承自StreamObserver
        ExtendResponseObserver<DeductResponse> responseObserver = new ExtendResponseObserver<DeductResponse>() {

            // 用stringBuilder保存所有来自服务端的响应
            private StringBuilder stringBuilder = new StringBuilder();

            @Override
            public String getExtra() {
                return stringBuilder.toString();
            }

            /**
             * 客户端的流式请求期间,每一笔请求都会收到服务端的一个响应,
             * 对应每个响应,这里的onNext方法都会被执行一次,入参是响应内容
             * @param value
             */
            @Override
            public void onNext(DeductResponse value) {
                log.info("batch deduct on next");
                // 放入匿名类的成员变量中
                stringBuilder.append(String.format("返回码[%d],返回信息:%s<br>" , value.getCode(), value.getMessage()));
            }

            @Override
            public void onError(Throwable t) {
                log.error("batch deduct gRPC request error", t);
                stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
                countDownLatch.countDown();
            }

            /**
             * 服务端确认响应完成后,这里的onCompleted方法会被调用
             */
            @Override
            public void onCompleted() {
                log.info("batch deduct on complete");
                // 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了,
                // 会继续往下执行
                countDownLatch.countDown();
            }
        };

        // 远程调用,此时数据还没有给到服务端
        StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);

        for(int i=0; i<count; i++) {
            // 每次执行onNext都会发送一笔数据到服务端,
            // 服务端的onNext方法都会被执行一次
            requestObserver.onNext(build(101 + i, 1 + i));
        }

        // 客户端告诉服务端:数据已经发完了
        requestObserver.onCompleted();

        try {
            // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,
            // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,
            // await的超时时间设置为2秒
            countDownLatch.await(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("countDownLatch await error", e);
        }

        log.info("service finish");
        // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得
        return responseObserver.getExtra();
    }

    /**
     * 创建ProductOrder对象
     * @param productId
     * @param num
     * @return
     */
    private static ProductOrder build(int productId, int num) {
        return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
    }
}
  • 最后做个web接口,可以通过web请求验证远程调用:
package com.demo.grpc.controller;

import com.demo.grpc.service.GrpcStockClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;


@RestController
public class GrpcClientController {

    @Resource
    private GrpcStockClientService grpcStockClientService;

    @GetMapping("/test4")
    public String printMessage4(@RequestParam(defaultValue = "1") int count) {
        return grpcStockClientService.batchDeduct(count);
    }
}

6.4、验证

  • 这里要改:浏览器输入http://localhost:8082/test3?count=10,响应如下,可见远程调用gRPC服务成功,流式响应的每一笔返回都被客户端收到:

在这里插入图片描述

  • 服务端

在这里插入图片描述

  • 客服端

在这里插入图片描述

四种类型的gRPC服务及其客户端开发就完成了

创作不易,不妨点赞、收藏、关注支持一下,各位的支持就是我创作的最大动力❤️

在这里插入图片描述

http://www.dtcms.com/a/51617.html

相关文章:

  • 大语言模型中温度参数(Temperature)的核心原理
  • 大学至今的反思与总结
  • python-leetcode-零钱兑换 II
  • EasyRTC嵌入式视频通话SDK的跨平台适配,构建web浏览器、Linux、ARM、安卓等终端的低延迟音视频通信
  • 内核编程七:Linux 内核日志的级别
  • DeepSeek大模型深度解析:架构、技术与应用全景
  • SAP-ABAP:SAP第二代增强之隐式增强(Implicit Enhancements)和Enhancement Framework 的详细解析
  • 密码学(一)
  • 混合专家模型(MoE):高效处理复杂任务的智能架构,DeepSeek性能出色的秘诀
  • SpringCloud微服务开发工程细节
  • 1.15-16-17-18迭代器与生成器,函数,数据结构,模块
  • LeetCode 718.最长重复子数组(动态规划,Python)
  • DeepSeek学术写作全流程提示词
  • LLM自动金融量化-CFGPT
  • 肠胃镜过程描述(普通、无痛)
  • 学习记录-缺陷
  • 如何用FFmpeg高效拉流(避坑指南)
  • 数据库原理4
  • 为AI聊天工具添加一个知识系统 之135 详细设计之76 通用编程语言 之6
  • java项目之基于ssm的在线视频网站开发(源码+文档)
  • java8中young gc的垃圾回收器选型,您了解嘛
  • 基于SpringBoot+mybatis+layui就业管理系统设计和实现
  • Git安装与配置
  • 第一个 C++ 程序
  • C++————引用
  • SpringTask 引起的错误
  • 【折线图 Line】——12
  • PHP之常量
  • [数据结构]设计循环队列
  • 【由技及道】量子构建交响曲:Jenkinsfile流水线的十一维编程艺术【人工智障AI2077的开发日志008】