- 代理模式(Proxy Pattern)是一种结构型设计模式,其核心思想是通过引入一个代理对象来控制对另一个对象的访问。代理对象在客户端和目标对象之间起到中介的作用,可以在不改变目标对象代码的前提下,增加额外的控制逻辑,例如延迟加载、访问控制、性能优化、安全检测等。
- 通过代理来访问service HUB,所有对service HUB的访问都需经过代理。
- 代理提供的增值服务:限流保护和缓存。
- 类似房产中介:中介作为传话人联系房东,并提供额外功能。
package service_hub
import (
"context"
"github.com/jmh000527/criker-search/utils"
etcdv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/time/rate"
"strings"
"sync"
"time"
)
type HubProxy struct {
*EtcdServiceHub
endpointCache sync.Map
limiter *rate.Limiter
}
var (
hubProxy *HubProxy
proxyOnce sync.Once
)
func GetServiceHubProxy(etcdServers []string, heartbeatFrequency int64, qps int) *HubProxy {
if hubProxy == nil {
proxyOnce.Do(func() {
hubProxy = &HubProxy{
EtcdServiceHub: GetServiceHub(etcdServers, heartbeatFrequency),
endpointCache: sync.Map{},
limiter: rate.NewLimiter(rate.Every(time.Duration(1e9/qps)*time.Nanosecond), qps),
}
})
}
return hubProxy
}
func (p *HubProxy) GetServiceEndpoints(service string) []string {
if !p.limiter.Allow() {
return nil
}
p.watchEndpointsOfService(service)
cachedEndpoints, ok := p.endpointCache.Load(service)
if !ok {
endpoints := p.EtcdServiceHub.GetServiceEndpoints(service)
if len(endpoints) > 0 {
p.endpointCache.Store(service, endpoints)
}
return endpoints
}
return cachedEndpoints.([]string)
}
func (p *HubProxy) watchEndpointsOfService(service string) {
_, ok := p.watched.LoadOrStore(service, true)
if ok {
return
}
prefix := strings.TrimRight(ServiceRootPath, "/") + "/" + service + "/"
watchChan := p.EtcdServiceHub.client.Watch(context.Background(), prefix, etcdv3.WithPrefix())
utils.Log.Printf("开始监视服务端点: %s", prefix)
go func() {
for response := range watchChan {
for _, event := range response.Events {
utils.Log.Printf("etcd事件类型: %s", event.Type)
path := strings.Split(string(event.Kv.Key), "/")
if len(path) > 2 {
service := path[len(path)-2]
endpoints := p.EtcdServiceHub.GetServiceEndpoints(service)
if len(endpoints) > 0 {
p.endpointCache.Store(service, endpoints)
} else {
p.endpointCache.Delete(service)
}
}
}
}
}()
}
- proxy与service HUB具有相同的功能,包括注册、注销和获取服务列表。
- 通过调用底层service HUB:实现注册和注销功能。
- 第一项增值服务——缓存,将获取的服务列表缓存到本地,不用每次都去服务器找endpoint。
- 第二项增值服务——限流保护,allow函数:检查是否允许访问,通过从桶中取令牌实现;wait函数:等待一段时间再尝试访问,通过传入等待时间参数实现。
- 监听终端基本变化:确保本地缓存与实际服务状态一致。
- watch函数:监听etc d的服务变化,通过传入服务名称和前缀实现。
- 处理监听事件:从channel中读取事件,并更新本地缓存。
- 无限循环处理:将监听逻辑放在单独的子集成中,避免主集成阻塞。
- 简化代码:通过匿名成员变量和接口抽象,简化代理的实现。
package service_hub
import (
"fmt"
"testing"
"time"
)
var (
serviceName = "test_service"
etcdServers = []string{"127.0.0.1:2379"}
)
func TestGetServiceEndpointsByProxy(t *testing.T) {
const qps = 10
p := GetServiceHubProxy(etcdServers, 3, qps)
endpoint := "127.0.0.1:5000"
p.RegisterService(serviceName, endpoint, 0)
defer p.UnregisterService(serviceName, endpoint)
endpoints := p.GetServiceEndpoints(serviceName)
fmt.Printf("endpoints %v\n", endpoints)
endpoint = "127.0.0.2:5000"
p.RegisterService(serviceName, endpoint, 0)
defer p.UnregisterService(serviceName, endpoint)
endpoints = p.GetServiceEndpoints(serviceName)
fmt.Printf("endpoints %v\n", endpoints)
endpoint = "127.0.0.3:5000"
p.RegisterService(serviceName, endpoint, 0)
defer p.UnregisterService(serviceName, endpoint)
endpoints = p.GetServiceEndpoints(serviceName)
fmt.Printf("endpoints %v\n", endpoints)
time.Sleep(1 * time.Second)
for i := 0; i < qps+5; i++ {
endpoints = p.GetServiceEndpoints(serviceName)
fmt.Printf("%d endpoints %v\n", i, endpoints)
}
time.Sleep(1 * time.Second)
for i := 0; i < qps+5; i++ {
endpoints = p.GetServiceEndpoints(serviceName)
fmt.Printf("%d endpoints %v\n", i, endpoints)
}
}