【取消分仓-分布式锁】

发布于:2025-07-20 ⋅ 阅读:(19) ⋅ 点赞:(0)
public JsonResponse updateInOutBoundTaskList(List<InOutBoundTaskReq> cancelBoundList) {
        log.info("【出入库取消分仓】开始处理请求: cancelBoundList={}", cancelBoundList);

        // 1. 参数校验
        if (ObjectUtil.isEmpty(cancelBoundList)) {
            throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_NO_TASK_INFO);
        }
        if (cancelBoundList.size() > 100){
            throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_MAX_SIZE);
        }

        // 校验订单类型是否合法
        List<InOutBoundTaskReq> validCancelBoundList = cancelBoundList.stream()
                .filter(item ->
                        OrderTypeEnum.PI.getCode().equals(item.getEoorOrderTypeCode()) ||
                                OrderTypeEnum.AI.getCode().equals(item.getEoorOrderTypeCode()) ||
                                OrderTypeEnum.PO.getCode().equals(item.getEoorOrderTypeCode()) ||
                                OrderTypeEnum.AO.getCode().equals(item.getEoorOrderTypeCode())
                ).collect(Collectors.toList());

        if (CollectionUtil.isEmpty(validCancelBoundList)) {
            throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_NOT_MEET);
        }

        Map<String, List<InOutBoundTaskReq>> eotaOrderNoMap = validCancelBoundList.stream()
                .filter(item -> TaskStatusEnum.CREATED.getCode().equals(item.getEotaStatus()))
                .filter(item -> StrUtil.isNotBlank(item.getEotaOrderNo()))
                .collect(Collectors.groupingBy(InOutBoundTaskReq::getEotaOrderNo));

        Set<String> orderNoSet = eotaOrderNoMap.keySet().stream()
                .filter(StringUtils::isNotBlank)
                .collect(Collectors.toSet());

        if (orderNoSet.isEmpty()) {
            throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_NOT_NEED_CANCEL);
        }

        int successCount = 0;
        List<String> failedOrderNos = new ArrayList<>();
        List<String> successOrderNos = new ArrayList<>();

        // 3. 遍历每个订单号进行加锁处理
        for (String orderNo : orderNoSet) {
            try {
                processSingleOrder(orderNo, eotaOrderNoMap.get(orderNo));
                successOrderNos.add(orderNo);
                successCount++;
            } catch (BusinessException e) {
                log.warn("业务异常或锁异常,跳过订单 {}", orderNo);
                failedOrderNos.add(orderNo);
            } catch (Exception e) {
                log.error("系统异常,订单 {} 失败", orderNo, e);
                failedOrderNos.add(orderNo);
            }
        }

        // 构建返回结果
        Map<String, Object> result = new HashMap<>();
        result.put("successCount", successCount);
        result.put("failCount", failedOrderNos.size());
        result.put("successOrderNos", successOrderNos);
        result.put("failedOrderNos", failedOrderNos);
        log.info("【取消分仓】处理结果: {}", result);

        JsonResponse<Object> objectJsonResponse = new JsonResponse<>();
        objectJsonResponse.setCode(BaseCodeEnum.SUCCESS.getCode());
        objectJsonResponse.setData(result);
        objectJsonResponse.setMsg(I18nUtils.getI18Msg(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_RESULT_RESPONSE.getCode()));

        return objectJsonResponse;
    }

    /**
     * 每个订单单独处理,使用独立事务
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public void processSingleOrder(String orderNo, List<InOutBoundTaskReq> reqList) {
        boolean lockSuccess = false;

        try {
            // 加锁:基于订单号加锁
            lockSuccess = distributedLockHelper.lock(RedisKeyUtil.LockTypeEnum.WH_ALLOCATE, orderNo, 30);
            if (!lockSuccess) {
                log.warn("【取消分仓】未能获取锁,跳过订单号:{}", orderNo);
                throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_ACQUIRE_LOCK_FAIL);
            }

            List<String> eotaNoList = reqList.stream()
                    .map(InOutBoundTaskReq::getEotaNo)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());

            List<EoTask> tasks = eoTaskMapper.getListByEotaNoList(eotaNoList);
            List<EoTask> validTasks = tasks.stream()
                    .filter(task -> TaskStatusEnum.CREATED.getCode().equals(task.getEotaStatus()))
                    .collect(Collectors.toList());

            if (CollectionUtils.isEmpty(validTasks)) {
                log.warn("【取消分仓】订单号:{} 下无状态为创建的任务", orderNo);
                throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_NOT_NEED_CANCEL);
            }

            List<String> eotaNoList1 = validTasks.stream()
                    .map(EoTask::getEotaNo)
                    .collect(Collectors.toList());

            // 删除子项
            List<EoTaskItem> taskItems = eoTaskItemMapper.getListByEotaNoList(eotaNoList1);
            if (CollectionUtil.isNotEmpty(taskItems)){
                List<Long> itemIds = taskItems.stream()
                        .map(EoTaskItem::getId)
                        .filter(Objects::nonNull)
                        .collect(Collectors.toList());
                if (!itemIds.isEmpty()) {
                    eoOrderItemAllocateMapper.deleteBatchByEoTaskItemIdList(itemIds);
                }
                eoTaskItemMapper.deleteBatchByEotaNo(eotaNoList1);
            }

            eoTaskExtendMapper.deleteBatchByEotaNo(eotaNoList1);
            eoTaskMapper.deleteBatchByEotaNo(eotaNoList1);

            // 查询订单下是否还有其他任务
            List<EoTask> remainingTasks = eoTaskMapper.queryListByEoOrderNoList(Collections.singletonList(orderNo));
            if (CollectionUtils.isEmpty(remainingTasks)) {
                eoOrderMapper.updateStatusByOrderNoList(Collections.singletonList(orderNo), OrderCommonConstant.EO_ORDER_ALLOCATE_STATUS_UNSPLIT);
            } else {
                eoOrderMapper.updateStatusByOrderNoList(Collections.singletonList(orderNo), OrderCommonConstant.EO_ORDER_ALLOCATE_STATUS_PARTIAL_SPLITTING);
            }

        } finally {
            if (lockSuccess) {
                distributedLockHelper.unlock(RedisKeyUtil.LockTypeEnum.WH_ALLOCATE, orderNo);
            }
        }
    }

这段 Java 代码是一个 出入库任务取消分仓 的业务逻辑实现,主要功能是:批量取消已经创建但未执行的出入库任务(In/Out Bound Task),并清除相关任务数据,更新订单状态。


🧩 功能概述

1. 接口功能

  • 接收一个出入库任务的列表 List<InOutBoundTaskReq>
  • 对每个任务进行校验、加锁、删除任务数据、更新订单状态等操作。
  • 支持部分成功、部分失败的处理,返回成功和失败的订单号。

2. 适用场景

  • 当订单的出入库任务已创建但未执行时,可以取消这些任务。
  • 适用于分仓系统中,订单取消、任务回滚等场景。

🔍 方法结构详解

public JsonResponse updateInOutBoundTaskList(List<InOutBoundTaskReq> cancelBoundList)

这是对外暴露的主方法,负责整体流程控制。

1. 日志记录
log.info("【出入库取消分仓】开始处理请求: cancelBoundList={}", cancelBoundList);
  • 记录请求开始,方便调试和追踪。
2. 参数校验
if (ObjectUtil.isEmpty(cancelBoundList)) {
    throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_NO_TASK_INFO);
}
if (cancelBoundList.size() > 100){
    throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_MAX_SIZE);
}
  • 判断请求列表是否为空。
  • 限制最大请求数为 100,防止批量操作过大导致性能问题。
3. 校验订单类型是否合法
List<InOutBoundTaskReq> validCancelBoundList = cancelBoundList.stream()
        .filter(item ->
                OrderTypeEnum.PI.getCode().equals(item.getEoorOrderTypeCode()) ||
                        OrderTypeEnum.AI.getCode().equals(item.getEoorOrderTypeCode()) ||
                        OrderTypeEnum.PO.getCode().equals(item.getEoorOrderTypeCode()) ||
                        OrderTypeEnum.AO.getCode().equals(item.getEoorOrderTypeCode())
        ).collect(Collectors.toList());
  • 过滤出合法的订单类型(如 PI、AI、PO、AO)。
  • 非法类型不处理,防止误操作。
if (CollectionUtil.isEmpty(validCancelBoundList)) {
    throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_NOT_MEET);
}
  • 如果没有合法订单,直接抛异常。
4. 按订单号分组,过滤无效订单
Map<String, List<InOutBoundTaskReq>> eotaOrderNoMap = validCancelBoundList.stream()
        .filter(item -> TaskStatusEnum.CREATED.getCode().equals(item.getEotaStatus()))
        .filter(item -> StrUtil.isNotBlank(item.getEotaOrderNo()))
        .collect(Collectors.groupingBy(InOutBoundTaskReq::getEotaOrderNo));

Set<String> orderNoSet = eotaOrderNoMap.keySet().stream()
        .filter(StringUtils::isNotBlank)
        .collect(Collectors.toSet());
  • 只处理状态为“已创建”的任务。
  • 过滤出有效的订单号集合。
if (orderNoSet.isEmpty()) {
    throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_NOT_NEED_CANCEL);
}
  • 如果没有可取消的任务,抛异常。
5. 遍历订单,逐个处理
int successCount = 0;
List<String> failedOrderNos = new ArrayList<>();
List<String> successOrderNos = new ArrayList<>();

for (String orderNo : orderNoSet) {
    try {
        processSingleOrder(orderNo, eotaOrderNoMap.get(orderNo));
        successOrderNos.add(orderNo);
        successCount++;
    } catch (BusinessException e) {
        log.warn("业务异常或锁异常,跳过订单 {}", orderNo);
        failedOrderNos.add(orderNo);
    } catch (Exception e) {
        log.error("系统异常,订单 {} 失败", orderNo, e);
        failedOrderNos.add(orderNo);
    }
}
  • 对每个订单调用 processSingleOrder() 方法进行处理。
  • 捕获异常,记录成功和失败的订单号。
  • 支持部分成功,提升用户体验。
6. 构建返回结果
Map<String, Object> result = new HashMap<>();
result.put("successCount", successCount);
result.put("failCount", failedOrderNos.size());
result.put("successOrderNos", successOrderNos);
result.put("failedOrderNos", failedOrderNos);
  • 返回成功数量、失败数量、成功订单号列表、失败订单号列表。
JsonResponse<Object> objectJsonResponse = new JsonResponse<>();
objectJsonResponse.setCode(BaseCodeEnum.SUCCESS.getCode());
objectJsonResponse.setData(result);
objectJsonResponse.setMsg(I18nUtils.getI18Msg(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_RESULT_RESPONSE.getCode()));
  • 构造统一的 JSON 响应对象。

🔧 @Transactional 方法:processSingleOrder

✅ 方法签名

@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public void processSingleOrder(String orderNo, List<InOutBoundTaskReq> reqList)
  • 事务传播模式REQUIRES_NEW,表示每个订单处理都开启一个独立事务。
  • 异常回滚rollbackFor = Exception.class,任何异常都触发回滚。

1. 分布式锁控制

boolean lockSuccess = distributedLockHelper.lock(RedisKeyUtil.LockTypeEnum.WH_ALLOCATE, orderNo, 30);
if (!lockSuccess) {
    log.warn("【取消分仓】未能获取锁,跳过订单号:{}", orderNo);
    throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_ACQUIRE_LOCK_FAIL);
}
  • 使用 Redis 实现分布式锁,保证并发安全。
  • 锁的 key 为 WH_ALLOCATE:orderNo
  • 如果获取锁失败,抛出异常并跳过该订单。
} finally {
    if (lockSuccess) {
        distributedLockHelper.unlock(RedisKeyUtil.LockTypeEnum.WH_ALLOCATE, orderNo);
    }
}
  • 不管是否成功,最后释放锁。

2. 查询任务并过滤

List<String> eotaNoList = reqList.stream()
        .map(InOutBoundTaskReq::getEotaNo)
        .filter(Objects::nonNull)
        .collect(Collectors.toList());

List<EoTask> tasks = eoTaskMapper.getListByEotaNoList(eotaNoList);
List<EoTask> validTasks = tasks.stream()
        .filter(task -> TaskStatusEnum.CREATED.getCode().equals(task.getEotaStatus()))
        .collect(Collectors.toList());

if (CollectionUtils.isEmpty(validTasks)) {
    log.warn("【取消分仓】订单号:{} 下无状态为创建的任务", orderNo);
    throw I18nUtils.failException(IomsErrorCodeEnum.ORDER_ALLOCATE_CANCEL_INOUTBOUND_NOT_NEED_CANCEL);
}
  • 根据任务编号查询数据库中的任务。
  • 只处理状态为“创建”的任务。

3. 删除任务相关数据

List<String> eotaNoList1 = validTasks.stream()
        .map(EoTask::getEotaNo)
        .collect(Collectors.toList());

// 删除子项
List<EoTaskItem> taskItems = eoTaskItemMapper.getListByEotaNoList(eotaNoList1);
if (CollectionUtil.isNotEmpty(taskItems)){
    List<Long> itemIds = taskItems.stream()
            .map(EoTaskItem::getId)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
    if (!itemIds.isEmpty()) {
        eoOrderItemAllocateMapper.deleteBatchByEoTaskItemIdList(itemIds);
    }
    eoTaskItemMapper.deleteBatchByEotaNo(eotaNoList1);
}

eoTaskExtendMapper.deleteBatchByEotaNo(eotaNoList1);
eoTaskMapper.deleteBatchByEotaNo(eotaNoList1);
  • 删除任务子项、扩展信息、任务主表数据。

4. 更新订单状态

List<EoTask> remainingTasks = eoTaskMapper.queryListByEoOrderNoList(Collections.singletonList(orderNo));
if (CollectionUtils.isEmpty(remainingTasks)) {
    eoOrderMapper.updateStatusByOrderNoList(Collections.singletonList(orderNo), OrderCommonConstant.EO_ORDER_ALLOCATE_STATUS_UNSPLIT);
} else {
    eoOrderMapper.updateStatusByOrderNoList(Collections.singletonList(orderNo), OrderCommonConstant.EO_ORDER_ALLOCATE_STATUS_PARTIAL_SPLITTING);
}
  • 如果订单下没有其他任务了,则将订单状态设为“未分仓”。
  • 如果还有任务,则设为“部分分仓”。

🧠 总结:这段代码做了什么?

模块 功能
📥 参数校验 校验请求是否为空、订单类型是否合法、订单号是否有效
🔁 分组处理 按订单号分组,逐个处理
🔒 分布式锁 防止并发操作,保证数据一致性
🗑️ 数据删除 删除任务及子项数据
📝 状态更新 更新订单分仓状态
📤 结果返回 返回成功与失败的订单号,支持部分成功

🛠️ 可优化建议(可选)

  1. 异步处理:如果订单量较大,可以考虑使用异步线程池并发处理每个订单。
  2. 缓存更新:删除任务后,建议清理相关缓存。
  3. 事务控制优化:目前是每个订单一个事务,也可以按业务需求合并事务。
  4. 日志级别优化:根据环境(生产/测试)调整日志输出级别。
  5. 异常统一处理:可以提取异常处理逻辑到统一的 AOP 或全局异常处理器中。

如果你需要我帮你:

  • 提取异常处理逻辑为统一处理类
  • 改造为异步处理方式
  • 添加缓存清理逻辑
  • 使用 CompletableFuture 并发处理订单

欢迎继续提问,我可以帮你进一步优化和完善!

分布式锁

package com.midea.logistics.common.helper.redis;

import com.alibaba.ttl.TransmittableThreadLocal;
import com.midea.logistics.common.domain.enums.ModuleEnum;
import com.midea.logistics.common.util.Assert;
import com.midea.logistics.common.util.RedisKeyUtil;
import com.mideaframework.core.constants.BaseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 分布式锁
 * @author Raymond
 * @since 1.0.0
 */
