欢迎来到多级缓存实战模块!
首先请你思考这样一个业务场景:一个有 10 个节点的集群,当你修改了某一项配置后,你希望能立即同步给这 10 个节点,并更新到各节点的缓存中,该怎么做?
一种方法是你获取到这 10 个节点的地址信息,然后逐个调用接口同步给它们。但这会带来新的问题:你如何准确获取这 10 个节点的地址信息,又如何将配置稳定可靠地同步给它们?如果集群部署到多个可用区,且是用容器部署的,你拿到的地址不一定是最新的、完全可用的地址,最后很可能因部分节点同步失败而导致节点数据不一致。
另一种方法是让这 10 个节点自己来取。那怎么让它们知道配置变更了呢?它们又该去哪儿获取最新配置呢?
把这个问题抽象下就是:A 系统修改了数据,需要即时同步给 B、C、D 系统的所有节点。显然,这是非常经典的数据同步问题。想要解决,我们可以引入一个分布式系统协调器,如 etcd来处理。
在秒杀系统中,我们使用 etcd 存储集群信息和活动信息,以此解决分布式系统中的数据同步问题。除此之外,在多级缓存中我们还有一种为 Redis 缓存可用性兜底的手段。具体是怎么做的呢?
etcd 初始化
在使用 etcd 存储集群信息和活动信息前,我们需要先对 etcd 进行初始化。具体来说,在 infrastructure/stores/etcd 目录下的 etcd.go 中,我实现了 Init 和 GetClient 这两个函数。其中,在 Init 函数里,获取 etcd 相关的配置,并初始化 etcd 客户端,用 sync.Once 确保只初始化一次;在 GetClient 函数里,返回 Init 函数中初始化好的客户端。具体代码如下:
var etcdCli *etcd.Client
var etcdOnce = &sync.Once{}
func Init() error {
var err error
etcdOnce.Do(
func() {
endpoints := viper.GetStringSlice("etcd.endpoints")
username := viper.GetString("etcd.username")
password := viper.GetString("etcd.password")
cfg := etcd.Config{
Endpoints: endpoints,
DialTimeout: time.Second,
Username: username,
Password: password,
}
etcdCli, err = etcd.New(cfg)
})
return err
}
func GetClient() *etcd.Client {
return etcdCli
}
接下来,我在 cmd/root.go 中调用 etcd.Init ,初始化 etcd 客户端。代码如下:
if err := etcd.Init(); err != nil {
panic(err)
}
存储集群信息
既然要用 etcd 存储集群信息,那么集群信息都有哪些呢?
总的来说,秒杀系统的集群信息主要是秒杀服务节点信息,比如服务地址、版本号、协议类型等。在秒杀系统集群内,主要是将秒杀 API 服务的节点信息提供给 Admin 服务,用于调用 API 服务的 RPC 接口同步配置;在秒杀系统集群外使用节点信息,主要通过服务发现的方式,将节点暴露给监控系统或者其他微服务调用。
在 etcd 中,我们可以将节点信息存放到 /seckill/nodes 这个 key 下。以 IP 为 10.10.11.12 的节点为例,我们用 addr 字段保存服务地址,用 proto 保存协议类型,用 version 字段保存版本号。由于节点信息只会在节点启动时变更,我们用 json 格式来存放,具体的数据示例如下:
{
"addr": "10.10.11.12:8080",
"proto": "http",
"version": "v1.0"
}
我们可以用 Go 结构体来定义一个新类型 Node,用于处理 json 格式的节点信息,字段名跟前面提到的 json 格式保持一致。如下所示:
type Node struct {
Addr string `json:"addr"`
Version string `json:"version"`
Proto string `json:"proto"`
}
节点信息的管理需要使用三个函数:Register、Deregister 和 Discover,它们分别用于服务节点的注册、注销和发现。
具体怎么实现呢?
第一步,我定义了个 cluster 结构体,用于保存集群管理过程中需要的所有信息,包括锁、etcd 客户端、节点列表等,然后在 Init 函数中初始化它。具体代码如下所示:
type cluster struct {
sync.RWMutex
cli *etcdv3.Client
service string
once *sync.Once
deregCh map[string]chan struct{}
nodes map[string]*Node
v *viper.Viper
}
var defaultCluster *cluster
var once = &sync.Once{
func Init(service string) {
once.Do(func() {
defaultCluster = &cluster{
cli: etcd.GetClient(),
service: service,
once: &sync.Once{},
deregCh: make(map[string]chan struct{}),
nodes: make(map[string]*Node),
}
})
}
第二步,我实现了 Register、Deregister 和 Discover 这三个函数。
具体来说,我在 Register 函数中将节点信息注册到 etcd,设置一个有效期,并定期更新,确保节点信息是最新的。在 Deregister 函数中将注册到 etcd 的节点信息删除。在 Discover 函数中,监控节点变更,并及时更新本地内存缓存中的节点信息,将最新的节点信息返回给调用者。代码如下:
func Register(node *Node, ttl int) error {
const minTTL = 2
c := defaultCluster
key := c.makeKey(node)
if ttl < minTTL {
ttl = minTTL
}
var errCh = make(chan error)
go func() {
kv := etcdv3.NewKV(c.cli)
closeCh := make(chan struct{})
lease := etcdv3.NewLease(c.cli)
val, _ := json.Marshal(node)
var curLeaseId etcdv3.LeaseID = 0
ticker := time.NewTicker(time.Duration(ttl/2) * time.Second)
register := func() error {
if curLeaseId == 0 {
leaseResp, err := lease.Grant(context.TODO(), int64(ttl))
if err != nil {
return err
}
if _, err := kv.Put(context.TODO(), key, string(val), etcdv3.WithLease(leaseResp.ID)); err != nil {
return err
}
curLeaseId = leaseResp.ID
} else {
if _, err := lease.KeepAliveOnce(context.TODO(), curLeaseId); err == rpctypes.ErrLeaseNotFound {
curLeaseId = 0
}
}
return nil
}
if err := register(); err != nil {
logrus.Error("register node failed, error:", err)
errCh <- err
}
close(errCh)
for {
select {
case <-ticker.C:
if err := register(); err != nil {
logrus.Error("register node failed, error:", err)
panic(err)
}
case <-closeCh:
ticker.Stop()
return
}
}
}()
err := <-errCh
return err
}
func Deregister(node *Node) error {
c := defaultCluster
c.Lock()
defer c.Unlock()
key := c.makeKey(node)
if ch, ok := c.deregCh[key]; ok {
close(ch)
delete(c.deregCh, key)
}
_, err := c.cli.Delete(context.Background(), key, etcdv3.WithPrefix())
return err
}
func Discover() (output []*Node, err error) {
c := defaultCluster
key := fmt.Sprintf("/%s/nodes/", c.service)
c.once.Do(func() {
var resp *etcdv3.GetResponse
resp, err = c.cli.Get(context.Background(), key, etcdv3.WithPrefix())
if err != nil {
return
}
for _, kv := range resp.Kvs {
k := string(kv.Key)
if len(k) > len(key) {
var node *Node
json.Unmarshal(kv.Value, &node)
if node != nil {
c.Lock()
c.nodes[k] = node
c.Unlock()
}
}
}
watchCh := c.cli.Watch(context.Background(), key, etcdv3.WithPrefix())
go func() {
for {
select {
case resp := <-watchCh:
for _, evt := range resp.Events {
k := string(evt.Kv.Key)
if len(k) <= len(key) {
continue
}
switch evt.Type {
case etcdv3.EventTypePut:
var node *Node
json.Unmarshal(evt.Kv.Value, &node)
if node != nil {
c.Lock()
c.nodes[k] = node
c.Unlock()
}
case etcdv3.EventTypeDelete:
c.Lock()
if _, ok := c.nodes[k]; ok {
delete(c.nodes, k)
}
c.Unlock()
}
}
}
}
}()
})
if err != nil {
return nil, err
}
c.RLock()
for _, node := range c.nodes {
output = append(output, node)
}
c.RUnlock()
return
}
func (c *cluster) makeKey(node *Node) string {
id := strings.Replace(node.Addr, ".", "-", -1)
id = strings.Replace(id, ":", "-", -1)
return fmt.Sprintf("/%s/nodes/%s", c.service, id)
}
第三步,修改 interfaces/rpc 目录下的 rpc.go,在 Run 函数中加上注册节点信息的代码,其中初始化一个变量 node,如下所示:
//初始化集群
cluster.Init("seckill")
var addr string
if addr, err = utils.Extract(bind); err == nil {
//注册节点信息
version := viper.GetString("api.version")
if version == "" {
version = "v0.1"
}
once.Do(func() {
node = &cluster.Node{
Addr: addr,
Version: version,
Proto: "gRPC",
}
err = cluster.Register(node, 6)
})
}
if err != nil {
return err
}
另外,我还在 Exit 函数中加上了注销节点的代码,主要是调用 cluster.Deregister 函数将节点信息从 etcd 中删除。
第四步,在 Run 函数里初始化集群,并加上一段代码用于输出获取到的节点信息。代码如下:
cluster.Init("seckill")
if nodes, err := cluster.Discover(); err == nil {
log, _ := json.Marshal(nodes)
logrus.Info("discover nodes ", string(log))
} else {
logrus.Error("discover nodes error:", err)
}
注意,这个 Run 函数是位于 interfaces/admin 目录下的 admin.go 里面哦。
第五步,编译成功后,我们先启动 api,然后启动 admin。可以看到 admin 中输出了节点信息日志,说明 admin 服务成功发现了 api 服务的节点。此时,使用 etcdctl 也能获取到节点信息。
为了验证节点注册功能是否正常工作,我用 pkill -9 seckill 命令将 api 和 admin 都杀掉,使用 etcdctl 前两次还能获取到节点信息,但后面就获取不到了。就是因为杀掉 api 服务后,服务停止了更新过期时间,导致 etcd 将过期的节点信息删除。这也就证明了服务节点注册功能是正常工作的。
运行效果如下图所示:
存储集群配置
秒杀集群配置主要有日志等级、限流器速度、熔断条件等。它们存放到 etcd 中 /seckill/config 这个 key 下,其中的 logLevel 保存日志等级, rateLimit 保存中间层和底层限流器的速度, circuitBreaker 保存熔断条件中的 cpu 使用率和请求时延。具体配置示例如下:
{
"logLevel":"info",
"rateLimit": {
"middle": 100000,
"low": 10000,
}
"circuitBreaker":{
"cpu": 80,
"latency": 1000,
}
}
对应到 Go 代码中,大致步骤如下:
将上面的 json 配置用一个 Config 结构体保存,字段名跟 json 中的保持一致,然后把它放在 infrastructure/cluster 目录下的 cluster.go 中;
实现一个 WatchClusterConfig 函数,主要是监控并处理 etcd 中集群配置的 Delete 和 Put 事件,并及时更新内存缓存中的配置数据;
实现一个 GetClusterConfig 函数,用于将缓存中的配置提供给其他模块使用。
注意,在更新和读取缓存中配置数据的时候,都需要加锁,以免获取到的配置数据不是最新的。
代码如下所示:
type Config struct {
LogLevel string `json:"logLevel"`
RateLimit struct {
Middle int `json:"middle"`
Low int `json:"low"`
} `json:"rateLimit"`
CircuitBreaker struct {
Cpu int `json:"cpu"`
Latency int `json:"latency"`
} `json:"circuitBreaker"`
}
var configLock = &sync.RWMutex{}
var config = &Config{}
func WatchClusterConfig() error {
cli := etcd.GetClient()
key := "/seckill/config"
resp, err := cli.Get(context.Background(), key)
if err != nil {
return err
}
update := func(kv *mvccpb.KeyValue) (bool, error) {
if string(kv.Key) == key {
var tmpConfig *Config
err = json.Unmarshal(kv.Value, &tmpConfig)
if err != nil {
logrus.Error("update cluster config failed, error:", err)
return false, err
}
configLock.Lock()
*config = *tmpConfig
logrus.Info("update cluster config ", *config)
configLock.Unlock()
return true, nil
}
return false, nil
}
for _, kv := range resp.Kvs {
if ok, err := update(kv); ok {
break
} else if err != nil {
return err
}
}
go func() {
watchCh := cli.Watch(context.Background(), key)
for resp := range watchCh {
for _, evt := range resp.Events {
if evt.Type == etcdv3.EventTypePut {
if ok, err := update(evt.Kv); ok {
break
} else if err != nil {
break
}
}
}
}
}()
return nil
}
func GetClusterConfig() Config {
configLock.RLock()
defer configLock.RUnlock()
return *config
}
接下来,为了能同时启动两个 api 服务,我先将 config/seckill.toml 配置文件拷贝一份,新的配置文件为 config/seckill1.toml,将里面的端口号和 pid 文件修改成与老的配置不一样,比如端口号改成 8083 和 8084,pid 文件改成 .pid1。
现在我们用这两份配置文件启动两个 api 服务,然后使用下面的 etcdctl 命令将配置数据写入到 etcd 。
ETCDCTL_API=3 etcdctl put /seckill/config '{"logLevel":"info","rateLimit":{"middle":100000,"low":10000},"circuitBreaker":{"cpu":80,"latency":1000}}'
此时我们就能在两个运行 api 服务的命令行终端看到 update cluster config 这样的日志,而且它们的时间戳一样,说明它们几乎能同时接收到配置变更事件,并在日志中输出最新的配置。
效果如下图所示:
小结
这一讲我主要介绍了如何用 etcd 存储集群信息和集群配置。实际上,集群信息和配置管理有很多种方法,比如你还可以用 consul 来实现配置管理、服务注册和发现,那可能会比用 etcd 更简单些。
我之所以在这里用 etcd ,主要是为了给你介绍集群配置管理、服务注册和发现的底层代码原理。当你真正学会了自己去实现一套类似的代码后,你在使用其他方案时将会更容易理解它们的原理。
另外,还需要注意一点,我在这里用的 etcd 版本是 v3,在代码和命令行上与 v2 有些差别。
思考题: 如果用 consul 来实现集群信息和配置管理,该如何做呢?
期待你在留言区回答。
好了,这一讲就到这里了,下一讲我将给你介绍“如何使用 Redis 缓存库存信息”。到时见!
源码地址:https://github.com/lagoueduCol/MiaoSha-Yiletian
精选评论
**耀:
老师我很喜欢你的课程,每次都能学到很多知识,谢谢老师。
**0835:
老师,问一下,这些配置信息,可以用redis来做存储吗?不是redis的性能更好一些吗?
讲师回复:
Redis性能虽然好,但无法保证数据一致和可靠。ETCD是会将数据落磁盘的,会有WAL日志,基于WAL之上用RAFT确保数据一致,不担心数据丢失。