打造千万级流量秒杀第二十课 etcd 实战:如何使用 etcd 存储配置信息?

发布于:2023-01-16 ⋅ 阅读:(554) ⋅ 点赞:(0)

欢迎来到多级缓存实战模块!

首先请你思考这样一个业务场景:一个有 10 个节点的集群,当你修改了某一项配置后,你希望能立即同步给这 10 个节点,并更新到各节点的缓存中,该怎么做?

一种方法是你获取到这 10 个节点的地址信息,然后逐个调用接口同步给它们。但这会带来新的问题:你如何准确获取这 10 个节点的地址信息,又如何将配置稳定可靠地同步给它们?如果集群部署到多个可用区,且是用容器部署的,你拿到的地址不一定是最新的、完全可用的地址,最后很可能因部分节点同步失败而导致节点数据不一致。

另一种方法是让这 10 个节点自己来取。那怎么让它们知道配置变更了呢?它们又该去哪儿获取最新配置呢?

把这个问题抽象下就是:A 系统修改了数据,需要即时同步给 B、C、D 系统的所有节点。显然,这是非常经典的数据同步问题。想要解决,我们可以引入一个分布式系统协调器,如 etcd来处理。

在秒杀系统中,我们使用 etcd 存储集群信息和活动信息,以此解决分布式系统中的数据同步问题。除此之外,在多级缓存中我们还有一种为 Redis 缓存可用性兜底的手段。具体是怎么做的呢?

etcd 初始化

在使用 etcd 存储集群信息和活动信息前,我们需要先对 etcd 进行初始化。具体来说,在 infrastructure/stores/etcd 目录下的 etcd.go 中,我实现了 Init 和 GetClient 这两个函数。其中,在 Init 函数里,获取 etcd 相关的配置,并初始化 etcd 客户端,用 sync.Once 确保只初始化一次;在 GetClient 函数里,返回 Init 函数中初始化好的客户端。具体代码如下:

var etcdCli *etcd.Client
var etcdOnce = &sync.Once{}
func Init() error {
   var err error
   etcdOnce.Do(
      func() {
         endpoints := viper.GetStringSlice("etcd.endpoints")
         username := viper.GetString("etcd.username")
         password := viper.GetString("etcd.password")
         cfg := etcd.Config{
            Endpoints:   endpoints,
            DialTimeout: time.Second,
            Username:    username,
            Password:    password,
         }
         etcdCli, err = etcd.New(cfg)
      })
   return err
}
func GetClient() *etcd.Client {
   return etcdCli
}

接下来,我在 cmd/root.go 中调用 etcd.Init ,初始化 etcd 客户端。代码如下:

if err := etcd.Init(); err != nil {
   panic(err)
}

存储集群信息

既然要用 etcd 存储集群信息,那么集群信息都有哪些呢?

总的来说,秒杀系统的集群信息主要是秒杀服务节点信息,比如服务地址、版本号、协议类型等。在秒杀系统集群内,主要是将秒杀 API 服务的节点信息提供给 Admin 服务,用于调用 API 服务的 RPC 接口同步配置;在秒杀系统集群外使用节点信息,主要通过服务发现的方式,将节点暴露给监控系统或者其他微服务调用。

在 etcd 中,我们可以将节点信息存放到 /seckill/nodes 这个 key 下。以 IP 为 10.10.11.12 的节点为例,我们用 addr 字段保存服务地址,用 proto 保存协议类型,用 version 字段保存版本号。由于节点信息只会在节点启动时变更,我们用 json 格式来存放,具体的数据示例如下:

{
  "addr": "10.10.11.12:8080",
  "proto": "http",
  "version": "v1.0"
}

我们可以用 Go 结构体来定义一个新类型 Node,用于处理 json 格式的节点信息,字段名跟前面提到的 json 格式保持一致。如下所示:

type Node struct {
   Addr    string `json:"addr"`
   Version string `json:"version"`
   Proto   string `json:"proto"`
}

节点信息的管理需要使用三个函数:Register、Deregister 和 Discover,它们分别用于服务节点的注册、注销和发现。

