Go微服务: redis分布式锁在集群中可能遇到的问题及其解决方案

发布于:2024-06-22 ⋅ 阅读:(104) ⋅ 点赞:(0)

概述

  • 我们的 redis 一般都是集群来给我们程序提供服务的,单体的redis现在也不多见
  • 看到上面是主节点redis和下面是6个重节点redis,主节点和重节点的通讯都是畅通没问题的
  • 这个时候,我们有 gorouting 写我们的数据,那它就会用到我们的setNX
  • 写完数据内部是自动同步的,就是你的这个数据通过主节点同步到这些从节点了
  • 下面又有我们的 gorouting 去读我们的从节点,但是,我们是在高并发和网络不确定的情况下可能会遇到一些问题

可能会遇到的一些问题


1 )集群方面

  • 如果,我们上面是一个gorouting,在主节点上,它用setNX写数据,如果主节点挂了
  • 集群就从我们的所有子节点中抽取一个节点,当成主节点顶上去,集群又可以正常工作了
  • 这个时候,有一个gorouting在右下角,它又来读数据了,由于我们上面刚写了数据
  • 还没有来得及同步到最后一个 redis 这个节点上, 但是面临着新的gorouting读取数据或操作
  • 这个时候最后这一台redis它是拿不到那个锁的,是没有同步到的
  • 最后来的 gorouting,就认为你没有锁, 或者说我要的资源,你没锁住
  • 那其他 gorouting 就认为它是无主的, 就可以锁, 这个时候就会造成一些问题

2 )网络方面

  • 正常的时候,写数据还有下面的从节点去获取数据,获取锁,都是没问题的
  • 如果是网络抖动或不通会有一些问题,由于redis它是网络传输的
  • 如果说我们右边的这网络络不通了,相当于右边的 redis 没有和主节点通讯
  • 这个时候我们的一个gorouting就来获取锁进行数据的操作
  • 如果这个时候,我要操作的资源没有上锁,那这个gorouting就认为它是还没有被加锁,就把这个锁锁上了
  • 所以这个地方也是有可能出问题的风险

解决方案


1 )使用 redLock

  • 锁不住资源,有可能因为节点挂了或网络抖动, 我们现在尝试使用 redLock 来解决这一个问题
  • redLock它没有master节点,也没有这个slave从节点,都是独立的
  • 每一个redis,都是有一个 SetNx 这么一个锁, 现在有两个协程来申请锁
  • redis集群的一般是7个,而不是说双数的, 如果双数的那我左边的 gorouting 获得3个
  • 右边的 gorouting 获得3个,他就要重新再做选举投票之类的东西
  • 基于redLock, 当左边的 gorouting 抢到了4个,那右边的只有3个就应该释放掉
  • 为下一次再运行做准备,右边这个锁就消失了

2 ) 源码

  • redlock把原来的master,slave这种模式改成了平等的模式,最终解决了问题

2.1 ) NewMutex

  • 在源码的 NewMutex 函数中
    // NewMutex returns a new distributed mutex with given name.
    func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
    	m := &Mutex{
    		name:   name,
    		expiry: 8 * time.Second,
    		tries:  32, // 重试 32 次
    		delayFunc: func(tries int) time.Duration {
    			return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
    		},
    		genValueFunc:  genValue,
    		driftFactor:   0.01, // 漂移因子
    		timeoutFactor: 0.05, // 超时因子
    		quorum:        len(r.pools)/2 + 1, // 法定数,找一半+1,大多数
    		pools:         r.pools,
    	}
    	for _, o := range options {
    		o.Apply(m)
    	}
    	if m.shuffle {
    		randomPools(m.pools)
    	}
    	return m
    }
    
  • 上面 driftFactor 是说我们的服务器的时钟漂移
    • 比如说我们的A服务器是中午12点,但是B服务器是中午11点59分30秒
    • C服务器是中午的12点0分30秒,相当于它们每台服务器相差30秒
    • 这就是服务器的时间漂移,它不准,那这会导致什么呢?
  • 假如说我们的这个过期时间是8秒,那你差了30秒,肯定就是有的服务器会先释放锁
  • 那先释放锁,其他人就可以拿到锁,所以他就设置了一个因子
  • 关于 quorum 就如同上面的例子,7台服务器拿到了4台就是成功的

