40分钟学 Go 语言高并发:分布式锁实现

发布于:2024-12-06 ⋅ 阅读:(109) ⋅ 点赞:(0)

分布式锁实现

一、概述

分布式锁是分布式系统中的一个重要组件,用于协调分布式环境下的资源访问和并发控制。我们将从锁设计、死锁预防、性能优化和容错处理四个维度深入学习。

学习目标

维度 重点内容 掌握程度
锁设计 基于Redis/etcd的锁实现原理 必须掌握
死锁预防 超时机制、重入机制 必须掌握
性能优化 锁粒度控制、读写分离 重点掌握
容错处理 节点故障、网络分区 重点掌握

二、实现流程图

在这里插入图片描述

三、基础锁实现

让我们首先实现一个基于Redis的分布式锁基础版本:

package distlock

import (
    "context"
    "crypto/rand"
    "encoding/base64"
    "errors"
    "time"

    "github.com/go-redis/redis/v8"
)

type DistributedLock struct {
    client     *redis.Client
    key        string
    value      string
    expiration time.Duration
}

// NewDistributedLock 创建一个新的分布式锁实例
func NewDistributedLock(client *redis.Client, key string, expiration time.Duration) (*DistributedLock, error) {
    // 生成随机值作为锁的标识
    b := make([]byte, 16)
    _, err := rand.Read(b)
    if err != nil {
        return nil, err
    }
    value := base64.StdEncoding.EncodeToString(b)

    return &DistributedLock{
        client:     client,
        key:        key,
        value:      value,
        expiration: expiration,
    }, nil
}

// TryLock 尝试获取锁
func (dl *DistributedLock) TryLock(ctx context.Context) (bool, error) {
    return dl.client.SetNX(ctx, dl.key, dl.value, dl.expiration).Result()
}

// Unlock 释放锁
func (dl *DistributedLock) Unlock(ctx context.Context) error {
    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    `
    result, err := dl.client.Eval(ctx, script, []string{dl.key}, dl.value).Result()
    if err != nil {
        return err
    }
    if result == 0 {
        return errors.New("lock not held")
    }
    return nil
}

// RefreshLock 刷新锁的过期时间
func (dl *DistributedLock) RefreshLock(ctx context.Context) error {
    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("pexpire", KEYS[1], ARGV[2])
        else
            return 0
        end
    `
    result, err := dl.client.Eval(
        ctx,
        script,
        []string{dl.key},
        dl.value,
        dl.expiration.Milliseconds(),
    ).Result()
    
    if err != nil {
        return err
    }
    if result == 0 {
        return errors.New("lock not held")
    }
    return nil
}

// IsLocked 检查锁是否被持有
func (dl *DistributedLock) IsLocked(ctx context.Context) (bool, error) {
    exists, err := dl.client.Exists(ctx, dl.key).Result()
    if err != nil {
        return false, err
    }
    return exists == 1, nil
}

四、增强版锁实现(带可重入特性)

下面是一个支持可重入的分布式锁实现:

package distlock

import (
    "context"
    "encoding/json"
    "errors"
    "sync"
    "time"

    "github.com/go-redis/redis/v8"
)

type LockInfo struct {
    Owner     string `json:"owner"`
    Count     int    `json:"count"`
    Timestamp int64  `json:"timestamp"`
}

type ReentrantLock struct {
    client     *redis.Client
    key        string
    owner      string
    expiration time.Duration
    mu         sync.Mutex
}

// NewReentrantLock 创建可重入锁
func NewReentrantLock(client *redis.Client, key string, owner string, expiration time.Duration) *ReentrantLock {
    return &ReentrantLock{
        client:     client,
        key:        key,
        owner:      owner,
        expiration: expiration,
    }
}

// Lock 获取可重入锁
func (rl *ReentrantLock) Lock(ctx context.Context) error {
    rl.mu.Lock()
    defer rl.mu.Unlock()

    script := `
        local lockInfo = redis.call('get', KEYS[1])
        if not lockInfo then
            -- 锁不存在,创建新锁
            redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])
            return 1
        end
        
        local info = cjson.decode(lockInfo)
        if info.owner == ARGV[3] then
            -- 重入锁
            info.count = info.count + 1
            info.timestamp = tonumber(ARGV[4])
            redis.call('set', KEYS[1], cjson.encode(info), 'PX', ARGV[2])
            return 1
        end
        
        return 0
    `

    lockInfo := LockInfo{
        Owner:     rl.owner,
        Count:     1,
        Timestamp: time.Now().UnixNano(),
    }

    lockInfoJSON, err := json.Marshal(lockInfo)
    if err != nil {
        return err
    }

    result, err := rl.client.Eval(
        ctx,
        script,
        []string{rl.key},
        string(lockInfoJSON),
        rl.expiration.Milliseconds(),
        rl.owner,
        time.Now().UnixNano(),
    ).Result()

    if err != nil {
        return err
    }

    if result.(int64) == 0 {
        return errors.New("failed to acquire lock")
    }

    return nil
}

