ETCD学习笔记

发布于:2025-09-02 ⋅ 阅读:(22) ⋅ 点赞:(0)

etcd是什么

etcd是一个分布式、高可用的键值存储系统,常用于配置中心、服务注册、Leader选举等场景。

分布式系统

采用etcd作为分布式一致性KV存储,它基于Raft算法,可以保证在主从节点之间数据一致性。我们常把服务注册信息、配置中心的数据保存在etcd中,多个服务节点通过watch等机制保持一致状态。

分布式锁

数据一致性

Raft算法

代码实例

package main

import (
	"context"
	"log"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

// 服务注册
func registerService(client *clientv3.Client, serviceName, serviceAddr string, ttl int64) error {
	// 创建租约
	leaseResp, err := client.Grant(context.TODO(), ttl)
	if err != nil {
		return err
	}

	// 服务路径
	key := "/services/" + serviceName + "/" + serviceAddr

	// 将服务地址注册到etcd,并与租约关联
	_, err = client.Put(context.TODO(), key, serviceAddr, clientv3.WithLease(leaseResp.ID))
	if err != nil {
		return err
	}

	// 自动续约
	ch, err := client.KeepAlive(context.TODO(), leaseResp.ID)
	if err != nil {
		return err
	}

	// 处理续约应答
	go func() {
		for range ch {
			// 续约成功
			log.Printf("Service %s renewed", serviceAddr)
		}
		log.Printf("Service %s lease expired", serviceAddr)
	}()

	return nil
}

// 服务发现
func discoverServices(client *clientv3.Client, serviceName string) ([]string, error) {
	// 前缀查询
	resp, err := client.Get(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())
	if err != nil {
		return nil, err
	}

	var services []string
	for _, kv := range resp.Kvs {
		services = append(services, string(kv.Value))
	}

	return services, nil
}

// 监听服务变化
func watchServices(client *clientv3.Client, serviceName string) {
	rch := client.Watch(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())
	
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case clientv3.EventTypePut:
				log.Printf("Service added: %s", ev.Kv.Value)
			case clientv3.EventTypeDelete:
				log.Printf("Service removed: %s", ev.Kv.Key)
			}
		}
	}
}

func main() {
	// 连接etcd
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2379"}, // etcd服务器地址
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatalf("Failed to connect to etcd: %v", err)
	}
	defer client.Close()

	// 注册服务
	serviceName := "user-service"
	serviceAddr := "127.0.0.1:8080"
	err = registerService(client, serviceName, serviceAddr, 10) // 10秒租约
	if err != nil {
		log.Fatalf("Failed to register service: %v", err)
	}
	log.Printf("Service %s registered successfully", serviceAddr)

	// 启动服务监听
	go watchServices(client, serviceName)

	// 发现服务
	services, err := discoverServices(client, serviceName)
	if err != nil {
		log.Fatalf("Failed to discover services: %v", err)
	}
	log.Printf("Discovered services: %v", services)

	// 保持程序运行
	select {}
}

代码功能

  • 服务注册
    1. 使用etcd的租约机制注册(为了避免死锁,使用租约)
    2. 设置服务的存活时间(租约有效时间,到期前如果未续费就自动断开)
    3. 通过自动续约机制保持服务在线状态
    4. 服务路径格式为/services/服务名/服务地址
  • 服务发现
    1. 通过前缀查询获取指定服务名的所有实例
    2. 可以获取当前可用的服务列表
  • 服务监听
    1. 实时监控服务的变化(新增或移除)
    2. 当服务上下线时能及时收到通知

上线与注册流程说明

  1. 连接etcd集群
client, err := clientv3.New(clientv3.Config{
    Endpoints:   []string{"localhost:2379"}, // etcd服务器地址
    DialTimeout: 5 * time.Second,
})

创建了一个 etcd 客户端,通过指定的端点 (Endpoints) 连接到 etcd 集群。

  1. 服务注册实现
// 创建租约
leaseResp, err := client.Grant(context.TODO(), ttl)

// 写入服务信息并关联租约
_, err = client.Put(context.TODO(), key, serviceAddr, clientv3.WithLease(leaseResp.ID))

// 自动续约
ch, err := client.KeepAlive(context.TODO(), leaseResp.ID)

// 处理续约应答
go func() {
    for range ch {
        // 续约成功
    }
}()
  • Grant() 创建租约,设置服务存活时间
  • Put() 将服务信息写入 etcd,并通过WithLease()关联租约
  • KeepAlive() 启动自动续约机制
  • 用 goroutine 处理续约响应,保持服务活跃
  1. 服务发现实现
// 前缀查询获取服务实例
resp, err := client.Get(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())

// 提取服务地址
var services []string
for _, kv := range resp.Kvs {
    services = append(services, string(kv.Value))
}
  • Get() 方法配合WithPrefix()参数实现前缀查询
  • 遍历查询结果,提取所有服务实例地址
  1. 监听服务变化
rch := client.Watch(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())

for wresp := range rch {
    for _, ev := range wresp.Events {
        switch ev.Type {
        case clientv3.EventTypePut:    // 服务新增
        case clientv3.EventTypeDelete: // 服务移除
        }
    }
}
  • Watch() 方法建立对指定前缀路径的监听
  • 通过循环读取事件通道,处理服务新增和移除事件
  1. 服务下线