2.2 ) Lock

  • 回到锁定的函数 Lock 中,进入其 lockContext
    // lockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
    func (m *Mutex) lockContext(ctx context.Context, tries int) error {
    	if ctx == nil {
    		ctx = context.Background()
    	}
    	// 获取 base64 的值
    	value, err := m.genValueFunc()
    	if err != nil {
    		return err
    	}
    
    	var timer *time.Timer
    	// 对默认32次循环的操作
    	for i := 0; i < tries; i++ {
    		if i != 0 {
    			if timer == nil {
    				timer = time.NewTimer(m.delayFunc(i))
    			} else {
    				timer.Reset(m.delayFunc(i))
    			}
    
    			select {
    			// 如果 完成 状态,则返回 ErrFailed
    			case <-ctx.Done():
    				timer.Stop()
    				// Exit early if the context is done.
    				return ErrFailed
    			// 没有完成,则不动
    			case <-timer.C:
    				// Fall-through when the delay timer completes.
    			}
    		}
    
    		start := time.Now()
    		// 拿到计数器和错误信息
    		n, err := func() (int, error) {
    			ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
    			defer cancel()
    			/// 注意这里,最终的函数就是执行的这里
    			return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
    				return m.acquire(ctx, pool, value)
    			})
    		}()
    
    		now := time.Now()
    		// 下面是核心算法: 过期时间 - 拿锁的时间 - 漂移因子可能的时间
    		until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
    		// 判断是否是大多数并且没有过期,则直接进行赋值
    		if n >= m.quorum && now.Before(until) {
    			m.value = value
    			m.until = until
    			return nil
    		}
    		// 否则 release 进行释放
    		_, _ = func() (int, error) {
    			ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
    			defer cancel()
    			return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
    				return m.release(ctx, pool, value)
    			})
    		}()
    		if i == tries-1 && err != nil {
    			return err
    		}
    	}
    
    	return ErrFailed
    }
    
  • 进入 actOnPoolsAsync 这里参数是一个函数
    func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
    	type result struct {
    		node     int
    		statusOK bool
    		err      error
    	}
    	// 创建 channel
    	ch := make(chan result, len(m.pools))
    	// 循环 pools
    	for node, pool := range m.pools {
    		// 开协程提速
    		go func(node int, pool redis.Pool) {
    			r := result{node: node}
    			r.statusOK, r.err = actFn(pool)
    			ch <- r
    		}(node, pool)
    	}
    
    	var (
    		n     = 0 // 计数器
    		taken []int
    		err   error // 错误
    	)
    	
    	// 循环 pools
    	for range m.pools {
    		// 从 channel 中拿到结果
    		r := <-ch
    		if r.statusOK {
    			n++ // 计数器加加
    		} else if r.err == ErrLockAlreadyExpired {
    			err = multierror.Append(err, ErrLockAlreadyExpired)
    		} else if r.err != nil {
    			err = multierror.Append(err, &RedisError{Node: r.node, Err: r.err})
    		} else {
    			taken = append(taken, r.node)
    			err = multierror.Append(err, &ErrNodeTaken{Node: r.node})
    		}
    
    		if m.failFast {
    			// fast retrun
    			if n >= m.quorum {
    				return n, err
    			}
    
    			// fail fast
    			if len(taken) >= m.quorum {
    				return n, &ErrTaken{Nodes: taken}
    			}
    		}
    	}
    
    	if len(taken) >= m.quorum {
    		return n, &ErrTaken{Nodes: taken}
    	}
    	return n, err
    }
    
  • 以上就是 redLock 源码锁的机制,通过源代码可以更好的理解框架
  • 即使上面一些细节点看不懂,跳过即可,前期可以先看大的实现脉络帮助我们理解

网站公告

今日签到

点亮在社区的每一天
去签到