当前位置: 首页 > news >正文

Go语言的gRPC教程-通信模式

上一篇介绍了如何编写 protobuf 的 idl [接口定义语言 (Interface Definition Language)],并使用 idl 生成了 gRPC 的代码,现在来看看如何编写客户端和服务端的代码。

一、Simple RPC (Unary RPC)

1.1 protobuf

syntax = "proto3";
option go_package = ".;helloworld";//  protoc --go_out=. --go-grpc_out=. helloworld.protoservice SearchService {rpc Search(SearchRequest) returns (SearchResponse) {}
}message Book {string title = 1;int32 price = 3;
}message SearchRequest {string request = 1;repeated string keywords = 2;Book book = 3;
}message SearchResponse {string response = 1;
}

定义如上的 idl,需要关注几个事项

  • 使用protobuf最新版本syntax = "proto3";
  • protoc-gen-go要求 pb 文件必须指定 go 包的路径。即option go_package = ".;helloworld";,. (点号) - 生成代码的导入路径 (Import Path),helloworld - 生成代码的Go 包名 (Package Name)。
  • service定义的method仅能有一个入参和出参数。如果需要传递多个参数需要定义成message
  • 可以使用import引用另外一个文件的 pb。

生成 go 和 grpc 的代码

protoc -I .  --go_out . --go_opt paths=source_relative  --go-grpc_out . --go-grpc_opt paths=source_relative  helloworld.proto
helloword_proto
├── helloworld.pb.go
└── helloworld_grpc.pb.go
└── helloworld.proto

1.2 server 实现

1、由 pb 文件生成的 gRPC 代码中包含了 service 的接口定义,它和我们定义的 idl 是吻合的

service SearchService {rpc Search(SearchRequest) returns (SearchResponse) {}
}
type SearchServiceServer interface {Search(context.Context, *SearchRequest) (*SearchResponse, error)mustEmbedUnimplementedSearchServiceServer()
}

2、我们的业务逻辑就是实现这个接口

package mainimport ("context"helloworld "example.com/grpc/helloworld_proto""fmt""google.golang.org/grpc""google.golang.org/grpc/codes""google.golang.org/grpc/metadata""google.golang.org/grpc/peer""google.golang.org/grpc/status""net""time"
)type HelloWorldServer struct {helloworld.UnimplementedSearchServiceServer
}func (server *HelloWorldServer) Search(ctx context.Context, r *helloworld.SearchRequest) (*helloworld.SearchResponse, error) {fmt.Println("Request received:", r.GetRequest())fmt.Println("Keywords received:", r.GetKeywords())fmt.Println("Books received:", r.GetBook())return &helloworld.SearchResponse{Response: fmt.Sprintf("hello %s", r.GetRequest()),}, nil
}

3、在实现完业务逻辑之后,我们可以创建并启动服务

func main() {// 1. 监听端口listen, err := net.Listen("tcp", ":8090")if err != nil {fmt.Println("监听端口失败", err)}// 2. 创建gRPC服务器实例grpcServer := grpc.NewServer(grpc.UnaryInterceptor(serverUnaryInterceptor),)// 3. 注册服务helloworld.RegisterSearchServiceServer(grpcServer, &HelloWorldServer{})// 4. 启动服务, grpcServer.Serve(listen) 会阻塞当前 goroutine,直到服务停止err = grpcServer.Serve(listen)if err != nil {fmt.Println("启动服务失败", err)}
}

服务端代码实现的流程如下

在这里插入图片描述

client 实现

1、由 pb 文件生成的 gRPC 代码中包含了 client 的实现,它和我们定义的 idl 也是吻合的。

type searchServiceClient struct {cc grpc.ClientConnInterface
}func NewSearchServiceClient(cc grpc.ClientConnInterface) SearchServiceClient {return &searchServiceClient{cc}
}func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) {cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)out := new(SearchResponse)err := c.cc.Invoke(ctx, SearchService_Search_FullMethodName, in, out, cOpts...)if err != nil {return nil, err}return out, nil
}

2、直接使用 client 来进行 rpc 调用