// Unlock 释放可重入锁
func (rl *ReentrantLock) Unlock(ctx context.Context) error {
    rl.mu.Lock()
    defer rl.mu.Unlock()

    script := `
        local lockInfo = redis.call('get', KEYS[1])
        if not lockInfo then
            return 0
        end
        
        local info = cjson.decode(lockInfo)
        if info.owner ~= ARGV[1] then
            return -1
        end
        
        info.count = info.count - 1
        if info.count <= 0 then
            redis.call('del', KEYS[1])
            return 1
        else
            redis.call('set', KEYS[1], cjson.encode(info), 'PX', ARGV[2])
            return 1
        end
    `

    result, err := rl.client.Eval(
        ctx,
        script,
        []string{rl.key},
        rl.owner,
        rl.expiration.Milliseconds(),
    ).Result()

    if err != nil {
        return err
    }

    switch result.(int64) {
    case -1:
        return errors.New("lock held by another owner")
    case 0:
        return errors.New("lock not held")
    default:
        return nil
    }
}

// RefreshLock 刷新锁的过期时间
func (rl *ReentrantLock) RefreshLock(ctx context.Context) error {
    script := `
        local lockInfo = redis.call('get', KEYS[1])
        if not lockInfo then
            return 0
        end
        
        local info = cjson.decode(lockInfo)
        if info.owner ~= ARGV[1] then
            return 0
        end
        
        info.timestamp = tonumber(ARGV[3])
        redis.call('set', KEYS[1], cjson.encode(info), 'PX', ARGV[2])
        return 1
    `

    result, err := rl.client.Eval(
        ctx,
        script,
        []string{rl.key},
        rl.owner,
        rl.expiration.Milliseconds(),
        time.Now().UnixNano(),
    ).Result()

    if err != nil {
        return err
    }

    if result.(int64) == 0 {
        return errors.New("lock not held")
    }

    return nil
}

五、死锁预防机制

1. 超时机制

  • 所有锁操作都设置了过期时间
  • 使用看门狗机制自动续期
  • 防止客户端崩溃导致的死锁

2. 死锁检测

检测项 处理方式 实现难度
循环等待 资源有序分配 中等
持有等待 一次性申请所有资源 简单
不可剥夺 超时自动释放 简单
互斥访问 读写分离 较难

六、性能优化策略

1. 锁粒度优化

  • 降低锁粒度,提高并发度
  • 使用多粒度锁机制
  • 实现分段锁

2. 读写分离

package distlock

import (
    "context"
    "fmt"
    "time"

    "github.com/go-redis/redis/v8"
)

type RWLock struct {
    client     *redis.Client
    key        string
    owner      string
    expiration time.Duration
}

func NewRWLock(client *redis.Client, key string, owner string, expiration time.Duration) *RWLock {
    return &RWLock{
        client:     client,
        key:        key,
        owner:      owner,
        expiration: expiration,
    }
}

// RLock 获取读锁
func (rwl *RWLock) RLock(ctx context.Context) error {
    script := `
        -- 检查是否存在写锁
        if redis.call('exists', KEYS[1] .. ':write') == 1 then
            return 0
        end
        
        -- 增加读锁计数
        local count = redis.call('incr', KEYS[1] .. ':read')
        redis.call('pexpire', KEYS[1] .. ':read', ARGV[1])
        
        -- 记录读锁持有者
        redis.call('hset', KEYS[1] .. ':readers', ARGV[2], '1')
        redis.call('pexpire', KEYS[1] .. ':readers', ARGV[1])
        
        return 1
    `

    result, err := rwl.client.Eval(
        ctx,
        script,
        []string{rwl.key},
        rwl.expiration.Milliseconds(),
        rwl.owner,
    ).Result()

    if err != nil {
        return fmt.Errorf("failed to acquire read lock: %v", err)
    }

    if result.(int64) == 0 {
        return fmt.Errorf("write lock exists")
    }

    return nil
}

