Golang|etcd服务注册与发现 & 策略模式

发布于:2025-05-28 ⋅ 阅读:(20) ⋅ 点赞:(0)
  • etcd 是一个开源的 分布式键值存储系统(Key-Value Store),主要用于配置共享和服务发现。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • ETCD是一个键值(KV)数据库,类似于Redis,支持分布式集群。
  • ETCD也可以看作是一个分布式文件系统,类似于ZooKeeper,可以对文件和目录进行监听。
  • 在服务注册场景下,ETCD中的key是类似于文件路径的字符串,value为空。
  • 每台服务器启动后,会主动将自己的IP地址和提供的服务名称写入ETCD。
  • 为了防止key过期,服务器会每隔一段时间(如9秒)重新写入自己的信息。
  • 通过设置租期(如2秒),ETCD可以在服务器宕机后及时清理过期的key。
  • 客户端通过查询ETCD来获取能够提供服务的服务器IP地址。
  • 客户端可以查询具有特定前缀的key,以获取所有提供相同服务的服务器IP。
  • ETCD支持监听功能,客户端可以监听特定前缀的变化,实时获取新的服务器信息。
package service_hub

import (
	"context"
	"errors"
	"github.com/jmh000527/criker-search/index_service/load_balancer"
	"github.com/jmh000527/criker-search/utils"
	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
	etcdv3 "go.etcd.io/etcd/client/v3"
	"strings"
	"sync"
	"time"
)

// EtcdServiceHub 服务注册中心,使用单例模式构造。
// 该服务用于与etcd进行交互,管理服务的注册、注销以及心跳续约等功能。
type EtcdServiceHub struct {
	client             *etcdv3.Client             // etcd客户端,用于与etcd进行操作
	heartbeatFrequency int64                      // 服务续约的心跳频率,单位:秒
	watched            sync.Map                   // 存储已经监视的服务,以避免重复监视
	loadBalancer       load_balancer.LoadBalancer // 负载均衡策略的接口,支持多种负载均衡实现
}

const (
	ServiceRootPath = "/criker-search" // etcd key的前缀
)

var (
	etcdServiceHub *EtcdServiceHub // 该全局变量包外不可见,包外想使用时通过GetServiceHub()获得
	hubOnce        sync.Once       // 单例模式需要用到一个once
)

// GetServiceHub ServiceHub的构造函数,采用单例模式。
//
// 参数:
//   - etcdServers: 包含etcd服务器地址的字符串切片。
//   - heartbeatFrequency: 心跳频率,表示服务心跳的间隔时间(以秒为单位)。
//
// 返回值:
//   - *EtcdServiceHub: 返回一个初始化好的EtcdServiceHub实例。
func GetServiceHub(etcdServers []string, heartbeatFrequency int64) *EtcdServiceHub {
	// 检查是否已经存在etcdServiceHub实例
	if etcdServiceHub == nil {
		// 使用sync.Once确保单例模式,hubOnce.Do中的代码块只会被执行一次
		hubOnce.Do(func() {
			// 创建一个新的etcd客户端,连接到指定的etcd服务器
			client, err := etcdv3.New(etcdv3.Config{
				Endpoints:   etcdServers,     // etcd 服务器的地址列表
				DialTimeout: 3 * time.Second, // 连接超时时间
			})
			if err != nil {
				// 如果连接etcd服务器失败,记录错误并终止程序
				utils.Log.Fatal("连接etcd失败:", err)
			}

			// 初始化一个新的EtcdServiceHub实例
			etcdServiceHub = &EtcdServiceHub{
				client:             client,                      // 设置etcd客户端
				heartbeatFrequency: heartbeatFrequency,          // 设置心跳频率
				loadBalancer:       &load_balancer.RoundRobin{}, // 使用Round-Robin负载均衡策略
			}
		})
	}

	// 返回已初始化的etcdServiceHub实例
	return etcdServiceHub
}