@Slf4j
@ConditionalOnProperty(name = {"redis.distributed-lock-helper.enabled"}, havingValue = "true")
@Component
public class DistributedLockHelper {

    private static final TransmittableThreadLocal<Long> THREAD_LOCAL = new TransmittableThreadLocal<>();

    private final ModuleEnum moduleEnum;

    private final DefaultRedisScript<Boolean> unlockScript;

    private final RedisTemplate<String, Object> redisTemplate;

    public DistributedLockHelper(@Value("${redis.distributed-lock-helper.module:COMMON}") String module,
                                 RedisTemplate<String, Object> redisTemplate) {
        Assert.isTrue(redisTemplate != null, BaseCodeEnum.FAILED.getCode(), "redisTemplate不能为空");
        // 加载释放锁的脚本
        this.unlockScript = new DefaultRedisScript<>();
        this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/unlock.lua")));
        this.unlockScript.setResultType(Boolean.class);
        this.redisTemplate = redisTemplate;
        this.moduleEnum = ModuleEnum.getByFullName(module);
    }

    /**
     * 加锁
     * @param lockTypeEnum 锁的分类
     * @param resources 操作的资源
     * @param second 锁的时长
     * @return true or false
     */
    public boolean lock(RedisKeyUtil.LockTypeEnum lockTypeEnum, String resources, long second) {
        Assert.isTrue(StringUtils.isNotBlank(resources), BaseCodeEnum.FAILED.getCode(), "key can not be null");
        Assert.isTrue(Objects.nonNull(resources), BaseCodeEnum.FAILED.getCode(), "lockId can not be null");
        Assert.isTrue(second > 0, BaseCodeEnum.FAILED.getCode(), "second mast be more than zero");
        String redisKey = RedisKeyUtil.getDistributedLockKey(moduleEnum, lockTypeEnum, resources);
        Long currentTimeMillis = getValue();
        Boolean boo = redisTemplate.opsForValue().setIfAbsent(redisKey, currentTimeMillis, second, TimeUnit.SECONDS);
        if (boo == null){
            log.error("lock {} failed, return null", redisKey);
            return false;
        }
        if (!boo) {
            Object o = this.redisTemplate.opsForValue().get(redisKey);
            if (Objects.isNull(o)) {
                log.warn("lock {} failed, it is lock directed", redisKey);
                return false;
            }
            Long val = (Long) o;
            if (currentTimeMillis.equals(val)) {
                log.info("{} lock again", redisKey);
                return true;
            }
            log.warn("lock {} failed, it has locked @ {}", redisKey, currentTimeMillis);
        }
        return boo;
    }