// RUnlock 释放读锁
func (rwl *RWLock) RUnlock(ctx context.Context) error {
    script := `
        -- 检查读锁是否存在
        if redis.call('exists', KEYS[1] .. ':read') == 0 then
            return 0
        end
        
        -- 检查当前客户端是否持有读锁
        if redis.call('hexists', KEYS[1] .. ':readers', ARGV[1]) == 0 then
            return -1
        end
        
        -- 移除读锁持有者记录
        redis.call('hdel', KEYS[1] .. ':readers', ARGV[1])
        
        -- 减少读锁计数
        local count = redis.call('decr', KEYS[1] .. ':read')
        if count <= 0 then
            redis.call('del', KEYS[1] .. ':read')
            redis.call('del', KEYS[1] .. ':readers')
        end
        
        return 1
    `

    result, err := rwl.client.Eval(
        ctx,
        script,
        []string{rwl.key},
        rwl.owner,
    ).Result()

    if err != nil {
        return fmt.Errorf("failed to release read lock: %v", err)
    }

    switch result.(int64) {
    case -1:
        return fmt.Errorf("read lock not held by this client")
    case 0:
        return fmt.Errorf("read lock not exists")
    default:
        return nil
    }
}

// Lock 获取写锁
func (rwl *RWLock) Lock(ctx context.Context) error {
    script := `
        -- 检查是否存在读锁或写锁
        if redis.call('exists', KEYS[1] .. ':read') == 1 or
           redis.call('exists', KEYS[1] .. ':write') == 1 then
            return 0
        end
        
        -- 设置写锁
        redis.call('set', KEYS[1] .. ':write', ARGV[1], 'PX', ARGV[2])
        return 1
    `

    result, err := rwl.client.Eval(
        ctx,
        script,
        []string{rwl.key},
        rwl.owner,
        rwl.expiration.Milliseconds(),
    ).Result()

    if err != nil {
        return fmt.Errorf("failed to acquire write lock: %v", err)
    }

    if result.(int64) == 0 {
        return fmt.Errorf("lock exists")
    }

    return nil
}

// Unlock 释放写锁
func (rwl *RWLock) Unlock(ctx context.Context) error {
    script := `
        -- 检查写锁是否存在且属于当前客户端
        local value = redis.call('get', KEYS[1] .. ':write')
        if not value then
            return 0
        end
        if value ~= ARGV[1] then
            return -1
        end
        
        -- 删除写锁
        redis.call('del', KEYS[1] .. ':write')
        return 1
    `

    result, err := rwl.client.Eval(
        ctx,
        script,
        []string{rwl.key},
        rwl.owner,
    ).Result()

    if err != nil {
        return fmt.Errorf("failed to release write lock: %v", err)
    }

    switch result.(int64) {
    case -1:
        return fmt.Errorf("write lock not held by this client")
    case 0:
        return fmt.Errorf("write lock not exists")
    default:
        return nil
    }
}

七、容错处理

1. 容错机制设计

在这里插入图片描述

2. 故障处理实现

package distlock

import (
    "context"
    "errors"
    "sync"
    "time"

    "github.com/go-redis/redis/v8"
)

type FaultTolerantLock struct {
    master     *redis.Client
    slaves     []*redis.Client
    localLock  sync.Mutex
    key        string
    owner      string
    expiration time.Duration
}

func NewFaultTolerantLock(
    master *redis.Client,
    slaves []*redis.Client,
    key string,
    owner string,
    expiration time.Duration,
) *FaultTolerantLock {
    return &FaultTolerantLock{
        master:     master,
        slaves:     slaves,
        key:        key,
        owner:      owner,
        expiration: expiration,
    }
}

// Lock 获取容错锁
func (ftl *FaultTolerantLock) Lock(ctx context.Context) error {
    // 1. 尝试在主节点获取锁
    if err := ftl.tryLockOnMaster(ctx); err == nil {
        return nil
    }

    // 2. 主节点失败,尝试在从节点获取锁
    if err := ftl.tryLockOnSlaves(ctx); err == nil {
        return nil
    }

    // 3. 所有Redis节点都失败,降级使用本地锁
    ftl.localLock.Lock()
    
    // 4. 启动后台协程尝试恢复到Redis锁
    go ftl.tryRecoverToRedis(context.Background())
    
    return nil
}

