Hello,大家好,我是灰小猿! 在分布式系统中,由于各个服务之间独立部署,各个服务之间依靠远程调用完成通信,再加上面对用户重复点击时的重复请求等情况,所以如何保证消息消费的幂等性是在分布式或微服务项目中必须要考虑的问题。
常见的保证消息幂等性的策略有以下几种,根据具体的使用场景选用不同的幂等策略。
1、数据库唯一索引
这种策略主要通过数据库的唯一索引约束来保证消息的幂等性主要是用于数据插入的场景,防止数据重复插入,
适用场景:数据强一致性的场景,订单创建防重,用户注册时手机号、邮箱号的唯一性校验等,如防止用户重复提交订单,在订单表中设置一个唯一标识的字段,如order表的Business_Key(业务ID)字段,当用户提交重复的订单时,这些重复的订单所对应的Business_Key是相同的,此时插入数据数据库会报索引重复DataIntegrityViolationException
异常,从而避免数据的重复插入。
对于这个Business_Key,可以使用用户ID+商品ID+下单时间来生成,这个下单时间可能由于用户点击的先后顺序有所不同,所以可以对时间进行处理,如五分钟之内使用同一个时间标识,则可以使用下单时间戳除以300000(即5分钟=300000毫秒),这样可以有效保证同一业务订单在一段时间内只会下单一次。
我们以一个商品订单表为例,举例数据表结构如下:
CREATE TABLE payment_order (
order_id VARCHAR(32) PRIMARY KEY,
business_key VARCHAR(64) UNIQUE NOT NULL,
user_id BIGINT NOT NULL,
amount DECIMAL(10,2),
status VARCHAR(20),
create_time DATETIME
);
给订单表的business_key建立唯一索引
ALTER TABLE payment_order ADD UNIQUE INDEX uniq_business_key (business_key);
之后具体的业务实现流程大概如下:
1、用户发起支付请求
2、生成订单业务ID(business_key)
3、判断存在业务ID相同的订单
返回已有订单
生成新的订单
4、拉取第三方支付接口
生成唯一业务标识的方法如下:
public class BusinessKeyGenerator {
// 时间窗口:5分钟(300000毫秒)
private static final long TIME_WINDOW = 300000;
public static String generateKey(Long userId, String packageId) {
long timeSlot = System.currentTimeMillis() / TIME_WINDOW;
String rawKey = userId + ":" + packageId + ":" + timeSlot;
return DigestUtils.md5Hex(rawKey);
}
}
防止订单重复创建的幂等性设计
1、通过先查询订单是否存在的方式插入
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Transactional
public Order createOrder(Long userId, String packageId) {
String businessKey = BusinessKeyGenerator.generateKey(userId, packageId);
// 检查是否存在未支付的相同业务订单
Order existingOrder = orderRepository.findPendingByBusinessKey(businessKey);
if (existingOrder != null) {
return existingOrder;
}
try {
// 创建新订单
Order newOrder = new Order();
newOrder.setOrderId(generateOrderId()); // 生成唯一订单号
newOrder.setBusinessKey(businessKey);
newOrder.setUserId(userId);
newOrder.setPackageId(packageId);
newOrder.setStatus(OrderStatus.PENDING);
return orderRepository.save(newOrder);
} catch (DataIntegrityViolationException ex) {
// 处理唯一键冲突(高并发场景)
return orderRepository.findPendingByBusinessKey(businessKey);
}
}
}
2、通过捕获唯一索引异常的方式插入
上面这种策略在创建新订单之前是先通过业务ID的方式去查询了数据库中是否已经存在了这个业务ID对应的订单,还有一种方式是直接生成订单信息并且执行insert插入,之后通过捕获唯一索引异常(DuplicateKeyException)的方式来返回已经创建的订单信息。
具体的实现代码如下:
@Transactional
public void createOrder(Order order) {
try {
orderDao.insert(order); // 触发唯一约束
} catch (DataIntegrityViolationException ex) {
// 抓取重复提交异常
Order existingOrder = orderRepository.findPendingByBusinessKey(businessKey);
throw new DuplicateOrderException(existingOrder.getOrderId());
}
}
2、乐观锁
通过数据库乐观锁的方式保证幂等性,同样也是基于数据库的一种实现方式,
首先介绍一下乐观锁的概念:
乐观锁:即认为死锁的发生是极小概率的事件,所以在修改数据之前不会对数据进行加锁,只有在修改数据时通过判断本次修改的版本和上一次的版本是否相同,相同则表示数据未被修改,不相同则表示数据已经被修改,此时的数据修改失败。
适用场景:乐观锁机制适用于存在版本属性的更新,这种方式的使用通常需要在数据库表中增加int类型的versionId字段,每次修改数据时versionId=versionId+1,以此来保证每次更新的版本都是新的。
我们同样以商品订单表为例,其中加入version_id字段,用来记录当前的数据版本。
CREATE TABLE payment_order (
order_id VARCHAR(32) PRIMARY KEY,
version_id int NOT NULL,
user_id BIGINT NOT NULL,
amount DECIMAL(10,2),
status VARCHAR(20),
create_time DATETIME
);
当执行更新时,需要判断当前查询到的version和将要更新的version是否相同
#查询数据
SELECT version_id FROM payment_order WHERE order_id = #{order_id}
#更新数据,要求数据当前版本号和已知版本号相同,并且每次更新版本号递增
UPDATE payment_order SET status=PAID, version_id = version_id+1
WHERE order_id = #{order_id} AND version_id = #{version_id}
3、悲观锁
介绍一下悲观锁的概念
悲观锁:即认为死锁总是会发生的,所以在每次更新数据时都会对数据进行加锁,当其他线程想要修改数据时会处于一个阻塞的状态
这种处理方式一般需要我们在更新数据库时使用行级锁的更新方法,即开启事务并先查询出数据,同时对数据进行加锁,更新完成数据之后,再提交事务,从而释放锁。
以获取商品信息并生成订单,之后进行库存扣减为例,具体的sql操作如下:
//0.开始事务
begin
//1.查询出商品信息
select number from payment where id=#{payment_id} for update;
//2.根据商品信息生成订单
insert into payment_order (id,其他字段...) values (?,?,?,...);
//3.修改商品库存
update payment set number=#{number} where id=#{payment_id}
//4.提交事务
commit
4、状态机
状态机的原理是通过状态机的流转控制,确保操作只会被执行一次
适用场景:这种机制适用于订单或工单流程类系统,如订单状态变更,状态机来保证消息幂等性的策略可以说是依据严格的业务执行流程来的,换句话来说就是一条数据的状态只能由一个状态变为指定的另外一种或多种状态,
以订单数据为例,状态可以分为:待支付、已支付、已超时、已取消这几种状态,那么订单的状态流向就是固定的一个状态机制,
以下是一个订单状态的状态机
public enum OrderStateTypeEnum {
PENDING, // 待支付
PAID, // 已支付
EXPIRED, // 已超时
CANCELED; // 已取消
/**
* 状态机
*/
private static final Map<OrderStateTypeEnum, Set<OrderStateTypeEnum>> transitions = new HashMap<>();
static {
//待支付状态可以转换为其他三种
transitions.put(PENDING, EnumSet.of(PAID, EXPIRED, CANCELED));
//已支付状态不能转换为其他状态
transitions.put(PAID, EnumSet.noneOf(OrderStateTypeEnum.class));
}
public boolean canTransitionTo(OrderStateTypeEnum orderStateType) {
return transitions.get(this).contains(orderStateType);
}
}
通过状态机的方式去更新数据时,会先查询订单当前的状态,并且判断当前状态是否可以转化为将要更新后的状态,如果可以再执行数据的更新,否则则认为当前的状态转变是不合理的。
5、Redis
基于Redis的原子操作来实现分布式锁,通过SETNX设置key来标识是否处理过,并且设置过期时间,如果成功则处理,否则则忽略。
适用场景:高并发情况下的快速检查,比如秒杀活动等,这种方式具有高性能低延迟的特点,但是在使用过程中要注意Redis的高可用问题。
以下是一个通过Redis实现分布式锁,来避免订单重复处理的代码逻辑,
// Redis SETNX 实现分布式锁
public class RedisLockService {
public boolean tryLock(String key, String value, long expireSeconds) {
return redis.opsForValue().setIfAbsent(key, value, expireSeconds, TimeUnit.SECONDS);
}
}
public class OrderService {
public void addOrder(String orderId) {
String lockKey = "lock:order:" + orderId;
String requestId = UUID.randomUUID().toString();
try {
if (tryLock(lockKey, requestId, 30)) {
// 业务处理
processOrder(orderId);
}
} finally {
// Lua脚本保证原子删除
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
redis.execute(script, Collections.singletonList(lockKey), requestId);
}
}
}
6、token机制
token机制的实现是:客户端先从服务器上获取一个token,提交请求时携带上这个token,服务器端会验证这个token是否已经存在,如果已经存在则删除(因为在客户端获取这个Token时,服务器端已经存起来了)并且继续后面的操作,这样可以防止重复提交的发生,
适用场景:防止请求重复提交,API接口的短时效防重等,如在用户下单时生成token,提交时服务器进行验证,在代码实现中可以用Redis存储token,以此可以防止用户重复提交多个订单,
基于Token的实现的关键代码处理如下:
// Redis + Token 机制
public class TokenService {
@Autowired
private RedisTemplate<String, String> redis;
/**
* 生成Token
*/
public String generateToken(String userId) {
String token = UUID.randomUUID().toString();
//存入Redis
redis.opsForValue().set(userId + ":" + token, "1", 5, TimeUnit.MINUTES);
return token;
}
/**
* 校验Token的有效性
*/
public boolean validateToken(String userId, String token) {
String key = userId + ":" + token;
Long deleted = redis.delete(key); // 原子性删除
return deleted != null && deleted > 0;
}
}
具体选用哪种幂等策略,还需要根据具体的业务功能来确定,在一个项目中,可能同时使用了多种幂等策略,这些都需要结合他们不同的特点和业务需求来分析,原则就是以实现功能的前提下以最小代码成本实现功能。