Golang | 代理模式

发布于:2025-05-29 ⋅ 阅读:(26) ⋅ 点赞:(0)
  • 代理模式(Proxy Pattern)是一种结构型设计模式,其核心思想是通过引入一个代理对象来控制对另一个对象的访问。代理对象在客户端和目标对象之间起到中介的作用,可以在不改变目标对象代码的前提下,增加额外的控制逻辑,例如延迟加载、访问控制、性能优化、安全检测等。
  • 通过代理来访问service HUB,所有对service HUB的访问都需经过代理。
  • 代理提供的增值服务:限流保护和缓存。
  • 类似房产中介:中介作为传话人联系房东,并提供额外功能。
package service_hub

import (
	"context"
	"github.com/jmh000527/criker-search/utils"
	etcdv3 "go.etcd.io/etcd/client/v3"
	"golang.org/x/time/rate"
	"strings"
	"sync"
	"time"
)

// HubProxy 代理模式,实现了ServiceHub接口。
// 该代理为ServiceHub实例提供了一层中间访问,增加了缓存和限流功能。
//
// 成员变量:
//   - EtcdServiceHub: 真实的ServiceHub实例,用于实际的服务发现和注册。
//   - endpointCache: 用于缓存服务端点的同步映射。
//   - limiter: 限流器,用于控制每秒请求的最大次数。
type HubProxy struct {
	*EtcdServiceHub               // 真实的ServiceHub实例
	endpointCache   sync.Map      // 缓存服务端点
	limiter         *rate.Limiter // 限流器
}

var (
	hubProxy  *HubProxy
	proxyOnce sync.Once
)

// GetServiceHubProxy HubProxy的构造函数,采用单例模式创建实例。
//
// 参数:
//   - etcdServers: etcd服务器的地址列表。
//   - heartbeatFrequency: 心跳频率,用于创建租约。
//   - qps: 每秒请求的最大次数,用于限流器的配置。
//
// 返回值:
//   - *HubProxy: 返回HubProxy的单例实例。
func GetServiceHubProxy(etcdServers []string, heartbeatFrequency int64, qps int) *HubProxy {
	if hubProxy == nil {
		proxyOnce.Do(func() {
			// 初始化HubProxy实例
			hubProxy = &HubProxy{
				EtcdServiceHub: GetServiceHub(etcdServers, heartbeatFrequency),
				endpointCache:  sync.Map{},
				// 配置限流器:每秒产生qps个令牌
				limiter: rate.NewLimiter(rate.Every(time.Duration(1e9/qps)*time.Nanosecond), qps),
			}

		})
	}
	return hubProxy
}

// 以下方法由EtcdServiceHub匿名变量提供

 RegisterService 注册服务
//func (p *HubProxy) RegisterService(service, endpoint string, leaseId etcdv3.LeaseID) (etcdv3.LeaseID, error) {
//	return p.EtcdServiceHub.RegisterService(service, endpoint, leaseId)
//}
//
 UnregisterService 注销服务
//func (p *HubProxy) UnregisterService(service, endpoint string) error {
//	return p.EtcdServiceHub.UnregisterService(service, endpoint)
//}
//
 GetServiceEndpoint 根据负载均衡策略,从众多endpoint里选择一个
//func (p *HubProxy) GetServiceEndpoint(service string) string {
//	return p.EtcdServiceHub.GetServiceEndpoint(service)
//}

// GetServiceEndpoints 服务发现。把第一次查询etcd的结果缓存起来,然后安装一个Watcher,仅etcd数据变化时更新本地缓存,这样可以降低etcd的访问压力,同时加上限流保护。
//
// 参数:
//   - service: 需要获取端点的服务名称。
//
// 返回值:
//   - []string: 返回服务端点的列表。如果限流未通过或发生错误,则返回nil。
func (p *HubProxy) GetServiceEndpoints(service string) []string {
	// 限流检查:如果限流器不允许请求,则直接返回nil
	if !p.limiter.Allow() {
		return nil
	}

	// 更新服务端点缓存的Watcher
	p.watchEndpointsOfService(service)

	// 尝试从缓存中加载服务端点
	cachedEndpoints, ok := p.endpointCache.Load(service)
	if !ok {
		// 如果缓存中没有服务端点,查询etcd获取最新端点
		endpoints := p.EtcdServiceHub.GetServiceEndpoints(service)
		if len(endpoints) > 0 {
			// 如果查询到端点,将其存入缓存
			p.endpointCache.Store(service, endpoints)
		}
		return endpoints
	}
	// 如果缓存中已有服务端点,直接返回缓存结果。缓存的一致性由watchEndpointsOfService()函数保证。
	return cachedEndpoints.([]string)
}

