gRPC学习笔记
微服务
-
一旦某个服务器宕机,会引起整个应用不可用,隔离性差
-
只能整体应用进行伸缩,浪费资源,可伸缩性差
-
代码耦合在一起,可维护性差
微服务架构:解决了单体架构的弊端
可以按照服务进行单独扩容
各个服务之间可以独立开发,独立部署
同时引入了新的问题
-
代码冗余
-
服务和服务之间存在调用关系
grpc
的优点
-
生态好:Google
-
跨语言
-
性能好
-
强类型
-
流式处理
安装gRPC
go get google.golang.org/grpc go get google.golang.org/protobuf
安装protor
protor
是可用于通信协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式
链接:百度网盘 请输入提取码 提取码:jw8a
把解压出来的bin
目录放在系统变量中
D:demo_rely\protoc-3.15.5-win64\bin
// cmd中测试 protoc --version
etcd
链接:百度网盘 请输入提取码 提取码:7777 etcdkeeper
链接:百度网盘 请输入提取码 提取码:7777
Protobuf Support
插件可以让.proto
文件有高光
// 下载安装protoc-gen-go git clone https://gitcode.com/golang/protobuf.git // 进入到 protobuf/protoc-gen-go 目录执行以下命令: go build -o protoc-gen-go main.go // 复制 protoc-gen-go.exe 到 GOROOT的 bin 目录 // 我的protoc-gen-go.exe文件在GO/bin目录下
// cmd 转换 cd ./user/ protoc -I internal/service/pb internal/service/pb/*.proto --go_out=plugins=grpc:. // internal/service/pb pb所在的文件 internal/service/pb/*.proto是需要生成的文件
// 跨语言:protoc-gen-go是go语言的 go install github.com/golang/protobuf/protoc-gen-go // 需要配环境变量, 或者把exe文件放到gobin目录下
原生rpc
// server/ser.go package main import ( "fmt" "net" "net/http" "net/rpc" ) type Server struct{} type Req struct { Num1 int Num2 int } type Res struct { Num int } func (s Server) Add(req Req, res *Res) error { res.Num = req.Num1 + req.Num2 fmt.Println("请求来了", req) return nil } func main() { // 注册rpc服务 rpc.Register(new(Server)) rpc.HandleHTTP() listen, err := net.Listen("tcp", ":8080") if err != nil { fmt.Println(err) return } http.Serve(listen, nil) }
// client/cli.go package main import ( "fmt" "net/rpc" ) type Req struct { Num1 int Num2 int } type Res struct { Num int } func main() { req := Req{1, 2} client, err := rpc.DialHTTP("tcp", ":8080") if err != nil { fmt.Println(err) return } var res Res err = client.Call("Server.Add", req, &res) fmt.Println(res, err) }
原生rpc
的问题:
-
编写相对复杂,需要自己去关注实现过程
-
没有代码提示,容易写错
hello word服务器
protobuf
文件相当于接口文件(或者约束)服务端客户端可以都有
// grpc_proto/hello.proto syntax = "proto3"; // 指定proto版本 package hello_grpc; // 指定默认包名 // go_package表示在当前目录生成go文件,指定生成.go文件时的包名 // ".:service"=>:前是最终生成的位置:后是包名 option go_package = "/hello_grpc"; // 定义rpc服务 service HelloService { // 定义函数 rpc方法 rpc SayHello (HelloRequest) returns (HelloResponse) {} } // HelloRequest 请求内容 >>>结构体 message HelloRequest { string name = 1; string message = 2; } // HelloResponse 响应内容 message HelloResponse{ string name = 1; string message = 2; }
若想让proto
文件有高光提示,就在插件中下载gRPC
// .porto转换.go // -I . 指定了包含 .proto 文件的目录 protoc -I . --go_out=plugins=grpc:. grpc_proto\hello.proto protoc --go-grpc_out=. hello.proto // 前两者都有 gRPC 插件 protoc --go_out=. hello.proto // 没grpc插件
服务端
-
编写一个结构体
-
实现
protobuf
中的所有方法 -
监听端口
-
注册服务
// server/grpc_server.go package main import ( "context" "fmt" hello_grpc "grpc_study/grpc_proto" "net" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" ) type HelloService struct { } func (HelloService) SayHello(ctx context.Context, request *hello_grpc.HelloRequest) (res *hello_grpc.HelloResponse, err error) { fmt.Println(request) return &hello_grpc.HelloResponse{ Name: "江小年", Message: "ok", }, nil } func main() { // 监听端口 listen, err := net.Listen("tcp", ":8080") if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } // 创建一个grpc服务器实例 s := grpc.NewServer() server := HelloService{} // 将server结构体注册为grpc服务 hello_grpc.RegisterHelloServiceServer(s, &server) fmt.Println("grpc server running :8080") // 开始处理客户端请求 err = s.Serve(listen) }
客户端
// client/grpc_client.go package main import ( "context" "fmt" hello_grpc "grpc_study/grpc_proto" "log" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) func main() { addr := ":8080" // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) } defer conn.Close() // 初始化客户端 client := hello_grpc.NewHelloServiceClient(conn) result, err := client.SayHello(context.Background(), &hello_grpc.HelloRequest{ Name: "江小年", Message: "ok", }) fmt.Println(result, err) }
proto
基本数据类型
// grpc_proto/hello.proto syntax = "proto3"; // 指定proto版本 package hello_grpc; // 指定默认包名 // go_package表示在当前目录生成go文件,指定生成.go文件时的包名 option go_package = "/hello_grpc"; // 定义rpc服务 service HelloService { // 定义函数 rpc SayHello (HelloRequest) returns (HelloResponse) {} } // HelloRequest 请求内容 message HelloRequest { string name = 1; string message = 2; } // HelloResponse 响应内容 message HelloResponse{ string name = 1; string message = 2; }
proto
语法
service
对应的就是go
里面的接口,可以作为服务端,客户端
rpc
对应的就是结构体中的方法
message
对应的也是结构体
数据类型
基本数据类型
message Request { double a1 = 1; float a2 = 2; int32 a3 = 3; uint32 a4 = 4; uint64 a5 = 5; sint32 a6 = 6; sint64 a7 = 7; fixed32 a8 = 8; fixed64 a9 = 9; sfixed32 a10 = 10; sfixed64 a11 = 11; bool a12 = 12; string a13 = 13; bytes a14 = 14; repeated string name = 15; // 字符串切片 }
数组类型 repeated
关键字
// grpc_study/grpc_proto/type_grpc/type_grpc.proto syntax = "proto3"; // 指定proto版本 // 指定golang包名 option go_package = "/type_grpc"; service TypeService { rpc Say(Request)returns(Response){} } message Request{ double a1 = 1; float a2 = 2; int32 a3 = 3; uint32 a4 = 4; uint64 a5 = 5; sint32 a6 = 6; sint64 a7 = 7; fixed32 a8 = 8; fixed64 a9 = 9; sfixed32 a10 = 10; sfixed64 a11 = 11; bool a12 = 12; string a13 = 13; bytes a14 = 14; } message Item { // 声明可供远程调用的方法 string name = 1; fixed32 code = 2; } message ArrayRequest { // 声明可供远程调用的方法 repeated int64 i6_list = 1; // 列表 repeated string s_list = 2; repeated Item item_list = 3; } message Response{ }
map
类型
键只能是基本类型
message MapRequest { map<int64, string> i_s = 1; map<string, bool> s_b = 2; map<string, Item> s_item = 3; }
嵌套类型
message q1 { message q2{ string name = 1; } string name = 1; Q2 q2 = 2; }
多服务
// grpc_study/grpc_proto/type_grpc/duo.proto syntax = "proto3"; // 指定proto版本 // 指定golang包名 option go_package = "/duo_proto"; service VideoService { rpc Look(Request)returns(Response){} } message Request{ string name = 1; } message Response{ string name = 1; } service OrderService { rpc Buy(Request)returns(Response){} }
服务端
// grpc_study/server/duo_server.go package main type VideoServer struct { } func (VideoS erver)Look(ctx context.Context, request *duo_proto.Request) (res *duo_proto.Response, err error) { fmt.Println("video:", request) return &duo_proto.Response{ Name: "江小年", }, nil } type OrderServer struct { } func (OrderServer)Buy(ctx context.Context, request *duo_proto.Request) (res *duo_proto.Response, err error) { fmt.Println("video:", request) return &duo_proto.Response{ Name: "江小年", }, nil } func main() { listen, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal(err) } s := grpc.NewServer() duo_proto.RegisterVideoServiceServer(s, &VideoServer{}) duo_proto.RegisterOrderServiceServer(s, &OrderServer{}) fmt.Println("grpc server程序运行在:8080") err = s.Serve(listen) }
客户端
// grpc_study/client/duo_client.go package main import ( "context" "fmt" "grpc_test/grpc_proto/hello_grpc" "log" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) func main() { addr := ":8080" // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) } defer conn.Close() // 初始化客户端 orderClient := duo_proto.NewOrderServiceClient(conn) res, err := orderClient.Buy(context.Background(), &duo_proto.Request{ Name: "江小年", }) fmt.Println(res, err) videoClient := duo_proto.NewVideoServiceClient(conn) res, err = videoClient.Look(context.Background(), &duo_proto.Request{ Name: "江小年", }) fmt.Println(res, err) }
多proto
文件
当项目大起来之后,会有很多个service,rpc,message
我们会将不同的服务放在不同的proto
文件中
还可以放一些公共的proto
文件
其实本质就是生成go文件,需要在一个包内
头部最好一样,
package
必须一样
// grpc_study/service_proto/video.proto syntax = "proto3"; package proto; option go_package = "/proto"; import "common.proto"; service VideoService { rpc Look(Request)returns(Response){} }
// grpc_study/service_proto/order.proto syntax = "proto3"; package proto; option go_package = "/proto"; import "common.proto"; service OrderService { rpc Buy(Request)returns(Response){} }
// grpc_study/service_proto/common.proto syntax = "proto3"; package proto; option go_package = "/proto"; message Request{ string name = 1; } message Response{ string name = 1; }
// grpc_study/service_proto/XXX.proto protoc -I .\service_proto --go_out=plugins=grpc:./service_proto .\service_proto\video.proto protoc -I .\service_proto --go_out=plugins=grpc:./service_proto .\service_proto\order.proto protoc -I .\service_proto --go_out=plugins=grpc:./service_proto .\service_proto\common.proto
服务端流式传输
// grpc_study/stream_proto/stream.proto syntax = "proto3"; option go_package = "/proto"; service Simple{ rpc Fun(Request)returns(Response){} } message Request { string name = 1; } message Response { string Text = 1; } // 服务端,客户端代码同上 // 服务端流式 service ServiceStream{ rpc Fun(Request)returns(stream Response){} }
// grpc_study/server/服务端流式serve.go package main import "grpc_study/stream_proto/proto" type ServiceStream struct {} func (ServiceStream)Fun(request *proto.Request, stream proto.ServiceStream_FunServer) error { fmt.Println(request) for i := 0; i < 10; i++ { stream.Send(&proto.Response{ Text: fmt.Sprintf("第%d轮数据", i) }) // 响应 } return nil } func main() { listen, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal(err) } server := grpc.NewServer() proto.RegisterServiceStreamServer(server, &ServiceStream) server.Server(listen) }
// grpc_study/client/服务端流式client.go func main() { addr := ":8080" // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) } defer conn.Close() // 初始化客户端 client := proto.NewServiceStreamClient(conn) stream, err := client.Fun(context.Background(), &proto.Request{ Name: "江小年", }) for i := 0; i < 10; i++ { response, err := stream.Recv() fmt.Println(response, err) } }
客户端不知道服务端什么时候结束
服务端流式案例_下载文件
客户端不知道服务端什么时候结束
// grpc_study/client/服务端流式client.go func main() { addr := ":8080" // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) } defer conn.Close() // 初始化客户端 client := proto.NewServiceStreamClient(conn) stream, err := client.Fun(context.Background(), &proto.Request{ Name: "江小年", }) for { response, err := stream.Recv() if err == io.EOF { // EOF就是结束的err报错 break } fmt.Println(response) } }
下载文件,文件大时需要多次传输,这时不知道多会儿结束怎么办
// grpc_study/stream_proto/stream.proto syntax = "proto3"; option go_package = "/proto"; service Simple{ rpc Fun(Request)returns(Response){} } message Request { string name = 1; } message Response { string Text = 1; } message FileResponse { string file_name = 1; bytes content = 2; } // 服务端,客户端代码同上 // 服务端流式 service ServiceStream{ rpc Fun(Request)returns(stream Response){} rpc DownLoadFile(Request)returns(stream FileResponse){} } // protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\stream.proto
创建一个文件夹static
放一个大文件
package main // grpc_study/server/服务端流式serve.go import ( "fmt" proto "grpc_study/grpc_proto" "io" "log" "net" "os" "google.golang.org/grpc" ) type ServiceStream struct{} func (ServiceStream) Fun(request *proto.Request, stream proto.ServiceStream_FunServer) error { fmt.Println(request) for i := 0; i < 10; i++ { stream.Send(&proto.Response{ Text: fmt.Sprintf("第%d轮数据", i), }) // 响应 } return nil } func (ServiceStream) DownLoadFile(request *proto.Request, stream proto.ServiceStream_DownLoadFileServer) error { fmt.Println(request) file, err := os.Open("abc.txt") if err != nil { panic(err) } defer file.Close() for { buf := make([]byte, 2048) _, err = file.Read(buf) if err == io.EOF { break } if err != nil { break } stream.Send(&proto.FileResponse{ Content: buf, }) } return nil } func main() { listen, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal(err) } server := grpc.NewServer() proto.RegisterServiceStreamServer(server, &ServiceStream{}) server.Serve(listen) }
package main import ( "bufio" "context" "fmt" proto "grpc_study/grpc_proto" "io" "log" "os" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // grpc_study/client/服务端流式client.go func main() { addr := ":8080" // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) } defer conn.Close() // 初始化客户端 client := proto.NewServiceStreamClient(conn) stream, err := client.DownLoadFile(context.Background(), &proto.Request{ Name: "江小年", }) file, err := os.OpenFile("abc.txt", os.O_CREATE|os.O_WRONLY, 0600) if err != nil { log.Fatalln(err) } defer file.Close() writer := bufio.NewWriter(file) var index int for { index++ response, err := stream.Recv() if err == io.EOF { // EOF就是结束的err报错 break } fmt.Printf("第%d次 写入 %d 数据", index, len(response.Content)) writer.Write(response.Content) } writer.Flush() }
客户端流式传输
// grpc_study/stream_proto/stream.proto syntax = "proto3"; option go_package = "/proto"; service Simple{ rpc Fun(Request)returns(Response){} } message Request { string name = 1; } message Response { string Text = 1; } message FileResponse { string file_name = 1; bytes content = 2; } // 服务端,客户端代码同上 // 服务端流式 service ServiceStream{ rpc Fun(Request)returns(stream Response){} rpc DownLoadFile(Request)returns(stream FileResponse){} } message FileRequest { string file_name = 1; bytes content = 2; } // 客户端流式 service ClientStream { rpc UploadFile(stream FileRequest)returns(Response){} } // protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\stream.proto
// grpc_study/server/客户端流式serve.go package main import "grpc_study/stream_proto/proto" type ClientStream struct {} func (ClientStream)UpLoadFile(stream proto.ClientStream_UploadFileServer) error { for i := 0; i < 10; i++ { response, err := stream.Recv() fmt.Println(response, err) } stream.SendAndClose(&proto.Response{Text: "完毕"}) return nil } func main() { listen, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal(err) } server := grpc.NewServer() proto.RegisterClientStreamServer(server, &ClientStream) server.Server(listen) }
// grpc_study/client/客户端流式client.go func main() { addr := ":8080" // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) } defer conn.Close() // 初始化客户端 client := proto.NewClientStreamClient(conn) stream, err := client.UploadFile(context.Background()) for i := 0; i < 10; i++ { stream.Send(&proto.FileRequest{FileName: fmt.Sprintf("第%d次", i)}) } response, err := stream.CloseAndRecv() fmt.Println(response, err) }
客户端流式案例_上传文件
// grpc_study/client/客户端流式client.go func main() { addr := ":8080" // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) } defer conn.Close() // 初始化客户端 client := proto.NewClientStreamClient(conn) stream, err := client.UploadFile(context.Background()) file, err := os.Open("abc.txt") if err != nil { log.Fatalln(err) } defer file.Close() for { buf := make([]byte, 2048) _, err = file.Read(buf) if err == io.EOF { break } if err != nil { break } stream.Send(&proto.FileRequest{ FileName: "x.png" Content: buf, }) } response, err := stream.CloseAndRecv() fmt.Println(response, err) }
// grpc_study/server/客户端流式serve.go package main import "grpc_study/stream_proto/proto" type ClientStream struct {} func (ClientStream)UpLoadFile(stream proto.ClientStream_UploadFileServer) error { file, err := os.OpenFile("abc.txt", os.O_CREATE|os.O_WRONLY, 0600) if err != nil { log.Fatalln(err) } defer file.Close() writer := bufio.NewWriter(file) var index int for{ index ++ response, err := stream.Recv() if err == io.EOF { break } writer.Writer(response.Context) fmt.Printf("第%d次", index) } writer.Flush() stream.SendAndClose(&proto.Response{Text: "完毕"}) return nil } func main() { listen, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal(err) } server := grpc.NewServer() proto.RegisterClientStreamServer(server, &ClientStream) server.Server(listen) }
双向流传输
// grpc_study/stream_proto/stream.proto syntax = "proto3"; option go_package = "/proto"; service Simple{ rpc Fun(Request)returns(Response){} } message Request { string name = 1; } message Response { string Text = 1; } message FileResponse { string file_name = 1; bytes content = 2; } // 服务端,客户端代码同上 // 服务端流式 service ServiceStream{ rpc Fun(Request)returns(stream Response){} rpc DownLoadFile(Request)returns(stream FileResponse){} } message FileRequest { string file_name = 1; bytes content = 2; } // 客户端流式 service ClientStream { rpc UploadFile(stream FileRequest)returns(Response){} } // 双向流传输 service BothStream{ rpc Chat(stream Request)returns(stream Response){} } // protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\stream.proto
// grpc_study/server/双向流式serve.go package main type BothStream struct {} func (BothStream)Chat(stream proto.BothStream_ChatServer) error { for i := 0; i < 10; i++ { request, _ := stream.Recv() fmt.Println(request) stream.Send(&proto.Response{ Text: "你好", }) } return nil } func main() { listen, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal(err) } server := grpc.NewServer() proto.RegisterBothStreamServer(server, &BothStream) server.Server(listen) }
// grpc_study/client/双向流式client.go func main() { addr := ":8080" // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) } defer conn.Close() // 初始化客户端 client := proto.NewBothStreamClient(conn) stream, err := client.Chat(context.Background()) for i:=0;i<10;i++{ stream.Send(&proto.Request{ Name: fmt.Sprintf("第%d次", i) }) response, err := stream.Recv() fmt.Println(response, err) } }
认证及安全传输说明
认证不是用户的身份认证,而是指多个server和client之间,如何识别对方是谁,并且可以安全的进行数据传输
-
SSL/TLS
认证方式(采用http2
协议) -
基于
Token
的认证方式(基于安全连接)
TLS
协议主要解决如下三个网络安全问题
-
保密,通过加密encryption实现,所有信息都加密传输,第三方无法嗅探
-
完整版,通过
MAC
校验机制,一旦被篡改,通信双方会立刻发现 -
认证,双方认证,双方都可以配备证书,放置身份被冒充
生产环境可以购买证书或使用一些平台发放的免费证书
key
:服务器上的私钥文件,用于对发送给客户端数据的加密,以及对客户端接收到数据的解密
csr
:证书签名请求文件,用于提交给证书颁发机构(CA)对证书签名
crt
:由证书颁发机机构(CA)签名后的证书,或者是开发者亲自签名的证书,包含持证人的信息,持有人的公钥,以及签署者的签名等信息
pem
:是基于Base64
编码的证书,扩展名包括PEM
、CRT
和CER
TLS
认证实现
首先通过openssl
生成证书和私钥
-
官网下载,Downloads | OpenSSL Library
-
其他人做的便携安装包,Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions
-
配置环境变量
D:\...\OpenSSL-Win64\bin
-
命令行测试
openssl
生成证书
# cd ./key/ # 1、生成私钥 openssl genrsa -out server.key 2048 # 2、生成证书 全部回车即可,可以不填 openssl req -new -x509 -key server.key -out server.crt -days 36500 # 国家名称 Country Name (2 letter code) [Au]:CN # 省名称 State or Province Name (full name) [Some-State]:GuangDong # 城市名称 Locality Name (eg, city) []:Meizhou # 公司组织名称 Organization Name (eg, company) [Internet widgits Pty Ltd]:Xuexiangban # 部门名称 Organizational Unit Name (eg, section) []:go # 服务器or网站名称 Common Name (e.g. server FQDN or YOUR name) []:kuangstudy # 邮件 Email Address []:3509309412@qq.com # 3、生成csr openssl req -new -key server.key -out server.csr# 更改openssl.cnf (Linux是openssl.cfg) # 1)复制一份安装的openssl的bin目录下的openssl.cnf文件到你项目所在的证书目录下 # 2)找到 [ CA_default ],打开 copy_extensions = copy (就是把前面的#去掉) # 3)找到 [ req ],打开 req_extensions = v3_req # The extensions to add to a certificate request # 4)找到 [ v3_req ],添加subjectAltName = @alt_names # 5)添加新的标签 [ alt_names ],和标签字段 DNS.1 = *.huajiangsll.com # 可设置多个域名,只有通过域名才能访问到代码 # 步骤5)是设置能访问的网站# 生成证书私钥test.key openssl genpkey -algorithm RSA -out test.key # 通过私钥test.key生成证书请求文件test.csr(cnf, 注意windows系统是cfg) openssl req -new -nodes -key test.key -out test.csr -days 3650 -subj "/C=cn/OU=myorg/O=mycomp/CN=myname" -config ./openssl.cfg -extensions v3_req # test.csr是上面生成的证书请求文件。 ca.crt/server.key是CA证书文件和key,用来对test.csr进行签名认证。这两个文件在第一部分生成 # 生成SAN证书 pem openssl x509 -req -days 365 -in test.csr -out test.pem -CA server.crt -CAkey server.key -CAcreateserial -extfile ./openssl.cfg -extensions v3_req
// server/grpc_server.go package main import ( "context" "fmt" "grpc_test/grpc_proto/hello_grpc" "net" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" ) type HelloService struct { } func (HelloService) SayHello(ctx context.Context, request *hello_grpc.HelloRequest) (res *hello_grpc.HelloResponse, err error) { fmt.Println(request) return &hello_grpc.HelloResponse{ Name: "江小年", Message: "ok", }, nil } func main() { creds, _ := credentials.NewServerTLSFromFile("D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.pem", "D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.key") // 监听端口 listen, err := net.Listen("tcp", ":8080") if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } // 创建一个grpc服务器实例 s := grpc.NewServer(grpc.Creds(creds)) server := HelloService{} // 将server结构体注册为grpc服务 hello_grpc.RegisterHelloServiceServer(s, &server) fmt.Println("grpc server running :8080") // 开始处理客户端请求 err = s.Serve(listen) }
// grpc_client.go package main import ( "context" "fmt" "grpc_test/grpc_proto/hello_grpc" "log" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) func main() { creds, _ := credentials.NewClientTLSFromFile("D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.pem", "*.helloword.com") addr := ":8080" // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds)) if err != nil { log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) } defer conn.Close() // 初始化客户端 client := hello_grpc.NewHelloServiceClient(conn) result, err := client.SayHello(context.Background(), &hello_grpc.HelloRequest{ Name: "江小年", Message: "ok", }) fmt.Println(result, err) }
Token
认证
gRPC
提供的一个接口,接口中有两个方法,接口位于credentials包下,这个接口需要客户端来实现
type PerRPCCredentials interface { GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) RequireTransportSecurity() bool }
第一个方法就是获取元数据信息,也就是客户端提供的key.value
对,context
用于控制超时和取消,uri
是请求入口处的uri
第二个方法的作用是否需要基于TLS
认证进行安全传输,如果返回值是true
,则必须加上TLS
认证,返回值是false
则不用
// grpc_client.go package main import ( "context" "fmt" "grpc_test/grpc_proto/hello_grpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // type PerRPCCredentials interface { // GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) // RequireTransportSecurity() bool // } type ClientTokenAuth struct { } func (c ClientTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { return map[string]string{ "appId": "jiangxiaonian", "appKey": "123456", }, nil } func (c ClientTokenAuth) RequireTransportSecurity() bool { return false // 不开启安全认证 是否需要基于`TLS`认证进行安全传输 } func main() { addr := ":8080" // creds, _ := credentials.NewClientTLSFromFile("D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.pem", // "*.helloword.com") var opts []grpc.DialOption opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) opts = append(opts, grpc.WithPerRPCCredentials(new(ClientTokenAuth))) conn, err := grpc.Dial(addr, opts...) // 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接 // 此处使用不安全的证书来实现 SSL/TLS // conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds)) // if err != nil { // log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err)) // } defer conn.Close() // 初始化客户端 client := hello_grpc.NewHelloServiceClient(conn) result, err := client.SayHello(context.Background(), &hello_grpc.HelloRequest{ Name: "江小年", Message: "ok", }) fmt.Println(result, err) }
<!-- 服务端应该写一个拦截器 -->
// server/grpc_server.go package main import ( "context" "errors" "fmt" "grpc_test/grpc_proto/hello_grpc" "net" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" ) type HelloService struct { } func (HelloService) SayHello(ctx context.Context, request *hello_grpc.HelloRequest) (res *hello_grpc.HelloResponse, err error) { // 获取元数据的信息 md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, errors.New("未传输token") } var appId string var appKey string if v, ok := md["appid"]; ok { appId = v[0] } if v, ok := md["appkey"]; ok { appKey = v[0] } // 用户id 查appid if appId != "jiangxiaonian" || appKey != "123456" { return nil, errors.New("token不正确") } fmt.Println(request) return &hello_grpc.HelloResponse{ Name: "江小年", Message: "ok", }, nil } func main() { // creds, _ := credentials.NewServerTLSFromFile("D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.pem", // "D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.key") // 监听端口 listen, err := net.Listen("tcp", ":8080") if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } // 创建一个grpc服务器实例 s := grpc.NewServer(grpc.Creds(insecure.NewCredentials())) server := HelloService{} // 将server结构体注册为grpc服务 hello_grpc.RegisterHelloServiceServer(s, &server) fmt.Println("grpc server running :8080") // 开始处理客户端请求 err = s.Serve(listen) }
<!-- `etcd` -->
etcd
etcd
是一个go语言编写的分布式、高可用的一致性键值存储系统,用于提供可靠的分布式键值对存储,配置共享和服务发现功能
etcd
链接:百度网盘 请输入提取码 提取码:7777etcdkeeper
链接:百度网盘 请输入提取码 提取码:7777
etcd文档
etcd源码
etcd不同系统安装
Etcd实现服务注册和发现
-
go语言客户端
etcd
基本操作 -
etcd
消息发布订阅逻辑 不如redis
数据类型丰富 -
服务注册和服务发现的具体实现逻辑
etcd
配置文件
# etcdconf/etcd0.yaml # 节点名称 name: "etcdnode0" # 数据存储目录 data-dir: "/etcd-data/data" # 预写式日志存储目录 wal-dir: "/etcd-data/wal" # 集群成员之间通讯使用URL listen-peer-urls: "http://0.0.0.0:2380" # 集群提供给外部客户端访问的URL,外部客户端必须通过指定IP:端口访问etcd listen-client-urls: "http://0.0.0.0:2379" # 集群配置 initial-advertise-peer-urls: "http://192.168.239.149:2380" # 集群初始成员配置,是etcd静态部署的核心初始化配置,他说明了当前集群由哪些URLs组成,此处default为节点名称 initial-cluster: "etcdnode0=http://192.168.239.149:2380,etcdnode1=http://192.168.239.149:12380,etcdnode2=http://192.168.239.149:22380" # 初始化集群状态(new或existing) existing为已有集群增加节点 initial-cluster-state: "new" # 引导期间etcd集群的初始集群令牌,防止不同集群之间产生交互 initial-cluster-token: "etcd-cluster" # 向客户端发布的服务端点 advertise-client-urls: "http://192.168.239.149:2379" logger: "zap" # 配置日志级别,仅支持debug, indo, warn, error, panic, or fatal log-level: "warn" log-outputs: "stderr"
docker run -d -p 2379:2379 -p 2380:2380 -v /tmp/etcd0-data:/etcd-data -v /home/nick/work/etcdconf:/etcd-conf --name etcd0 quay.io/coreos/etcd:v3.5.5 /usr/local/bin/etcd --config-file=/etcd-conf/etcd0.yaml
docker run: 这是 Docker 的命令行工具,用于运行一个新的容器 -d: 表示以守护进程模式运行容器,即在后台运行 -p 2379:2379 -p 2380:2380: 这部分是端口映射,将容器的 2379 和 2380 端口映射到宿主机的相同端口。2379 端口通常用于客户端通信,2380 端口用于集群节点间的通信 -v /tmp/etcd0-data:/etcd-data: 这是卷(volume)挂载,将宿主机的 /tmp/etcd0-data 目录挂载到容器内的 /etcd-data 目录,用于存储 etcd 的数据 -v /home/nick/work/etcdconf:/etcd-conf: 这是另一个卷挂载,将宿主机的 /home/nick/work/etcdconf 目录挂载到容器内的 /etcd-conf 目录,用于存储 etcd 的配置文件 --name etcd0: 为容器指定一个名称,这里是 etcd0 quay.io/coreos/etcd:v3.5.5: 指定要运行的 Docker 镜像,这里是 etcd 的官方镜像,版本为 3.5.5 /usr/local/bin/etcd --config-file=/etcd-conf/etcd0.yaml: 这是容器启动后要执行的命令,这里指定使用 etcd0.yaml 配置文件启动 etcd 服务
docker exec -it etcd0 bash etcdctl endpoint status // 访问当前etcd节点 etcdctl endpoint status --cluster // 访问所有节点 etcdctl endpoint status --cluster -w table // 可读性差,输出为表格 etcdctl member add -h // 增加节点的help命令
基本操作
-
下载依赖
// 下划看到Install 和代码 go get go.etcd.io/etcd/client/v3
// D:/etcd_study/main.go package main import clientv3 "go.etcd.io/etcd/client/v3" func main() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379", "localhost:12379", "localhost:22379"}, DialTimeout: 5 * time.Second, }) if err != nil { // handle error! } defer cli.Close() }
-
新建
etcd/
etcd/etcd.go
// etcd/etcd.go package etcd import "time" const DialTimeout = time.Second * 5 func GetEtcdEndpoints() []string { // localhost换成etcd集群的节点IP return []string{"localhost:2379", "localhost:12379", "localhost:22379"} }
-
新建
etcd/kv.go
putRes, err := cli.Put(context.Background(), "key1", "value1", clientv3.WithPrevKV()) // 保存key-value,并返回上一个值, 一般来说命令都有对应的函数etcdctl put -h | PrevKv>>>WithPrevKV() if err != nil { log.Fatalln(err) } log.Panicln(putRes.PrevKv) // 打印上一个值
// etcd/kv.go package etcd import ( "context" "fmt" "log" clientv3 "go.etcd.io/etcd/client/v3" ) // put/get/delete key-value func KvDemo() { // 没有连接池,每次请求都会新建连接,所以性能不高 cli, err := clientv3.New(clientv3.Config{ Endpoints: GetEtcdEndpoints(), DialTimeout: DialTimeout, }) if err != nil { log.Fatalln(err) } defer cli.Close() _, err = cli.Put(context.Background(), "key1", "value1") if err != nil { log.Fatalln(err) } putRes, err := cli.Put(context.Background(), "key1", "value11", clientv3.WithPrevKV()) if err != nil { log.Fatalln(err) } if putRes.PrevKv != nil { fmt.Println(putRes.PrevKv) } getRes, err := cli.Get(context.Background(), "key", clientv3.WithPrefix()) if err != nil { log.Fatalln(err) } fmt.Println(getRes) delRes, err := cli.Delete(context.Background(), "key", clientv3.WithPrefix()) if err != nil { log.Fatalln(err) } fmt.Println(delRes.Deleted) }
// etcd_study/main.go package main import ( "etcdstudy/etcd" ) func main() { etcd.KvDemo() }
权限部分
-
官方文档
-
Auth
通过命令行做操作export ETCDCTL_API=3 ENDPOINTS=localhost:2379 etcdctl --endpoints=${ENDPOINTS} role add root etcdctl --endpoints=${ENDPOINTS} role get root etcdctl --endpoints=${ENDPOINTS} user add root etcdctl --endpoints=${ENDPOINTS} user grant-role root root etcdctl --endpoints=${ENDPOINTS} user get root etcdctl --endpoints=${ENDPOINTS} role add role0 etcdctl --endpoints=${ENDPOINTS} role grant-permission role0 readwrite foo etcdctl --endpoints=${ENDPOINTS} user add user0 etcdctl --endpoints=${ENDPOINTS} user grant-role user0 role0 etcdctl --endpoints=${ENDPOINTS} auth enable # 所有客户端请求都要经过验证 etcdctl --endpoints=${ENDPOINTS} --user=user0:123 put foo bar etcdctl --endpoints=${ENDPOINTS} get foo # 权限被拒绝,用户名为空,因为请求未发出身份验证请求 etcdctl --endpoints=${ENDPOINTS} --user=user0:123 get foo # User0可以读取键foo etcdctl --endpoints=${ENDPOINTS} --user=user0:123 get foo1
服务注册发现
-
服务提供者向
etcd
注册(key值写入etcd
) -
客户端向
etcd
做查询 -
做完之后,客户端就能访问到服务了
为什么要服务注册与服务发现?
其实就是一个解耦的过程(服务的地址和客户端做了一个解耦)
server端流程:
启动服务
向
etcd
注册服务信息声明租约并续约
client端流程:
第一次,获取服务信息
创建监听,被动接收
更新本地的服务信息
-
新建
discovery/proto/hello.proto
// discovery\proto\hello.proto syntax = "proto3"; option go_package = "discovery/proto"; package hello; service Greeter { rpc SayHello(HelloRequest) returns (HelloReply) {} } message HelloRequest { string msg = 1; } message HelloReply { string msg = 1; }
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest protoc --go_out=plugins=grpc:. discovery/proto/hello.proto protoc --go-grpc_out=. discovery/proto/hello.proto protoc --go_out=. discovery/proto/hello.proto
protoc --proto_path=discovery/proto --go_out=discovery/proto --go_out=paths=source_relative --go-grpc_out=discovery/proto --go-grpc_out=paths=source_relative discovery/proto/hello.proto ---------- protoc: 这是 Protocol Buffers 编译器的命令 --proto_path=discovery/proto: 指定 .proto 文件所在的目录,编译器会在这个目录下查找所有 import 语句中引用的 .proto 文件 --go_out=discovery/proto: 指定生成的 Go 代码的输出目录 --go_out=paths=source_relative: 指示 protoc 生成的 Go 代码中的 import 路径是相对于源 .proto 文件的路径 --go-grpc_out=discovery/proto: 指定生成的 Go gRPC 代码的输出目录 --go-grpc_out=paths=source_relative: 指示 protoc 生成的 Go gRPC 代码中的 import 路径也是相对于源 .proto 文件的路径 discovery/proto/hello.proto: 指定要编译的 .proto 文件的路径
-
新建
discovery/server/server.go
discovery/client/client.go
// discovery/server/server.go package main import ( "context" "flag" "fmt" "log" "net" pb "etcdstudy/discovery/proto" "google.golang.org/grpc" ) var ( port = flag.Int("port", 50051, "") ) type server struct { pb.UnimplementedDiscoveryServer } func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { fmt.Printf("Recv Client msg: %v \n", in.Msg) return &pb.HelloReply{ Msg: "Hello Client", }, nil } func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalln(err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalln(err) } }
// discovery/client/client.go package main import ( "context" pb "etcdstudy/discovery/proto" "fmt" "log" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) func getServerAddr() string { return "localhost:50051" } func sayHello() { addr := getServerAddr() conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalln(err) } defer conn.Close() c := pb.NewGreeterClient(conn) in := &pb.HelloRequest{ Msg: "hello server", } r, err := c.SayHello(context.Background(), in) if err != nil { log.Fatalln(err) } fmt.Println("Recv server msg:", r.Msg) } func main() { sayHello() }
-
服务发现 新建
discovery.go
// discovery/discovery.go package discovery import ( "context" "etcdstudy/etcd" "fmt" "log" "sync" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" ) type Service struct { Name string IP string Port string Protocol string } func ServiceRegister(s *Service) { // 没有连接池,每次请求都会新建连接,所以性能不高 cli, err := clientv3.New(clientv3.Config{ Endpoints: etcd.GetEtcdEndpoints(), DialTimeout: etcd.DialTimeout, }) if err != nil { log.Fatalln(err) } defer cli.Close() // 注册服务 var grantLease bool var leaseID clientv3.LeaseID ctx := context.Background() // 判断key是否存在 getRes, err := cli.Get(ctx, s.Name, clientv3.WithCountOnly()) if err != nil { log.Fatalln(err) } if getRes.Count == 0 { grantLease = true } // 租约声明 if grantLease { leaseResp, err := cli.Grant(ctx, 10) // 10秒租约 if err != nil { log.Fatalln(err) } leaseID = leaseResp.ID // 拿到租约 } // 事务 kv := clientv3.NewKV(cli) txn := kv.Txn(ctx) _, err = txn.If(clientv3.Compare(clientv3.CreateRevision(s.Name), "=", 0)). Then( clientv3.OpPut(s.Name, s.Name, clientv3.WithLease(leaseID)), clientv3.OpPut(s.Name+".ip", s.IP, clientv3.WithLease(leaseID)), clientv3.OpPut(s.Name+".port", s.Port, clientv3.WithLease(leaseID)), clientv3.OpPut(s.Name+".protocol", s.Protocol, clientv3.WithLease(leaseID)), ). Else( clientv3.OpPut(s.Name, s.Name, clientv3.WithIgnoreLease()), clientv3.OpPut(s.Name+".ip", s.IP, clientv3.WithIgnoreLease()), clientv3.OpPut(s.Name+".port", s.Port, clientv3.WithIgnoreLease()), clientv3.OpPut(s.Name+".protocol", s.Protocol, clientv3.WithIgnoreLease()), ). Commit() if err != nil { log.Fatalln(err) } if grantLease { leaseKeepalive, err := cli.KeepAlive(ctx, leaseID) if err != nil { log.Fatalln(err) } for lease := range leaseKeepalive { fmt.Printf("leaseID: %d, ttl: %d\n", lease.ID, lease.TTL) } } } // 服务发现 type Services struct { services map[string]*Service sync.RWMutex } var myServices = &Services{ services: map[string]*Service{}, } func ServiceDiscovery(svcName string) *Service { var s *Service = nil myServices.RLock() s, _ = myServices.services[svcName] myServices.RUnlock() return s } func WatchServiceName(svcName string) { // 没有连接池,每次请求都会新建连接,所以性能不高 cli, err := clientv3.New(clientv3.Config{ Endpoints: etcd.GetEtcdEndpoints(), DialTimeout: etcd.DialTimeout, }) if err != nil { log.Fatalln(err) } defer cli.Close() // 监听服务 getRes, err := cli.Get(context.Background(), svcName, clientv3.WithPrefix()) if err != nil { log.Fatalln(err) } // 解析服务 if getRes.Count > 0 { mp := sliceToMap(getRes.Kvs) s := &Service{} if kv, ok := mp[svcName]; ok { s.Name = string(kv.Value) } if kv, ok := mp[svcName+".ip"]; ok { s.IP = string(kv.Value) } if kv, ok := mp[svcName+".port"]; ok { s.Port = string(kv.Value) } if kv, ok := mp[svcName+".protocol"]; ok { s.Protocol = string(kv.Value) } myServices.Lock() myServices.services[svcName] = s myServices.Unlock() } rch := cli.Watch(context.Background(), svcName, clientv3.WithPrefix()) for wres := range rch { for _, ev := range wres.Events { if ev.Type == clientv3.EventTypeDelete { myServices.Lock() delete(myServices.services, svcName) myServices.Unlock() } if ev.Type == clientv3.EventTypePut { myServices.Lock() if _, ok := myServices.services[svcName]; !ok { myServices.services[svcName] = &Service{} } switch string(ev.Kv.Key) { case svcName: myServices.services[svcName].Name = string(ev.Kv.Value) case svcName + ".ip": myServices.services[svcName].IP = string(ev.Kv.Value) case svcName + ".port": myServices.services[svcName].Port = string(ev.Kv.Value) case svcName + ".protocol": myServices.services[svcName].Protocol = string(ev.Kv.Value) } myServices.Unlock() } } } } func sliceToMap(list []*mvccpb.KeyValue) map[string]*mvccpb.KeyValue { mp := make(map[string]*mvccpb.KeyValue, 0) for _, item := range list { mp[string(item.Key)] = item } return mp }
// server/server.go package main import ( "context" "flag" "fmt" "log" "net" "strconv" "etcdstudy/discovery" pb "etcdstudy/discovery/proto" "google.golang.org/grpc" ) var ( port = flag.Int("port", 50051, "") ) type server struct { pb.UnimplementedDiscoveryServer } func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { fmt.Printf("Recv Client msg: %v \n", in.Msg) return &pb.HelloReply{ Msg: "Hello Client", }, nil } func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalln(err) } s := grpc.NewServer() serverRegister(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalln(err) } } func serverRegister(s grpc.ServiceRegistrar, srv pb.GreeterServer) { pb.RegisterGreeterServer(s, srv) s1 := &discovery.Service{ Name: "hello Greeter", Port: strconv.Itoa(*port), IP: "localhost", Protocol: "grpc", } go discovery.ServiceRegister(s1) }
// client/client.go package main import ( "context" "etcdstudy/discovery" pb "etcdstudy/discovery/proto" "fmt" "log" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) func getServerAddr(svcName string) string { s := discovery.ServiceDiscovery(svcName) if s == nil { return "" } if s.IP == "" || s.Port == "" { return "" } return s.IP + ":" + s.Port } func sayHello() { addr := getServerAddr("hello.Greeter") if addr == "" { log.Println("未发现可用服务") return } log.Println("连接服务:", addr) conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Println(err) return } defer conn.Close() c := pb.NewGreeterClient(conn) in := &pb.HelloRequest{ Msg: "hello server", } r, err := c.SayHello(context.Background(), in) if err != nil { log.Println(err) return } fmt.Println("Recv server msg:", r.Msg) } func main() { log.SetFlags(log.Llongfile) go discovery.WatchServiceName("hello.Greeter") for { sayHello() time.Sleep(time.Second * 2) } }
bili
<!-- -->
gin+grpc
大型项目管理/协同系统
原则:高内聚,低耦合,更适合大团队协同开发
公认的工程目录
cmd
:可执行文件,可能有多个main文件
internal
:内部代码,不希望外部访问
pkg
:公开代码,外部可以访问
config/configs/etc
:配置文件
scripts
:脚本
docs
:文档
third_party
:三方辅助工具
bin
:编译的二进制文件
build
:持续集成相关
deploy
:部署相关
test
:测试文件
api
:开放的api
接口
init
:初始化函数
创建数据库表操作
搭建项目
-
mkdir ms_project cd ms_project go work init mkdir project-user cd project-user go mod init test.com/project-user cd .. go work use ./project-user go get -u github.com/gin-gonic/gin
-
// ms_project/project-user/main.go package main import ( "context" "log" "net/http" "os" "os/signal" "syscall" "time" "github.com/gin-gonic/gin" ) func main() { r := gin.Default() srv := &http.Server{ Addr: ":80", Handler: r, } // 保证下面的启停,go携程,不阻塞直接执行完 go func() { log.Printf("web server running in %s \n", srv.Addr) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalln(err) } }() // quit := make(chan os.Signal) // SIGINT 用户发送INRT字符(Ctrl+C)触发 // SIGTERM 结束程序(可以被捕获、阻塞或忽略) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit log.Println("Shutting Down project web server...") // ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { log.Fatalln("web server Shutdown, cause by :", err) } select { case <-ctx.Done(): log.Println("wait timeout...") } log.Println("web server stop success...") }
-
由于有其他模块都需要用到启停, 将其抽取到公共模块
common
-
// ms_project/project-common/go.mod module test.com/project-common go 1.22.1
-
// ms_project go work use ./project-common
-
// ms_project/project-common/run.go package common import ( "context" "log" "net/http" "os" "os/signal" "syscall" "time" "github.com/gin-gonic/gin" ) func Run(r *gin.Engine, srvName string, addr string) { srv := &http.Server{ Addr: addr, Handler: r, } // 保证下面的启停,go携程,不阻塞直接执行完 go func() { log.Printf("%s running in %s \n", srvName, srv.Addr) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalln(err) } }() // quit := make(chan os.Signal) // SIGINT 用户发送INRT字符(Ctrl+C)触发 // SIGTERM 结束程序(可以被捕获、阻塞或忽略) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit log.Println("Shutting Down project %s...\n", srvName) // ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { log.Fatalf("%s Shutdown, cause by :%v\n", srvName, err) } select { case <-ctx.Done(): log.Println("wait timeout...") } log.Printf("%s stop success...\n", srvName) }
-
// ms_project/project-user/main.go package main import ( "github.com/gin-gonic/gin" srv "test.com/project-common" ) func main() { r := gin.Default() srv.Run(r, "project-user", ":80") }
路由
// ms_project\project-user\router\router.go package router import ( "github.com/gin-gonic/gin" "test.com/project-user/api/user" ) type Router interface { Route(r *gin.Engine) } type RouterRegister struct { } func New() *RouterRegister { return &RouterRegister{} } func (*RouterRegister) Route(ro Router, r *gin.Engine) { ro.Route(r) } func InitRouter(r *gin.Engine) { rg := New() rg.Route(&user.RouterUser{}, r) }
// ms_project\project-user\api\user\route.go package user import "github.com/gin-gonic/gin" type RouterUser struct { } func (*RouterUser) Route(r *gin.Engine) { h := HandlerUser{} r.POST("/project/login/getCaptcha", h.getCaptcha) // localhost/project/login/getCaptcha }
// ms_project\project-user\api\user\user.go package user import "github.com/gin-gonic/gin" type HandlerUser struct { } func (*HandlerUser) getCaptcha(ctx *gin.Context) { ctx.JSON(200, "getCaptcha success") }
第二种方式(在第一种的基础上更改)
// ms_project\project-user\router\router.go package router import ( "github.com/gin-gonic/gin" ) type Router interface { Route(r *gin.Engine) } type RouterRegister struct { } func New() *RouterRegister { return &RouterRegister{} } func (*RouterRegister) Route1(ro Router, r *gin.Engine) { ro.Route(r) } var routers []Router func InitRouter(r *gin.Engine) { // rg := New() // rg.Route1(&user.RouterUser{}, r) for _, ro := range routers { ro.Route(r) } } func Register(ro ...Router) { routers = append(routers, ro...) }
// ms_project\project-user\api\user\route.go package user import ( "log" "github.com/gin-gonic/gin" "test.com/project-user/router" ) func init() { log.Println("init user router") router.Register(&RouterUser{}) } type RouterUser struct { } func (*RouterUser) Route(r *gin.Engine) { h := HandlerUser{} r.POST("/project/login/getCaptcha", h.getCaptcha) // localhost/project/login/getCaptcha }
// ms_project\project-user\api\api.go package api import ( _ "test.com/project-user/api/user" )
// ms_project/project-user/main.go package main import ( "github.com/gin-gonic/gin" srv "test.com/project-common" _ "test.com/project-user/api" "test.com/project-user/router" ) func main() { r := gin.Default() router.InitRouter(r) srv.Run(r, "project-user", ":80") }
发送验证码接口
// ms_project\project-common\model.go package common type BusinessCode int type Result struct { Code BusinessCode `json:"code"` Msg string `json:"msg"` Data any `json:"data"` } func (r *Result) Success(data any) *Result { r.Code = 200 r.Msg = "success" r.Data = data return r } func (r *Result) Fail(code BusinessCode, msg string) *Result { r.Code = code r.Msg = msg return r }
// ms_project\project-user\api\user\user.go package user import ( "github.com/gin-gonic/gin" common "test.com/project-common" ) type HandlerUser struct { } func (*HandlerUser) getCaptcha(ctx *gin.Context) { rsp := &common.Result{} ctx.JSON(200, rsp.Success("123456")) }
代码思路:
获取参数
校验参数
生成验证码(随机四位1000~9999或者六位100000~999999)
调用短信平台(三方 放入go协程中执行 接口可以快速响应)
存储验证码
redis
中 过期时间15分钟
// ms_project\project-user\api\user\user.go package user import ( "log" "net/http" "time" "github.com/gin-gonic/gin" common "test.com/project-common" "test.com/project-user/pkg/model" ) type HandlerUser struct { } func (*HandlerUser) getCaptcha(ctx *gin.Context) { rsp := &common.Result{} // 获取参数 mobile := ctx.PostForm("mobile") // 校验参数 if !common.VerifyMobile(mobile) { ctx.JSON(http.StatusOK, rsp.Fail(model.NoLegalMobile, "手机号码不合法")) return } // 生成验证码(随机四位1000~9999或者六位100000~999999) code := "123456" // 调用短信平台(三方 放入go协程中执行 接口可以快速响应) go func() { time.Sleep(2 * time.Second) log.Println("短信平台调用成功,发送短信") // 存储验证码`redis`中 过期时间15分钟 log.Printf("将手机号和验证码存入Redis成功: REGISTER_%v : %v", mobile, code) }() ctx.JSON(http.StatusOK, rsp.Success(code)) }
// ms_project\project-common\validate.go package common import "regexp" // VerifyMobile 验证手机号合法性 func VerifyMobile(mobile string) bool { if mobile == "" { return false } regular := "^((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|166|198|199|(147))\\d{8}$" reg := regexp.MustCompile(regular) return reg.MatchString(mobile) }
// ms_project\project-user\pkg\model\code.go package model import common "test.com/project-common" const ( NoLegalMobile common.BusinessCode = 2001 // 手机号不合法 )
导入redis
支持_接口应用
启动Redis
go get github.com/go-redis/redis/v8
// ms_project\project-user\api\user\user.go package user import ( "context" "log" "net/http" "time" "github.com/gin-gonic/gin" common "test.com/project-common" "test.com/project-user/pkg/dao" "test.com/project-user/pkg/model" "test.com/project-user/pkg/repo" ) type HandlerUser struct { cache repo.Cache } func New() *HandlerUser { return &HandlerUser{ cache: dao.Rc, } } func (h *HandlerUser) getCaptcha(ctx *gin.Context) { rsp := &common.Result{} // 获取参数 mobile := ctx.PostForm("mobile") // 校验参数 if !common.VerifyMobile(mobile) { ctx.JSON(http.StatusOK, rsp.Fail(model.NoLegalMobile, "手机号码不合法")) return } // 生成验证码(随机四位1000~9999或者六位100000~999999) code := "123456" // 调用短信平台(三方 放入go协程中执行 接口可以快速响应) go func() { time.Sleep(2 * time.Second) log.Println("短信平台调用成功,发送短信") // redis设置后续缓存可能存在mysql当中,也可能存在mongo,也可能存在memcache中 c, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() err := h.cache.Put(c, "REGISTER_"+mobile, code, 15*time.Second) if err != nil { log.Printf("验证码存入redis出错,cause by: %v\n", err) } // 存储验证码`redis`中 过期时间15分钟 }() ctx.JSON(http.StatusOK, rsp.Success(code)) }
// ms_project\project-user\pkg\repo\cache.go package repo import ( "context" "time" ) type Cache interface { Put(ctx context.Context, key, value string, expire time.Duration) error Get(ctx context.Context, key string) (string, error) }
// ms_project\project-user\pkg\dao\redis.go package dao import ( "context" "time" "github.com/go-redis/redis/v8" ) var Rc *RedisCache type RedisCache struct { rdb *redis.Client } func init() { rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 0, }) Rc = &RedisCache{ rdb: rdb, } } func (rc *RedisCache) Put(ctx context.Context, key, value string, expire time.Duration) error { err := rc.rdb.Set(ctx, key, value, expire).Err() return err } func (rc *RedisCache) Get(ctx context.Context, key string) (string, error) { result, err := rc.rdb.Get(ctx, key).Result() return result, err }
// ms_project\project-user\api\user\route.go package user import ( "log" "github.com/gin-gonic/gin" "test.com/project-user/router" ) func init() { log.Println("init user router") router.Register(&RouterUser{}) } type RouterUser struct { } func (*RouterUser) Route(r *gin.Engine) { h := New() r.POST("/project/login/getCaptcha", h.getCaptcha) // localhost/project/login/getCaptcha }
日志
原生日志,并不能很好的区分日志级别,所以需要集成一个流行的日志库进来
uber
开源的zap
日志库:https://github.com/uber-go/zap
// 安装zap库 go get -u go.uber.org/zap // 安装日志分割库 go get -u github.com/natefinch/lumberjack
日志是公用的,所以可以放到common进行多次使用
日志存储有多种方式:按日志级别存储到不同文件,按业务逻辑来分别记录不同级别的日志,按照包结构划分比同级别日志
项目庞大时,日志记录越细分越能便捷定位
一般按业务记录即可
这里按日志级别:debug以上记录一个,info以上记录一个,warn以上记录一个
// ms_project\project-common\logs\logs.go package logs import ( "net" "net/http" "net/http/httputil" "os" "runtime/debug" "strings" "time" "github.com/gin-gonic/gin" "github.com/natefinch/lumberjack" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) var LG *zap.Logger type LogConfig struct { DebugFileName string `json:"debugFileName"` InfoFileName string `json:"infoFileName"` WarnFileName string `json:"warnFileName"` MaxSize int `json:"maxsize"` MaxAge int `json:"max_age"` MaxBackups int `json:"max_backups"` } // InitLogger 初始化Logger func InitLogger(cfg *LogConfig) (err error) { writeSyncerDebug := getLogWriter(cfg.DebugFileName, cfg.MaxSize, cfg.MaxBackups, cfg.MaxAge) writeSyncerInfo := getLogWriter(cfg.InfoFileName, cfg.MaxSize, cfg.MaxBackups, cfg.MaxAge) writeSyncerWarn := getLogWriter(cfg.WarnFileName, cfg.MaxSize, cfg.MaxBackups, cfg.MaxAge) encoder := getEncoder() // 文件输出 debugCore := zapcore.NewCore(encoder, writeSyncerDebug, zapcore.DebugLevel) infoCore := zapcore.NewCore(encoder, writeSyncerInfo, zap.InfoLevel) warnCore := zapcore.NewCore(encoder, writeSyncerWarn, zap.WarnLevel) // 标准输出 consoleEncoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) std := zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), zapcore.DebugLevel) core := zapcore.NewTee(debugCore, infoCore, warnCore, std) LG = zap.New(core, zap.AddCaller()) zap.ReplaceGlobals(LG) // 替换zap包中全局的logger实例, 后续在其他包中只需要zap.L()调用即可 return } func getEncoder() zapcore.Encoder { encoderConfig := zap.NewProductionEncoderConfig() encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder encoderConfig.TimeKey = "time" encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder encoderConfig.EncodeDuration = zapcore.SecondsDurationEncoder encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder return zapcore.NewJSONEncoder(encoderConfig) } func getLogWriter(filename string, maxSize, maxBackup, maxAge int) zapcore.WriteSyncer { lumberJackLogger := &lumberjack.Logger{ Filename: filename, MaxSize: maxSize, MaxBackups: maxBackup, MaxAge: maxAge, } return zapcore.AddSync(lumberJackLogger) } // Ginlogger 接收gin框架默认的日志 func Ginlogger() gin.HandlerFunc { return func(c *gin.Context) { start := time.Now() path := c.Request.URL.Path query := c.Request.URL.RawQuery c.Next() cost := time.Since(start) LG.Info(path, zap.Int("status", c.Writer.Status()), zap.String("method", c.Request.Method), zap.String("path", path), zap.String("query", query), zap.String("ip", c.ClientIP()), zap.String("user-agent", c.Request.UserAgent()), zap.String("errors", c.Errors.ByType(gin.ErrorTypePrivate).String()), zap.Duration("cost", cost), ) } } // GinRecovery recover 掉项目可能出现的panic,并使用zap记录相关日志 func GinRecovery(stack bool) gin.HandlerFunc { return func(c *gin.Context) { defer func() { if err := recover(); err != nil { // Check for a broken connecttion, as it is not really a // condition that warrants a panic stack trace. var brokenPipe bool if ne, ok := err.(*net.OpError); ok { if se, ok := ne.Err.(*os.SyscallError); ok { // if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "broken pipe") { // brokenPipe = true // } if strings.Contains(strings.ToLower(se.Error()), "broken pipe") { brokenPipe = true } } } httpRequest, _ := httputil.DumpRequest(c.Request, false) if brokenPipe { LG.Error(c.Request.URL.Path, zap.Any("error", err), zap.String("request", string(httpRequest)), ) // If the connection is dead, we can't write a status to it. c.Error(err.(error)) // nolint: errcheck c.Abort() return } if stack { LG.Error("[Recovery from panic]", zap.Any("error", err), zap.String("request", string(httpRequest)), zap.String("stack", string(debug.Stack())), ) } else { LG.Error("[Recovery from panic]", zap.Any("error", err), zap.String("request", string(httpRequest)), ) } c.AbortWithStatus(http.StatusInternalServerError) } }() c.Next() } }
// ms_project/project-user/main.go package main import ( "log" "github.com/gin-gonic/gin" srv "test.com/project-common" "test.com/project-common/logs" _ "test.com/project-user/api" "test.com/project-user/router" ) func main() { r := gin.Default() lc := &logs.LogConfig{ DebugFileName: "D:\\Godemo\\gRPC_Gin项目\\ms_project\\project-common\\logs\\debug\\project-debug.log", InfoFileName: "D:\\Godemo\\gRPC_Gin项目\\ms_project\\project-common\\logs\\info\\project-info.log", WarnFileName: "D:\\Godemo\\gRPC_Gin项目\\ms_project\\project-common\\logs\\error\\project-error.log", MaxSize: 500, MaxAge: 28, MaxBackups: 3, } err := logs.InitLogger(lc) if err != nil { log.Fatalln(err) } router.InitRouter(r) srv.Run(r, "project-user", ":80") }
// ms_project\project-user\api\user\user.go package user import ( "context" "log" "net/http" "time" "github.com/gin-gonic/gin" "go.uber.org/zap" common "test.com/project-common" "test.com/project-common/logs" "test.com/project-user/pkg/dao" "test.com/project-user/pkg/model" "test.com/project-user/pkg/repo" ) type HandlerUser struct { cache repo.Cache } func New() *HandlerUser { return &HandlerUser{ cache: dao.Rc, } } func (h *HandlerUser) getCaptcha(ctx *gin.Context) { rsp := &common.Result{} // 获取参数 mobile := ctx.PostForm("mobile") // 校验参数 if !common.VerifyMobile(mobile) { ctx.JSON(http.StatusOK, rsp.Fail(model.NoLegalMobile, "手机号码不合法")) return } // 生成验证码(随机四位1000~9999或者六位100000~999999) code := "123456" // 调用短信平台(三方 放入go协程中执行 接口可以快速响应) go func() { time.Sleep(2 * time.Second) zap.L().Info("短信平台调用成功,发送短信 Info") logs.LG.Debug("短信平台调用成功,发送短信 Debug") zap.L().Error("短信平台调用成功,发送短信 Error") // redis设置后续缓存可能存在mysql当中,也可能存在mongo,也可能存在memcache中 c, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() err := h.cache.Put(c, "REGISTER_"+mobile, code, 15*time.Second) if err != nil { log.Printf("验证码存入redis出错,cause by: %v\n", err) } // 存储验证码`redis`中 过期时间15分钟 }() ctx.JSON(http.StatusOK, rsp.Success(code)) }
配置 |gRPC
| etcd
go get github.com/spf13/viper
# ms_project\project-user\config\app.yaml
读配置
<!-- 备忘录gRPC -->
主要依赖
-
gin
-
gorm
-
grpc
-
mysql
安装gRPC
go get gogle.golang.org/grpcs go get gogle.golang.org/protobuf
安装protor
proyor
是可用于通信协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式
链接:百度网盘 请输入提取码 提取码:jw8a
把解压出来的bin
目录放在系统变量中
D:demo_rely\protoc-3.15.5-win64\bin
// cmd中测试 protoc --version
项目结构
grpc_todolist
项目总体
grpc-todolist/ ├── app // 各个微服务 │ ├── gateway // 网关 │ ├── task // 任务模块微服务 │ └── user // 用户模块微服务 ├── bin // 编译后的二进制文件模块 ├── config // 配置文件 ├── consts // 定义的常量 ├── doc // 接口文档 ├── idl // protoc文件 │ └── pb // 放置生成的pb文件 ├── logs // 放置打印日志模块 ├── pkg // 各种包 │ ├── e // 统一错误状态码 │ ├── discovery // etcd服务注册、keep-alive、获取服务信息等等 │ ├── res // 统一response接口返回 │ └── util // 各种工具、JWT、Logger等等.. └── types // 定义各种结构体
proto
文件定义gateway
网关部分
sgateway/ ├── cmd // 启动入口 ├── internal // 业务逻辑(不对外暴露) │ ├── handler // 视图层 │ └── service // 服务层 │ └── pb // 放置生成的pb文件 ├── logs // 放置打印日志模块 ├── middleware // 中间件 ├── routes // http 路由模块 └── rpc // rpc 调用
user && task
用户与任务模块
user/ ├── cmd // 启动入口 └──internal // 业务逻辑(不对外暴露) ├── service // 业务服务 └── repository // 持久层 └── db // 视图层 ├── dao // 对数据库进行操作 └── model // 定义数据库的模型
etcd
链接:百度网盘 请输入提取码 提取码:7777 etcdkeeper
链接:百度网盘 请输入提取码 提取码:7777
-
api-gateway/
-
user/
-
cmd/
-
main.go
-
-
config/
-
config.go
-
config.yml
-
-
discovery/
-
internal/
-
hander/
-
repository/
-
db_init.go
-
-
service/
-
-
pkg/
-
e/
-
res/
-
util/
-
-
user
模块创建数据库
// config/config.yml server: domain: user version: 1.0 jwtSecret: FanOne grpcAddress: "127.0.0.1:10001" mysql: driverName: mysql host: 127.0.0.1 port: 3306 database: grpc_todolist_tmp username: root password: l20030328 charset: utf8mb4 etcd: address: 127.0.0.1:2379 redis: address: 127.0.0.1:6379 password:
// cmd cd user // /user> go get github.com/spf13/viper
// config/config.go package config import ( "fmt" "os" "path/filepath" "github.com/spf13/viper" ) func InitConfig() { wDir, _ := os.Getwd() // 获取当前工作目录 workDir := filepath.Dir(wDir) // 获取到文件的上层目录 fmt.Println(workDir) fmt.Println("-------------------") viper.SetConfigName("config") // 配置文件的文件名 viper.SetConfigType("yml") // 配置文件的后缀 viper.AddConfigPath(workDir + "/config") // 获取到配置文件的路径 err := viper.ReadInConfig() if err != nil { fmt.Println("err:", err) return } }
// user/internal/repository/db_init.go package repository import ( "strings" "time" "github.com/gin-gonic/gin" "github.com/spf13/viper" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/logger" "gorm.io/gorm/schema" ) var DB *gorm.DB func InitDB() { host := viper.GetString("mysql.host") port := viper.GetString("mysql.port") database := viper.GetString("mysql.database") username := viper.GetString("mysql.username") password := viper.GetString("mysql.password") charset := viper.GetString("mysql.charset") dsn := strings.Join([]string{username, ":", password, "@tcp(", host, ":", port, ")/", database, "?charset=" + charset + "&parseTime=true"}, "") err := Database(dsn) if err != nil { panic(err) } } // gorm的定义 func Database(dsn string) error { var ormLogger logger.Interface if gin.Mode() == "debug" { ormLogger = logger.Default.LogMode(logger.Info) } else { ormLogger = logger.Default } db, err := gorm.Open(mysql.New(mysql.Config{ DSN: dsn, DefaultStringSize: 256, DisableDatetimePrecision: true, // 禁止datatime的精度, mysql5.6之前的数据是不支持的 DontSupportRenameIndex: true, // 重命名索引的时候采取删除并新建的方式 DontSupportRenameColumn: true, // 用change重命名列 SkipInitializeWithVersion: false, // 根据版本自动配置 // 使用gorm.Open和MySQL的Config结构来初始化GORM连接 }), &gorm.Config{ // &gorm.Config设置GORM的额外配置,如日志记录器、命名策略 Logger: ormLogger, NamingStrategy: schema.NamingStrategy{ SingularTable: true, // 设置表名不自动加s }, }) if err != nil { return err } sqlDB, _ := db.DB() sqlDB.SetMaxIdleConns(20) // 设置连接池,空闲 sqlDB.SetMaxOpenConns(100) // 设置最大连接数 sqlDB.SetConnMaxLifetime(time.Second * 30) DB = db migration() return err }
// cmd go get github.com/gin-gonic/gin go get gorm.io/driver/mysql
-
user/internal/repository/
-
migration.go
-
// user/internal/repository/migration.go package repository import "fmt" func migration() { err := DB.Set("gorm:table_options", "charset=utf8mb4"). AutoMigrate( &User{}, ) //GORM提供的一个非常方便的函数,用于自动迁移或创建表结构 // Set为即将迁移的表设置一些额外的选项, 设置了字符集为utf8mb4 if err != nil { fmt.Println("migration err", err) } }
-
user/internal/repository/
-
user.go
-
// user/internal/repository/user.go package repository type User struct { UserId uint `gorm:"primarykey"` UserName string `gorm:"unique"` NickName string PasswordDigest string }
// cmd net start mysql mysql -u root -p create database grpc_todolist_tmp charset=utf8mb4; net stop mysql
-
user/cmd/
-
main.go
-
// user/cmd/main.go package main import ( "memo_grpc/user/config" "memo_grpc/user/internal/repository" ) func main() { config.InitConfig() repository.InitDB() }
user
模块dao
层
-
user/internal/service/
-
pb/userModels.proto
-
// user/internal/service/pb/userModels.proto syntax="proto3"; package pb; option go_package="/internal/service;service"; // /internal/service 生成文件所在的路径 ;service属于service包 message UserModel{ // @inject_tag: json:"user_id" uint32 UserID=1; // @inject_tag: json:"user_name" string UserName=2; // @inject_tag: json:"nick_name" string NickName=3; }
-
user/internal/service/pb/
-
userService.proto
-
// user/internal/service/pb/userService.proto syntax = "proto3"; package pb; import "userModels.proto"; option go_package="/internal/service;service"; // /internal/service 生成文件所在的路径 ;service属于service包 message UserRequest { //@inject_tag: json:"nick_name" form:"nick_name" string NickName=1; //@inject_tag: json:"user_name" form:"user_name" string UserName=2; //@inject_tag: json:"password" form:"password" string password=3; //@inject_tag: json:"password_confirm" form:"password_confirm" string PasswordConfirm=4; } message UserDetailResponse{ UserModel UserDetail=1; uint32 Code=2; } service UserService { rpc UserLogin(UserRequest) returns(UserDetailResponse); rpc UserRegister(UserRequest) returns(UserDetailResponse); }
// 下载安装protoc-gen-go git clone https://gitcode.com/golang/protobuf.git 进入到 protobuf/protoc-gen-go 目录执行以下命令: go build -o protoc-gen-go main.go 复制 protoc-gen-go.exe 到 GOROOT的 bin 目录 // 我的protoc-gen-go.exe文件在GO/bin目录下
// cmd cd ./user/ protoc -I internal/service/pb internal/service/pb/*.proto --go_out=plugins=grpc:. // internal/service/pb pb所在的文件 internal/service/pb/*.proto是需要生成的文件
-
user/pkg/e/
-
code.go
-
msg.go
-
// user/pkg/e/code.go package e const ( Success = 200 Error = 500 InvalidParams = 400 )
// user/pkg/e/msg.go package e var MsgFlags = map[uint]string{ Success: "ok", Error: "fail", InvalidParams: "请求的参数错误", } // GetMsg 获取状态码对应的信息 func GetMsg(code uint) string { msg, ok := MsgFlags[code] if ok { return msg } return MsgFlags[Error] }
// cmd 加密包 go get golang.org/x/crypto/bcrypt
// user/internal/repository/user.go package repository import ( "errors" "memo_grpc/user/internal/service" "golang.org/x/crypto/bcrypt" "gorm.io/gorm" ) type User struct { UserId uint `gorm:"primarykey"` UserName string `gorm:"unique"` NickName string PasswordDigest string } const ( PasswordCost = 12 // 密码加密难度 ) // CheckUserExist检查用户是否存在 func (user *User) CheckUserExist(req *service.UserRequest) bool { if err := DB.Where("user_name=?", req.UserName).First(&user).Error; err == gorm.ErrRecordNotFound { return false } return true } // ShowUserInfo获取用户信息 func (user *User) ShowUserInfo(req *service.UserRequest) error { if exist := user.CheckUserExist(req); exist { return nil } return errors.New("UserName Not Exist") } // UserCreate 创建用户 func (*User) UserCreate(req *service.UserRequest) error { var count int64 DB.Where("username=?", req.UserName).Count(&count) if count != 0 { return errors.New("UserName Exist") } user := User{ UserName: req.UserName, NickName: req.NickName, } // 密码的加密 _ = user.SetPassword(req.Password) err := DB.Create(&user).Error; return err } // SetPassword 加密密码 func (user *User) SetPassword(password string) error { bytes, err := bcrypt.GenerateFromPassword([]byte(password), PasswordCost) if err != nil { return err } user.PasswordDigest = string(bytes) return nil } // CheckPassword 检验密码 func (user *User) CheckPassword(password string) bool { err := bcrypt.CompareHashAndPassword([]byte(user.PasswordDigest), []byte(password)) return err == nil } // BuildUser序列化User func BuildUser(item User) *service.UserModel { userModel := service.UserModel{ UserID: uint32(item.UserId), UserName: item.UserName, NickName: item.NickName, } return &userModel }
-
user/internal/handler/
-
user.go
-
// user/internal/handler/user.go package handler import ( "context" "memo_grpc/user/internal/repository" "memo_grpc/user/internal/service" "memo_grpc/user/pkg/e" ) type UserService struct { } func NewUserService() *UserService { return &UserService{} } // UserLogin 用户登录 token不在服务层,不对数据库进行操作,在网关层操作 func (*UserService) UserLogin(ctx context.Context, req *service.UserRequest) (resp *service.UserDetailResponse, err error) { var user repository.User resp = new(service.UserDetailResponse) resp.Code = e.Success err = user.ShowUserInfo(req) if err != nil { resp.Code = e.Error return resp, err } resp.UserDetail = repository.BuildUser(user) return resp, nil } // 用户注册 func (*UserService) UserRegister(ctx context.Context, req *service.UserRequest) (resp *service.UserDetailResponse, err error) { var user repository.User resp = new(service.UserDetailResponse) resp.Code = e.Success err = user.UserCreate(req) if err != nil { resp.Code = e.Error return resp, err } resp.UserDetail = repository.BuildUser(user) return resp, nil }
etcd
模块1
// cmd go get go.etcd.io/etcd/client/v3 go get github.com/sirupsen/logrus go get google.golang.org/grpc/resolver
-
user/discovery/
-
register.go
-
resolver.go
-
instance.go
-
// user/discovery/register.go package discovery import ( "github.com/sirupsen/logrus" clientv3 "go.etcd.io/etcd/client/v3" ) type Register struct { EtcdAddrs []string DialTimeout int // 超时时间 closeCh chan struct{} leasesID clientv3.LeaseID keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse srvInfo Server srvTTL int64 cli *clientv3.Client logger *logrus.Logger }
// user/discovery/resolver.go package discovery import ( "github.com/sirupsen/logrus" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/resolver" ) type Resolver struct { schema string EtcdAddrs []string DialTimeout int closeCh chan struct{} watchCh clientv3.WatchChan cli *clientv3.Client keyPrifix string srvAddrsList []resolver.Address cc resolver.ClientConn logger *logrus.Logger }
// user/discovery/instance.go package discovery import ( "encoding/json" "errors" "fmt" "strings" ) type Server struct { Name string `json:"name"` Addr string `json:"addr"` Version string `json:"version"` // 版本 Weight int64 `json:"weight"` // 权重 } func BuildPrefix(server Server) string { if server.Version == "" { return fmt.Sprintf("/%s/", server.Name) } return fmt.Sprintf("/%s/%s/", server.Name, server.Version) } func BuildRegisterPath(server Server) string { return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr) } // ParseValue 将value值反序列化到一个Server实例当中 func ParseValue(value []byte) (Server, error) { server := Server{} if err := json.Unmarshal(value, &server); err != nil { return server, err } return server, nil } func SplitPath(path string) (Server, error) { server := Server{} strs := strings.Split(path, "/") if len(strs) == 0 { return server, errors.New("invalid path") } server.Addr = strs[len(strs)-1] return server, nil }
etcd
模块2
// user/discovery/instance.go package discovery import ( "encoding/json" "errors" "fmt" "strings" "google.golang.org/grpc/resolver" ) type Server struct { Name string `json:"name"` Addr string `json:"addr"` Version string `json:"version"` // 版本 Weight int64 `json:"weight"` // 权重 } func BuildPrefix(server Server) string { if server.Version == "" { return fmt.Sprintf("/%s/", server.Name) } return fmt.Sprintf("/%s/%s/", server.Name, server.Version) } func BuildRegisterPath(server Server) string { return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr) } // ParseValue 将value值反序列化到一个Server实例当中 func ParseValue(value []byte) (Server, error) { server := Server{} if err := json.Unmarshal(value, &server); err != nil { return server, err } return server, nil } func SplitPath(path string) (Server, error) { server := Server{} strs := strings.Split(path, "/") if len(strs) == 0 { return server, errors.New("invalid path") } server.Addr = strs[len(strs)-1] return server, nil } func Exist(l []resolver.Address, addr resolver.Address) bool { for i := range l { if l[i].Addr == addr.Addr { return true } } return false }
// user/discovery/register.go package discovery import ( "context" "encoding/json" "errors" "fmt" "strings" "time" "github.com/sirupsen/logrus" clientv3 "go.etcd.io/etcd/client/v3" ) type Register struct { EtcdAddrs []string DialTimeout int // 超时时间 closeCh chan struct{} // 通常用于通知或同步,关闭与etcd的连接或停止某些操作 leasesID clientv3.LeaseID keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse srvInfo Server srvTTL int64 cli *clientv3.Client logger *logrus.Logger } // NewRegister 基于ETCD创建一个register func NewRegister(etcdAddrs []string, logger *logrus.Logger) *Register { return &Register{ EtcdAddrs: etcdAddrs, DialTimeout: 3, logger: logger, } } // 初始化自己的register func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) { var err error if strings.Split(srvInfo.Addr, ":")[0] == "" { return nil, errors.New("invalid ip address") } // 初始化 新建一个连接 if r.cli, err = clientv3.New(clientv3.Config{ Endpoints: r.EtcdAddrs, DialTimeout: time.Duration(r.DialTimeout) * time.Second, }); err != nil { return nil, err } r.srvInfo = srvInfo r.srvTTL = ttl if err = r.register(); err != nil { return nil, err } // 注册 r.closeCh = make(chan struct{}) go r.keepAlive() return r.closeCh, nil } // 创建etcd自带的那些实例 func (r *Register) register() error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second) defer cancel() // defer cancel() 的作用是确保无论函数何时返回(正常返回或因为错误而返回),cancel 函数都会被调用 leaseResp, err := r.cli.Grant(ctx, r.srvTTL) if err != nil { return err } r.leasesID = leaseResp.ID if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), r.leasesID); err != nil { return err } data, err := json.Marshal(r.srvInfo) if err != nil { return err } _, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID)) return err } func (r *Register) keepAlive() { ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second) for { select { case <-r.closeCh: if err := r.unregister(); err != nil { // 注销服务器信息 fmt.Println("unregister failed error", err) } if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil { fmt.Println("revoke fail") // 撤销租约(调用Etcd客户端的Revoke方法) } case res := <-r.keepAliveCh: if res == nil { // 尝试重新注册服务器信息 if err := r.register(); err != nil { fmt.Println("register err") } } case <-ticker.C: if r.keepAliveCh == nil { if err := r.register(); err != nil { fmt.Println("register err") } } } } } func (r *Register) unregister() error { _, err := r.cli.Delete(context.Background(), BuildRegisterPath(r.srvInfo)) return err }
etcd
模块3
// user/cmd/main.go package main import ( "memo_grpc/user/config" "memo_grpc/user/discovery" "memo_grpc/user/internal/handler" "memo_grpc/user/internal/repository" "memo_grpc/user/internal/service" "net" "github.com/sirupsen/logrus" "github.com/spf13/viper" "google.golang.org/grpc" ) func main() { config.InitConfig() repository.InitDB() // etcd 地址 etcdAddress := []string{viper.GetString("etcd.address")} // 服务的注册 etcdRegister := discovery.NewRegister(etcdAddress, logrus.New()) grpcAddress := viper.GetString("server.grpcAddress") userNode := discovery.Server{ Name: viper.GetString("server.domain"), Addr: grpcAddress, } server := grpc.NewServer() defer server.Stop() // 绑定服务 返回的服务实例绑定到GRPC服务器上->客户端可以通过GRPC调用此服务的方法 service.RegisterUserServiceServer(server, handler.NewUserService()) lis, err := net.Listen("tcp", grpcAddress) // 在指定的GRPC地址上监听TCP连接 if err != nil { panic(err) } if _, err := etcdRegister.Register(userNode, 10); err != nil { panic(err) // 将服务器节点注册到etcd上,并指定租约时间为10秒 } if err = server.Serve(lis); err != nil { panic(err) // 启动GRPC服务器,并使其开始监听并处理传入的连接 } }
网关模块_user
接入网关1
-
api-gateway/
-
cmd/
-
config/
-
internal/
-
handler/
-
service/
-
pb/
-
复制
user
层pb
下的内容
-
-
复制
user
层service
下的内容
-
-
-
middlerware/
-
pkg/
-
e/
-
copy
-
-
res/
-
util/
-
-
routes/
-
wrappers/
-
// cmd cd api-gateway go mod init api-gateway go mod tidy
<!-- ...表示可变参数或切片参数,允许传入不定量的参数,...interface=>[]interface{} -->
-
api-gateway/internal/hander/
-
pkg.go
-
user.go
-
// api-gateway/internal/hander/pkg.go package handler import "errors" func PanicIfUserError(err error) { if err != nil { err = errors.New("User Service--" + err.Error()) panic(err) } }
// api-gateway/internal/hander/user.go package handler import ( "api-gateway/internal/service" "api-gateway/pkg/e" "api-gateway/pkg/res" "api-gateway/pkg/util" "context" "net/http" "github.com/gin-gonic/gin" ) // UserRegister 用户注册 func UserRegister(ginCtx *gin.Context) { var userReq service.UserRequest PanicIfUserError(ginCtx.Bind(&userReq)) // 尝试将HTTP请求体中的数据绑定到userReq中 // gin.Key 中获取服务实例 userService := ginCtx.Keys["user"].(service.UserServiceClient) // 断言为service.UserServiceClient类型 userResp, err := userService.UserRegister(context.Background(), &userReq) PanicIfUserError(err) r := res.Response{ Data: userResp, Status: uint(userResp.Code), Msg: e.GetMsg(uint(userResp.Code)), Error: err.Error(), } ginCtx.JSON(http.StatusOK, r) } // UserLogin 用户登录 func UserLogin(ginCtx *gin.Context) { var userReq service.UserRequest PanicIfUserError(ginCtx.Bind(&userReq)) // 尝试将HTTP请求体中的数据绑定到userReq中 // gin.Key 中获取服务实例 userService := ginCtx.Keys["user"].(service.UserServiceClient) // 断言为service.UserServiceClient类型 userResp, err := userService.UserLogin(context.Background(), &userReq) PanicIfUserError(err) token, err := util.GenerateToken(uint(userResp.UserDetail.UserID)) r := res.Response{ Data: res.TokenData{ User: userResp.UserDetail, Token: token, }, Status: uint(userResp.Code), Msg: e.GetMsg(uint(userResp.Code)), Error: err.Error(), } ginCtx.JSON(http.StatusOK, r) }
-
// api-gateway/routes/
-
router.go
-
// api-gateway/routes/router.go package routes import ( "api-gateway/internal/handler" "api-gateway/middlerware" "github.com/gin-gonic/gin" ) func NewRouter(service ...interface{}) *gin.Engine { // ...表示可变参数或切片参数,允许传入不定量的参数,...interface=>[]interface{} ginRouter := gin.Default() ginRouter.Use(middlerware.Cors(), middlerware.InitMiddleWare(service)) v1 := ginRouter.Group("api/v1") { v1.GET("ping", func(c *gin.Context) { c.JSON(200, "success") }) // 用户服务 v1.POST("/user/register", handler.UserRegister) v1.POST("/user/login", handler.UserLogin) } return ginRouter }
-
api-gateway/middlerware/
-
cors.go
-
jwt.go
-
// api-gateway/middlerware/cors.go // copy
// api-gateway/middlerware/jwt.go // 之后写
-
api-gateway/pkg/res/
-
response.go
-
// api-gateway/pkg/res/response.go package res import ( "api-gateway/pkg/e" "github.com/gin-gonic/gin" ) // Response 基础序列化器 type Response struct { Status uint `json:"Status"` Data interface{} `json:"Data"` Msg string `json:"Msg"` Error string `json:"Error"` } // DataList 带有总数的Data结构 type DataList struct { Item interface{} `json:"Item"` Total uint `json:"Total"` } // TokenData 带有token的Data结构 type TokenData struct { User interface{} `json:"User"` Token string `json:"Token"` } func ginH(msgCode int, data interface{}) gin.H { return gin.H{ "code": msgCode, "msg": e.GetMsg(uint(msgCode)), "data": data, } }
-
api-gateway/pkg/util/
-
token.go
-
// api-gateway/pkg/util/token.go package util import ( "time" "github.com/dgrijalva/jwt-go" "github.com/spf13/viper" ) var jwtSecret = []byte(viper.GetString("service.jwtSecret")) type Claims struct { UserId uint `json:"user_id"` jwt.StandardClaims } // GenerateToken 签发用户Token func GenerateToken(userID uint) (string, error) { nowTime := time.Now() expireTime := nowTime.Add(24 * time.Hour) claims := Claims{ UserId: userID, StandardClaims: jwt.StandardClaims{ ExpiresAt: expireTime.Unix(), Issuer: "38384-SearchEngine", }, } tokenClaims := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) // 加密 token, err := tokenClaims.SignedString(jwtSecret) // 签名 return token, err } // ParseToken 验证用户token func ParseToken(token string) (*Claims, error) { tokenClaims, err := jwt.ParseWithClaims(token, &Claims{}, func(token *jwt.Token) (interface{}, error) { return jwtSecret, nil }) if tokenClaims != nil { if claims, ok := tokenClaims.Claims.(*Claims); ok && tokenClaims.Valid { return claims, nil } } return nil, err }
-
api-gateway/middlerware/
-
init.go
-
// api-gateway/middlerware/init.go package middlerware import "github.com/gin-gonic/gin" func InitMiddleWare(service []interface{}) gin.HandlerFunc { return func(c *gin.Context) { c.Keys = make(map[string]interface{}) c.Keys["user"] = service[0] c.Next() } }
-
api-gateway/config/
-
config.yml
-
config.go
-
// api-gateway/config/config.yml service: domain: api-gateway prot: :4000 jwtSecret: slllbyxinniiuniu version: 1.0
// api-gateway/config/config.go package config import ( "fmt" "os" "path/filepath" "github.com/spf13/viper" ) func InitConfig() { wDir, _ := os.Getwd() // 获取当前工作目录 workDir := filepath.Dir(wDir) // 获取到文件的上层目录 viper.SetConfigName("config") // 配置文件的文件名 viper.SetConfigType("yml") // 配置文件的后缀 viper.AddConfigPath(workDir + "/config") // 获取到配置文件的路径 err := viper.ReadInConfig() if err != nil { fmt.Println("err:", err) return } }
-
api-gateway/cmd/
-
main.go
-
// api-gateway/cmd/main.go package main import "api-gateway/config" func main() { config.InitConfig() }
网关模块_user
接入网关2
-
copy->user/discovery
// api-gateway/discovery/resolver.go package discovery import ( "context" "time" "github.com/sirupsen/logrus" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/resolver" ) const ( schema = "etcd" ) // Resolver for grpc client type Resolver struct { schema string EtcdAddrs []string DialTimeout int closeCh chan struct{} watchCh clientv3.WatchChan cli *clientv3.Client keyPrifix string srvAddrsList []resolver.Address cc resolver.ClientConn logger *logrus.Logger } // NewResolver create a new resolver.Builder base on etcd func NewResolver(etcdAddrs []string, logger *logrus.Logger) *Resolver { return &Resolver{ schema: schema, EtcdAddrs: etcdAddrs, DialTimeout: 3, logger: logger, } } // Scheme returns the scheme supported by this resolver. func (r *Resolver) Scheme() string { return r.schema } // Build creates a new resolver.Resolver for the given target func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { r.cc = cc r.keyPrifix = BuildPrefix(Server{Name: target.Endpoint, Version: target.Authority}) if _, err := r.start(); err != nil { return nil, err } return r, nil } // ResolveNow resolver.Resolver interface func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {} // Close resolver.Resolver interface func (r *Resolver) Close() { r.closeCh <- struct{}{} } // start func (r *Resolver) start() (chan<- struct{}, error) { var err error r.cli, err = clientv3.New(clientv3.Config{ Endpoints: r.EtcdAddrs, DialTimeout: time.Duration(r.DialTimeout) * time.Second, }) if err != nil { return nil, err } resolver.Register(r) r.closeCh = make(chan struct{}) if err = r.sync(); err != nil { return nil, err } go r.watch() return r.closeCh, nil } // watch update events func (r *Resolver) watch() { ticker := time.NewTicker(time.Minute) r.watchCh = r.cli.Watch(context.Background(), r.keyPrifix, clientv3.WithPrefix()) for { select { case <-r.closeCh: return case res, ok := <-r.watchCh: if ok { r.update(res.Events) } case <-ticker.C: if err := r.sync(); err != nil { r.logger.Error("sync failed", err) } } } } // update func (r *Resolver) update(events []*clientv3.Event) { for _, ev := range events { var info Server var err error switch ev.Type { case clientv3.EventTypePut: info, err = ParseValue(ev.Kv.Value) if err != nil { continue } addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight} if !Exist(r.srvAddrsList, addr) { r.srvAddrsList = append(r.srvAddrsList, addr) r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList}) } case clientv3.EventTypeDelete: info, err = SplitPath(string(ev.Kv.Key)) if err != nil { continue } addr := resolver.Address{Addr: info.Addr} if s, ok := Remove(r.srvAddrsList, addr); ok { r.srvAddrsList = s r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList}) } } } } // sync 同步获取所有地址信息 func (r *Resolver) sync() error { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix()) if err != nil { return err } r.srvAddrsList = []resolver.Address{} for _, v := range res.Kvs { info, err := ParseValue(v.Value) if err != nil { continue } addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight} r.srvAddrsList = append(r.srvAddrsList, addr) } r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList}) return nil }
// api-gateway/discovery/instance.go package discovery import ( "encoding/json" "errors" "fmt" "strings" "google.golang.org/grpc/resolver" ) type Server struct { Name string `json:"name"` Addr string `json:"addr"` Version string `json:"version"` // 版本 Weight int64 `json:"weight"` // 权重 } func BuildPrefix(server Server) string { if server.Version == "" { return fmt.Sprintf("/%s/", server.Name) } return fmt.Sprintf("/%s/%s/", server.Name, server.Version) } func BuildRegisterPath(server Server) string { return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr) } // ParseValue 将value值反序列化到一个Server实例当中 func ParseValue(value []byte) (Server, error) { server := Server{} if err := json.Unmarshal(value, &server); err != nil { return server, err } return server, nil } func SplitPath(path string) (Server, error) { server := Server{} strs := strings.Split(path, "/") if len(strs) == 0 { return server, errors.New("invalid path") } server.Addr = strs[len(strs)-1] return server, nil } func Exist(l []resolver.Address, addr resolver.Address) bool { for i := range l { if l[i].Addr == addr.Addr { return true } } return false } func Remove(s []resolver.Address, addr resolver.Address) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr.Addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false }
// api-gateway/cmd/main.go package main import ( "api-gateway/config" "api-gateway/discovery" "api-gateway/internal/service" "api-gateway/routes" "fmt" "net/http" "os" "os/signal" "syscall" "time" "github.com/sirupsen/logrus" "github.com/spf13/viper" "google.golang.org/grpc" "google.golang.org/grpc/resolver" ) func main() { config.InitConfig() // 服务发现 etcdAddress := []string{viper.GetString("etcd.address")} etcdRegister := discovery.NewResolver(etcdAddress, logrus.New()) resolver.Register(etcdRegister) go startListen() { osSignal := make(chan os.Signal, 1) signal.Notify(osSignal, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) s := <-osSignal fmt.Println("exit!", s) } fmt.Println("gateway listen on :3000") } func startListen() { opts := []grpc.DialOption{} userConn, _ := grpc.Dial("127.0.0.1:10001", opts...) userService := service.NewUserServiceClient(userConn) ginRouter := routes.NewRouter(userService) service := &http.Server{ Addr: viper.GetString("server.port"), Handler: ginRouter, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } err := service.ListenAndServe() if err != nil { fmt.Println("绑定失败, 端口可能被占用", err) } }
user
模块接口测试
// cmd net start mysql mysql -u root -p net stop mysql
// api-gateway/cmd/main.go package main import ( "api-gateway/config" "api-gateway/discovery" "api-gateway/internal/service" "api-gateway/routes" "fmt" "net/http" "os" "os/signal" "syscall" "time" "github.com/sirupsen/logrus" "github.com/spf13/viper" "google.golang.org/grpc" "google.golang.org/grpc/resolver" ) func main() { config.InitConfig() // 服务发现 etcdAddress := []string{viper.GetString("etcd.address")} etcdRegister := discovery.NewResolver(etcdAddress, logrus.New()) resolver.Register(etcdRegister) go startListen() { osSignal := make(chan os.Signal, 1) signal.Notify(osSignal, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) s := <-osSignal fmt.Println("exit!", s) } fmt.Println("gateway listen on :4000") } func startListen() { opts := []grpc.DialOption{ grpc.WithInsecure(), } userConn, err := grpc.Dial("127.0.0.1:10001", opts...) if err != nil { panic(err) } userService := service.NewUserServiceClient(userConn) ginRouter := routes.NewRouter(userService) service := &http.Server{ Addr: viper.GetString("server.port"), Handler: ginRouter, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } err = service.ListenAndServe() if err != nil { fmt.Println("绑定失败, 端口可能被占用", err) } }
-
服务未被监听修改
api...main.go
// user/internal/handler/user.go package handler import ( "context" "memo_grpc/internal/repository" "memo_grpc/internal/service" "memo_grpc/pkg/e" ) type UserService struct { } func NewUserService() *UserService { return &UserService{} } // UserLogin 用户登录 token不在服务层,不对数据库进行操作,在网关层操作 func (*UserService) UserLogin(ctx context.Context, req *service.UserRequest) (resp *service.UserDetailResponse, err error) { var user repository.User resp = new(service.UserDetailResponse) resp.Code = e.Success err = user.ShowUserInfo(req) if err != nil { resp.Code = e.Error return resp, err } resp.UserDetail = repository.BuildUser(user) return resp, nil } // 用户注册 func (*UserService) UserRegister(ctx context.Context, req *service.UserRequest) (resp *service.UserDetailResponse, err error) { var user repository.User resp = new(service.UserDetailResponse) resp.Code = e.Success user, err = user.UserCreate(req) if err != nil { resp.Code = e.Error return resp, err } resp.UserDetail = repository.BuildUser(user) return resp, nil }
// user/internal/repository/user.go package repository import ( "errors" "memo_grpc/internal/service" "golang.org/x/crypto/bcrypt" "gorm.io/gorm" ) type User struct { UserId uint `gorm:"primarykey"` UserName string `gorm:"unique"` NickName string PasswordDigest string } const ( PasswordCost = 12 // 密码加密难度 ) // CheckUserExist检查用户是否存在 func (user *User) CheckUserExist(req *service.UserRequest) bool { if err := DB.Where("user_name=?", req.UserName).First(&user).Error; err == gorm.ErrRecordNotFound { return false } return true } // ShowUserInfo获取用户信息 func (user *User) ShowUserInfo(req *service.UserRequest) error { if exist := user.CheckUserExist(req); exist { return nil } return errors.New("UserName Not Exist") } // UserCreate 创建用户 func (*User) UserCreate(req *service.UserRequest) (user User, err error) { var count int64 DB.Where("username=?", req.UserName).Count(&count) if count != 0 { return User{}, errors.New("UserName Exist") } user = User{ UserName: req.UserName, NickName: req.NickName, } // 密码的加密 _ = user.SetPassword(req.Password) err = DB.Create(&user).Error return user, err } // SetPassword 加密密码 func (user *User) SetPassword(password string) error { bytes, err := bcrypt.GenerateFromPassword([]byte(password), PasswordCost) if err != nil { return err } user.PasswordDigest = string(bytes) return nil } // CheckPassword 检验密码 func (user *User) CheckPassword(password string) bool { err := bcrypt.CompareHashAndPassword([]byte(user.PasswordDigest), []byte(password)) return err == nil } // BuildUser序列化User func BuildUser(item User) *service.UserModel { userModel := service.UserModel{ UserID: uint32(item.UserId), UserName: item.UserName, NickName: item.NickName, } return &userModel }
// api-gateway/internal/handler/user.go package handler import ( "api-gateway/internal/service" "api-gateway/pkg/e" "api-gateway/pkg/res" "api-gateway/pkg/util" "context" "net/http" "github.com/gin-gonic/gin" ) // UserRegister 用户注册 func UserRegister(ginCtx *gin.Context) { var userReq service.UserRequest PanicIfUserError(ginCtx.Bind(&userReq)) // 尝试将HTTP请求体中的数据绑定到userReq中 // gin.Key 中获取服务实例 userService := ginCtx.Keys["user"].(service.UserServiceClient) // 断言为service.UserServiceClient类型 userResp, err := userService.UserRegister(context.Background(), &userReq) PanicIfUserError(err) r := res.Response{ Data: userResp, Status: uint(userResp.Code), Msg: e.GetMsg(uint(userResp.Code)), } ginCtx.JSON(http.StatusOK, r) } // UserLogin 用户登录 func UserLogin(ginCtx *gin.Context) { var userReq service.UserRequest PanicIfUserError(ginCtx.Bind(&userReq)) // 尝试将HTTP请求体中的数据绑定到userReq中 // gin.Key 中获取服务实例 userService := ginCtx.Keys["user"].(service.UserServiceClient) // 断言为service.UserServiceClient类型 userResp, err := userService.UserLogin(context.Background(), &userReq) PanicIfUserError(err) token, err := util.GenerateToken(uint(userResp.UserDetail.UserID)) r := res.Response{ Data: res.TokenData{ User: userResp.UserDetail, Token: token, }, Status: uint(userResp.Code), Msg: e.GetMsg(uint(userResp.Code)), } ginCtx.JSON(http.StatusOK, r) }
task
模块_整体开发
-
api-gateway/task/
-
cmd/
-
copy user/config
-
copy user/discovery/
-
internal/
-
handler/
-
repository/
-
copy user/db_init.go
-
copy user/migration.go
-
-
service/
-
pb/
-
taskModels.proto
-
taskService.proto
-
-
-
-
pkg/
-
copy user/pkg/e
-
-
// task/internal/service/pb/taskModels.proto syntax="proto3"; package pb; option go_package = "/internal/service;service"; message TaskModel { // @inject_tag: json:"favorite_id" uint32 TaskID=1; // @inject_tag: json:"user_id" uint32 UserID=2; // @inject_tag: json:"status" uint32 Status=3; // @inject_tag: json:"title" string Title=4; // @inject_tag: json:"content" string Content=5; // @inject_tag: json:"start_time" uint32 StartTime=6; // @inject_tag: json:"end_time" uint32 EndTime=7; }
// task/internal/service/pb/taskService.proto syntax="proto3"; package pb; import "taskModels.proto"; option go_package = "/internal/service;service"; message TaskRequest{ uint32 TaskID=1; uint32 UserID=2; uint32 Status=3; string Title=4; string Content=5; uint32 StartTime=6; uint32 EndTime=7; } message TasksDetailResponse{ repeated TaskModel TaskDetail=1; uint32 Code=2; } message CommonResponse{ uint32 Code=1; string Msg=2; string Data=3; } service TaskService{ rpc TaskCreate(TaskRequest) returns(CommonResponse); rpc TaskUpdate(TaskRequest) returns(CommonResponse); rpc TaskShow(TaskRequest) returns(TasksDetailResponse); rpc TaskDelete(TaskRequest) returns(CommonResponse); }
// cmd cd ../task/ protoc -I internal/service/pb internal/service/pb/*.proto --go_out=plugins=grpc:.
// task/config/config,yml server: domain: task version: 1.0 jwtSecret: FanOne grpcAddress: "127.0.0.1:10002" mysql: driverName: mysql host: 127.0.0.1 port: 3306 database: grpc_todolist_tmp username: root password: l20030328 charset: utf8mb4 etcd: address: 127.0.0.1:2379 redis: address: 127.0.0.1:6379 password:
-
task/internal/handler/
-
task.go
-
// task/internal/handler/task.go package handler import ( "context" "task/internal/repository" "task/internal/service" "task/pkg/e" ) type TaskService struct { } func NewTaskService() *TaskService { return &TaskService{} } func (*TaskService) TaskCreate(ctx context.Context, req *service.TaskRequest) (resp *service.CommonResponse, err error) { var task repository.Task resp = new(service.CommonResponse) resp.Code = e.Success err = task.TaskCreate(req) if err != nil { resp.Code = e.Error resp.Msg = e.GetMsg(e.Error) resp.Data = err.Error() return resp, err } resp.Msg = e.GetMsg(uint(resp.Code)) return resp, nil } func (*TaskService) TaskShow(ctx context.Context, req *service.TaskRequest) (resp *service.TasksDetailResponse, err error) { var task repository.Task resp = new(service.TasksDetailResponse) resp.Code = e.Success taskList, err := task.TaskShow(req) if err != nil { resp.Code = e.Error return resp, err } resp.TaskDetail = repository.BuildTasks(taskList) return resp, nil } func (*TaskService) TaskUpdate(ctx context.Context, req *service.TaskRequest) (resp *service.CommonResponse, err error) { var task repository.Task resp = new(service.CommonResponse) resp.Code = e.Success err = task.TaskUpdate(req) if err != nil { resp.Code = e.Error resp.Msg = e.GetMsg(e.Error) resp.Data = err.Error() return resp, err } resp.Msg = e.GetMsg(uint(resp.Code)) return resp, nil } func (*TaskService) TaskDelete(ctx context.Context, req *service.TaskRequest) (resp *service.CommonResponse, err error) { var task repository.Task resp = new(service.CommonResponse) resp.Code = e.Success err = task.TaskDelete(req) if err != nil { resp.Code = e.Error resp.Msg = e.GetMsg(e.Error) resp.Data = err.Error() return resp, err } resp.Msg = e.GetMsg(uint(resp.Code)) return resp, nil }
-
task/internal/repository/
-
task.go
-
// task/internal/repository/task.go package repository import "task/internal/service" type Task struct { TaskID uint `gorm:"primarykey"` UserID uint `gorm:"index"` // 用户ID Status int `gorm:"default:0"` Title string Content string `gorm:"type:longtext"` StartTime int64 EndTime int64 } func (*Task) TaskCreate(req *service.TaskRequest) error { task := Task{ UserID: uint(req.UserID), Title: req.Title, Content: req.Content, StartTime: int64(req.StartTime), EndTime: int64(req.EndTime), } return DB.Create(&task).Error } func (*Task) TaskShow(req *service.TaskRequest) (taskList []Task, err error) { err = DB.Model(&Task{}).Where("user_id=?", req.UserID).Find(&taskList).Error if err != nil { return nil, err } return taskList, nil } func (*Task) TaskDelete(req *service.TaskRequest) (err error) { return DB.Model(&Task{}).Where("task_id=?", req.TaskID).Delete(&Task{}).Error } func (*Task) TaskUpdate(req *service.TaskRequest) error { t := Task{} err := DB.Where("task_id=?", req.TaskID).First(&t).Error if err != nil { return err } t.Status = int(req.Status) t.Title = req.Title t.Content = req.Content t.StartTime = int64(req.StartTime) t.EndTime = int64(req.EndTime) return DB.Save(&t).Error } func BuildTask(item Task) *service.TaskModel { return &service.TaskModel{ TaskID: uint32(item.TaskID), UserID: uint32(item.UserID), Status: uint32(item.Status), Title: item.Title, Content: item.Content, StartTime: uint32(item.StartTime), EndTime: uint32(item.EndTime), } } func BuildTasks(item []Task) (tList []*service.TaskModel) { for _, v := range item { t := BuildTask(v) tList = append(tList, t) } return tList }
// task/cmd/main.go copy user/cmd/main.go package main import ( "net" "task/config" "task/discovery" "task/internal/handler" "task/internal/repository" "task/internal/service" "github.com/sirupsen/logrus" "github.com/spf13/viper" "google.golang.org/grpc" ) func main() { config.InitConfig() repository.InitDB() // etcd 地址 etcdAddress := []string{viper.GetString("etcd.address")} // 服务的注册 etcdRegister := discovery.NewRegister(etcdAddress, logrus.New()) grpcAddress := viper.GetString("server.grpcAddress") userNode := discovery.Server{ Name: viper.GetString("server.domain"), Addr: grpcAddress, } server := grpc.NewServer() defer server.Stop() // 绑定服务 返回的服务实例绑定到GRPC服务器上->客户端可以通过GRPC调用此服务的方法 service.RegisterTaskServiceServer(server, handler.NewTaskService()) lis, err := net.Listen("tcp", grpcAddress) // 在指定的GRPC地址上监听TCP连接 if err != nil { panic(err) } if _, err := etcdRegister.Register(userNode, 10); err != nil { panic(err) // 将服务器节点注册到etcd上,并指定租约时间为10秒 } if err = server.Serve(lis); err != nil { panic(err) // 启动GRPC服务器,并使其开始监听并处理传入的连接 } }
task
接入网关
// api-gateway/routes/router.go package routes import ( "api-gateway/internal/handler" "api-gateway/middlerware" "github.com/gin-gonic/gin" ) func NewRouter(service ...interface{}) *gin.Engine { // ...表示可变参数或切片参数,允许传入不定量的参数,...interface=>[]interface{} ginRouter := gin.Default() ginRouter.Use(middlerware.Cors(), middlerware.InitMiddleWare(service)) v1 := ginRouter.Group("api/v1") { v1.GET("ping", func(c *gin.Context) { c.JSON(200, "success") }) // 用户服务 v1.POST("/user/register", handler.UserRegister) v1.POST("/user/login", handler.UserLogin) authed := v1.Group("/") authed.Use(middlerware.JWT()) { // 任务模块 authed.GET("task", handler.ListTask) authed.POST("task", handler.CreateTask) authed.PUT("task", handler.UpdateTask) authed.DELETE("task", handler.DeleteTask) } } return ginRouter }
// api-gareway/middlerware/jwt.go package middlerware import ( "api-gateway/pkg/e" "api-gateway/pkg/util" "time" "github.com/gin-gonic/gin" ) func JWT() gin.HandlerFunc { return func(c *gin.Context) { code := 200 token := c.GetHeader("Authorization") if token == "" { code = 404 } else { claim, err := util.ParseToken(token) if err != nil { code = e.ErrorAuthzcheckTokenFail } else if time.Now().Unix() > claim.ExpiresAt { code = e.ErrorAuthzcheckTokenTimeout } } if code != 200 { c.JSON(200, gin.H{ "status": code, "msg": e.GetMsg(uint(code)), }) c.Abort() return } c.Next() } }
// api-gateway/pkg/e/code.go package e const ( Success = 200 Error = 500 InvalidParams = 400 ErrorAuthzcheckTokenFail = 20001 // token 错误 ErrorAuthzcheckTokenTimeout = 20002 // token 过期 )
// api-gateway/pkg/e/msg.go package e var MsgFlags = map[uint]string{ Success: "ok", Error: "fail", InvalidParams: "请求的参数错误", ErrorAuthzcheckTokenFail: "token 错误", ErrorAuthzcheckTokenTimeout: "token 过期", } // GetMsg 获取状态码对应的信息 func GetMsg(code uint) string { msg, ok := MsgFlags[code] if ok { return msg } return MsgFlags[Error] }
-
copy task/service/pb
-
里的
proto
和go
到api-gateway
-
api-gateway/internal/handler/
-
task.go
-
// api-gateway/internal/handler/pkg.go package handler import "errors" func PanicIfUserError(err error) { if err != nil { err = errors.New("User Service--" + err.Error()) panic(err) } } func PanicIfTaskError(err error) { if err != nil { err = errors.New("Task Service--" + err.Error()) panic(err) } }
// api-gateway/middlerware/init.go package middlerware import "github.com/gin-gonic/gin" func InitMiddleWare(service []interface{}) gin.HandlerFunc { return func(c *gin.Context) { c.Keys = make(map[string]interface{}) c.Keys["user"] = service[0] c.Keys["task"] = service[1] c.Next() } }
// api-gateway/internal/handler/task.go package handler import ( "api-gateway/internal/service" "api-gateway/pkg/e" "api-gateway/pkg/res" "api-gateway/pkg/util" "context" "net/http" "github.com/gin-gonic/gin" ) func ListTask(ginCtx *gin.Context) { var tReq service.TaskRequest PanicIfTaskError(ginCtx.Bind(&tReq)) claim, _ := util.ParseToken(ginCtx.GetHeader("Authorization")) tReq.UserID = uint32(claim.UserId) taskService := ginCtx.Keys["task"].(service.TaskServiceClient) taskResp, err := taskService.TaskShow(context.Background(), &tReq) PanicIfTaskError(err) r := res.Response{ Status: uint(taskResp.Code), Data: taskResp, Msg: e.GetMsg(uint(taskResp.Code)), } ginCtx.JSON(http.StatusOK, r) } func CreateTask(ginCtx *gin.Context) { var tReq service.TaskRequest PanicIfTaskError(ginCtx.Bind(&tReq)) claim, _ := util.ParseToken(ginCtx.GetHeader("Authorization")) tReq.UserID = uint32(claim.UserId) taskService := ginCtx.Keys["task"].(service.TaskServiceClient) taskResp, err := taskService.TaskCreate(context.Background(), &tReq) PanicIfTaskError(err) r := res.Response{ Status: uint(taskResp.Code), Data: taskResp, Msg: e.GetMsg(uint(taskResp.Code)), } ginCtx.JSON(http.StatusOK, r) } func UpdateTask(ginCtx *gin.Context) { var tReq service.TaskRequest PanicIfTaskError(ginCtx.Bind(&tReq)) claim, _ := util.ParseToken(ginCtx.GetHeader("Authorization")) tReq.UserID = uint32(claim.UserId) taskService := ginCtx.Keys["task"].(service.TaskServiceClient) taskResp, err := taskService.Taskupdate(context.Background(), &tReq) PanicIfTaskError(err) r := res.Response{ Status: uint(taskResp.Code), Data: taskResp, Msg: e.GetMsg(uint(taskResp.Code)), } ginCtx.JSON(http.StatusOK, r) } func DeleteTask(ginCtx *gin.Context) { var tReq service.TaskRequest PanicIfTaskError(ginCtx.Bind(&tReq)) claim, _ := util.ParseToken(ginCtx.GetHeader("Authorization")) tReq.UserID = uint32(claim.UserId) taskService := ginCtx.Keys["task"].(service.TaskServiceClient) taskResp, err := taskService.TaskDelete(context.Background(), &tReq) PanicIfTaskError(err) r := res.Response{ Status: uint(taskResp.Code), Data: taskResp, Msg: e.GetMsg(uint(taskResp.Code)), } ginCtx.JSON(http.StatusOK, r) }
// api-gateway/cmd/main.go package main import ( "api-gateway/config" "api-gateway/discovery" "api-gateway/internal/service" "api-gateway/routes" "fmt" "net/http" "os" "os/signal" "syscall" "time" "github.com/sirupsen/logrus" "github.com/spf13/viper" "google.golang.org/grpc" "google.golang.org/grpc/resolver" ) func main() { config.InitConfig() // 服务发现 etcdAddress := []string{viper.GetString("etcd.address")} etcdRegister := discovery.NewResolver(etcdAddress, logrus.New()) resolver.Register(etcdRegister) go startListen() { osSignal := make(chan os.Signal, 1) signal.Notify(osSignal, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) s := <-osSignal fmt.Println("exit!", s) } fmt.Println("gateway listen on :4000") } func startListen() { opts := []grpc.DialOption{ grpc.WithInsecure(), } userConn, err := grpc.Dial("127.0.0.1:10001", opts...) if err != nil { panic(err) } userService := service.NewUserServiceClient(userConn) taskConn, err := grpc.Dial("127.0.0.1:10001", opts...) if err != nil { panic(err) } taskService := service.NewTaskServiceClient(taskConn) ginRouter := routes.NewRouter(userService, taskService) service := &http.Server{ Addr: viper.GetString("server.port"), Handler: ginRouter, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } err = service.ListenAndServe() if err != nil { fmt.Println("绑定失败, 端口可能被占用", err) } }