【gRPC】Java高性能远程调用之gRPC详解
gRPC详解
- 一、什么是gRPC?
- 二、用proto生成代码
- 2.1、前期准备
- 2.2、protobuf插件安装
- 三、简单 RPC
- 3.1、开发gRPC服务端
- 3.2、开发gRPC客户端
- 3.3、验证gRPC服务
- 四、服务器端流式 RPC
- 4.1、开发一个gRPC服务,类型是服务端流
- 4.2、开发一个客户端,调用前面发布的gRPC服务
- 4.3、验证
- 五、客户端流式 RPC
- 5.1、在proto文件中定义客户端流类型的gRPC接口
- 5.2、开发服务端应用
- 5.3、开发客户端应用
- 5.4、验证
- 六、双向流式 RPC
- 6.1、在proto文件中定义双端流类型的gRPC接口
- 6.2、开发服务端应用
- 6.3、开发客户端应用
- 6.4、验证
一、什么是gRPC?
gRPC是一款由Google开发的高性能、开源的 RPC(远程过程调用)框架。它基于 HTTP/2 协议,并使用 Protocol Buffers(Protobuf)作为默认的序列化工具。gRPC 的主要作用包括:
-
高性能:基于 HTTP/2,支持多路复用、流式传输和二进制编码,性能优于传统的 HTTP/1.1。
-
跨语言支持:支持多种编程语言(如 Java、Go、Python、C++ 等),适合微服务架构中的多语言环境。
-
强类型接口:通过 Protobuf 定义服务接口,生成强类型的客户端和服务端代码,减少错误。
-
流式通信:支持单向流、双向流等复杂的通信模式。
-
适用于微服务:适合低延迟、高吞吐量的场景,如微服务之间的通信。
贴一张油管老哥基于GraphQL、REST、gRPC的测试结果,在应对较大的RPS时,gRPC需要更少的CPU和内存使用率,并且网络带宽消耗也是最小的(花费最小),同时还有着很低的时延
,10万的rps响应能在20ms内。
基于以上优点,这篇文章详细说说gRPC怎么使用。
详细代码地址:github,有用的话,麻烦给个star
二、用proto生成代码
2.1、前期准备
环境与依赖
依赖 | 版本 |
---|---|
jdk | 1.8 |
maven | 3.9.9 |
springboot | 2.4.2 |
grpc | 1.26.0 |
protobuf | 3.19.4 |
grpc-client-spring-boot-starter | 2.13.0.RELEASE |
grpc-server-spring-boot-starter | 2.13.0.RELEASE |
项目结构
创建一个父项目grpc-test
,包含三个字项目,grpc-client
、grpc-server
和grpc-api
2.2、protobuf插件安装
- IDEA安装
Protobuf
插件
grpc-test
服务的pom文件中引入maven依赖,parent可替换成自己的springboot版本
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.csdn.dev</groupId>
<artifactId>csdn-dev-boot</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<artifactId>grpc-test</artifactId>
<modules>
<module>grpc-server</module>
<module>grpc-client</module>
<module>grpc-api</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<java.version>1.8</java.version>
<protobuf.version>3.19.4</protobuf.version>
<grpc.version>1.26.0</grpc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!-- grpc server和spring-boot集成框架 -->
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>2.13.0.RELEASE</version>
</dependency>
<!-- grpc client和spring-boot集成框架 -->
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
<version>2.13.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
</dependencies>
</project>
- maven中
protobuf plugin
<build>
<!-- os系统信息插件, protobuf-maven-plugin需要获取系统信息下载相应的protobuf程序 -->
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<!-- proto文件目录 -->
<protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
<!-- 生成的Java文件目录 -->
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
<!--<outputDirectory>${project.build.directory}/generated-sources/protobuf</outputDirectory>-->
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
在maven中的build中的plugins中添加上面插件。
protoSourceRoot
指定*.proto定义的消息文件路径。
outputDirectory
指定输出的java文件地址。默认是输出到target中。
clearOutputDirectory
是否清空输出文件,默认为是,如果是,outputDirectory下的项目工程会被清空。
执行maven,出现下图中protobuf表示插件安装成功
- 在
grpc-api
模块的src/main/proto
目录下新增名为helloworld.proto
的文件,这里面定义了一个gRPC服务,里面含有一个接口,并且还有这个接口的入参和返回结果的定义:
syntax = "proto3";
option java_multiple_files = true;
// 生成java代码的package
option java_package = "com.demo.grpc.protocol";
option java_outer_classname = "HelloWorld";
// 入参的数据结构
message HelloRequest {
string name = 1;
}
// 返回结果的数据结构
message HelloResponse {
string message = 1;
}
// gRPC服务
service HelloWorldService {
// 接口定义
rpc SayHello (HelloRequest) returns (HelloResponse) {
}
}
- proto文件已经做好,接下来要根据这个文件来生成java代码,双击插件中的
compile
和compile-custom
,分别编译bean
和service
。也可以执行mvn clean compile
生成文件。
- 生成的文件
三、简单 RPC
gRPC服务的开发和调用,实现的效果如下图:
3.1、开发gRPC服务端
- 首先要开发的是gRPC服务端,回顾前文中
helloworld.proto
中定义的服务和接口,如下所示,名为HelloWorldService的服务对外提供名为SayHello接口,这就是咱们接下来的任务,创建一个springboot应用,该应用以gRPC的方式提供SayHello接口给其他应用远程调用:
// gRPC服务
service HelloWorldService {
// 接口定义
rpc SayHello (HelloRequest) returns (HelloResponse) {
}
}
- 基于springboot框架开发一个普通的gRPC服务端应用,一共需要五个步骤,如下图所示,接下来我们按照下图序号的顺序来开发:
- 导入maven依赖,引入grpc-api依赖
<dependency>
<groupId>com.csdn.dev</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
- 这是个springboot应用,配置文件内容如下:
spring:
application:
name: grpc-server
grpc:
server:
port: 9252
- 新建拦截类
LogGrpcInterceptor.java
,每当gRPC请求到来后该类会先执行,这里是将方法名字在日志中打印出来,您可以对请求响应做更详细的处理:
package com.demo.grpc.interceptor;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LogGrpcInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
log.debug(serverCall.getMethodDescriptor().getFullMethodName());
return serverCallHandler.startCall(serverCall, metadata);
}
}
- 为了让
LogGrpcInterceptor
可以在gRPC请求到来时被执行,需要做相应的配置,如下所示,在普通的bean的配置中添加注解即可:
package com.demo.grpc.config;
import com.demo.grpc.interceptor.LogGrpcInterceptor;
import io.grpc.ServerInterceptor;
import net.devh.boot.grpc.server.interceptor.GrpcGlobalServerInterceptor;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class GlobalInterceptorConfiguration {
@GrpcGlobalServerInterceptor
ServerInterceptor logServerInterceptor(){
return new LogGrpcInterceptor();
}
}
- 应用启动类很简单
package com.demo.grpc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class GrpcServerApplication {
public static void main(String[] args) {
SpringApplication.run(GrpcServerApplication.class, args);
}
}
- 接下来是最重要的service类,gRPC服务在此处对外暴露出去,完整代码如下,有几处要注意的地方稍后提到:
package com.demo.grpc.service;
import com.demo.grpc.protocol.HelloRequest;
import com.demo.grpc.protocol.HelloResponse;
import com.demo.grpc.protocol.HelloWorldServiceGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import java.util.Date;
@GrpcService
public class GrpcServerService extends HelloWorldServiceGrpc.HelloWorldServiceImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
HelloResponse response = HelloResponse.newBuilder().setMessage("Hello " + request.getName() + ", " + new Date()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
- 上述GrpcServerService.java中有几处需要注意:
- 是使用
@GrpcService
注解,再继承HelloWorldServiceImplBase
,这样就可以借助grpc-server-spring-boot-starter
库将sayHello
暴露为gRPC服务; HelloWorldServiceImplBase
是前文中根据proto自动生成的java代码,在grpc-api
模块中;sayHello
方法中处理完毕业务逻辑后,调用onNext
方法填入返回内容;- 调用
onCompleted
方法表示本次gRPC服务完成;
至此,gRPC服务端编码就完成了,咱们接着开始客户端开发;
3.2、开发gRPC客户端
- 同理引入
grpc-api
依赖
<dependency>
<groupId>com.csdn.dev</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
- 应用配置文件
grpc-client/src/main/resources/application.yml
,注意address的值就是gRPC服务端的信息,我这里grpc-server
和grpc-client
在同一台电脑上运行,请您根据自己情况来设置:
server:
port: 8082
spring:
application:
name: grpc-client
grpc:
client:
# gRPC配置的名字,GrpcClient注解会用到
grpc-server:
# gRPC服务端地址
address: 'static://127.0.0.1:9252'
enableKeepAlive: true
keepAliveWithoutCalls: true
negotiationType: plaintext
- 接下来要创建下图展示的类,按序号顺序创建:
- 首先是拦截类
LogGrpcInterceptor
,与服务端的拦截类差不多,不过实现的接口不同:
package com.demo.grpc.interceptor;
import io.grpc.*;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LogGrpcInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel channel) {
log.debug(method.getFullMethodName());
return channel.newCall(method, callOptions);
}
}
- 为了让拦截类能够正常工作,即发起gRPC请求的时候被执行,需要新增一个配置类:
package com.demo.grpc.config;
import com.demo.grpc.interceptor.LogGrpcInterceptor;
import io.grpc.ClientInterceptor;
import net.devh.boot.grpc.client.interceptor.GrpcGlobalClientInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
@Order(Ordered.LOWEST_PRECEDENCE)
@Configuration(proxyBeanMethods = false)
public class GlobalClientInterceptorConfiguration {
@GrpcGlobalClientInterceptor
ClientInterceptor logGrpcInterceptor() {
return new LogGrpcInterceptor();
}
}
- 启动类:
package com.demo.grpc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class GrpcClientApplication {
public static void main(String[] args) {
SpringApplication.run(GrpcClientApplication.class, args);
}
}
- 接下来是最重要的服务类
GrpcClientService
,有几处要注意的地方稍后会提到:
package com.demo.grpc.service;
import com.demo.grpc.protocol.HelloRequest;
import com.demo.grpc.protocol.HelloResponse;
import com.demo.grpc.protocol.HelloWorldServiceGrpc;
import io.grpc.StatusRuntimeException;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
@Service
public class GrpcClientService {
@GrpcClient("grpc-server")
private HelloWorldServiceGrpc.HelloWorldServiceBlockingStub helloWorldServiceBlockingStub;
public String sendMessage(final String name) {
try {
final HelloResponse response = this.helloWorldServiceBlockingStub.sayHello(HelloRequest.newBuilder().setName(name).build());
return response.getMessage();
} catch (StatusRuntimeException e) {
return "FAILED with " + e.getStatus().getCode().name();
}
}
}
- 上述GrpcClientService类有几处要注意的地方:
- 用
@Service
将GrpcClientService
注册为spring的普通bean实例; - 用
@GrpcClient
修饰HelloWorldServiceBlockingStub
,这样就可以通过grpc-client-spring-boot-starter
库发起gRPC调用,被调用的服务端信息来自名为grpc-server
的配置; HelloWorldServiceBlockingStub
来自前文中根据helloworld.proto
生成的java代码;helloWorldServiceBlockingStub.sayHello
方法会远程调用grpc-server
应用的gRPC服务;
- 为了验证gRPC服务调用能否成功,再新增个web接口,接口内部会调用GrpcClientService.sendMessage,这样咱们通过浏览器就能验证gRPC服务是否调用成功了:
package com.demo.grpc.controller;
import com.demo.grpc.service.GrpcClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class GrpcClientController {
@Resource
private GrpcClientService grpcClientService;
@GetMapping("/test1")
public String printMessage(@RequestParam(defaultValue = "Hanson") String name) {
return grpcClientService.sendMessage(name);
}
}
- 编码完成,接下来将两个服务都启动,验证gRPC服务是否正常;
3.3、验证gRPC服务
grpc-server
和grpc-client
都是普通的springboot应用,可以在IDEA中启动,点击下图红框位置,在弹出菜单中选择Run 'LocalServerApplication’
即可启动grpc-server
:
grpc-server
启动后,控制台会提示gRPC server已启动,正在监听9252端口,如下图:
grpc-client
后,在浏览器输入http://localhost:8082/test1?name=Huang
,可以看到响应的内容正是来自grpc-server
的GrpcServerService.java
:
- 从web端到gRPC服务端的关键节点信息如下图:
- 可以看到
grpc-server
的拦截日志:
- 还有
grpc-client
的拦截日志:
四、服务器端流式 RPC
客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息;
4.1、开发一个gRPC服务,类型是服务端流
- 在
src/main/proto
目录下新增文件order.proto
,里面定一个了一个gRPC方法ListOrders
及其入参和返回对象,内容如下,要注意的是返回值要用关键字stream
修饰,表示该接口类型是服务端流:
syntax = "proto3";
option java_multiple_files = true;
// 生成java代码的package
option java_package = "com.demo.grpc.order";
// 类名
option java_outer_classname = "OrderProto";
// 买家ID
message Buyer {
int32 buyerId = 1;
}
// 返回结果的数据结构
message Order {
// 订单ID
int32 orderId = 1;
// 商品ID
int32 productId = 2;
// 交易时间
int64 orderTime = 3;
// 买家备注
string buyerRemark = 4;
}
// gRPC服务,这是个在线商城的订单查询服务
service OrderQuery {
// 服务端流式:订单列表接口,入参是买家信息,返回订单列表(用stream修饰返回值)
rpc ListOrders (Buyer) returns (stream Order) {}
}
- 双击插件中的
compile
和compile-custom
,即可根据proto生成java代码:
- 新生成的java代码如下图红框:
- 接下来是最关键的gRPC服务,代码如下,可见
responseObserver.onNext
方法被多次调用,用以向客户端持续输出数据,最后通过responseObserver.onCompleted
结束输出:
package com.demo.grpc.service;
import java.util.ArrayList;
import java.util.List;
import com.demo.grpc.order.Buyer;
import com.demo.grpc.order.Order;
import com.demo.grpc.order.OrderQueryGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
@GrpcService
public class GrpcOrderServerService extends OrderQueryGrpc.OrderQueryImplBase {
/**
* 造一批数据
* @return
*/
private static List<Order> mockOrders(){
List<Order> list = new ArrayList<>();
Order.Builder builder = Order.newBuilder();
for (int i = 0; i < 10; i++) {
list.add(builder
.setOrderId(i)
.setProductId(100+i)
.setOrderTime(System.currentTimeMillis()/1000)
.setBuyerRemark(("Hanson-" + i))
.build());
}
return list;
}
@Override
public void listOrders(Buyer request, StreamObserver<Order> responseObserver) {
// 持续输出到client
for (Order order : mockOrders()) {
responseObserver.onNext(order);
}
// 结束输出
responseObserver.onCompleted();
}
}
- 至此,服务端开发完成,咱们再开发一个springboot应用作为客户端,看看如何远程调用
listOrders
接口,得到responseObserver.onNext
方法输出的数据;
4.2、开发一个客户端,调用前面发布的gRPC服务
- 客户端模块的基本功能是提供一个web接口,其内部会调用服务端的listOrders接口,将得到的数据返回给前端,如下图:
- 服务端的
listOrders
接口返回的Order
对象里面有很多gRPC相关的内容,不适合作为web接口的返回值,因此定义一个OrderDTO
类作为web接口返回值:
package com.demo.grpc.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class OrderDTO {
private int orderId;
private int productId;
private String orderTime;
private String buyerRemark;
}
- 重点来了,
GrpcOrderClientService.java
,里面展示了如何远程调用gRPC服务的listOrders
接口,可见对于服务端流类型的接口,客户端这边通过stub
调用会得到Iterator
类型的返回值,接下来要做的就是遍历Iterator
:
package com.demo.grpc.service;
import com.demo.grpc.entity.OrderDTO;
import com.demo.grpc.order.Buyer;
import com.demo.grpc.order.Order;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@Slf4j
@Service
public class GrpcOrderClientService {
@GrpcClient("grpc-server")
private com.demo.grpc.order.OrderQueryGrpc.OrderQueryBlockingStub orderQueryBlockingStub;
public List<OrderDTO> listOrders(final String name) {
// gRPC的请求参数
Buyer buyer = Buyer.newBuilder().setBuyerId(101).build();
// gRPC的响应
Iterator<Order> orderIterator;
// 当前方法的返回值
List<OrderDTO> orders = new ArrayList<>();
// 通过stub发起远程gRPC请求
try {
orderIterator = orderQueryBlockingStub.listOrders(buyer);
} catch (final StatusRuntimeException e) {
log.error("error grpc invoke", e);
return new ArrayList<>();
}
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
log.info("start put order to list");
while (orderIterator.hasNext()) {
Order order = orderIterator.next();
orders.add(new OrderDTO(order.getOrderId(),
order.getProductId(),
// 使用DateTimeFormatter将时间戳转为字符串
dtf.format(LocalDateTime.ofEpochSecond(order.getOrderTime(), 0, ZoneOffset.of("+8"))),
order.getBuyerRemark()));
log.info("");
}
log.info("end put order to list");
return orders;
}
}
- 最后做一个controller类,对外提供一个web接口,里面会调用
GrpcOrderClientService
的方法:
package com.demo.grpc.controller;
import com.demo.grpc.entity.OrderVO;
import com.demo.grpc.service.GrpcClientService;
import com.demo.grpc.service.GrpcOrderClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
@RestController
public class GrpcClientController {
@Resource
private GrpcOrderClientService grpcOrderClientService;
@GetMapping("/test2")
public List<OrderVO> printMessage2(@RequestParam(defaultValue = "Hanson") String name) {
return grpcOrderClientService.listOrders(name);
}
}
- 至此,编码完成,开始验证
4.3、验证
- 启动
grpc-server
,启动成功后会监听9252端口:
- 启动
grpc-client
,再在浏览器访问:http://localhost:8082/test2?name=Huang ,得到结果如下,可见成功地获取了gRPC的远程数据:
五、客户端流式 RPC
客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应;
5.1、在proto文件中定义客户端流类型的gRPC接口
- 首先要做的就是定义gRPC接口,打开
order.proto
,在里面新增方法和相关的数据结构,需要重点关注的是AddToCart
方法的入参ProductOrder
前面添加了stream
修饰,代表该方法是客户端流类型:
// 提交购物车时的产品信息
message ProductOrder {
// 商品ID
int32 productId = 1;
// 商品数量
int32 number = 2;
}
// 提交购物车返回结果的数据结构
message AddCartResponse {
// 返回码
int32 code = 1;
// 描述信息
string message = 2;
}
// gRPC服务,这是个在线商城的购物车服务
service CartService {
// 客户端流式:添加多个商品到购物车
rpc AddToCart (stream ProductOrder) returns (AddCartResponse) {}
}
- 双击插件中的
compile
和compile-custom
,即可根据proto生成java代码:
- 新生成的java代码如下图红框:
5.2、开发服务端应用
- 重点是提供grpc服务的
GrpcServerService.java,咱们要做的就是给上层框架返回一个匿名类,至于里面的
onNext、
onCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量
totalCount`,这样就可以记录总数了:
package com.demo.grpc.service;
import com.demo.grpc.order.AddCartResponse;
import com.demo.grpc.order.CartServiceGrpc;
import com.demo.grpc.order.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
@Slf4j
@GrpcService
public class GrpcCartServerService extends CartServiceGrpc.CartServiceImplBase {
@Override
public StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartResponse> responseObserver) {
// 返回匿名类,给上层框架使用
return new StreamObserver<ProductOrder>() {
// 记录处理产品的总量
private int totalCount = 0;
@Override
public void onNext(ProductOrder value) {
log.info("正在处理商品[{}],数量为[{}]",
value.getProductId(),
value.getNumber());
// 增加总量
totalCount += value.getNumber();
}
@Override
public void onError(Throwable t) {
log.error("添加购物车异常", t);
}
@Override
public void onCompleted() {
log.info("添加购物车完成,共计[{}]件商品", totalCount);
responseObserver.onNext(AddCartResponse.newBuilder()
.setCode(10000)
.setMessage(String.format("添加购物车完成,共计[%d]件商品", totalCount))
.build());
responseObserver.onCompleted();
}
};
}
}
5.3、开发客户端应用
- 正常情况下我们都是用
StreamObserver
处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver
中取出业务数据,于是定一个新接口,继承自StreamObserver
,新增getExtra
方法可以返回String
对象,详细的用法稍后会看到:
package com.demo.grpc.service;
import io.grpc.stub.StreamObserver;
public interface ExtendResponseObserver<T> extends StreamObserver<T> {
String getExtra();
}
- 重头戏来了,看看如何远程调用客户端流类型的gRPC接口,前面小结提到的2、3、4点都会涉及到,代码中已经添加详细注释:
package com.demo.grpc.service;
import com.demo.grpc.order.AddCartResponse;
import com.demo.grpc.order.CartServiceGrpc;
import com.demo.grpc.order.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class GrpcCartClientService {
@GrpcClient("grpc-server")
private CartServiceGrpc.CartServiceStub cartServiceStub;
public String addToCart(int count) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// responseObserver的onNext和onCompleted会在另一个线程中被执行,
// ExtendResponseObserver继承自StreamObserver
ExtendResponseObserver<AddCartResponse> responseObserver = new ExtendResponseObserver<AddCartResponse>() {
String extraStr;
@Override
public String getExtra() {
return extraStr;
}
private int code;
private String message;
@Override
public void onNext(AddCartResponse value) {
log.info("on next");
code = value.getCode();
message = value.getMessage();
}
@Override
public void onError(Throwable t) {
log.error("gRPC request error", t);
extraStr = "gRPC error, " + t.getMessage();
countDownLatch.countDown();
}
@Override
public void onCompleted() {
log.info("on complete");
extraStr = String.format("返回码[%d],返回信息:%s" , code, message);
countDownLatch.countDown();
}
};
// 远程调用,此时数据还没有给到服务端
StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver);
for(int i=0; i<count; i++) {
// 发送一笔数据到服务端
requestObserver.onNext(build(101 + i, 1 + i));
}
// 客户端告诉服务端:数据已经发完了
requestObserver.onCompleted();
try {
// 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,
// 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,
// await的超时时间设置为2秒
countDownLatch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("countDownLatch await error", e);
}
log.info("service finish");
// 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得
return responseObserver.getExtra();
}
/**
* 创建ProductOrder对象
* @param productId
* @param num
* @return
*/
private static ProductOrder build(int productId, int num) {
return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
}
}
- 最后做个web接口,可以通过web请求验证远程调用:
package com.demo.grpc.controller;
import com.demo.grpc.entity.OrderVO;
import com.demo.grpc.service.GrpcCartClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
@RestController
public class GrpcClientController {
@Resource
private GrpcCartClientService grpcCartClientService;
@GetMapping("/test3")
public String printMessage(@RequestParam(defaultValue = "1") int count) {
return grpcCartClientService.addToCart(count);
}
}
- 编码完成,开始验证
5.4、验证
-
启动client和server
-
浏览器输入http://localhost:8082/test3?count=100,响应如下,可见远程调用gRPC服务成功:
- 下面是服务端日志,可见逐一处理了客户端的每一笔数据:
- 下面是客户端日志,可见由于
CountDownLatch
的作用,发起gRPC请求的线程一直等待responseObserver.onCompleted
在另一个线程被执行完后,才会继续执行:
六、双向流式 RPC
双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;
其实就是结合了4,5两章节内容,快速过一下
6.1、在proto文件中定义双端流类型的gRPC接口
- 首先要做的就是定义gRPC接口,打开
order.proto
,在里面新增方法和相关的数据结构,需要重点关注的是BatchDeduct
方法的入参ProductOrder
和返回值DeductResponse
都添加了stream
修饰(ProductOrder是上一章定义的),代表该方法是双向流类型:
// 扣减库存返回结果的数据结构
message DeductResponse {
// 返回码
int32 code = 1;
// 描述信息
string message = 2;
}
// gRPC服务,这是个在线商城的库存服务
service StockService {
// 双向流式:批量扣减库存
rpc BatchDeduct (stream ProductOrder) returns (stream DeductResponse) {}
}
其他相同地方不赘述
6.2、开发服务端应用
- 重点是提供grpc服务的
GrpcStockServerService.java
,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNext
、onCompleted
方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount
,这样就可以记录总数了,由于请求参数是流,因此匿名类的onNext
会被多次调用,并且由于返回值是流,因此onNext
中调用了responseObserver.onNext
方法来响应流中的每个请求,这样客户端就不断收到服务端的响应数据(即客户端的onNext
方法会被多次调用):
package com.demo.grpc.service;
import com.demo.grpc.order.DeductResponse;
import com.demo.grpc.order.ProductOrder;
import com.demo.grpc.order.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
@GrpcService
@Slf4j
public class GrpcStockServerService extends StockServiceGrpc.StockServiceImplBase {
@Override
public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductResponse> responseObserver) {
// 返回匿名类,给上层框架使用
return new StreamObserver<ProductOrder>() {
private int totalCount = 0;
@Override
public void onNext(ProductOrder value) {
log.info("正在处理商品[{}],数量为[{}]",
value.getProductId(),
value.getNumber());
// 增加总量
totalCount += value.getNumber();
int code;
String message;
// 假设单数的都有库存不足的问题
if (0 == value.getNumber() % 2) {
code = 10000;
message = String.format("商品[%d]扣减库存数[%d]成功", value.getProductId(), value.getNumber());
} else {
code = 10001;
message = String.format("商品[%d]扣减库存数[%d]失败", value.getProductId(), value.getNumber());
}
responseObserver.onNext(DeductResponse.newBuilder()
.setCode(code)
.setMessage(message)
.build());
}
@Override
public void onError(Throwable t) {
log.error("批量减扣库存异常", t);
}
@Override
public void onCompleted() {
log.info("批量减扣库存完成,共计[{}]件商品", totalCount);
responseObserver.onCompleted();
}
};
}
}
6.3、开发客户端应用
- 看看如何远程调用双向流类型的gRPC接口,代码中已经添加详细注释:
package com.demo.grpc.service;
import com.demo.grpc.order.DeductResponse;
import com.demo.grpc.order.ProductOrder;
import com.demo.grpc.order.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author hanson.huang
* @version V1.0
* @ClassName GrpcStockClientService
* @date 2025/3/5 22:15
**/
@Service
@Slf4j
public class GrpcStockClientService {
@GrpcClient("grpc-server")
private StockServiceGrpc.StockServiceStub stockServiceStub;
/**
* 批量减库存
* @param count
* @return
*/
public String batchDeduct(int count) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// responseObserver的onNext和onCompleted会在另一个线程中被执行,
// ExtendResponseObserver继承自StreamObserver
ExtendResponseObserver<DeductResponse> responseObserver = new ExtendResponseObserver<DeductResponse>() {
// 用stringBuilder保存所有来自服务端的响应
private StringBuilder stringBuilder = new StringBuilder();
@Override
public String getExtra() {
return stringBuilder.toString();
}
/**
* 客户端的流式请求期间,每一笔请求都会收到服务端的一个响应,
* 对应每个响应,这里的onNext方法都会被执行一次,入参是响应内容
* @param value
*/
@Override
public void onNext(DeductResponse value) {
log.info("batch deduct on next");
// 放入匿名类的成员变量中
stringBuilder.append(String.format("返回码[%d],返回信息:%s<br>" , value.getCode(), value.getMessage()));
}
@Override
public void onError(Throwable t) {
log.error("batch deduct gRPC request error", t);
stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
countDownLatch.countDown();
}
/**
* 服务端确认响应完成后,这里的onCompleted方法会被调用
*/
@Override
public void onCompleted() {
log.info("batch deduct on complete");
// 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了,
// 会继续往下执行
countDownLatch.countDown();
}
};
// 远程调用,此时数据还没有给到服务端
StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);
for(int i=0; i<count; i++) {
// 每次执行onNext都会发送一笔数据到服务端,
// 服务端的onNext方法都会被执行一次
requestObserver.onNext(build(101 + i, 1 + i));
}
// 客户端告诉服务端:数据已经发完了
requestObserver.onCompleted();
try {
// 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,
// 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,
// await的超时时间设置为2秒
countDownLatch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("countDownLatch await error", e);
}
log.info("service finish");
// 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得
return responseObserver.getExtra();
}
/**
* 创建ProductOrder对象
* @param productId
* @param num
* @return
*/
private static ProductOrder build(int productId, int num) {
return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
}
}
- 最后做个web接口,可以通过web请求验证远程调用:
package com.demo.grpc.controller;
import com.demo.grpc.service.GrpcStockClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
@RestController
public class GrpcClientController {
@Resource
private GrpcStockClientService grpcStockClientService;
@GetMapping("/test4")
public String printMessage4(@RequestParam(defaultValue = "1") int count) {
return grpcStockClientService.batchDeduct(count);
}
}
6.4、验证
- 这里要改:浏览器输入http://localhost:8082/test3?count=10,响应如下,可见远程调用gRPC服务成功,流式响应的每一笔返回都被客户端收到:
- 服务端
- 客服端
四种类型的gRPC服务及其客户端开发就完成了
创作不易,不妨点赞、收藏、关注支持一下,各位的支持就是我创作的最大动力❤️