// watchEndpointsOfService 监视服务端点的变化,确保本地缓存与etcd中的数据保持同步。
//
// 参数:
//   - service: 需要监视的服务名称。
//
// 该函数将设置一个Watcher来监听etcd中对应服务的变化,并在检测到变化时更新本地缓存。
func (p *HubProxy) watchEndpointsOfService(service string) {
	// 检查当前服务是否已经被监听
	_, ok := p.watched.LoadOrStore(service, true)
	if ok {
		// 如果已经监听过,直接返回
		return
	}

	// 构建服务的前缀路径
	prefix := strings.TrimRight(ServiceRootPath, "/") + "/" + service + "/"
	// 设置etcd Watcher,监视指定前缀的所有键值对的变化
	watchChan := p.EtcdServiceHub.client.Watch(context.Background(), prefix, etcdv3.WithPrefix())
	utils.Log.Printf("开始监视服务端点: %s", prefix)

	// 启动一个 goroutine 来异步处理 Watcher 事件
	go func() {
		for response := range watchChan {
			for _, event := range response.Events {
				// 记录事件类型(PUT或DELETE)
				utils.Log.Printf("etcd事件类型: %s", event.Type)

				// 提取服务名称
				path := strings.Split(string(event.Kv.Key), "/")
				if len(path) > 2 {
					service := path[len(path)-2]
					// 从etcd中获取最新的服务端点列表
					endpoints := p.EtcdServiceHub.GetServiceEndpoints(service)
					if len(endpoints) > 0 {
						// 如果获取到服务端点,更新本地缓存
						p.endpointCache.Store(service, endpoints)
					} else {
						// 如果服务下没有端点,删除本地缓存
						p.endpointCache.Delete(service)
					}
				}
			}
		}
	}()
}
  • proxy与service HUB具有相同的功能,包括注册、注销和获取服务列表。
  • 通过调用底层service HUB:实现注册和注销功能。
  • 第一项增值服务——缓存,将获取的服务列表缓存到本地,不用每次都去服务器找endpoint。
  • 第二项增值服务——限流保护,allow函数:检查是否允许访问,通过从桶中取令牌实现;wait函数:等待一段时间再尝试访问,通过传入等待时间参数实现。
  • 监听终端基本变化:确保本地缓存与实际服务状态一致。
  • watch函数:监听etc d的服务变化,通过传入服务名称和前缀实现。
  • 处理监听事件:从channel中读取事件,并更新本地缓存。
  • 无限循环处理:将监听逻辑放在单独的子集成中,避免主集成阻塞。
  • 简化代码:通过匿名成员变量和接口抽象,简化代理的实现。

package service_hub

import (
	"fmt"
	"testing"
	"time"
)

var (
	serviceName = "test_service"
	etcdServers = []string{"127.0.0.1:2379"} // etcd集群的地址
)

func TestGetServiceEndpointsByProxy(t *testing.T) {
	const qps = 10 // qps限制为10
	p := GetServiceHubProxy(etcdServers, 3, qps)

	endpoint := "127.0.0.1:5000"
	p.RegisterService(serviceName, endpoint, 0)
	defer p.UnregisterService(serviceName, endpoint)
	endpoints := p.GetServiceEndpoints(serviceName)
	fmt.Printf("endpoints %v\n", endpoints)

	endpoint = "127.0.0.2:5000"
	p.RegisterService(serviceName, endpoint, 0)
	defer p.UnregisterService(serviceName, endpoint)
	endpoints = p.GetServiceEndpoints(serviceName)
	fmt.Printf("endpoints %v\n", endpoints)

	endpoint = "127.0.0.3:5000"
	p.RegisterService(serviceName, endpoint, 0)
	defer p.UnregisterService(serviceName, endpoint)
	endpoints = p.GetServiceEndpoints(serviceName)
	fmt.Printf("endpoints %v\n", endpoints)

	time.Sleep(1 * time.Second)  // 暂停1秒钟,把令牌桶的容量打满
	for i := 0; i < qps+5; i++ { // 桶里面有10个令牌,从第11次开始就拒绝访问了
		endpoints = p.GetServiceEndpoints(serviceName)
		fmt.Printf("%d endpoints %v\n", i, endpoints)
	}

	time.Sleep(1 * time.Second)  // 暂停1秒钟,把令牌桶的容量打满
	for i := 0; i < qps+5; i++ { // 桶里面有10个令牌,从第11次开始就拒绝访问了
		endpoints = p.GetServiceEndpoints(serviceName)
		fmt.Printf("%d endpoints %v\n", i, endpoints)
	}
}

网站公告

今日签到

点亮在社区的每一天
去签到