    /**
     * 解锁
     * @param lockTypeEnum 锁的分类
     * @param resources 操作的资源
     * @return true or false
     */
    public boolean unlock(RedisKeyUtil.LockTypeEnum lockTypeEnum, String resources) {
        Assert.isTrue(StringUtils.isNotBlank(resources), BaseCodeEnum.FAILED.getCode(), "key can not be null");
        String redisKey = RedisKeyUtil.getDistributedLockKey(moduleEnum, lockTypeEnum, resources);
        Long currentTimeMillis = getValue();
        Boolean flag = redisTemplate.execute(
                unlockScript,
                Collections.singletonList(redisKey),
                currentTimeMillis);
        return flag != null && flag;
    }

    /**
     * 获取线程唯一标识
     * @return 线程唯一标识
     */
    private static Long getValue() {
        Long currentTimeMillis = THREAD_LOCAL.get();
        if (Objects.isNull(currentTimeMillis)) {
            currentTimeMillis = System.currentTimeMillis();
            THREAD_LOCAL.set(currentTimeMillis);
        }
        return currentTimeMillis;
    }
}
详细解释一下

这段代码定义了一个 分布式锁助手类 DistributedLockHelper,用于在分布式系统中实现基于 Redis 的分布式锁机制。它允许对特定资源加锁和解锁,确保在同一时间只有一个客户端可以操作该资源。