// RegisterService 注册服务。
// 第一次注册时,会向etcd写入一个key,并创建一个租约;后续注册仅进行续约。
//
// 参数:
//   - service: 微服务的名称。
//   - endpoint: 微服务服务器的地址。
//   - leaseId: 租约ID,第一次注册时应置为0。
//
// 返回值:
//   - etcdv3.LeaseID: 返回租约ID。
//   - error: 返回错误信息,如果操作成功则为nil。
func (hub *EtcdServiceHub) RegisterService(service, endpoint string, leaseId etcdv3.LeaseID) (etcdv3.LeaseID, error) {
	// 检查是否为首次注册(租约ID是否小于等于0)
	if leaseId <= 0 {
		// 首次注册: 创建一个新的租约,租约的有效期为heartbeatFrequency秒
		leaseGrantResponse, err := hub.client.Grant(context.Background(), hub.heartbeatFrequency)
		if err != nil {
			// 如果创建租约失败,记录错误并返回
			utils.Log.Printf("创建租约失败: %v", err)
			return 0, err
		}
		// 构建服务在etcd中的key,路径形如: /{ServiceRootPath}/{service}/{endpoint}
		key := strings.TrimRight(ServiceRootPath, "/") + "/" + service + "/" + endpoint
		// 将服务注册到etcd中,并将租约与该服务绑定
		_, err = hub.client.Put(context.Background(), key, "", etcdv3.WithLease(leaseGrantResponse.ID))
		if err != nil {
			// 如果注册服务失败,记录错误并返回
			utils.Log.Printf("服务注册失败: %v", err)
			return leaseGrantResponse.ID, err
		}
		utils.Log.Printf("成功注册服务: %v", key)
		// 返回新的租约ID
		return leaseGrantResponse.ID, nil
	} else {
		// 续约: 通过租约ID进行续租操作
		_, err := hub.client.KeepAliveOnce(context.Background(), leaseId)
		if errors.Is(err, rpctypes.ErrLeaseNotFound) {
			// 如果续租时发现租约不存在,则重新注册服务,将leaseID置为0重新进行注册
			utils.Log.Printf("未找到租约,重新注册服务")
			return hub.RegisterService(service, endpoint, 0)
		} else if err != nil {
			// 如果续租过程中发生其他错误,记录错误并返回
			utils.Log.Printf("续租失败: %v", err)
			return 0, err
		}
		// 如果续租成功,则返回现有的租约ID
		return leaseId, nil
	}
}

// UnregisterService 主动注销服务。
// 从etcd中删除服务的注册信息。
//
// 参数:
//   - service: 微服务的名称。
//   - endpoint: 微服务服务器的地址。
//
// 返回值:
//   - error: 返回错误信息,如果操作成功则为nil。
func (hub *EtcdServiceHub) UnregisterService(service string, endpoint string) error {
	// 构建服务在etcd中的key,路径形如: /{ServiceRootPath}/{service}/{endpoint}
	key := strings.TrimRight(ServiceRootPath, "/") + "/" + service + "/" + endpoint

	// 从etcd中删除服务注册信息
	_, err := hub.client.Delete(context.Background(), key)
	if err != nil {
		// 如果删除操作失败,记录错误并返回
		utils.Log.Printf("注销服务失败: %v", err)
		return err
	}

	// 成功注销服务,记录日志
	utils.Log.Printf("成功注销服务: %v", key)
	return nil
}

// GetServiceEndpoints 服务发现。
// 从etcd中查询指定服务的所有endpoint,并返回这些endpoint的列表。
// 参数:
//   - service: 微服务的名称。
//
// 返回值:
//   - []string: 包含所有服务endpoint的列表。如果查询失败,则返回nil。
func (hub *EtcdServiceHub) GetServiceEndpoints(service string) []string {
	// 构造服务的key前缀,用于获取服务的所有endpoint
	prefix := strings.TrimRight(ServiceRootPath, "/") + "/" + service + "/"

	// 从etcd中获取以指定前缀为开头的所有key-value对
	getResponse, err := hub.client.Get(context.Background(), prefix, etcdv3.WithPrefix())
	if err != nil {
		// 如果获取服务endpoint失败,记录错误并返回nil
		utils.Log.Printf("从etcd获取服务端点失败: %v", err)
		return nil
	}

	// 构造返回的endpoint列表
	endpoints := make([]string, 0, len(getResponse.Kvs))
	for _, kv := range getResponse.Kvs {
		// 从key中提取endpoint
		path := strings.Split(string(kv.Key), "/")
		endpoints = append(endpoints, path[len(path)-1])
	}

	// 记录获取到的服务endpoint
	utils.Log.Printf("最新的服务端点: %v", endpoints)
	return endpoints
}

