当前位置: 首页 > news >正文

《Go语言高级编程》玩转RPC

《Go语言高级编程》玩转RPC

一、客户端 RPC 实现原理:异步调用机制

Go 的 RPC 客户端支持同步和异步调用,核心在于 Client.Go 方法的实现:

1. 同步调用(Client.Call)的本质
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {// 通过 Client.Go 发起异步调用,阻塞等待结果call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error
}

同步调用本质是封装了异步流程:创建调用请求后,通过通道阻塞等待结果返回。

2. 异步调用(Client.Go)的流程
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {call := &Call{ServiceMethod: serviceMethod,Args: args,Reply: reply,Done: make(chan *Call, 10), // 带缓冲通道,避免阻塞}client.send(call) // 线程安全地发送调用请求return call
}

异步调用返回 Call 对象,调用完成后通过 call.Done 通道通知结果:

func (call *Call) done() {select {case call.Done <- call: // 结果写入通道default: // 通道满时不阻塞(由调用方保证缓冲区足够)}
}
3. 异步调用示例
func doClientWork(client *rpc.Client) {// 发起异步调用,不阻塞当前 goroutinecall := client.Go("HelloService.Hello", "hello", new(string), nil)// 执行其他任务...// 等待调用结果call = <-call.Doneif call.Error != nil {log.Fatal(call.Error)}fmt.Println("参数:", call.Args.(string), "响应:", *call.Reply.(*string))
}

核心优势:异步调用允许客户端在等待 RPC 结果时处理其他任务,提升并发性能。

二、基于 RPC 实现 Watch 监控功能

通过 RPC 实现实时监控(类似订阅-发布模式),以 KV 存储为例:

1. 服务端设计(KVStoreService
type KVStoreService struct {m      map[string]string       // KV 数据存储filter map[string]func(string) // 监控过滤器列表mu     sync.Mutex              // 互斥锁保护共享资源
}// 获取 KV 值
func (p *KVStoreService) Get(key string, value *string) error {p.mu.Lock(); defer p.mu.Unlock()if v, ok := p.m[key]; ok {*value = vreturn nil}return errors.New("not found")
}// 设置 KV 值,并触发监控回调
func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {p.mu.Lock(); defer p.mu.Unlock()key, value := kv[0], kv[1]if oldVal := p.m[key]; oldVal != value {for _, fn := range p.filter {fn(key) // 调用所有监控过滤器}}p.m[key] = valuereturn nil
}// 监控方法:注册过滤器,等待 key 变化或超时
func (p *KVStoreService) Watch(timeout int, keyChanged *string) error {id := "watch-" + time.Now().Format("150405") + "-" + strconv.Itoa(rand.Intn(1000))ch := make(chan string, 10)p.mu.Lock()p.filter[id] = func(key string) { ch <- key } // 注册过滤器p.mu.Unlock()select {case <-time.After(time.Duration(timeout) * time.Second):return errors.New("timeout")case key := <-ch:*keyChanged = keyreturn nil}
}
2. 客户端调用
func doClientWork(client *rpc.Client) {// 启动独立 goroutine 执行监控,阻塞等待 key 变化go func() {var key stringif err := client.Call("KVStoreService.Watch", 30, &key); err != nil {log.Fatal(err)}fmt.Println("监控到变化的 key:", key)}()// 修改 KV 值,触发监控回调if err := client.Call("KVStoreService.Set", [2]string{"abc", "new-value"}, new(struct{})); err != nil {log.Fatal(err)}time.Sleep(3 * time.Second)
}

核心原理

  • 服务端为每个 Watch 调用生成唯一 ID,绑定过滤器函数到 filter 列表。
  • Set 方法修改数据时,遍历调用所有过滤器,通过通道通知监控方。
  • 客户端通过异步 goroutine 阻塞监听,实现实时监控。
三、反向 RPC:内网服务主动连接外网

传统 RPC 是客户端连接服务端,反向 RPC 则相反,适用于内网服务无法被外网直接访问的场景:

1. 内网服务端(主动连接外网)
func main() {rpc.Register(new(HelloService)) // 注册服务for {// 主动连接外网服务器conn, err := net.Dial("tcp", "外网IP:1234")if err != nil {time.Sleep(1 * time.Second)continue}// 基于连接提供 RPC 服务rpc.ServeConn(conn)conn.Close()}
}
2. 外网客户端(监听连接)
func main() {listener, err := net.Listen("tcp", ":1234")if err != nil {log.Fatal(err)}clientChan := make(chan *rpc.Client)// 后台 goroutine 接受连接并创建客户端go func() {for {conn, err := listener.Accept()if err != nil {log.Fatal(err)}clientChan <- rpc.NewClient(conn) // 将客户端放入通道}}()doClientWork(clientChan) // 从通道获取客户端并调用
}func doClientWork(clientChan <-chan *rpc.Client) {client := <-clientChandefer client.Close()var reply stringif err := client.Call("HelloService.Hello", "hello", &reply); err != nil {log.Fatal(err)}fmt.Println(reply)
}

核心逻辑

  • 内网服务主动拨号外网服务器,建立连接后提供 RPC 服务。
  • 外网客户端监听端口,接收连接并转换为 RPC 客户端,通过通道传递给业务逻辑。
  • 适用于内网服务需被外网访问,但内网无法暴露端口的场景(如防火墙限制)。
四、上下文信息:基于连接的定制化服务

为每个 RPC 连接添加上下文(如认证状态、客户端信息),提升服务安全性和灵活性:

1. 服务端改造(包含连接和状态)
type HelloService struct {conn    net.Conn    // 连接对象,可获取客户端地址等信息isLogin bool        // 登录状态
}// 登录方法
func (p *HelloService) Login(request string, reply *string) error {if request != "user:password" {return errors.New("认证失败")}log.Println("登录成功")p.isLogin = true*reply = "登录成功"return nil
}// 需要认证的 Hello 方法
func (p *HelloService) Hello(request string, reply *string) error {if !p.isLogin {return errors.New("请先登录")}*reply = "hello:" + request + ", from " + p.conn.RemoteAddr().String()return nil
}
2. 服务端启动逻辑(为每个连接创建独立服务)
func main() {listener, err := net.Listen("tcp", ":1234")if err != nil {log.Fatal(err)}for {conn, err := listener.Accept()if err != nil {log.Fatal(err)}// 为每个连接启动独立 goroutine,绑定 HelloService 实例go func(c net.Conn) {defer c.Close()server := rpc.NewServer()server.Register(&HelloService{conn: c}) // 传入连接对象server.ServeConn(c)}(conn)}
}
3. 客户端调用流程
func main() {client, err := rpc.Dial("tcp", "localhost:1234")if err != nil {log.Fatal(err)}// 先登录var loginReply stringif err := client.Call("HelloService.Login", "user:password", &loginReply); err != nil {log.Fatal("登录失败:", err)}// 再调用 Hello 方法var helloReply stringif err := client.Call("HelloService.Hello", "world", &helloReply); err != nil {log.Fatal("调用失败:", err)}fmt.Println(helloReply) // 输出包含客户端地址的响应
}

核心优势

  • 通过 net.Conn 获取客户端上下文(如 IP 地址、连接状态)。
  • 基于连接状态实现认证逻辑(如登录验证),确保服务安全性。
  • 每个连接独立维护状态,避免多客户端数据混淆。
五、关键概念总结
  1. 异步调用:通过通道机制实现非阻塞 RPC 调用,提升客户端并发能力。
  2. Watch 机制:利用函数回调和通道,实现服务端数据变化的实时通知。
  3. 反向 RPC:打破传统 C/S 模式,适用于内网服务主动对外提供能力的场景。
  4. 上下文管理:基于连接绑定状态(如认证信息),实现定制化服务逻辑。

相关文章:

  • axure基础操作
  • Rust高效编程实战指南
  • c++学习(五、函数高级)
  • GEO(生成式引擎优化)—— 内容创作者与企业的生死新战场
  • 掌握 MySQL 的基石:全面解读数据类型及其影响
  • HarmonyOS NEXT仓颉开发语言实战案例:动态广场
  • 单调栈一文深度解析
  • Flutter基础(路由页面跳转)
  • 【Cursor黑科技AI编程实战】
  • PMO 与IPD、CMMI、项目管理什么区别和联系
  • C++扩展 - 关键字应用 - decltype
  • 中级统计师-经济学基础知识-第三章 市场失灵与分配不公及其公共治理
  • Uni-App 小程序面试题高频问答汇总
  • 【QT】QT的发展历史与介绍
  • 机器学习配置环境
  • Python实现对WPS协作群进行群消息自动推送
  • 在单片机中如何实现一个shell控制台
  • 机器学习框架(1)
  • crawl4ai crawler.arun( 超时问题
  • 安卓中静态和动态添加子 View 到容器