类概述

  • 包名: com.midea.logistics.common.helper.redis
  • 作者: Raymond
  • 版本: 1.0.0
  • 日志框架: 使用了 SLF4J 进行日志记录
  • 条件加载: 通过 @ConditionalOnProperty 注解,只有当配置项 redis.distributed-lock-helper.enabled 设置为 true 时,才会加载这个 Bean。

核心功能

  1. 加锁 (lock 方法)
  2. 解锁 (unlock 方法)

📝 详细解释

1. 类成员变量

private static final TransmittableThreadLocal<Long> THREAD_LOCAL = new TransmittableThreadLocal<>();
private final ModuleEnum moduleEnum;
private final DefaultRedisScript<Boolean> unlockScript;
private final RedisTemplate<String, Object> redisTemplate;
  • THREAD_LOCAL: 线程局部变量,存储当前线程的唯一标识(通常是当前时间戳),用于区分不同的线程。
  • moduleEnum: 模块枚举,用于生成 Redis 锁的键前缀。
  • unlockScript: Lua 脚本,用于安全地释放锁。
  • redisTemplate: Spring Data Redis 提供的模板类,用于与 Redis 进行交互。

2. 构造函数

public DistributedLockHelper(@Value("${redis.distributed-lock-helper.module:COMMON}") String module,
                             RedisTemplate<String, Object> redisTemplate) {
    Assert.isTrue(redisTemplate != null, BaseCodeEnum.FAILED.getCode(), "redisTemplate不能为空");
    this.unlockScript = new DefaultRedisScript<>();
    this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/unlock.lua")));
    this.unlockScript.setResultType(Boolean.class);
    this.redisTemplate = redisTemplate;
    this.moduleEnum = ModuleEnum.getByFullName(module);
}
  • 参数注入:

    • module: 从配置文件中读取模块名称,默认值为 COMMON
    • redisTemplate: 注入 Redis 操作模板。
  • 初始化:

    • 检查 redisTemplate 是否为空。
    • 加载 Lua 脚本 unlock.lua,用于解锁操作。
    • 初始化 moduleEnum,根据传入的模块名称获取对应的枚举值。