func (ftl *FaultTolerantLock) tryLockOnMaster(ctx context.Context) error {
    script := `
        if redis.call('exists', KEYS[1]) == 0 then
            redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])
            return 1
        end
        return 0
    `

    result, err := ftl.master.Eval(
        ctx,
        script,
        []string{ftl.key},
        ftl.owner,
        ftl.expiration.Milliseconds(),
    ).Result()

    if err != nil {
        return err
    }

    if result.(int64) == 0 {
        return errors.New("lock exists")
    }

    return nil
}

func (ftl *FaultTolerantLock) tryLockOnSlaves(ctx context.Context) error {
    // 需要在多数从节点上获取锁才算成功
    successCount := 0
    majorityCount := (len(ftl.slaves) / 2) + 1

    for _, slave := range ftl.slaves {
        if err := ftl.tryLockOnNode(ctx, slave); err == nil {
            successCount++
            if successCount >= majorityCount {
                return nil
            }
        }
    }

    return errors.New("failed to acquire lock on majority of slaves")
}

func (ftl *FaultTolerantLock) tryLockOnNode(ctx context.Context, node *redis.Client) error {
    script := `
        if redis.call('exists', KEYS[1]) == 0 then
            redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])
            return 1
        end
        return 0
    `

    result, err := node.Eval(
        ctx,
        script,
        []string{ftl.key},
        ftl.owner,
        ftl.expiration.Milliseconds(),
    ).Result()

    if err != nil {
        return err
    }

    if result.(int64) == 0 {
        return errors.New("lock exists")
    }

    return nil
}

func (ftl *FaultTolerantLock) tryRecoverToRedis(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // 尝试恢复到Redis主节点
            if err := ftl.tryLockOnMaster(ctx); err == nil {
                ftl.localLock.Unlock()
                return
            }

            // 尝试恢复到Redis从节点
            if err := ftl.tryLockOnSlaves(ctx); err == nil {
                ftl.localLock.Unlock()
                return
            }
        }
    }
}

// Unlock 释放锁
func (ftl *FaultTolerantLock) Unlock(ctx context.Context) error {
    // 尝试释放Redis锁
    if err := ftl.unlockRedis(ctx); err == nil {
        return nil
    }

    // Redis释放失败,释放本地锁
    ftl.localLock.Unlock()
    return nil
}

func (ftl *FaultTolerantLock) unlockRedis(ctx context.Context) error {
    script := `
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        end
        return 0
    `

    // 先尝试在主节点释放
    result, err := ftl.master.Eval(
        ctx,
        script,
        []string{ftl.key},
        ftl.owner,
    ).Result()

    if err == nil && result.(int64) == 1 {
        return nil
    }

    // 主节点释放失败,尝试在从节点释放
    for _, slave := range ftl.slaves {
        result, err = slave.Eval(
            ctx,
            script,
            []string{ftl.key},
            ftl.owner,
        ).Result()

        if err == nil && result.(int64) == 1 {
            return nil
        }
    }

    return errors.New("failed to release lock on all nodes")
}

八、性能测试与监控

1. 性能指标

指标 说明 目标值
获取锁延迟 从发起请求到获取锁的时间 <50ms
释放锁延迟 从发起释放到完成的时间 <30ms
锁冲突率 获取锁失败的比例 <10%
QPS 每秒处理的锁请求数 >1000

2. 监控指标

  1. 系统监控

    • CPU使用率
    • 内存使用
    • 网络延迟
    • 磁盘IO
  2. 业务监控

    • 锁获取成功率
    • 锁超时次数
    • 死锁检测次数
    • 降级次数

九、最佳实践总结

  1. 锁设计

    • 使用唯一标识确保锁的归属
    • 合理设置超时时间
    • 实现可重入机制
    • 使用Lua脚本保证原子性
  2. 死锁预防

    • 实现超时自动释放
    • 避免循环等待
    • 实现锁的重入
    • 定期检测死锁
  3. 性能优化

    • 使用读写锁分离
    • 控制锁粒度
    • 批量处理
    • 使用本地缓存
  4. 容错处理

    • 实现主从切换
    • 支持优雅降级
    • 异步恢复机制
    • 多副本数据同步

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


网站公告

今日签到

点亮在社区的每一天
去签到