Go语言的gRPC教程-通信模式
上一篇介绍了如何编写 protobuf 的 idl [接口定义语言 (Interface Definition Language
)],并使用 idl 生成了 gRPC 的代码,现在来看看如何编写客户端和服务端的代码。
一、Simple RPC (Unary RPC)
1.1 protobuf
syntax = "proto3";
option go_package = ".;helloworld";// protoc --go_out=. --go-grpc_out=. helloworld.protoservice SearchService {rpc Search(SearchRequest) returns (SearchResponse) {}
}message Book {string title = 1;int32 price = 3;
}message SearchRequest {string request = 1;repeated string keywords = 2;Book book = 3;
}message SearchResponse {string response = 1;
}
定义如上的 idl,需要关注几个事项
- 使用
protobuf
最新版本syntax = "proto3";
protoc-gen-go
要求 pb 文件必须指定 go 包的路径。即option go_package = ".;helloworld";
,. (点号) - 生成代码的导入路径 (Import Path),helloworld - 生成代码的Go 包名 (Package Name)。service
定义的method
仅能有一个入参和出参数。如果需要传递多个参数需要定义成message
- 可以使用
import
引用另外一个文件的 pb。
生成 go 和 grpc 的代码
protoc -I . --go_out . --go_opt paths=source_relative --go-grpc_out . --go-grpc_opt paths=source_relative helloworld.proto
helloword_proto
├── helloworld.pb.go
└── helloworld_grpc.pb.go
└── helloworld.proto
1.2 server 实现
1、由 pb 文件生成的 gRPC 代码中包含了 service 的接口定义,它和我们定义的 idl 是吻合的
service SearchService {rpc Search(SearchRequest) returns (SearchResponse) {}
}
type SearchServiceServer interface {Search(context.Context, *SearchRequest) (*SearchResponse, error)mustEmbedUnimplementedSearchServiceServer()
}
2、我们的业务逻辑就是实现这个接口
package mainimport ("context"helloworld "example.com/grpc/helloworld_proto""fmt""google.golang.org/grpc""google.golang.org/grpc/codes""google.golang.org/grpc/metadata""google.golang.org/grpc/peer""google.golang.org/grpc/status""net""time"
)type HelloWorldServer struct {helloworld.UnimplementedSearchServiceServer
}func (server *HelloWorldServer) Search(ctx context.Context, r *helloworld.SearchRequest) (*helloworld.SearchResponse, error) {fmt.Println("Request received:", r.GetRequest())fmt.Println("Keywords received:", r.GetKeywords())fmt.Println("Books received:", r.GetBook())return &helloworld.SearchResponse{Response: fmt.Sprintf("hello %s", r.GetRequest()),}, nil
}
3、在实现完业务逻辑之后,我们可以创建并启动服务
func main() {// 1. 监听端口listen, err := net.Listen("tcp", ":8090")if err != nil {fmt.Println("监听端口失败", err)}// 2. 创建gRPC服务器实例grpcServer := grpc.NewServer(grpc.UnaryInterceptor(serverUnaryInterceptor),)// 3. 注册服务helloworld.RegisterSearchServiceServer(grpcServer, &HelloWorldServer{})// 4. 启动服务, grpcServer.Serve(listen) 会阻塞当前 goroutine,直到服务停止err = grpcServer.Serve(listen)if err != nil {fmt.Println("启动服务失败", err)}
}
服务端代码实现的流程如下
client 实现
1、由 pb 文件生成的 gRPC 代码中包含了 client 的实现,它和我们定义的 idl 也是吻合的。
type searchServiceClient struct {cc grpc.ClientConnInterface
}func NewSearchServiceClient(cc grpc.ClientConnInterface) SearchServiceClient {return &searchServiceClient{cc}
}func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) {cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)out := new(SearchResponse)err := c.cc.Invoke(ctx, SearchService_Search_FullMethodName, in, out, cOpts...)if err != nil {return nil, err}return out, nil
}
2、直接使用 client 来进行 rpc 调用
package mainimport ("context"helloworld "example.com/grpc/helloworld_proto""fmt""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure""google.golang.org/protobuf/types/known/wrapperspb""runtime""time"
)
func main() {conn, err := grpc.NewClient("localhost:8090",grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {panic(err)}defer conn.Close()// 1. 创建客户端实例client := helloworld.NewSearchServiceClient(conn)// 2. 调用方法resp, err := client.Search(context.Background(), &helloworld.SearchRequest{Request: "Golang",Keywords: []string{"hello", "world"},Book: &helloworld.Book{Title: "Go语言编程",Price: &wrapperspb.Int64Value{Value: 100,},},})if err != nil {panic(err)}fmt.Println(resp)
}
客户端代码实现的流程如下
1.3 总结
✨ 前文提到过protobuf协议
是平台无关的。演示的客户端和服务端都是 golang 的,即使客户端和服务端不同语言也是类似的可以通信的
✨ 对于上面介绍的的这种类似于http1.x
的模式:客户端发送请求,服务端响应请求,一问一答的模式在 gRPC 里叫做Simple RPC
(也称Unary RPC 一元RPC
)。gRPC 同时也支持其他类型的交互方式。
二、Server-Streaming RPC 服务器端流式 RPC
服务器端流式 RPC
,显然是单向流,并代指 Server 为 Stream 而 Client 为普通 RPC 请求
简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。大致如图:
2.1 protobuf 定义
syntax = "proto3";option go_package = ".;streaming";service Greeter {// 服务端流模式:客户端发送一次请求,服务端持续返回数据rpc GetStream(StreamRequestData) returns (stream StreamResponseData);
}message StreamRequestData {string data = 1; // 1 是字段编号,用于在消息定义中标识字段,不是值
}message StreamResponseData {string data = 1;
}
2.2 server 实现
✨ 注意与Simple RPC
的区别:因为我们的服务端是流式响应的,因此对于服务端来说函数入参多了一个streaming.Greeter_GetStreamServer
参数用来写入多个响应,可以把它看作是客户端的对象
✨ 可以通过调用这个流对象的Send(...)
,来往客户端写入数据
✨ 通过返回nil
或者error
来表示全部数据写完了
func (s *server) GetStream(req *streaming.StreamRequestData, res streaming.Greeter_GetStreamServer) error {i := 0for {i++_ = res.Send(&streaming.StreamResponseData{Data: fmt.Sprintf("%v", time.Now().Unix()),})time.Sleep(time.Second)if i > 10 {break}}return nil
}
2.3 client 实现
✨ 注意与Simple RPC
的区别:因为我们的服务端是流式响应的,因此 RPC 函数返回值stream
是一个流,可以把它看作是服务端的对象
✨ 使用stream
的Recv
函数来不断从服务端接收数据
✨ 当Recv
返回io.EOF
代表流已经结束
func main() {conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), grpc.WithStreamInterceptor(streamInterceptor))if err != nil {panic(err)}defer conn.Close()// 1. 创建客户端实例client := streaming.NewGreeterClient(conn)// 2. 调用方法 (服务端流模式)res, _ := client.GetStream(context.Background(), &streaming.StreamRequestData{Data: "服务端流模式"})for {a, err := res.Recv()if err != nil {fmt.Println(err)break}fmt.Println(a.Data)}
}
2.4 总结
三、Client-Streaming RPC 客户端流式 RPC
客户端流式 RPC
,显然也是单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端,大致如图:
服务端没有必要等到客户端发送完所有请求再响应,可以在收到部分请求之后就响应
3.1 protobuf 定义
syntax = "proto3";option go_package = ".;streaming";service Greeter {// 客户端流模式:客户端持续发送数据,服务端最终返回结果rpc PutStream(stream StreamRequestData) returns (StreamResponseData);
}message StreamRequestData {string data = 1; // 1 是字段编号,用于在消息定义中标识字段,不是值
}message StreamResponseData {string data = 1;
}
3.2 server 实现
✨ 注意与Simple RPC
的区别:因为我们的客户端是流式请求的,因此请求参数cliStr streaming.Greeter_PutStreamServer
就是流对象
✨ 可以从cliStr streaming.Greeter_PutStreamServer
的Recv
函数读取消息
✨ 当Recv
返回io.EOF
代表流已经结束
✨ 使用cliStr streaming.Greeter_PutStreamServer的
SendAndClose`函数关闭并发送响应
// 在这段程序中,我们对每一个 Recv 都进行了处理
// PutStream 客户端流模式:客户端持续发送数据,服务端最终返回结果
func (s *server) PutStream(cliStr streaming.Greeter_PutStreamServer) error {for {if a, err := cliStr.Recv(); err != nil {fmt.Println(err)if err == io.EOF {// Finished reading the order streamreturn cliStr.SendAndClose(&streaming.StreamResponseData{Data: "服务端接收完成"})}break} else {fmt.Println(a.Data)}}return nil
}
3.3 Client 实现
✨ 注意与Simple RPC
的区别:因为我们的客户端是流式响应的,因此 RPC 函数返回值stream
是一个流
✨ 可以通过调用这个流对象的Send(...)
,来往这个对象写入数据
✨ 使用stream
的CloseAndRecv
函数关闭并发送响应
conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), grpc.WithStreamInterceptor(streamInterceptor))if err != nil {panic(err)}defer conn.Close()// 1. 创建客户端实例client := streaming.NewGreeterClient(conn)putS, _ := client.PutStream(context.Background())i := 0for {i++_ = putS.Send(&streaming.StreamRequestData{Data: fmt.Sprintf("客户端流模式:%d", i),})time.Sleep(time.Second)if i > 10 {// 发送超过10条消息,关闭流_, err := putS.CloseAndRecv()if err != nil {panic(err)}break}}
3.4 总结
四、Bidirectional-Streaming RPC 双向流式 RPC
双向流式 RPC
,顾名思义是双向流。由客户端以流式的方式发起请求,服务端同样以流式的方式响应请求
首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)
假设该双向流是按顺序发送的话,大致如图:
4.1 protobuf 定义
syntax = "proto3";option go_package = ".;streaming";service Greeter {// 双向流模式:客户端持续发送数据,服务端持续返回数据rpc BidiStream(stream StreamRequestData) returns (stream StreamResponseData);
}message StreamRequestData {string data = 1; // 1 是字段编号,用于在消息定义中标识字段,不是值
}message StreamResponseData {string data = 1;
}
4.2 server 实现
✨ 函数入参allStr streaming.Greeter_BidiStreamServer
是用来写入多个响应和读取多个消息的对象引用
✨ 可以通过调用这个流对象的Send(...)
,来往这个对象写入响应
✨ 可以通过调用这个流对象的Recv(...)
函数读取消息,当Recv
返回io.EOF
代表流已经结束
✨ 通过返回nil
或者error
表示全部数据写完了
// BidiStream 双向流模式:客户端持续发送数据,服务端持续返回数据
func (s *server) BidiStream(allStr streaming.Greeter_BidiStreamServer) error {wg := sync.WaitGroup{}wg.Add(2)// 接收客户端数据go func() {defer wg.Done()for {if data, err := allStr.Recv(); err != nil {fmt.Println(err)break} else {fmt.Println("收到客户端消息:" + data.Data)}}}()// 发送数据给客户端go func() {defer wg.Done()i := 0for {i++_ = allStr.Send(&streaming.StreamResponseData{Data: fmt.Sprintf("我是服务端%v", time.Now().Unix())})time.Sleep(time.Second)if i > 10 {break}}}()wg.Wait()return nil
}
4.3 Client 实现
✨ 函数返回值allStr streaming.Greeter_BidiStreamServer
是用来获取多个响应和写入多个消息的对象引用
✨ 可以通过调用这个流对象的Send(...)
,来往这个对象写入响应
✨ 可以通过调用这个流对象的Recv(...)
函数读取消息,当Recv
返回io.EOF
代表流已经结束
// 双向流模式client := streaming.NewGreeterClient(conn)ctx, cancel := context.WithCancel(context.Background())defer cancel()allStr, _ := client.BidiStream(ctx)wg := sync.WaitGroup{}wg.Add(2)// 接收服务端消息go func() {defer wg.Done()for {data, _ := allStr.Recv()fmt.Println("收到服务端消息:" + data.Data)}}()// 发送消息给服务端go func() {defer wg.Done()i := 0for {i++_ = allStr.Send(&streaming.StreamRequestData{Data: fmt.Sprintf("我是客户端:%d", rand.IntN(100))})time.Sleep(time.Second)if i > 10 {break}}}()wg.Wait()
4.4 总结
参考资料
- https://grpc.org.cn/docs/languages/go/basics
- https://segmentfault.com/a/1190000043355301
示例代码
gitee