3. 加锁方法 (lock)

public boolean lock(RedisKeyUtil.LockTypeEnum lockTypeEnum, String resources, long second) {
    Assert.isTrue(StringUtils.isNotBlank(resources), BaseCodeEnum.FAILED.getCode(), "key can not be null");
    Assert.isTrue(Objects.nonNull(resources), BaseCodeEnum.FAILED.getCode(), "lockId can not be null");
    Assert.isTrue(second > 0, BaseCodeEnum.FAILED.getCode(), "second mast be more than zero");

    String redisKey = RedisKeyUtil.getDistributedLockKey(moduleEnum, lockTypeEnum, resources);
    Long currentTimeMillis = getValue();
    Boolean boo = redisTemplate.opsForValue().setIfAbsent(redisKey, currentTimeMillis, second, TimeUnit.SECONDS);

    if (boo == null){
        log.error("lock {} failed, return null", redisKey);
        return false;
    }
    if (!boo) {
        Object o = this.redisTemplate.opsForValue().get(redisKey);
        if (Objects.isNull(o)) {
            log.warn("lock {} failed, it is lock directed", redisKey);
            return false;
        }
        Long val = (Long) o;
        if (currentTimeMillis.equals(val)) {
            log.info("{} lock again", redisKey);
            return true;
        }
        log.warn("lock {} failed, it has locked @ {}", redisKey, currentTimeMillis);
    }
    return boo;
}
主要步骤:
  1. 参数校验:

    • 检查 resourcessecond 参数是否合法。
  2. 生成 Redis 键:

    • 使用 RedisKeyUtil.getDistributedLockKey 方法生成唯一的 Redis 键。
  3. 尝试加锁:

    • 使用 setIfAbsent 方法尝试设置 Redis 键值对,如果键不存在则设置成功并返回 true,否则返回 false
    • 如果设置失败,检查是否存在其他线程已经持有该锁。
    • 如果锁已经被持有并且不是由当前线程持有,则返回 false 表示加锁失败。
  4. 日志记录:

    • 记录加锁失败的原因。