// GetServiceEndpoint 根据负载均衡策略从服务端点中选择一个。
// 通过调用负载均衡策略的Take方法,从获取的服务端点列表中选择一个。
//
// 参数:
//   - service: 微服务的名称。
//
// 返回值:
//   - string: 选择的服务端点地址。
func (hub *EtcdServiceHub) GetServiceEndpoint(service string) string {
	// 获取指定服务的所有端点
	endpoints := hub.GetServiceEndpoints(service)
	// 使用负载均衡策略选择一个端点
	return hub.loadBalancer.Take(endpoints)
}

// Close 关闭etcd客户端连接。
// 释放etcd客户端占用的资源,并记录关闭连接的状态。
//
// 返回值:
//   - 无
func (hub *EtcdServiceHub) Close() {
	// 尝试关闭etcd客户端连接
	err := hub.client.Close()
	if err != nil {
		// 如果关闭连接失败,记录错误日志
		utils.Log.Printf("关闭etcd客户端连接失败: %v", err)
	}
}
  • ETCD提供API用于服务的注册与发现。
  • 服务中心的核心是client,用于连接到ETCD。
  • 服务注册后,需要定期上报心跳以保持存活状态。
  • service worker单独部署在服务器上,连接service hub使用单例模式。
  • 通过once实现单例模式,判断是否已创建实例。
  • 使用ETCD new方法连接,传入endpoints和配置信息。
  • 配置中核心是endpoints,需要提供ETCD集群的多个IP。
  • 连接超时设置为3秒,确保连接可靠性。
  • 服务启动时,首先申请租约并获取租约ID。
  • 将服务信息(service name + ip:port)注册到ETCD中,并设置租约。
  • 定期续租以保持服务存活状态。
  • 如果租约ID不存在,则重新注册服务。
  • 提供注销函数,传入service name和endpoint IP。
  • 从ETCD中删除对应的key,完成注销。
  • 服务调用方通过get service函数获取服务列表。
  • 传入service name作为前缀,查询满足前缀的key。
  • 返回所有匹配key的IP列表,供调用方选择。

  • 策略模式
// GetServiceEndpoint 根据负载均衡策略从服务端点中选择一个。
// 通过调用负载均衡策略的Take方法,从获取的服务端点列表中选择一个。
//
// 参数:
//   - service: 微服务的名称。
//
// 返回值:
//   - string: 选择的服务端点地址。
func (hub *EtcdServiceHub) GetServiceEndpoint(service string) string {
	// 获取指定服务的所有端点
	endpoints := hub.GetServiceEndpoints(service)
	// 使用负载均衡策略选择一个端点
	return hub.loadBalancer.Take(endpoints)
}
  • 由于调用方希望直接获取一台服务器进行接口通信,服务中心通过策略模式,将负载均衡算法的实现交给外部,采用接口方式定义负载均衡策略,并展示了轮询和随机两种简单的负载均衡策略实现,强调了在并发环境下确保累加操作的线程安全性。

在这里插入图片描述

package load_balancer

// LoadBalancer 负载均衡接口,定义选择Endpoint的方法
type LoadBalancer interface {
	// Take 从给定的端点列表中选择一个
	Take(endpoints []string) string
}



import "math/rand"

// RandomSelect 负载均衡算法:随机选择法
// 随机选择算法从列表中随机选择一个端点
type RandomSelect struct{}

// Take 选择一个Endpoint,根据随机选择算法
func (b *RandomSelect) Take(endpoints []string) string {
	if len(endpoints) == 0 {
		return ""
	}
	// 从端点列表中随机选择一个索引
	index := rand.Intn(len(endpoints))
	return endpoints[index]
}




import "sync/atomic"

// RoundRobin 负载均衡算法:轮询法
// 轮询法确保每个请求轮流被分配到列表中的每个端点
type RoundRobin struct {
	acc int64 // 记录累计请求次数
}

// Take 选择一个Endpoint,根据轮询算法
func (b *RoundRobin) Take(endpoints []string) string {
	if len(endpoints) == 0 {
		return ""
	}
	// 线程安全地增加请求次数
	n := atomic.AddInt64(&b.acc, 1)
	// 计算要选择的Endpoint的索引
	index := int(n % int64(len(endpoints)))
	return endpoints[index]
}

网站公告

今日签到

点亮在社区的每一天
去签到