具体怎么实现呢?

第一步,我定义了个 cluster 结构体,用于保存集群管理过程中需要的所有信息,包括锁、etcd 客户端、节点列表等,然后在 Init 函数中初始化它。具体代码如下所示:

type cluster struct {
   sync.RWMutex
   cli     *etcdv3.Client
   service string
   once    *sync.Once
   deregCh map[string]chan struct{}
   nodes   map[string]*Node
   v       *viper.Viper
}
var defaultCluster *cluster
var once = &sync.Once{
func Init(service string) {
   once.Do(func() {
      defaultCluster = &cluster{
         cli:     etcd.GetClient(),
         service: service,
         once:    &sync.Once{},
         deregCh: make(map[string]chan struct{}),
         nodes:   make(map[string]*Node),
      }
   })
}

第二步,我实现了 Register、Deregister 和 Discover 这三个函数。

具体来说,我在 Register 函数中将节点信息注册到 etcd,设置一个有效期,并定期更新,确保节点信息是最新的。在 Deregister 函数中将注册到 etcd 的节点信息删除。在 Discover 函数中,监控节点变更,并及时更新本地内存缓存中的节点信息,将最新的节点信息返回给调用者。代码如下:

func Register(node *Node, ttl int) error {
   const minTTL = 2
   c := defaultCluster
   key := c.makeKey(node)
   if ttl < minTTL {
      ttl = minTTL
   }
   var errCh = make(chan error)
   go func() {
      kv := etcdv3.NewKV(c.cli)
      closeCh := make(chan struct{})
      lease := etcdv3.NewLease(c.cli)
      val, _ := json.Marshal(node)
      var curLeaseId etcdv3.LeaseID = 0
      ticker := time.NewTicker(time.Duration(ttl/2) * time.Second)
      register := func() error {
         if curLeaseId == 0 {
            leaseResp, err := lease.Grant(context.TODO(), int64(ttl))
            if err != nil {
               return err
            }
            if _, err := kv.Put(context.TODO(), key, string(val), etcdv3.WithLease(leaseResp.ID)); err != nil {
               return err
            }
            curLeaseId = leaseResp.ID
         } else {
            if _, err := lease.KeepAliveOnce(context.TODO(), curLeaseId); err == rpctypes.ErrLeaseNotFound {
               curLeaseId = 0
            }
         }
         return nil
      }
      if err := register(); err != nil {
         logrus.Error("register node failed, error:", err)
         errCh <- err
      }
      close(errCh)
      for {
         select {
         case <-ticker.C:
            if err := register(); err != nil {
               logrus.Error("register node failed, error:", err)
               panic(err)
            }
         case <-closeCh:
            ticker.Stop()
            return
         }
      }
   }()
   err := <-errCh
   return err
}
func Deregister(node *Node) error {
   c := defaultCluster
   c.Lock()
   defer c.Unlock()
   key := c.makeKey(node)
   if ch, ok := c.deregCh[key]; ok {
      close(ch)
      delete(c.deregCh, key)
   }
   _, err := c.cli.Delete(context.Background(), key, etcdv3.WithPrefix())
   return err
}
func Discover() (output []*Node, err error) {
   c := defaultCluster
   key := fmt.Sprintf("/%s/nodes/", c.service)
   c.once.Do(func() {
      var resp *etcdv3.GetResponse
      resp, err = c.cli.Get(context.Background(), key, etcdv3.WithPrefix())
      if err != nil {
         return
      }
      for _, kv := range resp.Kvs {
         k := string(kv.Key)
         if len(k) > len(key) {
            var node *Node
            json.Unmarshal(kv.Value, &node)
            if node != nil {
               c.Lock()
               c.nodes[k] = node
               c.Unlock()
            }
         }
      }
      watchCh := c.cli.Watch(context.Background(), key, etcdv3.WithPrefix())
      go func() {
         for {
            select {
            case resp := <-watchCh:
               for _, evt := range resp.Events {
                  k := string(evt.Kv.Key)
                  if len(k) <= len(key) {
                     continue
                  }
                  switch evt.Type {
                  case etcdv3.EventTypePut:
                     var node *Node
                     json.Unmarshal(evt.Kv.Value, &node)
                     if node != nil {
                        c.Lock()
                        c.nodes[k] = node
                        c.Unlock()
                     }
                  case etcdv3.EventTypeDelete:
                     c.Lock()
                     if _, ok := c.nodes[k]; ok {
                        delete(c.nodes, k)
                     }
                     c.Unlock()
                  }
               }
            }
         }
      }()
   })
   if err != nil {
      return nil, err
   }
   c.RLock()
   for _, node := range c.nodes {
      output = append(output, node)
   }
   c.RUnlock()
   return
}
func (c *cluster) makeKey(node *Node) string {
   id := strings.Replace(node.Addr, ".", "-", -1)
   id = strings.Replace(id, ":", "-", -1)
   return fmt.Sprintf("/%s/nodes/%s", c.service, id)
}

第三步,修改 interfaces/rpc 目录下的 rpc.go,在 Run 函数中加上注册节点信息的代码,其中初始化一个变量 node,如下所示:

//初始化集群
cluster.Init("seckill")
var addr string
if addr, err = utils.Extract(bind); err == nil {
   //注册节点信息
   version := viper.GetString("api.version")
   if version == "" {
      version = "v0.1"
   }
   once.Do(func() {
      node = &cluster.Node{
         Addr:    addr,
         Version: version,
         Proto:   "gRPC",
      }
      err = cluster.Register(node, 6)
   })
}
if err != nil {
   return err
}

另外,我还在 Exit 函数中加上了注销节点的代码,主要是调用 cluster.Deregister 函数将节点信息从 etcd 中删除。

第四步,在 Run 函数里初始化集群,并加上一段代码用于输出获取到的节点信息。代码如下:

cluster.Init("seckill")
if nodes, err := cluster.Discover(); err == nil {
   log, _ := json.Marshal(nodes)
   logrus.Info("discover nodes ", string(log))
} else {
   logrus.Error("discover nodes error:", err)
}

注意,这个 Run 函数是位于 interfaces/admin 目录下的 admin.go 里面哦。

第五步,编译成功后,我们先启动 api,然后启动 admin。可以看到 admin 中输出了节点信息日志,说明 admin 服务成功发现了 api 服务的节点。此时,使用 etcdctl 也能获取到节点信息。

为了验证节点注册功能是否正常工作,我用 pkill -9 seckill 命令将 api 和 admin 都杀掉,使用 etcdctl 前两次还能获取到节点信息,但后面就获取不到了。就是因为杀掉 api 服务后,服务停止了更新过期时间,导致 etcd 将过期的节点信息删除。这也就证明了服务节点注册功能是正常工作的。

运行效果如下图所示:

Drawing 0.png

存储集群配置

秒杀集群配置主要有日志等级、限流器速度、熔断条件等。它们存放到 etcd 中 /seckill/config 这个 key 下,其中的 logLevel 保存日志等级, rateLimit 保存中间层和底层限流器的速度, circuitBreaker 保存熔断条件中的 cpu 使用率和请求时延。具体配置示例如下:

{
  "logLevel":"info",
  "rateLimit": {
    "middle": 100000,
    "low": 10000,
  }
  "circuitBreaker":{
    "cpu": 80,
    "latency": 1000,
  }
}

对应到 Go 代码中,大致步骤如下:

  1. 将上面的 json 配置用一个 Config 结构体保存,字段名跟 json 中的保持一致,然后把它放在 infrastructure/cluster 目录下的 cluster.go 中;

  2. 实现一个 WatchClusterConfig 函数,主要是监控并处理 etcd 中集群配置的 Delete 和 Put 事件,并及时更新内存缓存中的配置数据;

  3. 实现一个 GetClusterConfig 函数,用于将缓存中的配置提供给其他模块使用。

注意,在更新和读取缓存中配置数据的时候,都需要加锁,以免获取到的配置数据不是最新的。

代码如下所示:

type Config struct {
   LogLevel  string `json:"logLevel"`
   RateLimit struct {
      Middle int `json:"middle"`
      Low    int `json:"low"`
   } `json:"rateLimit"`
   CircuitBreaker struct {
      Cpu     int `json:"cpu"`
      Latency int `json:"latency"`
   } `json:"circuitBreaker"`
}
var configLock = &sync.RWMutex{}
var config = &Config{}
func WatchClusterConfig() error {
   cli := etcd.GetClient()
   key := "/seckill/config"
   resp, err := cli.Get(context.Background(), key)
   if err != nil {
      return err
   }
   update := func(kv *mvccpb.KeyValue) (bool, error) {
      if string(kv.Key) == key {
         var tmpConfig *Config
         err = json.Unmarshal(kv.Value, &tmpConfig)
         if err != nil {
            logrus.Error("update cluster config failed, error:", err)
            return false, err
         }
         configLock.Lock()
         *config = *tmpConfig
         logrus.Info("update cluster config ", *config)
         configLock.Unlock()
         return true, nil
      }
      return false, nil
   }
   for _, kv := range resp.Kvs {
      if ok, err := update(kv); ok {
         break
      } else if err != nil {
         return err
      }
   }
   go func() {
      watchCh := cli.Watch(context.Background(), key)
      for resp := range watchCh {
         for _, evt := range resp.Events {
            if evt.Type == etcdv3.EventTypePut {
               if ok, err := update(evt.Kv); ok {
                  break
               } else if err != nil {
                  break
               }
            }
         }
      }
   }()
   return nil
}
func GetClusterConfig() Config {
   configLock.RLock()
   defer configLock.RUnlock()
   return *config
}

接下来,为了能同时启动两个 api 服务,我先将 config/seckill.toml 配置文件拷贝一份,新的配置文件为 config/seckill1.toml,将里面的端口号和 pid 文件修改成与老的配置不一样,比如端口号改成 8083 和 8084,pid 文件改成 .pid1。

现在我们用这两份配置文件启动两个 api 服务,然后使用下面的 etcdctl 命令将配置数据写入到 etcd 。

ETCDCTL_API=3 etcdctl put /seckill/config '{"logLevel":"info","rateLimit":{"middle":100000,"low":10000},"circuitBreaker":{"cpu":80,"latency":1000}}'

此时我们就能在两个运行 api 服务的命令行终端看到 update cluster config 这样的日志,而且它们的时间戳一样,说明它们几乎能同时接收到配置变更事件,并在日志中输出最新的配置。

效果如下图所示:

Drawing 1.png

图片1.png

小结

这一讲我主要介绍了如何用 etcd 存储集群信息和集群配置。实际上,集群信息和配置管理有很多种方法,比如你还可以用 consul 来实现配置管理、服务注册和发现,那可能会比用 etcd 更简单些。

我之所以在这里用 etcd ,主要是为了给你介绍集群配置管理、服务注册和发现的底层代码原理。当你真正学会了自己去实现一套类似的代码后,你在使用其他方案时将会更容易理解它们的原理。

另外,还需要注意一点,我在这里用的 etcd 版本是 v3,在代码和命令行上与 v2 有些差别。

思考题: 如果用 consul 来实现集群信息和配置管理,该如何做呢?

期待你在留言区回答。

好了,这一讲就到这里了,下一讲我将给你介绍“如何使用 Redis 缓存库存信息”。到时见!

源码地址:https://github.com/lagoueduCol/MiaoSha-Yiletian


精选评论

**耀:

老师我很喜欢你的课程,每次都能学到很多知识,谢谢老师。

**0835:

老师,问一下,这些配置信息,可以用redis来做存储吗?不是redis的性能更好一些吗?

    讲师回复:

    Redis性能虽然好,但无法保证数据一致和可靠。ETCD是会将数据落磁盘的,会有WAL日志,基于WAL之上用RAFT确保数据一致,不担心数据丢失。

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到