package mainimport ("context"helloworld "example.com/grpc/helloworld_proto""fmt""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure""google.golang.org/protobuf/types/known/wrapperspb""runtime""time"
)
func main() {conn, err := grpc.NewClient("localhost:8090",grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {panic(err)}defer conn.Close()// 1. 创建客户端实例client := helloworld.NewSearchServiceClient(conn)// 2. 调用方法resp, err := client.Search(context.Background(), &helloworld.SearchRequest{Request:  "Golang",Keywords: []string{"hello", "world"},Book: &helloworld.Book{Title: "Go语言编程",Price: &wrapperspb.Int64Value{Value: 100,},},})if err != nil {panic(err)}fmt.Println(resp)
}

客户端代码实现的流程如下

在这里插入图片描述

1.3 总结

✨ 前文提到过protobuf协议是平台无关的。演示的客户端和服务端都是 golang 的,即使客户端和服务端不同语言也是类似的可以通信的

✨ 对于上面介绍的的这种类似于http1.x的模式:客户端发送请求,服务端响应请求,一问一答的模式在 gRPC 里叫做Simple RPC (也称Unary RPC 一元RPC)。gRPC 同时也支持其他类型的交互方式。

在这里插入图片描述

二、Server-Streaming RPC 服务器端流式 RPC

服务器端流式 RPC,显然是单向流,并代指 Server 为 Stream 而 Client 为普通 RPC 请求

简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。大致如图:

在这里插入图片描述

2.1 protobuf 定义

syntax = "proto3";option go_package = ".;streaming";service Greeter {// 服务端流模式:客户端发送一次请求,服务端持续返回数据rpc GetStream(StreamRequestData) returns (stream StreamResponseData);
}message StreamRequestData {string data = 1; // 1 是字段编号,用于在消息定义中标识字段,不是值
}message StreamResponseData {string data = 1;
}

2.2 server 实现

✨ 注意与Simple RPC的区别:因为我们的服务端是流式响应的,因此对于服务端来说函数入参了一个streaming.Greeter_GetStreamServer参数用来写入多个响应,可以把它看作是客户端的对象

✨ 可以通过调用这个流对象的Send(...),来往客户端写入数据

✨ 通过返回nil或者error来表示全部数据写完了

func (s *server) GetStream(req *streaming.StreamRequestData, res streaming.Greeter_GetStreamServer) error {i := 0for {i++_ = res.Send(&streaming.StreamResponseData{Data: fmt.Sprintf("%v", time.Now().Unix()),})time.Sleep(time.Second)if i > 10 {break}}return nil
}

2.3 client 实现

✨ 注意与Simple RPC的区别:因为我们的服务端是流式响应的,因此 RPC 函数返回值stream是一个流,可以把它看作是服务端的对象

✨ 使用streamRecv函数来不断从服务端接收数据

✨ 当Recv返回io.EOF代表流已经结束

func main() {conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), grpc.WithStreamInterceptor(streamInterceptor))if err != nil {panic(err)}defer conn.Close()// 1. 创建客户端实例client := streaming.NewGreeterClient(conn)// 2. 调用方法 (服务端流模式)res, _ := client.GetStream(context.Background(), &streaming.StreamRequestData{Data: "服务端流模式"})for {a, err := res.Recv()if err != nil {fmt.Println(err)break}fmt.Println(a.Data)}
}

2.4 总结

在这里插入图片描述

三、Client-Streaming RPC 客户端流式 RPC

客户端流式 RPC,显然也是单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端,大致如图:

服务端没有必要等到客户端发送完所有请求再响应,可以在收到部分请求之后就响应

在这里插入图片描述

3.1 protobuf 定义

syntax = "proto3";option go_package = ".;streaming";service Greeter {// 客户端流模式:客户端持续发送数据,服务端最终返回结果rpc PutStream(stream StreamRequestData) returns (StreamResponseData);
}message StreamRequestData {string data = 1; // 1 是字段编号,用于在消息定义中标识字段,不是值
}message StreamResponseData {string data = 1;
}

3.2 server 实现

✨ 注意与Simple RPC的区别:因为我们的客户端是流式请求的,因此请求参数cliStr streaming.Greeter_PutStreamServer就是流对象

✨ 可以从cliStr streaming.Greeter_PutStreamServerRecv函数读取消息

✨ 当Recv返回io.EOF代表流已经结束

