当前位置: 首页 > news >正文

gRPC学习笔记

微服务

  • 一旦某个服务器宕机,会引起整个应用不可用,隔离性差

  • 只能整体应用进行伸缩,浪费资源,可伸缩性差

  • 代码耦合在一起,可维护性差

微服务架构:解决了单体架构的弊端

可以按照服务进行单独扩容

各个服务之间可以独立开发,独立部署

同时引入了新的问题

  • 代码冗余

  • 服务和服务之间存在调用关系

grpc的优点

  • 生态好:Google

  • 跨语言

  • 性能好

  • 强类型

  • 流式处理

安装gRPC

go get google.golang.org/grpc
go get google.golang.org/protobuf

安装protor

protor是可用于通信协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式

链接:百度网盘 请输入提取码 提取码:jw8a

把解压出来的bin目录放在系统变量中

D:demo_rely\protoc-3.15.5-win64\bin

// cmd中测试
protoc --version

etcd 链接:百度网盘 请输入提取码 提取码:7777 etcdkeeper 链接:百度网盘 请输入提取码 提取码:7777

Protobuf Support插件可以让.proto文件有高光

// 下载安装protoc-gen-go
git clone https://gitcode.com/golang/protobuf.git
// 进入到 protobuf/protoc-gen-go 目录执行以下命令:
go build -o protoc-gen-go main.go
// 复制 protoc-gen-go.exe 到 GOROOT的 bin 目录
// 我的protoc-gen-go.exe文件在GO/bin目录下
// cmd 转换
cd ./user/
protoc -I internal/service/pb internal/service/pb/*.proto --go_out=plugins=grpc:.
// internal/service/pb pb所在的文件 internal/service/pb/*.proto是需要生成的文件
// 跨语言:protoc-gen-go是go语言的
go install github.com/golang/protobuf/protoc-gen-go
// 需要配环境变量, 或者把exe文件放到gobin目录下

原生rpc

// server/ser.go
package main

import (
	"fmt"
	"net"
	"net/http"
	"net/rpc"
)

type Server struct{}

type Req struct {
	Num1 int
	Num2 int
}

type Res struct {
	Num int
}

func (s Server) Add(req Req, res *Res) error {
	res.Num = req.Num1 + req.Num2
	fmt.Println("请求来了", req)
	return nil
}

func main() {
	// 注册rpc服务
	rpc.Register(new(Server))
	rpc.HandleHTTP()
	listen, err := net.Listen("tcp", ":8080")
	if err != nil {
		fmt.Println(err)
		return
	}
	http.Serve(listen, nil)
}
// client/cli.go
package main

import (
	"fmt"
	"net/rpc"
)

type Req struct {
	Num1 int
	Num2 int
}

type Res struct {
	Num int
}

func main() {
	req := Req{1, 2}
	client, err := rpc.DialHTTP("tcp", ":8080")
	if err != nil {
		fmt.Println(err)
		return
	}
	var res Res
	err = client.Call("Server.Add", req, &res)
	fmt.Println(res, err)
}

原生rpc的问题:

  • 编写相对复杂,需要自己去关注实现过程

  • 没有代码提示,容易写错

hello word服务器

protobuf文件相当于接口文件(或者约束)服务端客户端可以都有

// grpc_proto/hello.proto
syntax = "proto3"; // 指定proto版本
package hello_grpc; // 指定默认包名

// go_package表示在当前目录生成go文件,指定生成.go文件时的包名
// ".:service"=>:前是最终生成的位置:后是包名
option go_package = "/hello_grpc";

// 定义rpc服务
service HelloService {
    // 定义函数 rpc方法
    rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

// HelloRequest 请求内容  >>>结构体
message HelloRequest {
    string name = 1;
    string message = 2;
}

// HelloResponse 响应内容
message HelloResponse{
    string name = 1;
    string message = 2;
}

若想让proto文件有高光提示,就在插件中下载gRPC

// .porto转换.go    // -I . 指定了包含 .proto 文件的目录
protoc -I . --go_out=plugins=grpc:. grpc_proto\hello.proto
protoc --go-grpc_out=. hello.proto  // 前两者都有 gRPC 插件
protoc --go_out=. hello.proto  // 没grpc插件

服务端

  • 编写一个结构体

  • 实现protobuf中的所有方法

  • 监听端口

  • 注册服务

// server/grpc_server.go
package main

import (
	"context"
	"fmt"
	hello_grpc "grpc_study/grpc_proto"
	"net"

	"google.golang.org/grpc"
	"google.golang.org/grpc/grpclog"
)

type HelloService struct {
}

func (HelloService) SayHello(ctx context.Context, request *hello_grpc.HelloRequest) (res *hello_grpc.HelloResponse, err error) {
	fmt.Println(request)
	return &hello_grpc.HelloResponse{
		Name:    "江小年",
		Message: "ok",
	}, nil
}

func main() {
	// 监听端口
	listen, err := net.Listen("tcp", ":8080")
	if err != nil {
		grpclog.Fatalf("Failed to listen: %v", err)
	}
	// 创建一个grpc服务器实例
	s := grpc.NewServer()
	server := HelloService{}
	// 将server结构体注册为grpc服务
	hello_grpc.RegisterHelloServiceServer(s, &server)
	fmt.Println("grpc server running :8080")
	// 开始处理客户端请求
	err = s.Serve(listen)
}

客户端

// client/grpc_client.go
package main

import (
	"context"
	"fmt"
	hello_grpc "grpc_study/grpc_proto"
	"log"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {
	addr := ":8080"
	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	}
	defer conn.Close()
	// 初始化客户端
	client := hello_grpc.NewHelloServiceClient(conn)
	result, err := client.SayHello(context.Background(), &hello_grpc.HelloRequest{
		Name:    "江小年",
		Message: "ok",
	})
	fmt.Println(result, err)
}

proto基本数据类型

// grpc_proto/hello.proto
syntax = "proto3"; // 指定proto版本
package hello_grpc; // 指定默认包名

// go_package表示在当前目录生成go文件,指定生成.go文件时的包名
option go_package = "/hello_grpc";

// 定义rpc服务
service HelloService {
    // 定义函数
    rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

// HelloRequest 请求内容
message HelloRequest {
    string name = 1;
    string message = 2;
}

// HelloResponse 响应内容
message HelloResponse{
    string name = 1;
    string message = 2;
}

proto语法

service对应的就是go里面的接口,可以作为服务端,客户端

rpc对应的就是结构体中的方法

message对应的也是结构体

数据类型

基本数据类型

message Request {
    double a1 = 1;
    float a2 = 2;
    int32 a3 = 3;
    uint32 a4 = 4;
    uint64 a5 = 5;
    sint32 a6 = 6;
    sint64 a7 = 7;
    fixed32 a8 = 8;
    fixed64 a9 = 9;
    sfixed32 a10 = 10;
    sfixed64 a11 = 11;
    bool a12 = 12;
    string a13 = 13;
    bytes a14 = 14;
    repeated string name = 15; // 字符串切片
}

数组类型 repeated关键字

// grpc_study/grpc_proto/type_grpc/type_grpc.proto
syntax = "proto3"; // 指定proto版本
// 指定golang包名
option go_package = "/type_grpc";

service TypeService {
    rpc Say(Request)returns(Response){}
}

message Request{  
    double a1 = 1;
    float a2 = 2;
    int32 a3 = 3;
    uint32 a4 = 4;
    uint64 a5 = 5;
    sint32 a6 = 6;
    sint64 a7 = 7;
    fixed32 a8 = 8;
    fixed64 a9 = 9;
    sfixed32 a10 = 10;
    sfixed64 a11 = 11;
    bool a12 = 12;
    string a13 = 13;
    bytes a14 = 14;
}

message Item {  // 声明可供远程调用的方法
    string name = 1;
    fixed32 code = 2;
}

message ArrayRequest {  // 声明可供远程调用的方法
    repeated int64 i6_list = 1;  // 列表
    repeated string s_list = 2;
    repeated Item item_list = 3;
}

message Response{
    
}

map类型

键只能是基本类型

message MapRequest {
	map<int64, string> i_s = 1;
	map<string, bool> s_b = 2;
	map<string, Item> s_item = 3;
}

嵌套类型

message q1 {
	message q2{
		string name = 1;
	}
	string name = 1;
	Q2 q2 = 2;
}

多服务

// grpc_study/grpc_proto/type_grpc/duo.proto
syntax = "proto3"; // 指定proto版本
// 指定golang包名
option go_package = "/duo_proto";

service VideoService {
    rpc Look(Request)returns(Response){}
}

message Request{
	string name = 1;
}

message Response{
	string name = 1;
}

service OrderService {
    rpc Buy(Request)returns(Response){}
}

服务端

// grpc_study/server/duo_server.go
package main

type VideoServer struct {
}

func (VideoS erver)Look(ctx context.Context, request *duo_proto.Request) (res *duo_proto.Response, err error) {
    fmt.Println("video:", request)
    return &duo_proto.Response{
        Name: "江小年",
    }, nil
}

type OrderServer struct {
}

func (OrderServer)Buy(ctx context.Context, request *duo_proto.Request) (res *duo_proto.Response, err error) {
    fmt.Println("video:", request)
    return &duo_proto.Response{
        Name: "江小年",
    }, nil
}

func main() {
    listen, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    s := grpc.NewServer()
    duo_proto.RegisterVideoServiceServer(s, &VideoServer{})
    duo_proto.RegisterOrderServiceServer(s, &OrderServer{})
    
    fmt.Println("grpc server程序运行在:8080")
    err = s.Serve(listen)
}

客户端

// grpc_study/client/duo_client.go
package main

import (
	"context"
	"fmt"
	"grpc_test/grpc_proto/hello_grpc"
	"log"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {
	addr := ":8080"
	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	}
	defer conn.Close()
	// 初始化客户端
    
	orderClient := duo_proto.NewOrderServiceClient(conn)
    res, err := orderClient.Buy(context.Background(), &duo_proto.Request{
        Name: "江小年",
    })
	fmt.Println(res, err)
    
    videoClient := duo_proto.NewVideoServiceClient(conn)
    res, err = videoClient.Look(context.Background(), &duo_proto.Request{
        Name: "江小年",
    })
	fmt.Println(res, err)
}

proto文件

当项目大起来之后,会有很多个service,rpc,message

我们会将不同的服务放在不同的proto文件中

还可以放一些公共的proto文件

其实本质就是生成go文件,需要在一个包内

头部最好一样,package必须一样

// grpc_study/service_proto/video.proto
syntax = "proto3"; 
package proto;
option go_package = "/proto";
import "common.proto";

service VideoService {
	rpc Look(Request)returns(Response){}
}

// grpc_study/service_proto/order.proto
syntax = "proto3"; 
package proto;
option go_package = "/proto";
import "common.proto";

service OrderService {
	rpc Buy(Request)returns(Response){}
}

// grpc_study/service_proto/common.proto
syntax = "proto3"; 
package proto;
option go_package = "/proto";

message Request{
	string name = 1;
}

message Response{
	string name = 1;
}

// grpc_study/service_proto/XXX.proto
protoc -I .\service_proto --go_out=plugins=grpc:./service_proto .\service_proto\video.proto
protoc -I .\service_proto --go_out=plugins=grpc:./service_proto .\service_proto\order.proto
protoc -I .\service_proto --go_out=plugins=grpc:./service_proto .\service_proto\common.proto

服务端流式传输

// grpc_study/stream_proto/stream.proto
syntax = "proto3";
option go_package = "/proto";

service Simple{
	rpc Fun(Request)returns(Response){}
}

message Request {
	string name = 1;
}

message Response {
	string Text = 1;
}
// 服务端,客户端代码同上
// 服务端流式
service ServiceStream{
	rpc Fun(Request)returns(stream Response){}
}
// grpc_study/server/服务端流式serve.go
package main

import "grpc_study/stream_proto/proto"

type ServiceStream struct {}

func (ServiceStream)Fun(request *proto.Request, stream proto.ServiceStream_FunServer) error {
    fmt.Println(request)
    for i := 0; i < 10; i++ {
        stream.Send(&proto.Response{
            Text: fmt.Sprintf("第%d轮数据", i)
        }) // 响应
    }
    return nil
}

func main() {
    listen, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    server := grpc.NewServer()
    proto.RegisterServiceStreamServer(server, &ServiceStream)
    
    server.Server(listen)
}
// grpc_study/client/服务端流式client.go
func main() {
	addr := ":8080"
	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	}
	defer conn.Close()
	// 初始化客户端
	client := proto.NewServiceStreamClient(conn)
	stream, err := client.Fun(context.Background(), &proto.Request{
		Name:    "江小年",
	})
    for i := 0; i < 10; i++ {
        response, err := stream.Recv()
        fmt.Println(response, err)
    }
}

客户端不知道服务端什么时候结束

服务端流式案例_下载文件

客户端不知道服务端什么时候结束

// grpc_study/client/服务端流式client.go
func main() {
	addr := ":8080"
	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	}
	defer conn.Close()
	// 初始化客户端
	client := proto.NewServiceStreamClient(conn)
	stream, err := client.Fun(context.Background(), &proto.Request{
		Name:    "江小年",
	})
    for {
        response, err := stream.Recv()
        if err == io.EOF {
            // EOF就是结束的err报错
            break
        }
        fmt.Println(response)
    }
}

下载文件,文件大时需要多次传输,这时不知道多会儿结束怎么办

// grpc_study/stream_proto/stream.proto
syntax = "proto3";
option go_package = "/proto";

service Simple{
	rpc Fun(Request)returns(Response){}
}

message Request {
	string name = 1;
}

message Response {
	string Text = 1;
}

message FileResponse {
	string file_name = 1;
    bytes content = 2;
}

// 服务端,客户端代码同上
// 服务端流式
service ServiceStream{
	rpc Fun(Request)returns(stream Response){}
    rpc DownLoadFile(Request)returns(stream FileResponse){}
}
// protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\stream.proto

创建一个文件夹static放一个大文件

package main

// grpc_study/server/服务端流式serve.go

import (
	"fmt"
	proto "grpc_study/grpc_proto"
	"io"
	"log"
	"net"
	"os"

	"google.golang.org/grpc"
)

type ServiceStream struct{}

func (ServiceStream) Fun(request *proto.Request, stream proto.ServiceStream_FunServer) error {
	fmt.Println(request)
	for i := 0; i < 10; i++ {
		stream.Send(&proto.Response{
			Text: fmt.Sprintf("第%d轮数据", i),
		}) // 响应
	}
	return nil
}

func (ServiceStream) DownLoadFile(request *proto.Request, stream proto.ServiceStream_DownLoadFileServer) error {
	fmt.Println(request)
	file, err := os.Open("abc.txt")
	if err != nil {
		panic(err)
	}
	defer file.Close()

	for {
		buf := make([]byte, 2048)
		_, err = file.Read(buf)
		if err == io.EOF {
			break
		}

		if err != nil {
			break
		}
		stream.Send(&proto.FileResponse{
			Content: buf,
		})
	}
	return nil
}

func main() {
	listen, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatal(err)
	}
	server := grpc.NewServer()
	proto.RegisterServiceStreamServer(server, &ServiceStream{})

	server.Serve(listen)
}
package main

import (
	"bufio"
	"context"
	"fmt"
	proto "grpc_study/grpc_proto"
	"io"
	"log"
	"os"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

// grpc_study/client/服务端流式client.go
func main() {
	addr := ":8080"
	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	}
	defer conn.Close()
	// 初始化客户端
	client := proto.NewServiceStreamClient(conn)
	stream, err := client.DownLoadFile(context.Background(), &proto.Request{
		Name: "江小年",
	})

	file, err := os.OpenFile("abc.txt", os.O_CREATE|os.O_WRONLY, 0600)
	if err != nil {
		log.Fatalln(err)
	}
	defer file.Close()

	writer := bufio.NewWriter(file)
	var index int
	for {
		index++
		response, err := stream.Recv()
		if err == io.EOF {
			// EOF就是结束的err报错
			break
		}
		fmt.Printf("第%d次 写入 %d 数据", index, len(response.Content))
		writer.Write(response.Content)
	}
	writer.Flush()
}

客户端流式传输

// grpc_study/stream_proto/stream.proto
syntax = "proto3";
option go_package = "/proto";

service Simple{
	rpc Fun(Request)returns(Response){}
}

message Request {
	string name = 1;
}

message Response {
	string Text = 1;
}

message FileResponse {
	string file_name = 1;
    bytes content = 2;
}

// 服务端,客户端代码同上
// 服务端流式
service ServiceStream{
	rpc Fun(Request)returns(stream Response){}
    rpc DownLoadFile(Request)returns(stream FileResponse){}
}

message FileRequest {
	string file_name = 1;
    bytes content = 2;
}

// 客户端流式
service ClientStream {
	rpc UploadFile(stream FileRequest)returns(Response){}
}

// protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\stream.proto
// grpc_study/server/客户端流式serve.go
package main

import "grpc_study/stream_proto/proto"

type ClientStream struct {}


func (ClientStream)UpLoadFile(stream proto.ClientStream_UploadFileServer) error {
    for i := 0; i < 10; i++ {
        response, err := stream.Recv()
        fmt.Println(response, err)
    }
    stream.SendAndClose(&proto.Response{Text: "完毕"})
    return nil
}

func main() {
    listen, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    server := grpc.NewServer()
    proto.RegisterClientStreamServer(server, &ClientStream)
    
    server.Server(listen)
}
// grpc_study/client/客户端流式client.go
func main() {
	addr := ":8080"
	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	}
	defer conn.Close()
	// 初始化客户端
    client := proto.NewClientStreamClient(conn)
    stream, err := client.UploadFile(context.Background())
    for i := 0; i < 10; i++ {
        stream.Send(&proto.FileRequest{FileName: fmt.Sprintf("第%d次", i)})
    }
    response, err := stream.CloseAndRecv()
    fmt.Println(response, err)
}

客户端流式案例_上传文件

// grpc_study/client/客户端流式client.go
func main() {
	addr := ":8080"
	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	}
	defer conn.Close()
	// 初始化客户端
    client := proto.NewClientStreamClient(conn)
    stream, err := client.UploadFile(context.Background())
    
    file, err := os.Open("abc.txt")
    if err != nil {
        log.Fatalln(err)
    }
    defer file.Close()

    for {
        buf := make([]byte, 2048)
        _, err = file.Read(buf)
        if err == io.EOF {
            break
        }

        if err != nil {
            break
        }
        stream.Send(&proto.FileRequest{
            FileName: "x.png"
            Content: buf,
        })
    }

    response, err := stream.CloseAndRecv()
    fmt.Println(response, err)
}
// grpc_study/server/客户端流式serve.go
package main

import "grpc_study/stream_proto/proto"

type ClientStream struct {}


func (ClientStream)UpLoadFile(stream proto.ClientStream_UploadFileServer) error {
    file, err := os.OpenFile("abc.txt", os.O_CREATE|os.O_WRONLY, 0600)
    if err != nil {
        log.Fatalln(err)
    }
    defer file.Close() 

    writer := bufio.NewWriter(file)
    var index int
    for{
        index ++
        response, err := stream.Recv()
        if err == io.EOF {
            break
        }
        writer.Writer(response.Context)
        fmt.Printf("第%d次", index)
    }
    writer.Flush()
    stream.SendAndClose(&proto.Response{Text: "完毕"})
    return nil
}

func main() {
    listen, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    server := grpc.NewServer()
    proto.RegisterClientStreamServer(server, &ClientStream)
    
    server.Server(listen)
}

双向流传输

// grpc_study/stream_proto/stream.proto
syntax = "proto3";
option go_package = "/proto";

service Simple{
	rpc Fun(Request)returns(Response){}
}

message Request {
	string name = 1;
}

message Response {
	string Text = 1;
}

message FileResponse {
	string file_name = 1;
    bytes content = 2;
}

// 服务端,客户端代码同上
// 服务端流式
service ServiceStream{
	rpc Fun(Request)returns(stream Response){}
    rpc DownLoadFile(Request)returns(stream FileResponse){}
}

message FileRequest {
	string file_name = 1;
    bytes content = 2;
}

// 客户端流式
service ClientStream {
	rpc UploadFile(stream FileRequest)returns(Response){}
}

// 双向流传输
service BothStream{
	rpc Chat(stream Request)returns(stream Response){}
}

// protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\stream.proto
// grpc_study/server/双向流式serve.go
package main
type BothStream struct {}


func (BothStream)Chat(stream proto.BothStream_ChatServer) error {
    for i := 0; i < 10; i++ {
        request, _ := stream.Recv()
        fmt.Println(request)
        stream.Send(&proto.Response{
            Text: "你好",
        })
    }
    return nil
}

func main() {
    listen, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    server := grpc.NewServer()
    proto.RegisterBothStreamServer(server, &BothStream)
    
    server.Server(listen)
}
// grpc_study/client/双向流式client.go
func main() {
	addr := ":8080"
	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	}
	defer conn.Close()
	// 初始化客户端
    client := proto.NewBothStreamClient(conn)
    stream, err := client.Chat(context.Background())
    
    for i:=0;i<10;i++{
        stream.Send(&proto.Request{
            Name: fmt.Sprintf("第%d次", i)
        })
        response, err := stream.Recv()
        fmt.Println(response, err)
    }
}

认证及安全传输说明

认证不是用户的身份认证,而是指多个server和client之间,如何识别对方是谁,并且可以安全的进行数据传输

  • SSL/TLS认证方式(采用http2协议)

  • 基于Token的认证方式(基于安全连接)

TLS协议主要解决如下三个网络安全问题

  1. 保密,通过加密encryption实现,所有信息都加密传输,第三方无法嗅探

  2. 完整版,通过MAC校验机制,一旦被篡改,通信双方会立刻发现

  3. 认证,双方认证,双方都可以配备证书,放置身份被冒充

生产环境可以购买证书或使用一些平台发放的免费证书

key:服务器上的私钥文件,用于对发送给客户端数据的加密,以及对客户端接收到数据的解密

csr:证书签名请求文件,用于提交给证书颁发机构(CA)对证书签名

crt:由证书颁发机机构(CA)签名后的证书,或者是开发者亲自签名的证书,包含持证人的信息,持有人的公钥,以及签署者的签名等信息

pem:是基于Base64编码的证书,扩展名包括PEMCRTCER

TLS认证实现

首先通过openssl生成证书和私钥

  • 官网下载,Downloads | OpenSSL Library

  • 其他人做的便携安装包,Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions

  • 配置环境变量 D:\...\OpenSSL-Win64\bin

  • 命令行测试 openssl

生成证书

# cd ./key/
# 1、生成私钥
openssl genrsa -out server.key 2048
# 2、生成证书 全部回车即可,可以不填
openssl req -new -x509 -key server.key -out server.crt -days 36500
# 国家名称
Country Name (2 letter code) [Au]:CN
# 省名称
State or Province Name (full name) [Some-State]:GuangDong
# 城市名称
Locality Name (eg, city) []:Meizhou
# 公司组织名称
Organization Name (eg, company) [Internet widgits Pty Ltd]:Xuexiangban
# 部门名称
Organizational Unit Name (eg, section) []:go
# 服务器or网站名称
Common Name (e.g. server FQDN or YOUR name) []:kuangstudy
# 邮件
Email Address []:3509309412@qq.com

# 3、生成csr
openssl req -new -key server.key -out server.csr
# 更改openssl.cnf (Linux是openssl.cfg)
# 1)复制一份安装的openssl的bin目录下的openssl.cnf文件到你项目所在的证书目录下
# 2)找到 [ CA_default ],打开 copy_extensions = copy (就是把前面的#去掉)
# 3)找到 [ req ],打开 req_extensions = v3_req # The extensions to add to a certificate request
# 4)找到 [ v3_req ],添加subjectAltName = @alt_names
# 5)添加新的标签 [ alt_names ],和标签字段 DNS.1 = *.huajiangsll.com    # 可设置多个域名,只有通过域名才能访问到代码
# 步骤5)是设置能访问的网站
# 生成证书私钥test.key
openssl genpkey -algorithm RSA -out test.key

# 通过私钥test.key生成证书请求文件test.csr(cnf, 注意windows系统是cfg)
openssl req -new -nodes -key test.key -out test.csr -days 3650 -subj "/C=cn/OU=myorg/O=mycomp/CN=myname" -config ./openssl.cfg -extensions v3_req
# test.csr是上面生成的证书请求文件。 ca.crt/server.key是CA证书文件和key,用来对test.csr进行签名认证。这两个文件在第一部分生成

# 生成SAN证书 pem
openssl x509 -req -days 365 -in test.csr -out test.pem -CA server.crt -CAkey server.key -CAcreateserial -extfile ./openssl.cfg -extensions v3_req

// server/grpc_server.go
package main

import (
	"context"
	"fmt"
	"grpc_test/grpc_proto/hello_grpc"
	"net"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/grpclog"
)

type HelloService struct {
}

func (HelloService) SayHello(ctx context.Context, request *hello_grpc.HelloRequest) (res *hello_grpc.HelloResponse, err error) {
	fmt.Println(request)
	return &hello_grpc.HelloResponse{
		Name:    "江小年",
		Message: "ok",
	}, nil
}

func main() {
	creds, _ := credentials.NewServerTLSFromFile("D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.pem",
		"D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.key")
	// 监听端口
	listen, err := net.Listen("tcp", ":8080")
	if err != nil {
		grpclog.Fatalf("Failed to listen: %v", err)
	}
	// 创建一个grpc服务器实例
	s := grpc.NewServer(grpc.Creds(creds))
	server := HelloService{}
	// 将server结构体注册为grpc服务
	hello_grpc.RegisterHelloServiceServer(s, &server)
	fmt.Println("grpc server running :8080")
	// 开始处理客户端请求
	err = s.Serve(listen)
}
// grpc_client.go
package main

import (
	"context"
	"fmt"
	"grpc_test/grpc_proto/hello_grpc"
	"log"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

func main() {
	creds, _ := credentials.NewClientTLSFromFile("D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.pem",
		"*.helloword.com")

	addr := ":8080"
	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
	if err != nil {
		log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	}
	defer conn.Close()
	// 初始化客户端
	client := hello_grpc.NewHelloServiceClient(conn)
	result, err := client.SayHello(context.Background(), &hello_grpc.HelloRequest{
		Name:    "江小年",
		Message: "ok",
	})
	fmt.Println(result, err)
}

Token认证

gRPC提供的一个接口,接口中有两个方法,接口位于credentials包下,这个接口需要客户端来实现

type PerRPCCredentials interface {
    GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
    RequireTransportSecurity() bool
}

第一个方法就是获取元数据信息,也就是客户端提供的key.value对,context用于控制超时和取消,uri是请求入口处的uri

第二个方法的作用是否需要基于TLS认证进行安全传输,如果返回值是true,则必须加上TLS认证,返回值是false则不用

// grpc_client.go
package main

import (
	"context"
	"fmt"
	"grpc_test/grpc_proto/hello_grpc"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

 // type PerRPCCredentials interface {
	// GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
	// RequireTransportSecurity() bool
// }

type ClientTokenAuth struct {
}

func (c ClientTokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"appId":  "jiangxiaonian",
		"appKey": "123456",
	}, nil
}

func (c ClientTokenAuth) RequireTransportSecurity() bool {
	return false    // 不开启安全认证 是否需要基于`TLS`认证进行安全传输
}

func main() {
	addr := ":8080"
	// creds, _ := credentials.NewClientTLSFromFile("D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.pem",
	// 	"*.helloword.com")

	var opts []grpc.DialOption
	opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
	opts = append(opts, grpc.WithPerRPCCredentials(new(ClientTokenAuth)))
	conn, err := grpc.Dial(addr, opts...)

	// 使用 grpc.Dial 创建一个到指定地址的 gRPC 链接
	// 此处使用不安全的证书来实现 SSL/TLS
	// conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
	// if err != nil {
	// 	log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
	// }
	defer conn.Close()
	// 初始化客户端
	client := hello_grpc.NewHelloServiceClient(conn)
	result, err := client.SayHello(context.Background(), &hello_grpc.HelloRequest{
		Name:    "江小年",
		Message: "ok",
	})
	fmt.Println(result, err)
}

<!-- 服务端应该写一个拦截器 -->

// server/grpc_server.go
package main

import (
	"context"
	"errors"
	"fmt"
	"grpc_test/grpc_proto/hello_grpc"
	"net"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/grpclog"
	"google.golang.org/grpc/metadata"
)

type HelloService struct {
}

func (HelloService) SayHello(ctx context.Context, request *hello_grpc.HelloRequest) (res *hello_grpc.HelloResponse, err error) {

	// 获取元数据的信息
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, errors.New("未传输token")
	}
	var appId string
	var appKey string
	if v, ok := md["appid"]; ok {
		appId = v[0]
	}
	if v, ok := md["appkey"]; ok {
		appKey = v[0]
	}
	// 用户id 查appid
	if appId != "jiangxiaonian" || appKey != "123456" {
		return nil, errors.New("token不正确")
	}

	fmt.Println(request)
	return &hello_grpc.HelloResponse{
		Name:    "江小年",
		Message: "ok",
	}, nil
}

func main() {
	// creds, _ := credentials.NewServerTLSFromFile("D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.pem",
	// 	"D:\\Godemo\\end_demo\\练习\\grpc_test\\key\\test.key")
	// 监听端口
	listen, err := net.Listen("tcp", ":8080")
	if err != nil {
		grpclog.Fatalf("Failed to listen: %v", err)
	}
	// 创建一个grpc服务器实例
	s := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
	server := HelloService{}
	// 将server结构体注册为grpc服务
	hello_grpc.RegisterHelloServiceServer(s, &server)
	fmt.Println("grpc server running :8080")
	// 开始处理客户端请求
	err = s.Serve(listen)
}

<!-- `etcd` -->

etcd

  • etcd是一个go语言编写的分布式、高可用的一致性键值存储系统,用于提供可靠的分布式键值对存储,配置共享和服务发现功能

  • etcd 链接:百度网盘 请输入提取码 提取码:7777 etcdkeeper 链接:百度网盘 请输入提取码 提取码:7777


  • etcd文档

  • etcd源码

  • etcd不同系统安装

Etcd实现服务注册和发现

  • go语言客户端etcd基本操作

  • etcd消息发布订阅逻辑 不如redis数据类型丰富

  • 服务注册和服务发现的具体实现逻辑

etcd配置文件

# etcdconf/etcd0.yaml
# 节点名称
name: "etcdnode0"
# 数据存储目录
data-dir: "/etcd-data/data"
# 预写式日志存储目录
wal-dir: "/etcd-data/wal"
# 集群成员之间通讯使用URL
listen-peer-urls: "http://0.0.0.0:2380"
# 集群提供给外部客户端访问的URL,外部客户端必须通过指定IP:端口访问etcd
listen-client-urls: "http://0.0.0.0:2379"

# 集群配置
initial-advertise-peer-urls: "http://192.168.239.149:2380"
# 集群初始成员配置,是etcd静态部署的核心初始化配置,他说明了当前集群由哪些URLs组成,此处default为节点名称
initial-cluster: "etcdnode0=http://192.168.239.149:2380,etcdnode1=http://192.168.239.149:12380,etcdnode2=http://192.168.239.149:22380"
# 初始化集群状态(new或existing)  existing为已有集群增加节点
initial-cluster-state: "new"
# 引导期间etcd集群的初始集群令牌,防止不同集群之间产生交互
initial-cluster-token: "etcd-cluster"
# 向客户端发布的服务端点
advertise-client-urls: "http://192.168.239.149:2379"
logger: "zap"
# 配置日志级别,仅支持debug, indo, warn, error, panic, or fatal
log-level: "warn"
log-outputs: "stderr"
docker run -d -p 2379:2379 -p 2380:2380 -v /tmp/etcd0-data:/etcd-data -v /home/nick/work/etcdconf:/etcd-conf --name etcd0 quay.io/coreos/etcd:v3.5.5 /usr/local/bin/etcd --config-file=/etcd-conf/etcd0.yaml
docker run: 这是 Docker 的命令行工具,用于运行一个新的容器
-d: 表示以守护进程模式运行容器,即在后台运行
-p 2379:2379 -p 2380:2380: 这部分是端口映射,将容器的 2379 和 2380 端口映射到宿主机的相同端口。2379 端口通常用于客户端通信,2380 端口用于集群节点间的通信
-v /tmp/etcd0-data:/etcd-data: 这是卷(volume)挂载,将宿主机的 /tmp/etcd0-data 目录挂载到容器内的 /etcd-data 目录,用于存储 etcd 的数据
-v /home/nick/work/etcdconf:/etcd-conf: 这是另一个卷挂载,将宿主机的 /home/nick/work/etcdconf 目录挂载到容器内的 /etcd-conf 目录,用于存储 etcd 的配置文件
--name etcd0: 为容器指定一个名称,这里是 etcd0
quay.io/coreos/etcd:v3.5.5: 指定要运行的 Docker 镜像,这里是 etcd 的官方镜像,版本为 3.5.5
/usr/local/bin/etcd --config-file=/etcd-conf/etcd0.yaml: 这是容器启动后要执行的命令,这里指定使用 etcd0.yaml 配置文件启动 etcd 服务
docker exec -it etcd0 bash
etcdctl endpoint status  // 访问当前etcd节点
etcdctl endpoint status --cluster  // 访问所有节点
etcdctl endpoint status --cluster -w table  // 可读性差,输出为表格
etcdctl member add -h  // 增加节点的help命令

基本操作

  • 下载依赖

    // 下划看到Install  和代码
    go get go.etcd.io/etcd/client/v3
    // D:/etcd_study/main.go
    package main
    import clientv3 "go.etcd.io/etcd/client/v3"
    
    func main() {
    	cli, err := clientv3.New(clientv3.Config{
    		Endpoints:   []string{"localhost:2379", "localhost:12379", "localhost:22379"},
    		DialTimeout: 5 * time.Second,
    	})
    	if err != nil {
    		// handle error!
    	}
    	defer cli.Close()
    }
  • 新建etcd/ etcd/etcd.go

    // etcd/etcd.go
    package etcd
    
    import "time"
    
    const DialTimeout = time.Second * 5
    
    func GetEtcdEndpoints() []string {
    	// localhost换成etcd集群的节点IP
    	return []string{"localhost:2379", "localhost:12379", "localhost:22379"}
    }
  • 新建etcd/kv.go

    putRes, err := cli.Put(context.Background(), "key1", "value1", clientv3.WithPrevKV()) // 保存key-value,并返回上一个值, 一般来说命令都有对应的函数etcdctl put -h | PrevKv>>>WithPrevKV()
    	if err != nil {
    		log.Fatalln(err)
    	}
    	log.Panicln(putRes.PrevKv) // 打印上一个值
    // etcd/kv.go
    package etcd
    
    import (
    	"context"
    	"fmt"
    	"log"
    
    	clientv3 "go.etcd.io/etcd/client/v3"
    )
    
    // put/get/delete key-value
    func KvDemo() {
    	// 没有连接池,每次请求都会新建连接,所以性能不高
    	cli, err := clientv3.New(clientv3.Config{
    		Endpoints:   GetEtcdEndpoints(),
    		DialTimeout: DialTimeout,
    	})
    	if err != nil {
    		log.Fatalln(err)
    	}
    	defer cli.Close()
    
    	_, err = cli.Put(context.Background(), "key1", "value1")
    	if err != nil {
    		log.Fatalln(err)
    	}
    
    	putRes, err := cli.Put(context.Background(), "key1", "value11", clientv3.WithPrevKV())
    	if err != nil {
    		log.Fatalln(err)
    	}
    	if putRes.PrevKv != nil {
    		fmt.Println(putRes.PrevKv)
    	}
    
    	getRes, err := cli.Get(context.Background(), "key", clientv3.WithPrefix())
    	if err != nil {
    		log.Fatalln(err)
    	}
    	fmt.Println(getRes)
    
    	delRes, err := cli.Delete(context.Background(), "key", clientv3.WithPrefix())
    	if err != nil {
    		log.Fatalln(err)
    	}
    	fmt.Println(delRes.Deleted)
    }
    // etcd_study/main.go
    package main
    
    import (
    	"etcdstudy/etcd"
    )
    
    func main() {
    	etcd.KvDemo()
    }

权限部分

  • 官方文档

  • Auth 通过命令行做操作

    export ETCDCTL_API=3
    ENDPOINTS=localhost:2379
    
    etcdctl --endpoints=${ENDPOINTS} role add root
    etcdctl --endpoints=${ENDPOINTS} role get root
    
    etcdctl --endpoints=${ENDPOINTS} user add root
    etcdctl --endpoints=${ENDPOINTS} user grant-role root root
    etcdctl --endpoints=${ENDPOINTS} user get root
    
    etcdctl --endpoints=${ENDPOINTS} role add role0
    etcdctl --endpoints=${ENDPOINTS} role grant-permission role0 readwrite foo
    etcdctl --endpoints=${ENDPOINTS} user add user0
    etcdctl --endpoints=${ENDPOINTS} user grant-role user0 role0
    
    etcdctl --endpoints=${ENDPOINTS} auth enable
    # 所有客户端请求都要经过验证
    
    etcdctl --endpoints=${ENDPOINTS} --user=user0:123 put foo bar
    etcdctl --endpoints=${ENDPOINTS} get foo
    # 权限被拒绝,用户名为空,因为请求未发出身份验证请求
    etcdctl --endpoints=${ENDPOINTS} --user=user0:123 get foo
    # User0可以读取键foo
    etcdctl --endpoints=${ENDPOINTS} --user=user0:123 get foo1

服务注册发现

  • 服务提供者向etcd注册(key值写入etcd

  • 客户端向etcd做查询

  • 做完之后,客户端就能访问到服务了


为什么要服务注册与服务发现?

  • 其实就是一个解耦的过程(服务的地址和客户端做了一个解耦)


server端流程:

  • 启动服务

  • etcd注册服务信息

  • 声明租约并续约

client端流程:

  • 第一次,获取服务信息

  • 创建监听,被动接收

  • 更新本地的服务信息


  • 新建 discovery/proto/hello.proto

    // discovery\proto\hello.proto
    syntax = "proto3";
    option go_package = "discovery/proto";
    package hello;
    
    service Greeter {
        rpc SayHello(HelloRequest) returns (HelloReply) {}
    }
    
    message HelloRequest {
        string msg = 1;
    }
    message HelloReply {
        string msg = 1;
    }
    go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
    go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
    protoc --go_out=plugins=grpc:. discovery/proto/hello.proto
    protoc --go-grpc_out=. discovery/proto/hello.proto
    protoc --go_out=. discovery/proto/hello.proto
protoc --proto_path=discovery/proto --go_out=discovery/proto --go_out=paths=source_relative --go-grpc_out=discovery/proto --go-grpc_out=paths=source_relative discovery/proto/hello.proto
----------
protoc: 这是 Protocol Buffers 编译器的命令
--proto_path=discovery/proto: 指定 .proto 文件所在的目录,编译器会在这个目录下查找所有 import 语句中引用的 .proto 文件
--go_out=discovery/proto: 指定生成的 Go 代码的输出目录
--go_out=paths=source_relative: 指示 protoc 生成的 Go 代码中的 import 路径是相对于源 .proto 文件的路径
--go-grpc_out=discovery/proto: 指定生成的 Go gRPC 代码的输出目录
--go-grpc_out=paths=source_relative: 指示 protoc 生成的 Go gRPC 代码中的 import 路径也是相对于源 .proto 文件的路径
discovery/proto/hello.proto: 指定要编译的 .proto 文件的路径
  • 新建discovery/server/server.go discovery/client/client.go

    // discovery/server/server.go
    package main
    
    import (
    	"context"
    	"flag"
    	"fmt"
    	"log"
    	"net"
    
    	pb "etcdstudy/discovery/proto"
    
    	"google.golang.org/grpc"
    )
    
    var (
    	port = flag.Int("port", 50051, "")
    )
    
    type server struct {
    	pb.UnimplementedDiscoveryServer
    }
    
    func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    	fmt.Printf("Recv Client msg: %v \n", in.Msg)
    	return &pb.HelloReply{
    		Msg: "Hello Client",
    	}, nil
    }
    
    func main() {
    	flag.Parse()
    	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    	if err != nil {
    		log.Fatalln(err)
    	}
    	s := grpc.NewServer()
    	pb.RegisterGreeterServer(s, &server{})
    	if err := s.Serve(lis); err != nil {
    		log.Fatalln(err)
    	}
    }
    // discovery/client/client.go
    package main
    
    import (
    	"context"
    	pb "etcdstudy/discovery/proto"
    	"fmt"
    	"log"
    
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    )
    
    func getServerAddr() string {
    	return "localhost:50051"
    }
    
    func sayHello() {
    	addr := getServerAddr()
    	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		log.Fatalln(err)
    	}
    	defer conn.Close()
    
    	c := pb.NewGreeterClient(conn)
    	in := &pb.HelloRequest{
    		Msg: "hello server",
    	}
    	r, err := c.SayHello(context.Background(), in)
    	if err != nil {
    		log.Fatalln(err)
    	}
    	fmt.Println("Recv server msg:", r.Msg)
    }
    
    func main() {
    	sayHello()
    }
  • 服务发现 新建 discovery.go

    // discovery/discovery.go
    package discovery
    
    import (
    	"context"
    	"etcdstudy/etcd"
    	"fmt"
    	"log"
    	"sync"
    
    	"go.etcd.io/etcd/api/v3/mvccpb"
    	clientv3 "go.etcd.io/etcd/client/v3"
    )
    
    type Service struct {
    	Name     string
    	IP       string
    	Port     string
    	Protocol string
    }
    
    func ServiceRegister(s *Service) {
    	// 没有连接池,每次请求都会新建连接,所以性能不高
    	cli, err := clientv3.New(clientv3.Config{
    		Endpoints:   etcd.GetEtcdEndpoints(),
    		DialTimeout: etcd.DialTimeout,
    	})
    	if err != nil {
    		log.Fatalln(err)
    	}
    	defer cli.Close()
    
    	// 注册服务
    	var grantLease bool
    	var leaseID clientv3.LeaseID
    	ctx := context.Background()
    	// 判断key是否存在
    	getRes, err := cli.Get(ctx, s.Name, clientv3.WithCountOnly())
    	if err != nil {
    		log.Fatalln(err)
    	}
    	if getRes.Count == 0 {
    		grantLease = true
    	}
    	// 租约声明
    	if grantLease {
    		leaseResp, err := cli.Grant(ctx, 10) // 10秒租约
    		if err != nil {
    			log.Fatalln(err)
    		}
    		leaseID = leaseResp.ID // 拿到租约
    	}
    	// 事务
    	kv := clientv3.NewKV(cli)
    	txn := kv.Txn(ctx)
    	_, err = txn.If(clientv3.Compare(clientv3.CreateRevision(s.Name), "=", 0)).
    		Then(
    			clientv3.OpPut(s.Name, s.Name, clientv3.WithLease(leaseID)),
    			clientv3.OpPut(s.Name+".ip", s.IP, clientv3.WithLease(leaseID)),
    			clientv3.OpPut(s.Name+".port", s.Port, clientv3.WithLease(leaseID)),
    			clientv3.OpPut(s.Name+".protocol", s.Protocol, clientv3.WithLease(leaseID)),
    		).
    		Else(
    			clientv3.OpPut(s.Name, s.Name, clientv3.WithIgnoreLease()),
    			clientv3.OpPut(s.Name+".ip", s.IP, clientv3.WithIgnoreLease()),
    			clientv3.OpPut(s.Name+".port", s.Port, clientv3.WithIgnoreLease()),
    			clientv3.OpPut(s.Name+".protocol", s.Protocol, clientv3.WithIgnoreLease()),
    		).
    		Commit()
    	if err != nil {
    		log.Fatalln(err)
    	}
    	if grantLease {
    		leaseKeepalive, err := cli.KeepAlive(ctx, leaseID)
    		if err != nil {
    			log.Fatalln(err)
    		}
    		for lease := range leaseKeepalive {
    			fmt.Printf("leaseID: %d, ttl: %d\n", lease.ID, lease.TTL)
    		}
    	}
    }
    
    // 服务发现
    type Services struct {
    	services map[string]*Service
    	sync.RWMutex
    }
    
    var myServices = &Services{
    	services: map[string]*Service{},
    }
    
    func ServiceDiscovery(svcName string) *Service {
    	var s *Service = nil
    	myServices.RLock()
    	s, _ = myServices.services[svcName]
    	myServices.RUnlock()
    	return s
    }
    
    func WatchServiceName(svcName string) {
    	// 没有连接池,每次请求都会新建连接,所以性能不高
    	cli, err := clientv3.New(clientv3.Config{
    		Endpoints:   etcd.GetEtcdEndpoints(),
    		DialTimeout: etcd.DialTimeout,
    	})
    	if err != nil {
    		log.Fatalln(err)
    	}
    	defer cli.Close()
    	// 监听服务
    	getRes, err := cli.Get(context.Background(), svcName, clientv3.WithPrefix())
    	if err != nil {
    		log.Fatalln(err)
    	}
    	// 解析服务
    	if getRes.Count > 0 {
    		mp := sliceToMap(getRes.Kvs)
    		s := &Service{}
    		if kv, ok := mp[svcName]; ok {
    			s.Name = string(kv.Value)
    		}
    		if kv, ok := mp[svcName+".ip"]; ok {
    			s.IP = string(kv.Value)
    		}
    		if kv, ok := mp[svcName+".port"]; ok {
    			s.Port = string(kv.Value)
    		}
    		if kv, ok := mp[svcName+".protocol"]; ok {
    			s.Protocol = string(kv.Value)
    		}
    		myServices.Lock()
    		myServices.services[svcName] = s
    		myServices.Unlock()
    	}
    	rch := cli.Watch(context.Background(), svcName, clientv3.WithPrefix())
    	for wres := range rch {
    		for _, ev := range wres.Events {
    			if ev.Type == clientv3.EventTypeDelete {
    				myServices.Lock()
    				delete(myServices.services, svcName)
    				myServices.Unlock()
    			}
    			if ev.Type == clientv3.EventTypePut {
    				myServices.Lock()
    				if _, ok := myServices.services[svcName]; !ok {
    					myServices.services[svcName] = &Service{}
    				}
    				switch string(ev.Kv.Key) {
    				case svcName:
    					myServices.services[svcName].Name = string(ev.Kv.Value)
    				case svcName + ".ip":
    					myServices.services[svcName].IP = string(ev.Kv.Value)
    				case svcName + ".port":
    					myServices.services[svcName].Port = string(ev.Kv.Value)
    				case svcName + ".protocol":
    					myServices.services[svcName].Protocol = string(ev.Kv.Value)
    				}
    				myServices.Unlock()
    			}
    		}
    	}
    }
    
    func sliceToMap(list []*mvccpb.KeyValue) map[string]*mvccpb.KeyValue {
    	mp := make(map[string]*mvccpb.KeyValue, 0)
    	for _, item := range list {
    		mp[string(item.Key)] = item
    	}
    	return mp
    }
    // server/server.go
    package main
    
    import (
    	"context"
    	"flag"
    	"fmt"
    	"log"
    	"net"
    	"strconv"
    
    	"etcdstudy/discovery"
    	pb "etcdstudy/discovery/proto"
    
    	"google.golang.org/grpc"
    )
    
    var (
    	port = flag.Int("port", 50051, "")
    )
    
    type server struct {
    	pb.UnimplementedDiscoveryServer
    }
    
    func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    	fmt.Printf("Recv Client msg: %v \n", in.Msg)
    	return &pb.HelloReply{
    		Msg: "Hello Client",
    	}, nil
    }
    
    func main() {
    	flag.Parse()
    	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    	if err != nil {
    		log.Fatalln(err)
    	}
    	s := grpc.NewServer()
    	serverRegister(s, &server{})
    	if err := s.Serve(lis); err != nil {
    		log.Fatalln(err)
    	}
    }
    
    func serverRegister(s grpc.ServiceRegistrar, srv pb.GreeterServer) {
    	pb.RegisterGreeterServer(s, srv)
    	s1 := &discovery.Service{
    		Name:     "hello Greeter",
    		Port:     strconv.Itoa(*port),
    		IP:       "localhost",
    		Protocol: "grpc",
    	}
    	go discovery.ServiceRegister(s1)
    }
    // client/client.go
    package main
    
    import (
    	"context"
    	"etcdstudy/discovery"
    	pb "etcdstudy/discovery/proto"
    	"fmt"
    	"log"
    	"time"
    
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    )
    
    func getServerAddr(svcName string) string {
    	s := discovery.ServiceDiscovery(svcName)
    	if s == nil {
    		return ""
    	}
    	if s.IP == "" || s.Port == "" {
    		return ""
    	}
    	return s.IP + ":" + s.Port
    }
    
    func sayHello() {
    	addr := getServerAddr("hello.Greeter")
    	if addr == "" {
    		log.Println("未发现可用服务")
    		return
    	}
    	log.Println("连接服务:", addr)
    	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		log.Println(err)
    		return
    	}
    	defer conn.Close()
    
    	c := pb.NewGreeterClient(conn)
    	in := &pb.HelloRequest{
    		Msg: "hello server",
    	}
    	r, err := c.SayHello(context.Background(), in)
    	if err != nil {
    		log.Println(err)
    		return
    	}
    	fmt.Println("Recv server msg:", r.Msg)
    }
    
    func main() {
    	log.SetFlags(log.Llongfile)
    	go discovery.WatchServiceName("hello.Greeter")
    	for {
    		sayHello()
    		time.Sleep(time.Second * 2)
    	}
    }

    bili

<!-- -->


gin+grpc大型项目管理/协同系统

原则:高内聚,低耦合,更适合大团队协同开发

公认的工程目录

  • cmd:可执行文件,可能有多个main文件

  • internal:内部代码,不希望外部访问

  • pkg:公开代码,外部可以访问

  • config/configs/etc:配置文件

  • scripts:脚本

  • docs:文档

  • third_party:三方辅助工具

  • bin:编译的二进制文件

  • build:持续集成相关

  • deploy:部署相关

  • test:测试文件

  • api:开放的api接口

  • init:初始化函数

创建数据库表操作

搭建项目

  • mkdir ms_project
    cd ms_project
    go work init
    mkdir project-user
    cd project-user
    go mod init test.com/project-user
    cd ..
    go work use ./project-user
    go get -u github.com/gin-gonic/gin
  • // ms_project/project-user/main.go
    package main
    
    import (
    	"context"
    	"log"
    	"net/http"
    	"os"
    	"os/signal"
    	"syscall"
    	"time"
    
    	"github.com/gin-gonic/gin"
    )
    
    func main() {
    	r := gin.Default()
    
    	srv := &http.Server{
    		Addr:    ":80",
    		Handler: r,
    	}
    	// 保证下面的启停,go携程,不阻塞直接执行完
    	go func() {
    		log.Printf("web server running in %s \n", srv.Addr)
    		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
    			log.Fatalln(err)
    		}
    	}()
    
    	//
    	quit := make(chan os.Signal)
    	// SIGINT 用户发送INRT字符(Ctrl+C)触发
    	// SIGTERM 结束程序(可以被捕获、阻塞或忽略)
    	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    	<-quit
    	log.Println("Shutting Down project web server...")
    
    	//
    	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    	defer cancel()
    	if err := srv.Shutdown(ctx); err != nil {
    		log.Fatalln("web server Shutdown, cause by :", err)
    	}
    	select {
    	case <-ctx.Done():
    		log.Println("wait timeout...")
    	}
    	log.Println("web server stop success...")
    }
  • 由于有其他模块都需要用到启停, 将其抽取到公共模块common

  • // ms_project/project-common/go.mod
    module test.com/project-common
    
    go 1.22.1
  • // ms_project
    go work use ./project-common
  • // ms_project/project-common/run.go
    package common
    
    import (
    	"context"
    	"log"
    	"net/http"
    	"os"
    	"os/signal"
    	"syscall"
    	"time"
    
    	"github.com/gin-gonic/gin"
    )
    
    func Run(r *gin.Engine, srvName string, addr string) {
    	srv := &http.Server{
    		Addr:    addr,
    		Handler: r,
    	}
    	// 保证下面的启停,go携程,不阻塞直接执行完
    	go func() {
    		log.Printf("%s running in %s \n", srvName, srv.Addr)
    		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
    			log.Fatalln(err)
    		}
    	}()
    
    	//
    	quit := make(chan os.Signal)
    	// SIGINT 用户发送INRT字符(Ctrl+C)触发
    	// SIGTERM 结束程序(可以被捕获、阻塞或忽略)
    	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    	<-quit
    	log.Println("Shutting Down project %s...\n", srvName)
    
    	//
    	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    	defer cancel()
    	if err := srv.Shutdown(ctx); err != nil {
    		log.Fatalf("%s Shutdown, cause by :%v\n", srvName, err)
    	}
    	select {
    	case <-ctx.Done():
    		log.Println("wait timeout...")
    	}
    	log.Printf("%s stop success...\n", srvName)
    }
  • // ms_project/project-user/main.go
    package main
    
    import (
    	"github.com/gin-gonic/gin"
    	srv "test.com/project-common"
    )
    
    func main() {
    	r := gin.Default()
    	srv.Run(r, "project-user", ":80")
    }

路由

// ms_project\project-user\router\router.go
package router

import (
	"github.com/gin-gonic/gin"
	"test.com/project-user/api/user"
)

type Router interface {
	Route(r *gin.Engine)
}

type RouterRegister struct {
}

func New() *RouterRegister {
	return &RouterRegister{}
}

func (*RouterRegister) Route(ro Router, r *gin.Engine) {
	ro.Route(r)
}

func InitRouter(r *gin.Engine) {
	rg := New()
	rg.Route(&user.RouterUser{}, r)
}
// ms_project\project-user\api\user\route.go
package user

import "github.com/gin-gonic/gin"

type RouterUser struct {
}

func (*RouterUser) Route(r *gin.Engine) {
	h := HandlerUser{}
	r.POST("/project/login/getCaptcha", h.getCaptcha)
	// localhost/project/login/getCaptcha
}
// ms_project\project-user\api\user\user.go
package user

import "github.com/gin-gonic/gin"

type HandlerUser struct {
}

func (*HandlerUser) getCaptcha(ctx *gin.Context) {
	ctx.JSON(200, "getCaptcha success")
}

第二种方式(在第一种的基础上更改)

// ms_project\project-user\router\router.go
package router

import (
	"github.com/gin-gonic/gin"
)

type Router interface {
	Route(r *gin.Engine)
}

type RouterRegister struct {
}

func New() *RouterRegister {
	return &RouterRegister{}
}

func (*RouterRegister) Route1(ro Router, r *gin.Engine) {
	ro.Route(r)
}

var routers []Router

func InitRouter(r *gin.Engine) {
	// rg := New()
	// rg.Route1(&user.RouterUser{}, r)

	for _, ro := range routers {
		ro.Route(r)
	}
}

func Register(ro ...Router) {
	routers = append(routers, ro...)
}
// ms_project\project-user\api\user\route.go
package user

import (
	"log"

	"github.com/gin-gonic/gin"
	"test.com/project-user/router"
)

func init() {
	log.Println("init user router")
	router.Register(&RouterUser{})
}

type RouterUser struct {
}

func (*RouterUser) Route(r *gin.Engine) {
	h := HandlerUser{}
	r.POST("/project/login/getCaptcha", h.getCaptcha)
	// localhost/project/login/getCaptcha
}
// ms_project\project-user\api\api.go
package api

import (
	_ "test.com/project-user/api/user"
)
// ms_project/project-user/main.go
package main

import (
	"github.com/gin-gonic/gin"
	srv "test.com/project-common"
	_ "test.com/project-user/api"
	"test.com/project-user/router"
)

func main() {
	r := gin.Default()
	router.InitRouter(r)
	srv.Run(r, "project-user", ":80")
}

发送验证码接口

// ms_project\project-common\model.go
package common

type BusinessCode int

type Result struct {
	Code BusinessCode `json:"code"`
	Msg  string       `json:"msg"`
	Data any          `json:"data"`
}

func (r *Result) Success(data any) *Result {
	r.Code = 200
	r.Msg = "success"
	r.Data = data
	return r
}

func (r *Result) Fail(code BusinessCode, msg string) *Result {
	r.Code = code
	r.Msg = msg
	return r
}
// ms_project\project-user\api\user\user.go
package user

import (
	"github.com/gin-gonic/gin"
	common "test.com/project-common"
)

type HandlerUser struct {
}

func (*HandlerUser) getCaptcha(ctx *gin.Context) {
	rsp := &common.Result{}
	ctx.JSON(200, rsp.Success("123456"))
}

代码思路:

获取参数

校验参数

生成验证码(随机四位1000~9999或者六位100000~999999)

调用短信平台(三方 放入go协程中执行 接口可以快速响应)

存储验证码redis中 过期时间15分钟

// ms_project\project-user\api\user\user.go
package user

import (
	"log"
	"net/http"
	"time"

	"github.com/gin-gonic/gin"
	common "test.com/project-common"
	"test.com/project-user/pkg/model"
)

type HandlerUser struct {
}

func (*HandlerUser) getCaptcha(ctx *gin.Context) {
	rsp := &common.Result{}
	// 获取参数
	mobile := ctx.PostForm("mobile")
	// 校验参数
	if !common.VerifyMobile(mobile) {
		ctx.JSON(http.StatusOK, rsp.Fail(model.NoLegalMobile, "手机号码不合法"))
		return
	}
	// 生成验证码(随机四位1000~9999或者六位100000~999999)
	code := "123456"
	// 调用短信平台(三方 放入go协程中执行 接口可以快速响应)
	go func() {
		time.Sleep(2 * time.Second)
		log.Println("短信平台调用成功,发送短信")
		// 存储验证码`redis`中 过期时间15分钟
		log.Printf("将手机号和验证码存入Redis成功: REGISTER_%v : %v", mobile, code)
	}()

	ctx.JSON(http.StatusOK, rsp.Success(code))
}
// ms_project\project-common\validate.go
package common

import "regexp"

// VerifyMobile 验证手机号合法性
func VerifyMobile(mobile string) bool {
	if mobile == "" {
		return false
	}
	regular := "^((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|166|198|199|(147))\\d{8}$"
	reg := regexp.MustCompile(regular)
	return reg.MatchString(mobile)
}
// ms_project\project-user\pkg\model\code.go
package model

import common "test.com/project-common"

const (
	NoLegalMobile common.BusinessCode = 2001 // 手机号不合法
)

导入redis支持_接口应用

启动Redis

go get github.com/go-redis/redis/v8
// ms_project\project-user\api\user\user.go
package user

import (
	"context"
	"log"
	"net/http"
	"time"

	"github.com/gin-gonic/gin"
	common "test.com/project-common"
	"test.com/project-user/pkg/dao"
	"test.com/project-user/pkg/model"
	"test.com/project-user/pkg/repo"
)

type HandlerUser struct {
	cache repo.Cache
}

func New() *HandlerUser {
	return &HandlerUser{
		cache: dao.Rc,
	}
}

func (h *HandlerUser) getCaptcha(ctx *gin.Context) {
	rsp := &common.Result{}
	// 获取参数
	mobile := ctx.PostForm("mobile")
	// 校验参数
	if !common.VerifyMobile(mobile) {
		ctx.JSON(http.StatusOK, rsp.Fail(model.NoLegalMobile, "手机号码不合法"))
		return
	}
	// 生成验证码(随机四位1000~9999或者六位100000~999999)
	code := "123456"
	// 调用短信平台(三方 放入go协程中执行 接口可以快速响应)
	go func() {
		time.Sleep(2 * time.Second)
		log.Println("短信平台调用成功,发送短信")
		// redis设置后续缓存可能存在mysql当中,也可能存在mongo,也可能存在memcache中
		c, cancel := context.WithTimeout(context.Background(), 2*time.Second)
		defer cancel()
		err := h.cache.Put(c, "REGISTER_"+mobile, code, 15*time.Second)
		if err != nil {
			log.Printf("验证码存入redis出错,cause by: %v\n", err)
		}
		// 存储验证码`redis`中 过期时间15分钟
	}()

	ctx.JSON(http.StatusOK, rsp.Success(code))
}
// ms_project\project-user\pkg\repo\cache.go
package repo

import (
	"context"
	"time"
)

type Cache interface {
	Put(ctx context.Context, key, value string, expire time.Duration) error
	Get(ctx context.Context, key string) (string, error)
}
// ms_project\project-user\pkg\dao\redis.go
package dao

import (
	"context"
	"time"

	"github.com/go-redis/redis/v8"
)

var Rc *RedisCache

type RedisCache struct {
	rdb *redis.Client
}

func init() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})
	Rc = &RedisCache{
		rdb: rdb,
	}
}

func (rc *RedisCache) Put(ctx context.Context, key, value string, expire time.Duration) error {
	err := rc.rdb.Set(ctx, key, value, expire).Err()
	return err
}

func (rc *RedisCache) Get(ctx context.Context, key string) (string, error) {
	result, err := rc.rdb.Get(ctx, key).Result()
	return result, err
}
// ms_project\project-user\api\user\route.go
package user

import (
	"log"

	"github.com/gin-gonic/gin"
	"test.com/project-user/router"
)

func init() {
	log.Println("init user router")
	router.Register(&RouterUser{})
}

type RouterUser struct {
}

func (*RouterUser) Route(r *gin.Engine) {
	h := New()
	r.POST("/project/login/getCaptcha", h.getCaptcha)
	// localhost/project/login/getCaptcha
}

日志

原生日志,并不能很好的区分日志级别,所以需要集成一个流行的日志库进来

uber开源的zap日志库:https://github.com/uber-go/zap

// 安装zap库
go get -u go.uber.org/zap
// 安装日志分割库
go get -u github.com/natefinch/lumberjack

日志是公用的,所以可以放到common进行多次使用

日志存储有多种方式:按日志级别存储到不同文件,按业务逻辑来分别记录不同级别的日志,按照包结构划分比同级别日志

项目庞大时,日志记录越细分越能便捷定位

一般按业务记录即可

  • 这里按日志级别:debug以上记录一个,info以上记录一个,warn以上记录一个

// ms_project\project-common\logs\logs.go
package logs

import (
	"net"
	"net/http"
	"net/http/httputil"
	"os"
	"runtime/debug"
	"strings"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/natefinch/lumberjack"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

var LG *zap.Logger

type LogConfig struct {
	DebugFileName string `json:"debugFileName"`
	InfoFileName  string `json:"infoFileName"`
	WarnFileName  string `json:"warnFileName"`
	MaxSize       int    `json:"maxsize"`
	MaxAge        int    `json:"max_age"`
	MaxBackups    int    `json:"max_backups"`
}

// InitLogger 初始化Logger
func InitLogger(cfg *LogConfig) (err error) {
	writeSyncerDebug := getLogWriter(cfg.DebugFileName, cfg.MaxSize, cfg.MaxBackups, cfg.MaxAge)
	writeSyncerInfo := getLogWriter(cfg.InfoFileName, cfg.MaxSize, cfg.MaxBackups, cfg.MaxAge)
	writeSyncerWarn := getLogWriter(cfg.WarnFileName, cfg.MaxSize, cfg.MaxBackups, cfg.MaxAge)
	encoder := getEncoder()
	// 文件输出
	debugCore := zapcore.NewCore(encoder, writeSyncerDebug, zapcore.DebugLevel)
	infoCore := zapcore.NewCore(encoder, writeSyncerInfo, zap.InfoLevel)
	warnCore := zapcore.NewCore(encoder, writeSyncerWarn, zap.WarnLevel)
	// 标准输出
	consoleEncoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig())
	std := zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), zapcore.DebugLevel)
	core := zapcore.NewTee(debugCore, infoCore, warnCore, std)
	LG = zap.New(core, zap.AddCaller())
	zap.ReplaceGlobals(LG) // 替换zap包中全局的logger实例, 后续在其他包中只需要zap.L()调用即可
	return
}

func getEncoder() zapcore.Encoder {
	encoderConfig := zap.NewProductionEncoderConfig()
	encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
	encoderConfig.TimeKey = "time"
	encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
	encoderConfig.EncodeDuration = zapcore.SecondsDurationEncoder
	encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
	return zapcore.NewJSONEncoder(encoderConfig)

}

func getLogWriter(filename string, maxSize, maxBackup, maxAge int) zapcore.WriteSyncer {
	lumberJackLogger := &lumberjack.Logger{
		Filename:   filename,
		MaxSize:    maxSize,
		MaxBackups: maxBackup,
		MaxAge:     maxAge,
	}
	return zapcore.AddSync(lumberJackLogger)
}

// Ginlogger 接收gin框架默认的日志
func Ginlogger() gin.HandlerFunc {
	return func(c *gin.Context) {
		start := time.Now()
		path := c.Request.URL.Path
		query := c.Request.URL.RawQuery
		c.Next()

		cost := time.Since(start)
		LG.Info(path,
			zap.Int("status", c.Writer.Status()),
			zap.String("method", c.Request.Method),
			zap.String("path", path),
			zap.String("query", query),
			zap.String("ip", c.ClientIP()),
			zap.String("user-agent", c.Request.UserAgent()),
			zap.String("errors", c.Errors.ByType(gin.ErrorTypePrivate).String()),
			zap.Duration("cost", cost),
		)
	}
}

// GinRecovery recover 掉项目可能出现的panic,并使用zap记录相关日志
func GinRecovery(stack bool) gin.HandlerFunc {
	return func(c *gin.Context) {
		defer func() {
			if err := recover(); err != nil {
				// Check for a broken connecttion, as it is not really a
				// condition that warrants a panic stack trace.
				var brokenPipe bool
				if ne, ok := err.(*net.OpError); ok {
					if se, ok := ne.Err.(*os.SyscallError); ok {
						// if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "broken pipe") {
						// 	brokenPipe = true
						// }
						if strings.Contains(strings.ToLower(se.Error()), "broken pipe") {
							brokenPipe = true
						}
					}
				}

				httpRequest, _ := httputil.DumpRequest(c.Request, false)
				if brokenPipe {
					LG.Error(c.Request.URL.Path,
						zap.Any("error", err),
						zap.String("request", string(httpRequest)),
					)
					// If the connection is dead, we can't write a status to it.
					c.Error(err.(error)) // nolint: errcheck
					c.Abort()
					return
				}

				if stack {
					LG.Error("[Recovery from panic]",
						zap.Any("error", err),
						zap.String("request", string(httpRequest)),
						zap.String("stack", string(debug.Stack())),
					)
				} else {
					LG.Error("[Recovery from panic]",
						zap.Any("error", err),
						zap.String("request", string(httpRequest)),
					)
				}
				c.AbortWithStatus(http.StatusInternalServerError)
			}

		}()
		c.Next()
	}
}
// ms_project/project-user/main.go
package main

import (
	"log"

	"github.com/gin-gonic/gin"
	srv "test.com/project-common"
	"test.com/project-common/logs"
	_ "test.com/project-user/api"
	"test.com/project-user/router"
)

func main() {
	r := gin.Default()

	lc := &logs.LogConfig{
		DebugFileName: "D:\\Godemo\\gRPC_Gin项目\\ms_project\\project-common\\logs\\debug\\project-debug.log",
		InfoFileName:  "D:\\Godemo\\gRPC_Gin项目\\ms_project\\project-common\\logs\\info\\project-info.log",
		WarnFileName:  "D:\\Godemo\\gRPC_Gin项目\\ms_project\\project-common\\logs\\error\\project-error.log",
		MaxSize:       500,
		MaxAge:        28,
		MaxBackups:    3,
	}
	err := logs.InitLogger(lc)
	if err != nil {
		log.Fatalln(err)
	}

	router.InitRouter(r)
	srv.Run(r, "project-user", ":80")
}
// ms_project\project-user\api\user\user.go
package user

import (
	"context"
	"log"
	"net/http"
	"time"

	"github.com/gin-gonic/gin"
	"go.uber.org/zap"
	common "test.com/project-common"
	"test.com/project-common/logs"
	"test.com/project-user/pkg/dao"
	"test.com/project-user/pkg/model"
	"test.com/project-user/pkg/repo"
)

type HandlerUser struct {
	cache repo.Cache
}

func New() *HandlerUser {
	return &HandlerUser{
		cache: dao.Rc,
	}
}

func (h *HandlerUser) getCaptcha(ctx *gin.Context) {
	rsp := &common.Result{}
	// 获取参数
	mobile := ctx.PostForm("mobile")
	// 校验参数
	if !common.VerifyMobile(mobile) {
		ctx.JSON(http.StatusOK, rsp.Fail(model.NoLegalMobile, "手机号码不合法"))
		return
	}
	// 生成验证码(随机四位1000~9999或者六位100000~999999)
	code := "123456"
	// 调用短信平台(三方 放入go协程中执行 接口可以快速响应)
	go func() {
		time.Sleep(2 * time.Second)
		zap.L().Info("短信平台调用成功,发送短信 Info")
		logs.LG.Debug("短信平台调用成功,发送短信 Debug")
		zap.L().Error("短信平台调用成功,发送短信 Error")
		// redis设置后续缓存可能存在mysql当中,也可能存在mongo,也可能存在memcache中
		c, cancel := context.WithTimeout(context.Background(), 2*time.Second)
		defer cancel()
		err := h.cache.Put(c, "REGISTER_"+mobile, code, 15*time.Second)
		if err != nil {
			log.Printf("验证码存入redis出错,cause by: %v\n", err)
		}
		// 存储验证码`redis`中 过期时间15分钟
	}()

	ctx.JSON(http.StatusOK, rsp.Success(code))
}

配置 |gRPC| etcd

go get github.com/spf13/viper
# ms_project\project-user\config\app.yaml

读配置


<!-- 备忘录gRPC -->

主要依赖

  • gin

  • gorm

  • grpc

  • mysql

安装gRPC

go get gogle.golang.org/grpcs
go get gogle.golang.org/protobuf

安装protor

proyor是可用于通信协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式

链接:百度网盘 请输入提取码 提取码:jw8a

把解压出来的bin目录放在系统变量中

D:demo_rely\protoc-3.15.5-win64\bin

// cmd中测试
protoc --version

项目结构

grpc_todolist项目总体

grpc-todolist/
├── app                   // 各个微服务
│   ├── gateway           // 网关
│   ├── task              // 任务模块微服务
│   └── user              // 用户模块微服务
├── bin                   // 编译后的二进制文件模块
├── config                // 配置文件
├── consts                // 定义的常量
├── doc                   // 接口文档
├── idl                   // protoc文件
│   └── pb                // 放置生成的pb文件
├── logs                  // 放置打印日志模块
├── pkg                   // 各种包
│   ├── e                 // 统一错误状态码
│   ├── discovery         // etcd服务注册、keep-alive、获取服务信息等等
│   ├── res               // 统一response接口返回
│   └── util              // 各种工具、JWT、Logger等等..
└── types                 // 定义各种结构体

proto文件定义gateway 网关部分

sgateway/
├── cmd                   // 启动入口
├── internal              // 业务逻辑(不对外暴露)
│   ├── handler           // 视图层
│   └── service           // 服务层
│       └── pb            // 放置生成的pb文件
├── logs                  // 放置打印日志模块
├── middleware            // 中间件
├── routes                // http 路由模块
└── rpc                   // rpc 调用

user && task 用户与任务模块

user/
├── cmd                   // 启动入口
└──internal               // 业务逻辑(不对外暴露)
   ├── service            // 业务服务
   └── repository         // 持久层
       └── db             // 视图层
           ├── dao        // 对数据库进行操作
           └── model      // 定义数据库的模型

etcd 链接:百度网盘 请输入提取码 提取码:7777 etcdkeeper 链接:百度网盘 请输入提取码 提取码:7777

  • api-gateway/

  • user/

    • cmd/

      • main.go

    • config/

      • config.go

      • config.yml

    • discovery/

    • internal/

      • hander/

      • repository/

        • db_init.go

      • service/

    • pkg/

      • e/

      • res/

      • util/

user模块创建数据库

// config/config.yml
server:
  domain: user
  version: 1.0
  jwtSecret: FanOne
  grpcAddress: "127.0.0.1:10001"

mysql:
  driverName: mysql
  host: 127.0.0.1
  port: 3306
  database: grpc_todolist_tmp
  username: root
  password: l20030328
  charset: utf8mb4

etcd:
  address: 127.0.0.1:2379

redis:
  address: 127.0.0.1:6379
  password: 

// cmd
cd user
// /user>
go get github.com/spf13/viper

// config/config.go
package config

import (
	"fmt"
	"os"
	"path/filepath"

	"github.com/spf13/viper"
)

func InitConfig() {
	wDir, _ := os.Getwd()         // 获取当前工作目录
	workDir := filepath.Dir(wDir) // 获取到文件的上层目录
	fmt.Println(workDir)
	fmt.Println("-------------------")
	viper.SetConfigName("config")            // 配置文件的文件名
	viper.SetConfigType("yml")               // 配置文件的后缀
	viper.AddConfigPath(workDir + "/config") // 获取到配置文件的路径
	err := viper.ReadInConfig()
	if err != nil {
		fmt.Println("err:", err)
		return
	}
}

// user/internal/repository/db_init.go
package repository

import (
	"strings"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/spf13/viper"
	"gorm.io/driver/mysql"
	"gorm.io/gorm"
	"gorm.io/gorm/logger"
	"gorm.io/gorm/schema"
)

var DB *gorm.DB

func InitDB() {
	host := viper.GetString("mysql.host")
	port := viper.GetString("mysql.port")
	database := viper.GetString("mysql.database")
	username := viper.GetString("mysql.username")
	password := viper.GetString("mysql.password")
	charset := viper.GetString("mysql.charset")
	dsn := strings.Join([]string{username, ":", password, "@tcp(", host, ":", port, ")/", database, "?charset=" + charset + "&parseTime=true"}, "")
	err := Database(dsn)
	if err != nil {
		panic(err)
	}
}

// gorm的定义
func Database(dsn string) error {
	var ormLogger logger.Interface
	if gin.Mode() == "debug" {
		ormLogger = logger.Default.LogMode(logger.Info)
	} else {
		ormLogger = logger.Default
	}
	db, err := gorm.Open(mysql.New(mysql.Config{
		DSN:                       dsn,
		DefaultStringSize:         256,
		DisableDatetimePrecision:  true,  // 禁止datatime的精度, mysql5.6之前的数据是不支持的
		DontSupportRenameIndex:    true,  // 重命名索引的时候采取删除并新建的方式
		DontSupportRenameColumn:   true,  // 用change重命名列
		SkipInitializeWithVersion: false, // 根据版本自动配置
		// 使用gorm.Open和MySQL的Config结构来初始化GORM连接
	}), &gorm.Config{
		// &gorm.Config设置GORM的额外配置,如日志记录器、命名策略
		Logger: ormLogger,
		NamingStrategy: schema.NamingStrategy{
			SingularTable: true, // 设置表名不自动加s
		},
	})
	if err != nil {
		return err
	}
	sqlDB, _ := db.DB()
	sqlDB.SetMaxIdleConns(20)  // 设置连接池,空闲
	sqlDB.SetMaxOpenConns(100) // 设置最大连接数
	sqlDB.SetConnMaxLifetime(time.Second * 30)
	DB = db
	migration()
	return err
}

// cmd
go get github.com/gin-gonic/gin
go get gorm.io/driver/mysql

  • user/internal/repository/

    • migration.go

// user/internal/repository/migration.go
package repository

import "fmt"

func migration() {
	err := DB.Set("gorm:table_options", "charset=utf8mb4").
		AutoMigrate(
			&User{},
		) //GORM提供的一个非常方便的函数,用于自动迁移或创建表结构
		// Set为即将迁移的表设置一些额外的选项, 设置了字符集为utf8mb4
	if err != nil {
		fmt.Println("migration err", err)
	}
}

  • user/internal/repository/

    • user.go

// user/internal/repository/user.go
package repository

type User struct {
	UserId         uint   `gorm:"primarykey"`
	UserName       string `gorm:"unique"`
	NickName       string
	PasswordDigest string
}

// cmd
net start mysql
mysql -u root -p
create database grpc_todolist_tmp charset=utf8mb4;
net stop mysql

  • user/cmd/

    • main.go

// user/cmd/main.go
package main

import (
	"memo_grpc/user/config"
	"memo_grpc/user/internal/repository"
)

func main() {
	config.InitConfig()
	repository.InitDB()
}

user模块dao

  • user/internal/service/

    • pb/userModels.proto

// user/internal/service/pb/userModels.proto
syntax="proto3";
package pb;
option go_package="/internal/service;service";
// /internal/service 生成文件所在的路径 ;service属于service包

message UserModel{
    // @inject_tag: json:"user_id"
    uint32 UserID=1;
    // @inject_tag: json:"user_name"
    string UserName=2;
    // @inject_tag: json:"nick_name"
    string NickName=3;
}
  • user/internal/service/pb/

    • userService.proto

// user/internal/service/pb/userService.proto
syntax = "proto3";
package pb;
import "userModels.proto";
option go_package="/internal/service;service";
// /internal/service 生成文件所在的路径 ;service属于service包

message UserRequest {
    //@inject_tag: json:"nick_name" form:"nick_name"
    string NickName=1;
    //@inject_tag: json:"user_name" form:"user_name"
    string UserName=2;
    //@inject_tag: json:"password" form:"password"
    string password=3;
    //@inject_tag: json:"password_confirm" form:"password_confirm"
    string PasswordConfirm=4;
}

message UserDetailResponse{
    UserModel UserDetail=1;
    uint32 Code=2;
}

service UserService {
    rpc UserLogin(UserRequest) returns(UserDetailResponse);
    rpc UserRegister(UserRequest) returns(UserDetailResponse);
}

// 下载安装protoc-gen-go
git clone https://gitcode.com/golang/protobuf.git
进入到 protobuf/protoc-gen-go 目录执行以下命令:
go build -o protoc-gen-go main.go
复制 protoc-gen-go.exe 到 GOROOT的 bin 目录
// 我的protoc-gen-go.exe文件在GO/bin目录下

// cmd
cd ./user/
protoc -I internal/service/pb internal/service/pb/*.proto --go_out=plugins=grpc:.
// internal/service/pb pb所在的文件 internal/service/pb/*.proto是需要生成的文件

  • user/pkg/e/

    • code.go

    • msg.go

// user/pkg/e/code.go
package e

const (
	Success       = 200
	Error         = 500
	InvalidParams = 400
)
// user/pkg/e/msg.go
package e

var MsgFlags = map[uint]string{
	Success:       "ok",
	Error:         "fail",
	InvalidParams: "请求的参数错误",
}

// GetMsg 获取状态码对应的信息
func GetMsg(code uint) string {
	msg, ok := MsgFlags[code]
	if ok {
		return msg
	}
	return MsgFlags[Error]
}

// cmd 加密包
go get golang.org/x/crypto/bcrypt

// user/internal/repository/user.go
package repository

import (
	"errors"
	"memo_grpc/user/internal/service"

	"golang.org/x/crypto/bcrypt"
	"gorm.io/gorm"
)

type User struct {
	UserId         uint   `gorm:"primarykey"`
	UserName       string `gorm:"unique"`
	NickName       string
	PasswordDigest string
}

const (
	PasswordCost = 12 // 密码加密难度
)

// CheckUserExist检查用户是否存在
func (user *User) CheckUserExist(req *service.UserRequest) bool {
	if err := DB.Where("user_name=?", req.UserName).First(&user).Error; err == gorm.ErrRecordNotFound {
		return false
	}
	return true
}

// ShowUserInfo获取用户信息
func (user *User) ShowUserInfo(req *service.UserRequest) error {
	if exist := user.CheckUserExist(req); exist {
		return nil
	}
	return errors.New("UserName Not Exist")
}

// UserCreate 创建用户
func (*User) UserCreate(req *service.UserRequest) error {
	var count int64
	DB.Where("username=?", req.UserName).Count(&count)
	if count != 0 {
		return errors.New("UserName Exist")
	}
	user := User{
		UserName: req.UserName,
		NickName: req.NickName,
	}
	// 密码的加密
	_ = user.SetPassword(req.Password)
	err := DB.Create(&user).Error;
	return err
}

// SetPassword 加密密码
func (user *User) SetPassword(password string) error {
	bytes, err := bcrypt.GenerateFromPassword([]byte(password), PasswordCost)
	if err != nil {
		return err
	}
	user.PasswordDigest = string(bytes)
	return nil
}

// CheckPassword 检验密码
func (user *User) CheckPassword(password string) bool {
	err := bcrypt.CompareHashAndPassword([]byte(user.PasswordDigest), []byte(password))
	return err == nil
}

// BuildUser序列化User
func BuildUser(item User) *service.UserModel {
	userModel := service.UserModel{
		UserID:   uint32(item.UserId),
		UserName: item.UserName,
		NickName: item.NickName,
	}
	return &userModel
}

  • user/internal/handler/

    • user.go

// user/internal/handler/user.go
package handler

import (
	"context"
	"memo_grpc/user/internal/repository"
	"memo_grpc/user/internal/service"
	"memo_grpc/user/pkg/e"
)

type UserService struct {
}

func NewUserService() *UserService {
	return &UserService{}
}

// UserLogin 用户登录 token不在服务层,不对数据库进行操作,在网关层操作
func (*UserService) UserLogin(ctx context.Context, req *service.UserRequest) (resp *service.UserDetailResponse, err error) {
	var user repository.User
	resp = new(service.UserDetailResponse)
	resp.Code = e.Success
	err = user.ShowUserInfo(req)
	if err != nil {
		resp.Code = e.Error
		return resp, err
	}
	resp.UserDetail = repository.BuildUser(user)
	return resp, nil
}

// 用户注册
func (*UserService) UserRegister(ctx context.Context, req *service.UserRequest) (resp *service.UserDetailResponse, err error) {
	var user repository.User
	resp = new(service.UserDetailResponse)
	resp.Code = e.Success
	err = user.UserCreate(req)
	if err != nil {
		resp.Code = e.Error
		return resp, err
	}
	resp.UserDetail = repository.BuildUser(user)
	return resp, nil
}

etcd模块1

// cmd
go get go.etcd.io/etcd/client/v3
go get github.com/sirupsen/logrus
go get google.golang.org/grpc/resolver

  • user/discovery/

    • register.go

    • resolver.go

    • instance.go

// user/discovery/register.go
package discovery

import (
	"github.com/sirupsen/logrus"
	clientv3 "go.etcd.io/etcd/client/v3"
)

type Register struct {
	EtcdAddrs   []string
	DialTimeout int // 超时时间

	closeCh     chan struct{}
	leasesID    clientv3.LeaseID
	keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse

	srvInfo Server
	srvTTL  int64
	cli     *clientv3.Client
	logger  *logrus.Logger
}
// user/discovery/resolver.go
package discovery

import (
	"github.com/sirupsen/logrus"
	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc/resolver"
)

type Resolver struct {
	schema      string
	EtcdAddrs   []string
	DialTimeout int

	closeCh      chan struct{}
	watchCh      clientv3.WatchChan
	cli          *clientv3.Client
	keyPrifix    string
	srvAddrsList []resolver.Address

	cc     resolver.ClientConn
	logger *logrus.Logger
}
// user/discovery/instance.go
package discovery

import (
	"encoding/json"
	"errors"
	"fmt"
	"strings"
)

type Server struct {
	Name    string `json:"name"`
	Addr    string `json:"addr"`
	Version string `json:"version"` // 版本
	Weight  int64  `json:"weight"`  // 权重
}

func BuildPrefix(server Server) string {
	if server.Version == "" {
		return fmt.Sprintf("/%s/", server.Name)
	}
	return fmt.Sprintf("/%s/%s/", server.Name, server.Version)
}

func BuildRegisterPath(server Server) string {
	return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr)
}

// ParseValue 将value值反序列化到一个Server实例当中
func ParseValue(value []byte) (Server, error) {
	server := Server{}
	if err := json.Unmarshal(value, &server); err != nil {
		return server, err
	}
	return server, nil
}

func SplitPath(path string) (Server, error) {
	server := Server{}
	strs := strings.Split(path, "/")
	if len(strs) == 0 {
		return server, errors.New("invalid path")
	}
	server.Addr = strs[len(strs)-1]
	return server, nil
}

etcd模块2

// user/discovery/instance.go
package discovery

import (
	"encoding/json"
	"errors"
	"fmt"
	"strings"

	"google.golang.org/grpc/resolver"
)

type Server struct {
	Name    string `json:"name"`
	Addr    string `json:"addr"`
	Version string `json:"version"` // 版本
	Weight  int64  `json:"weight"`  // 权重
}

func BuildPrefix(server Server) string {
	if server.Version == "" {
		return fmt.Sprintf("/%s/", server.Name)
	}
	return fmt.Sprintf("/%s/%s/", server.Name, server.Version)
}

func BuildRegisterPath(server Server) string {
	return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr)
}

// ParseValue 将value值反序列化到一个Server实例当中
func ParseValue(value []byte) (Server, error) {
	server := Server{}
	if err := json.Unmarshal(value, &server); err != nil {
		return server, err
	}
	return server, nil
}

func SplitPath(path string) (Server, error) {
	server := Server{}
	strs := strings.Split(path, "/")
	if len(strs) == 0 {
		return server, errors.New("invalid path")
	}
	server.Addr = strs[len(strs)-1]
	return server, nil
}

func Exist(l []resolver.Address, addr resolver.Address) bool {
	for i := range l {
		if l[i].Addr == addr.Addr {
			return true
		}
	}
	return false
}
// user/discovery/register.go
package discovery

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/sirupsen/logrus"
	clientv3 "go.etcd.io/etcd/client/v3"
)

type Register struct {
	EtcdAddrs   []string
	DialTimeout int // 超时时间

	closeCh     chan struct{} // 通常用于通知或同步,关闭与etcd的连接或停止某些操作
	leasesID    clientv3.LeaseID
	keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse

	srvInfo Server
	srvTTL  int64
	cli     *clientv3.Client
	logger  *logrus.Logger
}

// NewRegister 基于ETCD创建一个register
func NewRegister(etcdAddrs []string, logger *logrus.Logger) *Register {
	return &Register{
		EtcdAddrs:   etcdAddrs,
		DialTimeout: 3,
		logger:      logger,
	}
}

// 初始化自己的register
func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {
	var err error
	if strings.Split(srvInfo.Addr, ":")[0] == "" {
		return nil, errors.New("invalid ip address")
	}
	// 初始化     新建一个连接
	if r.cli, err = clientv3.New(clientv3.Config{
		Endpoints:   r.EtcdAddrs,
		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
	}); err != nil {
		return nil, err
	}

	r.srvInfo = srvInfo
	r.srvTTL = ttl

	if err = r.register(); err != nil {
		return nil, err
	} // 注册
	r.closeCh = make(chan struct{})
	go r.keepAlive()
	return r.closeCh, nil
}

// 创建etcd自带的那些实例
func (r *Register) register() error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)
	defer cancel()
    // defer cancel() 的作用是确保无论函数何时返回(正常返回或因为错误而返回),cancel 函数都会被调用

	leaseResp, err := r.cli.Grant(ctx, r.srvTTL)
	if err != nil {
		return err
	}

	r.leasesID = leaseResp.ID

	if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), r.leasesID); err != nil {
		return err
	}
	data, err := json.Marshal(r.srvInfo)
	if err != nil {
		return err
	}

	_, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))

	return err
}

func (r *Register) keepAlive() {
	ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)
	for {
		select {
		case <-r.closeCh:
			if err := r.unregister(); err != nil {
				// 注销服务器信息
				fmt.Println("unregister failed error", err)
			}
			if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {
				fmt.Println("revoke fail")
				// 撤销租约(调用Etcd客户端的Revoke方法)
			}
		case res := <-r.keepAliveCh:
			if res == nil {
				// 尝试重新注册服务器信息
				if err := r.register(); err != nil {
					fmt.Println("register err")
				}
			}
		case <-ticker.C:
			if r.keepAliveCh == nil {
				if err := r.register(); err != nil {
					fmt.Println("register err")
				}
			}
		}
	}
}

func (r *Register) unregister() error {
	_, err := r.cli.Delete(context.Background(), BuildRegisterPath(r.srvInfo))
	return err
}

etcd模块3

// user/cmd/main.go
package main

import (
	"memo_grpc/user/config"
	"memo_grpc/user/discovery"
	"memo_grpc/user/internal/handler"
	"memo_grpc/user/internal/repository"
	"memo_grpc/user/internal/service"
	"net"

	"github.com/sirupsen/logrus"
	"github.com/spf13/viper"
	"google.golang.org/grpc"
)

func main() {
	config.InitConfig()
	repository.InitDB()

	// etcd 地址
	etcdAddress := []string{viper.GetString("etcd.address")}
	// 服务的注册
	etcdRegister := discovery.NewRegister(etcdAddress, logrus.New())
	grpcAddress := viper.GetString("server.grpcAddress")
	userNode := discovery.Server{
		Name: viper.GetString("server.domain"),
		Addr: grpcAddress,
	}
	server := grpc.NewServer()
	defer server.Stop()
	// 绑定服务      返回的服务实例绑定到GRPC服务器上->客户端可以通过GRPC调用此服务的方法
	service.RegisterUserServiceServer(server, handler.NewUserService())
	lis, err := net.Listen("tcp", grpcAddress) // 在指定的GRPC地址上监听TCP连接
	if err != nil {
		panic(err)
	}
	if _, err := etcdRegister.Register(userNode, 10); err != nil {
		panic(err)
		// 将服务器节点注册到etcd上,并指定租约时间为10秒
	}
	if err = server.Serve(lis); err != nil {
		panic(err)
		// 启动GRPC服务器,并使其开始监听并处理传入的连接
	}
}

网关模块_user接入网关1

  • api-gateway/

    • cmd/

    • config/

    • internal/

      • handler/

      • service/

        • pb/

          • 复制userpb下的内容

        • 复制userservice下的内容

    • middlerware/

    • pkg/

      • e/

        • copy

      • res/

      • util/

    • routes/

    • wrappers/

// cmd   
cd api-gateway
go mod init api-gateway
go mod tidy

<!-- ...表示可变参数或切片参数,允许传入不定量的参数,...interface=>[]interface{} -->

  • api-gateway/internal/hander/

    • pkg.go

    • user.go

// api-gateway/internal/hander/pkg.go
package handler

import "errors"

func PanicIfUserError(err error) {
	if err != nil {
		err = errors.New("User Service--" + err.Error())
		panic(err)
	}
}

// api-gateway/internal/hander/user.go
package handler

import (
	"api-gateway/internal/service"
	"api-gateway/pkg/e"
	"api-gateway/pkg/res"
	"api-gateway/pkg/util"
	"context"
	"net/http"

	"github.com/gin-gonic/gin"
)

// UserRegister 用户注册
func UserRegister(ginCtx *gin.Context) {
	var userReq service.UserRequest
	PanicIfUserError(ginCtx.Bind(&userReq)) // 尝试将HTTP请求体中的数据绑定到userReq中
	// gin.Key 中获取服务实例
	userService := ginCtx.Keys["user"].(service.UserServiceClient) // 断言为service.UserServiceClient类型
	userResp, err := userService.UserRegister(context.Background(), &userReq)
	PanicIfUserError(err)
	r := res.Response{
		Data:   userResp,
		Status: uint(userResp.Code),
		Msg:    e.GetMsg(uint(userResp.Code)),
		Error:  err.Error(),
	}
	ginCtx.JSON(http.StatusOK, r)
}

// UserLogin 用户登录
func UserLogin(ginCtx *gin.Context) {
	var userReq service.UserRequest
	PanicIfUserError(ginCtx.Bind(&userReq)) // 尝试将HTTP请求体中的数据绑定到userReq中
	// gin.Key 中获取服务实例
	userService := ginCtx.Keys["user"].(service.UserServiceClient) // 断言为service.UserServiceClient类型
	userResp, err := userService.UserLogin(context.Background(), &userReq)
	PanicIfUserError(err)
	token, err := util.GenerateToken(uint(userResp.UserDetail.UserID))
	r := res.Response{
		Data: res.TokenData{
			User:  userResp.UserDetail,
			Token: token,
		},
		Status: uint(userResp.Code),
		Msg:    e.GetMsg(uint(userResp.Code)),
		Error:  err.Error(),
	}
	ginCtx.JSON(http.StatusOK, r)
}

  • // api-gateway/routes/

    • router.go

// api-gateway/routes/router.go
package routes

import (
	"api-gateway/internal/handler"
	"api-gateway/middlerware"

	"github.com/gin-gonic/gin"
)

func NewRouter(service ...interface{}) *gin.Engine {
	// ...表示可变参数或切片参数,允许传入不定量的参数,...interface=>[]interface{}
	ginRouter := gin.Default()
	ginRouter.Use(middlerware.Cors(), middlerware.InitMiddleWare(service))
	v1 := ginRouter.Group("api/v1")
	{
		v1.GET("ping", func(c *gin.Context) {
			c.JSON(200, "success")
		})
		// 用户服务
		v1.POST("/user/register", handler.UserRegister)
		v1.POST("/user/login", handler.UserLogin)

	}
	return ginRouter
}

  • api-gateway/middlerware/

    • cors.go

    • jwt.go

// api-gateway/middlerware/cors.go
// copy
// api-gateway/middlerware/jwt.go
// 之后写

  • api-gateway/pkg/res/

    • response.go

// api-gateway/pkg/res/response.go
package res

import (
	"api-gateway/pkg/e"

	"github.com/gin-gonic/gin"
)

// Response 基础序列化器
type Response struct {
	Status uint        `json:"Status"`
	Data   interface{} `json:"Data"`
	Msg    string      `json:"Msg"`
	Error  string      `json:"Error"`
}

// DataList 带有总数的Data结构
type DataList struct {
	Item  interface{} `json:"Item"`
	Total uint        `json:"Total"`
}

// TokenData 带有token的Data结构
type TokenData struct {
	User  interface{} `json:"User"`
	Token string      `json:"Token"`
}

func ginH(msgCode int, data interface{}) gin.H {
	return gin.H{
		"code": msgCode,
		"msg":  e.GetMsg(uint(msgCode)),
		"data": data,
	}
}

  • api-gateway/pkg/util/

    • token.go

// api-gateway/pkg/util/token.go
package util

import (
	"time"

	"github.com/dgrijalva/jwt-go"
	"github.com/spf13/viper"
)

var jwtSecret = []byte(viper.GetString("service.jwtSecret"))

type Claims struct {
	UserId uint `json:"user_id"`
	jwt.StandardClaims
}

// GenerateToken 签发用户Token
func GenerateToken(userID uint) (string, error) {
	nowTime := time.Now()
	expireTime := nowTime.Add(24 * time.Hour)
	claims := Claims{
		UserId: userID,
		StandardClaims: jwt.StandardClaims{
			ExpiresAt: expireTime.Unix(),
			Issuer:    "38384-SearchEngine",
		},
	}
	tokenClaims := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) // 加密
	token, err := tokenClaims.SignedString(jwtSecret)                // 签名
	return token, err
}

// ParseToken 验证用户token
func ParseToken(token string) (*Claims, error) {
	tokenClaims, err := jwt.ParseWithClaims(token, &Claims{}, func(token *jwt.Token) (interface{}, error) {
		return jwtSecret, nil
	})
	if tokenClaims != nil {
		if claims, ok := tokenClaims.Claims.(*Claims); ok && tokenClaims.Valid {
			return claims, nil
		}
	}
	return nil, err
}

  • api-gateway/middlerware/

    • init.go

// api-gateway/middlerware/init.go
package middlerware

import "github.com/gin-gonic/gin"

func InitMiddleWare(service []interface{}) gin.HandlerFunc {
	return func(c *gin.Context) {
		c.Keys = make(map[string]interface{})
		c.Keys["user"] = service[0]
		c.Next()
	}
}

  • api-gateway/config/

    • config.yml

    • config.go

// api-gateway/config/config.yml
service:
  domain: api-gateway
  prot: :4000
  jwtSecret: slllbyxinniiuniu
  version: 1.0
// api-gateway/config/config.go
package config

import (
	"fmt"
	"os"
	"path/filepath"

	"github.com/spf13/viper"
)

func InitConfig() {
	wDir, _ := os.Getwd()         // 获取当前工作目录
	workDir := filepath.Dir(wDir) // 获取到文件的上层目录

	viper.SetConfigName("config")            // 配置文件的文件名
	viper.SetConfigType("yml")               // 配置文件的后缀
	viper.AddConfigPath(workDir + "/config") // 获取到配置文件的路径
	err := viper.ReadInConfig()
	if err != nil {
		fmt.Println("err:", err)
		return
	}
}

  • api-gateway/cmd/

    • main.go

// api-gateway/cmd/main.go
package main

import "api-gateway/config"

func main() {
	config.InitConfig()
}

网关模块_user接入网关2

  • copy->user/discovery

// api-gateway/discovery/resolver.go
package discovery

import (
	"context"
	"time"

	"github.com/sirupsen/logrus"
	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc/resolver"
)

const (
	schema = "etcd"
)

// Resolver for grpc client
type Resolver struct {
	schema      string
	EtcdAddrs   []string
	DialTimeout int

	closeCh      chan struct{}
	watchCh      clientv3.WatchChan
	cli          *clientv3.Client
	keyPrifix    string
	srvAddrsList []resolver.Address

	cc     resolver.ClientConn
	logger *logrus.Logger
}

// NewResolver create a new resolver.Builder base on etcd
func NewResolver(etcdAddrs []string, logger *logrus.Logger) *Resolver {
	return &Resolver{
		schema:      schema,
		EtcdAddrs:   etcdAddrs,
		DialTimeout: 3,
		logger:      logger,
	}
}

// Scheme returns the scheme supported by this resolver.
func (r *Resolver) Scheme() string {
	return r.schema
}

// Build creates a new resolver.Resolver for the given target
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	r.cc = cc

	r.keyPrifix = BuildPrefix(Server{Name: target.Endpoint, Version: target.Authority})
	if _, err := r.start(); err != nil {
		return nil, err
	}
	return r, nil
}

// ResolveNow resolver.Resolver interface
func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {}

// Close resolver.Resolver interface
func (r *Resolver) Close() {
	r.closeCh <- struct{}{}
}

// start
func (r *Resolver) start() (chan<- struct{}, error) {
	var err error
	r.cli, err = clientv3.New(clientv3.Config{
		Endpoints:   r.EtcdAddrs,
		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
	})
	if err != nil {
		return nil, err
	}
	resolver.Register(r)

	r.closeCh = make(chan struct{})

	if err = r.sync(); err != nil {
		return nil, err
	}

	go r.watch()

	return r.closeCh, nil
}

// watch update events
func (r *Resolver) watch() {
	ticker := time.NewTicker(time.Minute)
	r.watchCh = r.cli.Watch(context.Background(), r.keyPrifix, clientv3.WithPrefix())

	for {
		select {
		case <-r.closeCh:
			return
		case res, ok := <-r.watchCh:
			if ok {
				r.update(res.Events)
			}
		case <-ticker.C:
			if err := r.sync(); err != nil {
				r.logger.Error("sync failed", err)
			}
		}
	}
}

// update
func (r *Resolver) update(events []*clientv3.Event) {
	for _, ev := range events {
		var info Server
		var err error

		switch ev.Type {
		case clientv3.EventTypePut:
			info, err = ParseValue(ev.Kv.Value)
			if err != nil {
				continue
			}
			addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
			if !Exist(r.srvAddrsList, addr) {
				r.srvAddrsList = append(r.srvAddrsList, addr)
				r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
			}
		case clientv3.EventTypeDelete:
			info, err = SplitPath(string(ev.Kv.Key))
			if err != nil {
				continue
			}
			addr := resolver.Address{Addr: info.Addr}
			if s, ok := Remove(r.srvAddrsList, addr); ok {
				r.srvAddrsList = s
				r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
			}
		}
	}
}

// sync 同步获取所有地址信息
func (r *Resolver) sync() error {
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix())
	if err != nil {
		return err
	}
	r.srvAddrsList = []resolver.Address{}

	for _, v := range res.Kvs {
		info, err := ParseValue(v.Value)
		if err != nil {
			continue
		}
		addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
		r.srvAddrsList = append(r.srvAddrsList, addr)
	}
	r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
	return nil
}
// api-gateway/discovery/instance.go
package discovery

import (
	"encoding/json"
	"errors"
	"fmt"
	"strings"

	"google.golang.org/grpc/resolver"
)

type Server struct {
	Name    string `json:"name"`
	Addr    string `json:"addr"`
	Version string `json:"version"` // 版本
	Weight  int64  `json:"weight"`  // 权重
}

func BuildPrefix(server Server) string {
	if server.Version == "" {
		return fmt.Sprintf("/%s/", server.Name)
	}
	return fmt.Sprintf("/%s/%s/", server.Name, server.Version)
}

func BuildRegisterPath(server Server) string {
	return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr)
}

// ParseValue 将value值反序列化到一个Server实例当中
func ParseValue(value []byte) (Server, error) {
	server := Server{}
	if err := json.Unmarshal(value, &server); err != nil {
		return server, err
	}
	return server, nil
}

func SplitPath(path string) (Server, error) {
	server := Server{}
	strs := strings.Split(path, "/")
	if len(strs) == 0 {
		return server, errors.New("invalid path")
	}
	server.Addr = strs[len(strs)-1]
	return server, nil
}

func Exist(l []resolver.Address, addr resolver.Address) bool {
	for i := range l {
		if l[i].Addr == addr.Addr {
			return true
		}
	}
	return false
}

func Remove(s []resolver.Address, addr resolver.Address) ([]resolver.Address, bool) {
	for i := range s {
		if s[i].Addr == addr.Addr {
			s[i] = s[len(s)-1]
			return s[:len(s)-1], true
		}
	}
	return nil, false
}

// api-gateway/cmd/main.go
package main

import (
	"api-gateway/config"
	"api-gateway/discovery"
	"api-gateway/internal/service"
	"api-gateway/routes"
	"fmt"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/sirupsen/logrus"
	"github.com/spf13/viper"
	"google.golang.org/grpc"
	"google.golang.org/grpc/resolver"
)

func main() {
	config.InitConfig()
	// 服务发现
	etcdAddress := []string{viper.GetString("etcd.address")}
	etcdRegister := discovery.NewResolver(etcdAddress, logrus.New())
	resolver.Register(etcdRegister)
	go startListen()
	{
		osSignal := make(chan os.Signal, 1)
		signal.Notify(osSignal, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
		s := <-osSignal
		fmt.Println("exit!", s)
	}
	fmt.Println("gateway listen on :3000")
}

func startListen() {
	opts := []grpc.DialOption{}
	userConn, _ := grpc.Dial("127.0.0.1:10001", opts...)
	userService := service.NewUserServiceClient(userConn)

	ginRouter := routes.NewRouter(userService)
	service := &http.Server{
		Addr:           viper.GetString("server.port"),
		Handler:        ginRouter,
		ReadTimeout:    10 * time.Second,
		WriteTimeout:   10 * time.Second,
		MaxHeaderBytes: 1 << 20,
	}
	err := service.ListenAndServe()
	if err != nil {
		fmt.Println("绑定失败, 端口可能被占用", err)
	}
}

user模块接口测试

// cmd
 net start mysql
 mysql -u root -p
 net stop mysql
// api-gateway/cmd/main.go
package main

import (
	"api-gateway/config"
	"api-gateway/discovery"
	"api-gateway/internal/service"
	"api-gateway/routes"
	"fmt"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/sirupsen/logrus"
	"github.com/spf13/viper"
	"google.golang.org/grpc"
	"google.golang.org/grpc/resolver"
)

func main() {
	config.InitConfig()
	// 服务发现
	etcdAddress := []string{viper.GetString("etcd.address")}
	etcdRegister := discovery.NewResolver(etcdAddress, logrus.New())
	resolver.Register(etcdRegister)
	go startListen()
	{
		osSignal := make(chan os.Signal, 1)
		signal.Notify(osSignal, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
		s := <-osSignal
		fmt.Println("exit!", s)
	}
	fmt.Println("gateway listen on :4000")
}

func startListen() {
	opts := []grpc.DialOption{
		grpc.WithInsecure(),
	}
	userConn, err := grpc.Dial("127.0.0.1:10001", opts...)
	if err != nil {
		panic(err)
	}
	userService := service.NewUserServiceClient(userConn)

	ginRouter := routes.NewRouter(userService)
	service := &http.Server{
		Addr:           viper.GetString("server.port"),
		Handler:        ginRouter,
		ReadTimeout:    10 * time.Second,
		WriteTimeout:   10 * time.Second,
		MaxHeaderBytes: 1 << 20,
	}
	err = service.ListenAndServe()
	if err != nil {
		fmt.Println("绑定失败, 端口可能被占用", err)
	}
}
  • 服务未被监听修改api...main.go

// user/internal/handler/user.go
package handler

import (
	"context"
	"memo_grpc/internal/repository"
	"memo_grpc/internal/service"
	"memo_grpc/pkg/e"
)

type UserService struct {
}

func NewUserService() *UserService {
	return &UserService{}
}

// UserLogin 用户登录 token不在服务层,不对数据库进行操作,在网关层操作
func (*UserService) UserLogin(ctx context.Context, req *service.UserRequest) (resp *service.UserDetailResponse, err error) {
	var user repository.User
	resp = new(service.UserDetailResponse)
	resp.Code = e.Success
	err = user.ShowUserInfo(req)
	if err != nil {
		resp.Code = e.Error
		return resp, err
	}
	resp.UserDetail = repository.BuildUser(user)
	return resp, nil
}

// 用户注册
func (*UserService) UserRegister(ctx context.Context, req *service.UserRequest) (resp *service.UserDetailResponse, err error) {
	var user repository.User
	resp = new(service.UserDetailResponse)
	resp.Code = e.Success
	user, err = user.UserCreate(req)
	if err != nil {
		resp.Code = e.Error
		return resp, err
	}
	resp.UserDetail = repository.BuildUser(user)
	return resp, nil
}
// user/internal/repository/user.go
package repository

import (
	"errors"
	"memo_grpc/internal/service"

	"golang.org/x/crypto/bcrypt"
	"gorm.io/gorm"
)

type User struct {
	UserId         uint   `gorm:"primarykey"`
	UserName       string `gorm:"unique"`
	NickName       string
	PasswordDigest string
}

const (
	PasswordCost = 12 // 密码加密难度
)

// CheckUserExist检查用户是否存在
func (user *User) CheckUserExist(req *service.UserRequest) bool {
	if err := DB.Where("user_name=?", req.UserName).First(&user).Error; err == gorm.ErrRecordNotFound {
		return false
	}
	return true
}

// ShowUserInfo获取用户信息
func (user *User) ShowUserInfo(req *service.UserRequest) error {
	if exist := user.CheckUserExist(req); exist {
		return nil
	}
	return errors.New("UserName Not Exist")
}

// UserCreate 创建用户
func (*User) UserCreate(req *service.UserRequest) (user User, err error) {
	var count int64
	DB.Where("username=?", req.UserName).Count(&count)
	if count != 0 {
		return User{}, errors.New("UserName Exist")
	}
	user = User{
		UserName: req.UserName,
		NickName: req.NickName,
	}
	// 密码的加密
	_ = user.SetPassword(req.Password)
	err = DB.Create(&user).Error
	return user, err
}

// SetPassword 加密密码
func (user *User) SetPassword(password string) error {
	bytes, err := bcrypt.GenerateFromPassword([]byte(password), PasswordCost)
	if err != nil {
		return err
	}
	user.PasswordDigest = string(bytes)
	return nil
}

// CheckPassword 检验密码
func (user *User) CheckPassword(password string) bool {
	err := bcrypt.CompareHashAndPassword([]byte(user.PasswordDigest), []byte(password))
	return err == nil
}

// BuildUser序列化User
func BuildUser(item User) *service.UserModel {
	userModel := service.UserModel{
		UserID:   uint32(item.UserId),
		UserName: item.UserName,
		NickName: item.NickName,
	}
	return &userModel
}
// api-gateway/internal/handler/user.go
package handler

import (
	"api-gateway/internal/service"
	"api-gateway/pkg/e"
	"api-gateway/pkg/res"
	"api-gateway/pkg/util"
	"context"
	"net/http"

	"github.com/gin-gonic/gin"
)

// UserRegister 用户注册
func UserRegister(ginCtx *gin.Context) {
	var userReq service.UserRequest
	PanicIfUserError(ginCtx.Bind(&userReq)) // 尝试将HTTP请求体中的数据绑定到userReq中
	// gin.Key 中获取服务实例
	userService := ginCtx.Keys["user"].(service.UserServiceClient) // 断言为service.UserServiceClient类型
	userResp, err := userService.UserRegister(context.Background(), &userReq)
	PanicIfUserError(err)
	r := res.Response{
		Data:   userResp,
		Status: uint(userResp.Code),
		Msg:    e.GetMsg(uint(userResp.Code)),
	}
	ginCtx.JSON(http.StatusOK, r)
}

// UserLogin 用户登录
func UserLogin(ginCtx *gin.Context) {
	var userReq service.UserRequest
	PanicIfUserError(ginCtx.Bind(&userReq)) // 尝试将HTTP请求体中的数据绑定到userReq中
	// gin.Key 中获取服务实例
	userService := ginCtx.Keys["user"].(service.UserServiceClient) // 断言为service.UserServiceClient类型
	userResp, err := userService.UserLogin(context.Background(), &userReq)
	PanicIfUserError(err)
	token, err := util.GenerateToken(uint(userResp.UserDetail.UserID))
	r := res.Response{
		Data: res.TokenData{
			User:  userResp.UserDetail,
			Token: token,
		},
		Status: uint(userResp.Code),
		Msg:    e.GetMsg(uint(userResp.Code)),
	}
	ginCtx.JSON(http.StatusOK, r)
}

task模块_整体开发

  • api-gateway/task/

    • cmd/

    • copy user/config

    • copy user/discovery/

    • internal/

      • handler/

      • repository/

        • copy user/db_init.go

        • copy user/migration.go

      • service/

        • pb/

          • taskModels.proto

          • taskService.proto

    • pkg/

      • copy user/pkg/e

// task/internal/service/pb/taskModels.proto
syntax="proto3";
package pb;
option go_package = "/internal/service;service";

message TaskModel {
    // @inject_tag: json:"favorite_id"
    uint32 TaskID=1;
    // @inject_tag: json:"user_id"
    uint32 UserID=2;
    // @inject_tag: json:"status"
    uint32 Status=3;
    // @inject_tag: json:"title"
    string Title=4;
    // @inject_tag: json:"content"
    string Content=5;
    // @inject_tag: json:"start_time"
    uint32 StartTime=6;
    // @inject_tag: json:"end_time"
    uint32 EndTime=7;
}
// task/internal/service/pb/taskService.proto
syntax="proto3";
package pb;
import "taskModels.proto";
option go_package = "/internal/service;service";

message TaskRequest{
    uint32 TaskID=1;
    uint32 UserID=2;
    uint32 Status=3;
    string Title=4;
    string Content=5;
    uint32 StartTime=6;
    uint32 EndTime=7;
}

message TasksDetailResponse{
    repeated TaskModel TaskDetail=1;
    uint32 Code=2;
}

message CommonResponse{
    uint32 Code=1;
    string Msg=2;
    string Data=3;
}

service TaskService{
    rpc TaskCreate(TaskRequest) returns(CommonResponse);
    rpc TaskUpdate(TaskRequest) returns(CommonResponse);
    rpc TaskShow(TaskRequest) returns(TasksDetailResponse);
    rpc TaskDelete(TaskRequest) returns(CommonResponse);
}
// cmd
cd ../task/
protoc -I internal/service/pb internal/service/pb/*.proto --go_out=plugins=grpc:.
// task/config/config,yml
server:
  domain: task
  version: 1.0
  jwtSecret: FanOne
  grpcAddress: "127.0.0.1:10002"

mysql:
  driverName: mysql
  host: 127.0.0.1
  port: 3306
  database: grpc_todolist_tmp
  username: root
  password: l20030328
  charset: utf8mb4

etcd:
  address: 127.0.0.1:2379

redis:
  address: 127.0.0.1:6379
  password: 
  • task/internal/handler/

    • task.go

// task/internal/handler/task.go
package handler

import (
	"context"
	"task/internal/repository"
	"task/internal/service"
	"task/pkg/e"
)

type TaskService struct {
}

func NewTaskService() *TaskService {
	return &TaskService{}
}

func (*TaskService) TaskCreate(ctx context.Context, req *service.TaskRequest) (resp *service.CommonResponse, err error) {
	var task repository.Task
	resp = new(service.CommonResponse)
	resp.Code = e.Success
	err = task.TaskCreate(req)
	if err != nil {
		resp.Code = e.Error
		resp.Msg = e.GetMsg(e.Error)
		resp.Data = err.Error()
		return resp, err
	}
	resp.Msg = e.GetMsg(uint(resp.Code))
	return resp, nil
}

func (*TaskService) TaskShow(ctx context.Context, req *service.TaskRequest) (resp *service.TasksDetailResponse, err error) {
	var task repository.Task
	resp = new(service.TasksDetailResponse)
	resp.Code = e.Success
	taskList, err := task.TaskShow(req)
	if err != nil {
		resp.Code = e.Error
		return resp, err
	}
	resp.TaskDetail = repository.BuildTasks(taskList)
	return resp, nil
}

func (*TaskService) TaskUpdate(ctx context.Context, req *service.TaskRequest) (resp *service.CommonResponse, err error) {
	var task repository.Task
	resp = new(service.CommonResponse)
	resp.Code = e.Success
	err = task.TaskUpdate(req)
	if err != nil {
		resp.Code = e.Error
		resp.Msg = e.GetMsg(e.Error)
		resp.Data = err.Error()
		return resp, err
	}
	resp.Msg = e.GetMsg(uint(resp.Code))
	return resp, nil
}

func (*TaskService) TaskDelete(ctx context.Context, req *service.TaskRequest) (resp *service.CommonResponse, err error) {
	var task repository.Task
	resp = new(service.CommonResponse)
	resp.Code = e.Success
	err = task.TaskDelete(req)
	if err != nil {
		resp.Code = e.Error
		resp.Msg = e.GetMsg(e.Error)
		resp.Data = err.Error()
		return resp, err
	}
	resp.Msg = e.GetMsg(uint(resp.Code))
	return resp, nil
}
  • task/internal/repository/

    • task.go

// task/internal/repository/task.go
package repository

import "task/internal/service"

type Task struct {
	TaskID    uint `gorm:"primarykey"`
	UserID    uint `gorm:"index"` // 用户ID
	Status    int  `gorm:"default:0"`
	Title     string
	Content   string `gorm:"type:longtext"`
	StartTime int64
	EndTime   int64
}

func (*Task) TaskCreate(req *service.TaskRequest) error {
	task := Task{
		UserID:    uint(req.UserID),
		Title:     req.Title,
		Content:   req.Content,
		StartTime: int64(req.StartTime),
		EndTime:   int64(req.EndTime),
	}
	return DB.Create(&task).Error
}

func (*Task) TaskShow(req *service.TaskRequest) (taskList []Task, err error) {
	err = DB.Model(&Task{}).Where("user_id=?", req.UserID).Find(&taskList).Error
	if err != nil {
		return nil, err
	}
	return taskList, nil
}

func (*Task) TaskDelete(req *service.TaskRequest) (err error) {
	return DB.Model(&Task{}).Where("task_id=?", req.TaskID).Delete(&Task{}).Error
}

func (*Task) TaskUpdate(req *service.TaskRequest) error {
	t := Task{}
	err := DB.Where("task_id=?", req.TaskID).First(&t).Error
	if err != nil {
		return err
	}
	t.Status = int(req.Status)
	t.Title = req.Title
	t.Content = req.Content
	t.StartTime = int64(req.StartTime)
	t.EndTime = int64(req.EndTime)
	return DB.Save(&t).Error
}

func BuildTask(item Task) *service.TaskModel {
	return &service.TaskModel{
		TaskID:    uint32(item.TaskID),
		UserID:    uint32(item.UserID),
		Status:    uint32(item.Status),
		Title:     item.Title,
		Content:   item.Content,
		StartTime: uint32(item.StartTime),
		EndTime:   uint32(item.EndTime),
	}
}

func BuildTasks(item []Task) (tList []*service.TaskModel) {
	for _, v := range item {
		t := BuildTask(v)
		tList = append(tList, t)
	}
	return tList
}

// task/cmd/main.go copy user/cmd/main.go
package main

import (
	"net"
	"task/config"
	"task/discovery"
	"task/internal/handler"
	"task/internal/repository"
	"task/internal/service"

	"github.com/sirupsen/logrus"
	"github.com/spf13/viper"
	"google.golang.org/grpc"
)

func main() {
	config.InitConfig()
	repository.InitDB()

	// etcd 地址
	etcdAddress := []string{viper.GetString("etcd.address")}
	// 服务的注册
	etcdRegister := discovery.NewRegister(etcdAddress, logrus.New())
	grpcAddress := viper.GetString("server.grpcAddress")
	userNode := discovery.Server{
		Name: viper.GetString("server.domain"),
		Addr: grpcAddress,
	}
	server := grpc.NewServer()
	defer server.Stop()
	// 绑定服务      返回的服务实例绑定到GRPC服务器上->客户端可以通过GRPC调用此服务的方法
	service.RegisterTaskServiceServer(server, handler.NewTaskService())
	lis, err := net.Listen("tcp", grpcAddress) // 在指定的GRPC地址上监听TCP连接
	if err != nil {
		panic(err)
	}
	if _, err := etcdRegister.Register(userNode, 10); err != nil {
		panic(err)
		// 将服务器节点注册到etcd上,并指定租约时间为10秒
	}
	if err = server.Serve(lis); err != nil {
		panic(err)
		// 启动GRPC服务器,并使其开始监听并处理传入的连接
	}
}

task接入网关

// api-gateway/routes/router.go
package routes

import (
	"api-gateway/internal/handler"
	"api-gateway/middlerware"

	"github.com/gin-gonic/gin"
)

func NewRouter(service ...interface{}) *gin.Engine {
	// ...表示可变参数或切片参数,允许传入不定量的参数,...interface=>[]interface{}
	ginRouter := gin.Default()
	ginRouter.Use(middlerware.Cors(), middlerware.InitMiddleWare(service))
	v1 := ginRouter.Group("api/v1")
	{
		v1.GET("ping", func(c *gin.Context) {
			c.JSON(200, "success")
		})
		// 用户服务
		v1.POST("/user/register", handler.UserRegister)
		v1.POST("/user/login", handler.UserLogin)

		authed := v1.Group("/")
		authed.Use(middlerware.JWT())
		{
			// 任务模块
			authed.GET("task", handler.ListTask)
			authed.POST("task", handler.CreateTask)
			authed.PUT("task", handler.UpdateTask)
			authed.DELETE("task", handler.DeleteTask)
		}

	}
	return ginRouter
}

// api-gareway/middlerware/jwt.go
package middlerware

import (
	"api-gateway/pkg/e"
	"api-gateway/pkg/util"
	"time"

	"github.com/gin-gonic/gin"
)

func JWT() gin.HandlerFunc {
	return func(c *gin.Context) {
		code := 200
		token := c.GetHeader("Authorization")
		if token == "" {
			code = 404
		} else {
			claim, err := util.ParseToken(token)
			if err != nil {
				code = e.ErrorAuthzcheckTokenFail
			} else if time.Now().Unix() > claim.ExpiresAt {
				code = e.ErrorAuthzcheckTokenTimeout
			}
		}
		if code != 200 {
			c.JSON(200, gin.H{
				"status": code,
				"msg":    e.GetMsg(uint(code)),
			})
			c.Abort()
			return
		}
		c.Next()
	}
}

// api-gateway/pkg/e/code.go
package e

const (
	Success       = 200
	Error         = 500
	InvalidParams = 400

	ErrorAuthzcheckTokenFail    = 20001 // token 错误
	ErrorAuthzcheckTokenTimeout = 20002 // token 过期
)
// api-gateway/pkg/e/msg.go
package e

var MsgFlags = map[uint]string{
	Success:                     "ok",
	Error:                       "fail",
	InvalidParams:               "请求的参数错误",
	ErrorAuthzcheckTokenFail:    "token 错误",
	ErrorAuthzcheckTokenTimeout: "token 过期",
}

// GetMsg 获取状态码对应的信息
func GetMsg(code uint) string {
	msg, ok := MsgFlags[code]
	if ok {
		return msg
	}
	return MsgFlags[Error]
}

  • copy task/service/pb

  • 里的protogoapi-gateway

  • api-gateway/internal/handler/

    • task.go

// api-gateway/internal/handler/pkg.go
package handler

import "errors"

func PanicIfUserError(err error) {
	if err != nil {
		err = errors.New("User Service--" + err.Error())
		panic(err)
	}
}

func PanicIfTaskError(err error) {
	if err != nil {
		err = errors.New("Task Service--" + err.Error())
		panic(err)
	}
}
// api-gateway/middlerware/init.go
package middlerware

import "github.com/gin-gonic/gin"

func InitMiddleWare(service []interface{}) gin.HandlerFunc {
	return func(c *gin.Context) {
		c.Keys = make(map[string]interface{})
		c.Keys["user"] = service[0]
		c.Keys["task"] = service[1]
		c.Next()
	}
}
// api-gateway/internal/handler/task.go
package handler

import (
	"api-gateway/internal/service"
	"api-gateway/pkg/e"
	"api-gateway/pkg/res"
	"api-gateway/pkg/util"
	"context"
	"net/http"

	"github.com/gin-gonic/gin"
)

func ListTask(ginCtx *gin.Context) {
	var tReq service.TaskRequest
	PanicIfTaskError(ginCtx.Bind(&tReq))
	claim, _ := util.ParseToken(ginCtx.GetHeader("Authorization"))
	tReq.UserID = uint32(claim.UserId)
	taskService := ginCtx.Keys["task"].(service.TaskServiceClient)
	taskResp, err := taskService.TaskShow(context.Background(), &tReq)
	PanicIfTaskError(err)
	r := res.Response{
		Status: uint(taskResp.Code),
		Data:   taskResp,
		Msg:    e.GetMsg(uint(taskResp.Code)),
	}
	ginCtx.JSON(http.StatusOK, r)
}

func CreateTask(ginCtx *gin.Context) {
	var tReq service.TaskRequest
	PanicIfTaskError(ginCtx.Bind(&tReq))
	claim, _ := util.ParseToken(ginCtx.GetHeader("Authorization"))
	tReq.UserID = uint32(claim.UserId)
	taskService := ginCtx.Keys["task"].(service.TaskServiceClient)
	taskResp, err := taskService.TaskCreate(context.Background(), &tReq)
	PanicIfTaskError(err)
	r := res.Response{
		Status: uint(taskResp.Code),
		Data:   taskResp,
		Msg:    e.GetMsg(uint(taskResp.Code)),
	}
	ginCtx.JSON(http.StatusOK, r)
}

func UpdateTask(ginCtx *gin.Context) {
	var tReq service.TaskRequest
	PanicIfTaskError(ginCtx.Bind(&tReq))
	claim, _ := util.ParseToken(ginCtx.GetHeader("Authorization"))
	tReq.UserID = uint32(claim.UserId)
	taskService := ginCtx.Keys["task"].(service.TaskServiceClient)
	taskResp, err := taskService.Taskupdate(context.Background(), &tReq)
	PanicIfTaskError(err)
	r := res.Response{
		Status: uint(taskResp.Code),
		Data:   taskResp,
		Msg:    e.GetMsg(uint(taskResp.Code)),
	}
	ginCtx.JSON(http.StatusOK, r)
}

func DeleteTask(ginCtx *gin.Context) {
	var tReq service.TaskRequest
	PanicIfTaskError(ginCtx.Bind(&tReq))
	claim, _ := util.ParseToken(ginCtx.GetHeader("Authorization"))
	tReq.UserID = uint32(claim.UserId)
	taskService := ginCtx.Keys["task"].(service.TaskServiceClient)
	taskResp, err := taskService.TaskDelete(context.Background(), &tReq)
	PanicIfTaskError(err)
	r := res.Response{
		Status: uint(taskResp.Code),
		Data:   taskResp,
		Msg:    e.GetMsg(uint(taskResp.Code)),
	}
	ginCtx.JSON(http.StatusOK, r)
}

// api-gateway/cmd/main.go
package main
​
import (
    "api-gateway/config"
    "api-gateway/discovery"
    "api-gateway/internal/service"
    "api-gateway/routes"
    "fmt"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
​
    "github.com/sirupsen/logrus"
    "github.com/spf13/viper"
    "google.golang.org/grpc"
    "google.golang.org/grpc/resolver"
)
​
func main() {
    config.InitConfig()
    // 服务发现
    etcdAddress := []string{viper.GetString("etcd.address")}
    etcdRegister := discovery.NewResolver(etcdAddress, logrus.New())
    resolver.Register(etcdRegister)
    go startListen()
    {
        osSignal := make(chan os.Signal, 1)
        signal.Notify(osSignal, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
        s := <-osSignal
        fmt.Println("exit!", s)
    }
    fmt.Println("gateway listen on :4000")
}
​
func startListen() {
    opts := []grpc.DialOption{
        grpc.WithInsecure(),
    }
    userConn, err := grpc.Dial("127.0.0.1:10001", opts...)
    if err != nil {
        panic(err)
    }
    userService := service.NewUserServiceClient(userConn)
​
    taskConn, err := grpc.Dial("127.0.0.1:10001", opts...)
    if err != nil {
        panic(err)
    }
    taskService := service.NewTaskServiceClient(taskConn)
​
    ginRouter := routes.NewRouter(userService, taskService)
    service := &http.Server{
        Addr:           viper.GetString("server.port"),
        Handler:        ginRouter,
        ReadTimeout:    10 * time.Second,
        WriteTimeout:   10 * time.Second,
        MaxHeaderBytes: 1 << 20,
    }
    err = service.ListenAndServe()
    if err != nil {
        fmt.Println("绑定失败, 端口可能被占用", err)
    }
}

相关文章:

  • 双指针算法——配合例题讲解
  • 镭神C32测试LEGO-LOAM
  • IntelliJ IDEA 2021版创建springboot项目的五种方式
  • 深度解析前端页面性能优化
  • Python与SQL深度融合实战案例:打造你的数据处理秘籍
  • C++后端服务器开发技术栈有哪些?有哪些资源或开源库拿来用?
  • 嵌入式八股C语言---指针与函数篇
  • ESP8266 入门(第 2 部分):使用 AT 命令
  • c#面试题整理7
  • JavaScript系列07-事件委托:深入剖析与实践技术
  • LeetCode 1876长度为三且各字符不同的子字符串
  • 【数据结构】-- LinkedList与链表(1)
  • Docker 实践与应用举例
  • CCF-CSP第27次认证第1题 --《如此编码》
  • 大模型量化技术原理总结 [吃果冻不吐果冻皮]
  • 自然语言处理:最大期望值算法
  • 从案例分析看微型工业计算机在智能社区中的卓越表现
  • Springboot redis bitMap实现用户签到以及统计,保姆级教程
  • SpringBoot全栈开发:从数据库到Markdown文件导出的终极实践指南
  • TCP协议与包头格式
  • 佛山市南海区交通建设网站/小红书推广策略
  • 代理注册公司靠谱吗?/系统优化的意义
  • 有域名后怎么建网站/今晚赛事比分预测
  • 正邦设计面试/南宁求介绍seo软件
  • 怎么把网站建设推广出去/阿里云搜索
  • 国外做地铁设计的公司网站/网络营销专业是干嘛的