4. 解锁方法 (unlock)

public boolean unlock(RedisKeyUtil.LockTypeEnum lockTypeEnum, String resources) {
    Assert.isTrue(StringUtils.isNotBlank(resources), BaseCodeEnum.FAILED.getCode(), "key can not be null");

    String redisKey = RedisKeyUtil.getDistributedLockKey(moduleEnum, lockTypeEnum, resources);
    Long currentTimeMillis = getValue();

    Boolean flag = redisTemplate.execute(
            unlockScript,
            Collections.singletonList(redisKey),
            currentTimeMillis);

    return flag != null && flag;
}
主要步骤:
  1. 参数校验:

    • 检查 resources 参数是否为空。
  2. 生成 Redis 键:

    • 使用 RedisKeyUtil.getDistributedLockKey 方法生成唯一的 Redis 键。
  3. 执行 Lua 脚本解锁:

    • 使用 redisTemplate.execute 方法执行 Lua 脚本 unlock.lua 来解锁。
    • Lua 脚本的作用是原子性地检查并删除锁,避免竞态条件。
  4. 返回结果:

    • 返回解锁是否成功的布尔值。

5. 获取线程唯一标识 (getValue)

private static Long getValue() {
    Long currentTimeMillis = THREAD_LOCAL.get();
    if (Objects.isNull(currentTimeMillis)) {
        currentTimeMillis = System.currentTimeMillis();
        THREAD_LOCAL.set(currentTimeMillis);
    }
    return currentTimeMillis;
}
  • 作用:
    • 为每个线程生成一个唯一的标识(通常使用当前时间戳)。
    • 使用 TransmittableThreadLocal 确保在线程池中也能正确传递线程上下文信息。