✨ 使用cliStr streaming.Greeter_PutStreamServerSendAndClose`函数关闭并发送响应

// 在这段程序中,我们对每一个 Recv 都进行了处理
// PutStream 客户端流模式:客户端持续发送数据,服务端最终返回结果
func (s *server) PutStream(cliStr streaming.Greeter_PutStreamServer) error {for {if a, err := cliStr.Recv(); err != nil {fmt.Println(err)if err == io.EOF {// Finished reading the order streamreturn cliStr.SendAndClose(&streaming.StreamResponseData{Data: "服务端接收完成"})}break} else {fmt.Println(a.Data)}}return nil
}

3.3 Client 实现

✨ 注意与Simple RPC的区别:因为我们的客户端是流式响应的,因此 RPC 函数返回值stream是一个流

✨ 可以通过调用这个流对象的Send(...),来往这个对象写入数据

✨ 使用streamCloseAndRecv函数关闭并发送响应

conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), grpc.WithStreamInterceptor(streamInterceptor))if err != nil {panic(err)}defer conn.Close()// 1. 创建客户端实例client := streaming.NewGreeterClient(conn)putS, _ := client.PutStream(context.Background())i := 0for {i++_ = putS.Send(&streaming.StreamRequestData{Data: fmt.Sprintf("客户端流模式:%d", i),})time.Sleep(time.Second)if i > 10 {// 发送超过10条消息,关闭流_, err := putS.CloseAndRecv()if err != nil {panic(err)}break}}

3.4 总结

在这里插入图片描述

四、Bidirectional-Streaming RPC 双向流式 RPC

双向流式 RPC,顾名思义是双向流。由客户端以流式的方式发起请求,服务端同样以流式的方式响应请求

首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)

假设该双向流是按顺序发送的话,大致如图:

在这里插入图片描述

4.1 protobuf 定义

syntax = "proto3";option go_package = ".;streaming";service Greeter {// 双向流模式:客户端持续发送数据,服务端持续返回数据rpc BidiStream(stream StreamRequestData) returns (stream StreamResponseData);
}message StreamRequestData {string data = 1; // 1 是字段编号,用于在消息定义中标识字段,不是值
}message StreamResponseData {string data = 1;
}

4.2 server 实现

✨ 函数入参allStr streaming.Greeter_BidiStreamServer是用来写入多个响应和读取多个消息的对象引用

✨ 可以通过调用这个流对象的Send(...),来往这个对象写入响应

✨ 可以通过调用这个流对象的Recv(...)函数读取消息,当Recv返回io.EOF代表流已经结束

✨ 通过返回nil或者error表示全部数据写完了

// BidiStream 双向流模式:客户端持续发送数据,服务端持续返回数据
func (s *server) BidiStream(allStr streaming.Greeter_BidiStreamServer) error {wg := sync.WaitGroup{}wg.Add(2)// 接收客户端数据go func() {defer wg.Done()for {if data, err := allStr.Recv(); err != nil {fmt.Println(err)break} else {fmt.Println("收到客户端消息:" + data.Data)}}}()// 发送数据给客户端go func() {defer wg.Done()i := 0for {i++_ = allStr.Send(&streaming.StreamResponseData{Data: fmt.Sprintf("我是服务端%v", time.Now().Unix())})time.Sleep(time.Second)if i > 10 {break}}}()wg.Wait()return nil
}

4.3 Client 实现

✨ 函数返回值allStr streaming.Greeter_BidiStreamServer是用来获取多个响应和写入多个消息的对象引用

✨ 可以通过调用这个流对象的Send(...),来往这个对象写入响应

✨ 可以通过调用这个流对象的Recv(...)函数读取消息,当Recv返回io.EOF代表流已经结束

	// 双向流模式client := streaming.NewGreeterClient(conn)ctx, cancel := context.WithCancel(context.Background())defer cancel()allStr, _ := client.BidiStream(ctx)wg := sync.WaitGroup{}wg.Add(2)// 接收服务端消息go func() {defer wg.Done()for {data, _ := allStr.Recv()fmt.Println("收到服务端消息:" + data.Data)}}()// 发送消息给服务端go func() {defer wg.Done()i := 0for {i++_ = allStr.Send(&streaming.StreamRequestData{Data: fmt.Sprintf("我是客户端:%d", rand.IntN(100))})time.Sleep(time.Second)if i > 10 {break}}}()wg.Wait()

4.4 总结

在这里插入图片描述

参考资料

  • https://grpc.org.cn/docs/languages/go/basics
  • https://segmentfault.com/a/1190000043355301

示例代码

gitee

http://www.dtcms.com/a/308333.html

相关文章:

  • 搭建 Mock 服务,实现前端自调
  • Python与MySQL的关联操作
  • AI+金融,如何跨越大模型和场景鸿沟?
  • 006 低功耗蓝牙BLE——音频数据无法直接免驱传输分析与折中方案
  • Spark SQL 的详细介绍
  • CentOS7上使用Docker安装Nacos详细步骤
  • java一个脚手架搭建
  • 常用设计模式系列(十六)—策略模式
  • sqli-labs靶场Less23
  • Jmeter全局变量跨线程组的使用
  • 四、主辅源电路
  • F12 开发者工具 使用指北
  • vk框架或者普通函数封装的一些函数可以拿取使用【会持续更新】
  • 谷歌devtools检查文本资源是否已压缩
  • 【LeetCode 热题 100】20. 有效的括号
  • 使用 Docker 部署 Apache RocketMQ
  • 数据转换能干什么?有哪些好用的数据转换方法?
  • 剖析客户服务痛点,借助 Baklib 整合多渠道反馈
  • ADW300 物联网仪表:引领能源计量智能化变革
  • STM32标准库搭建示例(STM32F103C8T6)
  • 操作系统:上下文切换(Context Switch)
  • Effective C++ 条款13:以对象管理资源
  • LLC电源原边MOS管DS增加RC吸收对ZVS的影响分析
  • Linux和shell
  • 保姆级别IDEA关联数据库方式、在IDEA中进行数据库的可视化操作(包含图解过程)
  • ceph sc 设置文件系统格式化参数
  • 前端ESLint扩展的用法详解
  • 【实时Linux实战系列】实时图像处理应用开发
  • 【PHP类的基础概念:从零开始学面向对象】
  • Elasticsearch DSL 核心语法大全:match、bool、range、聚合查询实战解析