- 代理模式(Proxy Pattern)是一种结构型设计模式,其核心思想是通过引入一个代理对象来控制对另一个对象的访问。代理对象在客户端和目标对象之间起到中介的作用,可以在不改变目标对象代码的前提下,增加额外的控制逻辑,例如延迟加载、访问控制、性能优化、安全检测等。
- 通过代理来访问service HUB,所有对service HUB的访问都需经过代理。
- 代理提供的增值服务:限流保护和缓存。
- 类似房产中介:中介作为传话人联系房东,并提供额外功能。
package service_hubimport ("context""github.com/jmh000527/criker-search/utils"etcdv3 "go.etcd.io/etcd/client/v3""golang.org/x/time/rate""strings""sync""time"
)
type HubProxy struct {*EtcdServiceHub endpointCache sync.Map limiter *rate.Limiter
}var (hubProxy *HubProxyproxyOnce sync.Once
)
func GetServiceHubProxy(etcdServers []string, heartbeatFrequency int64, qps int) *HubProxy {if hubProxy == nil {proxyOnce.Do(func() {hubProxy = &HubProxy{EtcdServiceHub: GetServiceHub(etcdServers, heartbeatFrequency),endpointCache: sync.Map{},limiter: rate.NewLimiter(rate.Every(time.Duration(1e9/qps)*time.Nanosecond), qps),}})}return hubProxy
}
func (p *HubProxy) GetServiceEndpoints(service string) []string {if !p.limiter.Allow() {return nil}p.watchEndpointsOfService(service)cachedEndpoints, ok := p.endpointCache.Load(service)if !ok {endpoints := p.EtcdServiceHub.GetServiceEndpoints(service)if len(endpoints) > 0 {p.endpointCache.Store(service, endpoints)}return endpoints}return cachedEndpoints.([]string)
}
func (p *HubProxy) watchEndpointsOfService(service string) {_, ok := p.watched.LoadOrStore(service, true)if ok {return}prefix := strings.TrimRight(ServiceRootPath, "/") + "/" + service + "/"watchChan := p.EtcdServiceHub.client.Watch(context.Background(), prefix, etcdv3.WithPrefix())utils.Log.Printf("开始监视服务端点: %s", prefix)go func() {for response := range watchChan {for _, event := range response.Events {utils.Log.Printf("etcd事件类型: %s", event.Type)path := strings.Split(string(event.Kv.Key), "/")if len(path) > 2 {service := path[len(path)-2]endpoints := p.EtcdServiceHub.GetServiceEndpoints(service)if len(endpoints) > 0 {p.endpointCache.Store(service, endpoints)} else {p.endpointCache.Delete(service)}}}}}()
}
- proxy与service HUB具有相同的功能,包括注册、注销和获取服务列表。
- 通过调用底层service HUB:实现注册和注销功能。
- 第一项增值服务——缓存,将获取的服务列表缓存到本地,不用每次都去服务器找endpoint。
- 第二项增值服务——限流保护,allow函数:检查是否允许访问,通过从桶中取令牌实现;wait函数:等待一段时间再尝试访问,通过传入等待时间参数实现。
- 监听终端基本变化:确保本地缓存与实际服务状态一致。
- watch函数:监听etc d的服务变化,通过传入服务名称和前缀实现。
- 处理监听事件:从channel中读取事件,并更新本地缓存。
- 无限循环处理:将监听逻辑放在单独的子集成中,避免主集成阻塞。
- 简化代码:通过匿名成员变量和接口抽象,简化代理的实现。
package service_hubimport ("fmt""testing""time"
)var (serviceName = "test_service"etcdServers = []string{"127.0.0.1:2379"}
)func TestGetServiceEndpointsByProxy(t *testing.T) {const qps = 10 p := GetServiceHubProxy(etcdServers, 3, qps)endpoint := "127.0.0.1:5000"p.RegisterService(serviceName, endpoint, 0)defer p.UnregisterService(serviceName, endpoint)endpoints := p.GetServiceEndpoints(serviceName)fmt.Printf("endpoints %v\n", endpoints)endpoint = "127.0.0.2:5000"p.RegisterService(serviceName, endpoint, 0)defer p.UnregisterService(serviceName, endpoint)endpoints = p.GetServiceEndpoints(serviceName)fmt.Printf("endpoints %v\n", endpoints)endpoint = "127.0.0.3:5000"p.RegisterService(serviceName, endpoint, 0)defer p.UnregisterService(serviceName, endpoint)endpoints = p.GetServiceEndpoints(serviceName)fmt.Printf("endpoints %v\n", endpoints)time.Sleep(1 * time.Second) for i := 0; i < qps+5; i++ { endpoints = p.GetServiceEndpoints(serviceName)fmt.Printf("%d endpoints %v\n", i, endpoints)}time.Sleep(1 * time.Second) for i := 0; i < qps+5; i++ { endpoints = p.GetServiceEndpoints(serviceName)fmt.Printf("%d endpoints %v\n", i, endpoints)}
}