- etcd 是一个开源的 分布式键值存储系统(Key-Value Store),主要用于配置共享和服务发现。



- ETCD是一个键值(KV)数据库,类似于Redis,支持分布式集群。
- ETCD也可以看作是一个分布式文件系统,类似于ZooKeeper,可以对文件和目录进行监听。
- 在服务注册场景下,ETCD中的key是类似于文件路径的字符串,value为空。
- 每台服务器启动后,会主动将自己的IP地址和提供的服务名称写入ETCD。
- 为了防止key过期,服务器会每隔一段时间(如9秒)重新写入自己的信息。
- 通过设置租期(如2秒),ETCD可以在服务器宕机后及时清理过期的key。
- 客户端通过查询ETCD来获取能够提供服务的服务器IP地址。
- 客户端可以查询具有特定前缀的key,以获取所有提供相同服务的服务器IP。
- ETCD支持监听功能,客户端可以监听特定前缀的变化,实时获取新的服务器信息。
package service_hub
import (
"context"
"errors"
"github.com/jmh000527/criker-search/index_service/load_balancer"
"github.com/jmh000527/criker-search/utils"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
etcdv3 "go.etcd.io/etcd/client/v3"
"strings"
"sync"
"time"
)
type EtcdServiceHub struct {
client *etcdv3.Client
heartbeatFrequency int64
watched sync.Map
loadBalancer load_balancer.LoadBalancer
}
const (
ServiceRootPath = "/criker-search"
)
var (
etcdServiceHub *EtcdServiceHub
hubOnce sync.Once
)
func GetServiceHub(etcdServers []string, heartbeatFrequency int64) *EtcdServiceHub {
if etcdServiceHub == nil {
hubOnce.Do(func() {
client, err := etcdv3.New(etcdv3.Config{
Endpoints: etcdServers,
DialTimeout: 3 * time.Second,
})
if err != nil {
utils.Log.Fatal("连接etcd失败:", err)
}
etcdServiceHub = &EtcdServiceHub{
client: client,
heartbeatFrequency: heartbeatFrequency,
loadBalancer: &load_balancer.RoundRobin{},
}
})
}
return etcdServiceHub
}
func (hub *EtcdServiceHub) RegisterService(service, endpoint string, leaseId etcdv3.LeaseID) (etcdv3.LeaseID, error) {
if leaseId <= 0 {
leaseGrantResponse, err := hub.client.Grant(context.Background(), hub.heartbeatFrequency)
if err != nil {
utils.Log.Printf("创建租约失败: %v", err)
return 0, err
}
key := strings.TrimRight(ServiceRootPath, "/") + "/" + service + "/" + endpoint
_, err = hub.client.Put(context.Background(), key, "", etcdv3.WithLease(leaseGrantResponse.ID))
if err != nil {
utils.Log.Printf("服务注册失败: %v", err)
return leaseGrantResponse.ID, err
}
utils.Log.Printf("成功注册服务: %v", key)
return leaseGrantResponse.ID, nil
} else {
_, err := hub.client.KeepAliveOnce(context.Background(), leaseId)
if errors.Is(err, rpctypes.ErrLeaseNotFound) {
utils.Log.Printf("未找到租约,重新注册服务")
return hub.RegisterService(service, endpoint, 0)
} else if err != nil {
utils.Log.Printf("续租失败: %v", err)
return 0, err
}
return leaseId, nil
}
}
func (hub *EtcdServiceHub) UnregisterService(service string, endpoint string) error {
key := strings.TrimRight(ServiceRootPath, "/") + "/" + service + "/" + endpoint
_, err := hub.client.Delete(context.Background(), key)
if err != nil {
utils.Log.Printf("注销服务失败: %v", err)
return err
}
utils.Log.Printf("成功注销服务: %v", key)
return nil
}
func (hub *EtcdServiceHub) GetServiceEndpoints(service string) []string {
prefix := strings.TrimRight(ServiceRootPath, "/") + "/" + service + "/"
getResponse, err := hub.client.Get(context.Background(), prefix, etcdv3.WithPrefix())
if err != nil {
utils.Log.Printf("从etcd获取服务端点失败: %v", err)
return nil
}
endpoints := make([]string, 0, len(getResponse.Kvs))
for _, kv := range getResponse.Kvs {
path := strings.Split(string(kv.Key), "/")
endpoints = append(endpoints, path[len(path)-1])
}
utils.Log.Printf("最新的服务端点: %v", endpoints)
return endpoints
}
func (hub *EtcdServiceHub) GetServiceEndpoint(service string) string {
endpoints := hub.GetServiceEndpoints(service)
return hub.loadBalancer.Take(endpoints)
}
func (hub *EtcdServiceHub) Close() {
err := hub.client.Close()
if err != nil {
utils.Log.Printf("关闭etcd客户端连接失败: %v", err)
}
}
- ETCD提供API用于服务的注册与发现。
- 服务中心的核心是client,用于连接到ETCD。
- 服务注册后,需要定期上报心跳以保持存活状态。
- service worker单独部署在服务器上,连接service hub使用单例模式。
- 通过once实现单例模式,判断是否已创建实例。
- 使用ETCD new方法连接,传入endpoints和配置信息。
- 配置中核心是endpoints,需要提供ETCD集群的多个IP。
- 连接超时设置为3秒,确保连接可靠性。
- 服务启动时,首先申请租约并获取租约ID。
- 将服务信息(service name + ip:port)注册到ETCD中,并设置租约。
- 定期续租以保持服务存活状态。
- 如果租约ID不存在,则重新注册服务。
- 提供注销函数,传入service name和endpoint IP。
- 从ETCD中删除对应的key,完成注销。
- 服务调用方通过get service函数获取服务列表。
- 传入service name作为前缀,查询满足前缀的key。
- 返回所有匹配key的IP列表,供调用方选择。
func (hub *EtcdServiceHub) GetServiceEndpoint(service string) string {
endpoints := hub.GetServiceEndpoints(service)
return hub.loadBalancer.Take(endpoints)
}
- 由于调用方希望直接获取一台服务器进行接口通信,服务中心通过策略模式,将负载均衡算法的实现交给外部,采用接口方式定义负载均衡策略,并展示了轮询和随机两种简单的负载均衡策略实现,强调了在并发环境下确保累加操作的线程安全性。

package load_balancer
type LoadBalancer interface {
Take(endpoints []string) string
}
import "math/rand"
type RandomSelect struct{}
func (b *RandomSelect) Take(endpoints []string) string {
if len(endpoints) == 0 {
return ""
}
index := rand.Intn(len(endpoints))
return endpoints[index]
}
import "sync/atomic"
type RoundRobin struct {
acc int64
}
func (b *RoundRobin) Take(endpoints []string) string {
if len(endpoints) == 0 {
return ""
}
n := atomic.AddInt64(&b.acc, 1)
index := int(n % int64(len(endpoints)))
return endpoints[index]
}