🔧 关键点分析

1. Lua 脚本解锁

  • 使用 Lua 脚本解锁是为了保证操作的原子性,防止出现竞态条件。
  • 脚本内容位于 lua/unlock.lua 文件中,通常会包含类似以下逻辑:
if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end
  • 这个脚本首先检查 Redis 键的值是否与传入的线程唯一标识匹配,如果匹配则删除该键,否则不进行任何操作。

2. 加锁机制

  • 使用 setIfAbsent 方法来实现加锁,确保同一时间只有一个客户端能够成功加锁。
  • 如果加锁失败(即键已存在),会进一步检查是否是由当前线程持有的锁,如果是则允许重入,否则加锁失败。

3. 线程安全

  • 使用 TransmittableThreadLocal 确保每个线程都有自己的唯一标识,并且在线程池中也能正确传递上下文信息。
  • 这对于并发环境下的锁管理非常重要,确保不同线程不会互相干扰。

4. 超时机制

  • 在加锁时指定了锁的超时时间(以秒为单位),这样即使客户端崩溃或忘记释放锁,锁也会在超时后自动释放,避免死锁问题。

🛠️ 可能的改进

  1. 增加锁的续约机制:对于长时间运行的任务,可以在任务执行期间定期续约锁,防止锁因超时而被误释放。
  2. 异常处理优化:目前的代码中没有对 Redis 操作的异常进行特别处理,可以考虑添加更详细的异常捕获和日志记录。
  3. 支持更多的锁类型:除了简单的互斥锁外,还可以扩展支持读写锁、公平锁等高级锁类型。
  4. 增加锁的竞争策略:例如排队等待锁释放,而不是立即失败返回。

如果你需要我帮你实现上述建议中的任意一项,或者有其他问题,请随时告诉我!


网站公告

今日签到

点亮在社区的每一天
去签到