您说得非常对!方案一(Seata AT模式)我只展示了正向逻辑,确实需要补充完整的反向逻辑和一致性保证机制。让我详细解释并补充完整代码。
Seata AT 模式的事务一致性保证机制
核心原理:二阶段提交 + 回滚日志
- 一阶段:执行业务SQL,生成回滚日志(undo_log)
- 二阶段提交:删除回滚日志
- 二阶段回滚:用回滚日志进行数据恢复
完整的事务流程(包含反向逻辑)
graph TD A[全局事务开始] --> B[一阶段: 执行业务SQL] B --> C[生成undo_log回滚日志] C --> D{所有分支成功?} D -- 是 --> E[二阶段提交] E --> F[删除undo_log] F --> G[事务成功] D -- 否 --> H[二阶段回滚] H --> I[使用undo_log恢复数据] I --> J[删除undo_log] J --> K[事务回滚]
完整的代码实现(包含异常处理)
1. 全局事务入口(OrderService)
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountFeignClient accountFeignClient;
@Autowired
private StorageFeignClient storageFeignClient;
@Override
@GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class, timeoutMills = 300000)
public void createOrder(OrderDTO orderDTO) {
try {
log.info("=== 开始创建订单全局事务 ===");
// 1. 创建订单(状态为创建中)
Order order = createOrderRecord(orderDTO);
// 2. 扣减库存(可能会抛出异常)
decreaseStorage(orderDTO.getProductId(), orderDTO.getCount());
// 3. 扣减余额(可能会抛出异常)
decreaseAccount(orderDTO.getUserId(), orderDTO.getMoney());
// 4. 更新订单状态为成功
updateOrderStatus(order.getId(), 1); // 1-成功
log.info("=== 订单创建成功 ===");
} catch (Exception e) {
log.error("=== 订单创建失败,全局事务回滚 ===", e);
// 这里不需要手动回滚,@GlobalTransactional 会自动触发回滚
// Seata 会通过undo_log自动回滚已经执行的SQL
throw new RuntimeException("订单创建失败: " + e.getMessage(), e);
}
}
private Order createOrderRecord(OrderDTO orderDTO) {
Order order = new Order();
order.setUserId(orderDTO.getUserId());
order.setProductId(orderDTO.getProductId());
order.setCount(orderDTO.getCount());
order.setMoney(orderDTO.getMoney());
order.setStatus(0); // 0-创建中
orderMapper.insert(order);
log.info("创建订单记录: {}", order);
return order;
}
private void decreaseStorage(Long productId, Integer count) {
try {
ResponseEntity<String> result = storageFeignClient.decrease(productId, count);
if (!result.getStatusCode().is2xxSuccessful()) {
throw new RuntimeException("库存服务调用失败: " + result.getBody());
}
log.info("库存扣减成功: productId={}, count={}", productId, count);
} catch (Exception e) {
log.error("库存扣减失败", e);
throw new RuntimeException("库存不足或服务异常", e);
}
}
private void decreaseAccount(Long userId, BigDecimal money) {
try {
ResponseEntity<String> result = accountFeignClient.decrease(userId, money);
if (!result.getStatusCode().is2xxSuccessful()) {
throw new RuntimeException("账户服务调用失败: " + result.getBody());
}
log.info("账户扣减成功: userId={}, money={}", userId, money);
} catch (Exception e) {
log.error("账户扣减失败", e);
throw new RuntimeException("余额不足或服务异常", e);
}
}
private void updateOrderStatus(Long orderId, Integer status) {
Order order = new Order();
order.setId(orderId);
order.setStatus(status);
orderMapper.updateById(order);
}
}
2. 库存服务异常处理增强
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageMapper storageMapper;
@Override
@Transactional
public void decrease(Long productId, Integer count) {
try {
Storage storage = storageMapper.selectByProductId(productId);
if (storage == null) {
throw new RuntimeException("商品不存在: " + productId);
}
if (storage.getResidue() < count) {
// 这里抛出异常会触发全局回滚
throw new RuntimeException("库存不足: 当前库存=" + storage.getResidue() + ", 需要数量=" + count);
}
// 执行业务SQL - Seata代理会拦截并生成undo_log
storage.setResidue(storage.getResidue() - count);
storage.setUsed(storage.getUsed() + count);
storageMapper.updateById(storage);
log.info("库存扣减完成: productId={}, 扣减数量={}, 剩余库存={}",
productId, count, storage.getResidue());
} catch (Exception e) {
log.error("库存扣减业务异常", e);
throw e; // 异常继续抛出,触发全局回滚
}
}
/**
* 查询库存(用于前端校验)
*/
public Storage getStorage(Long productId) {
return storageMapper.selectByProductId(productId);
}
}
3. 账户服务异常处理增强
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;
@Override
@Transactional
public void decrease(Long userId, BigDecimal money) {
try {
Account account = accountMapper.selectByUserId(userId);
if (account == null) {
throw new RuntimeException("账户不存在: " + userId);
}
if (account.getBalance().compareTo(money) < 0) {
// 余额不足,抛出异常触发回滚
throw new RuntimeException("余额不足: 当前余额=" + account.getBalance() + ", 需要金额=" + money);
}
// 执行业务SQL - Seata代理会拦截并生成undo_log
account.setBalance(account.getBalance().subtract(money));
accountMapper.updateById(account);
log.info("账户扣减完成: userId={}, 扣减金额={}, 剩余余额={}",
userId, money, account.getBalance());
} catch (Exception e) {
log.error("账户扣减业务异常", e);
throw e; // 异常继续抛出,触发全局回滚
}
}
}
4. 异常场景测试用例
@SpringBootTest
@Slf4j
public class DistributedTransactionTest {
@Autowired
private OrderService orderService;
@Test
public void testNormalScenario() {
// 正常场景
OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("100.00"));
orderService.createOrder(orderDTO);
// 应该成功,所有数据一致
}
@Test
public void testInsufficientInventory() {
// 库存不足场景
OrderDTO orderDTO = new OrderDTO(1L, 1L, 1000, new BigDecimal("100.00")); // 数量太大
try {
orderService.createOrder(orderDTO);
Assert.fail("应该抛出异常");
} catch (Exception e) {
log.info("预期中的异常: {}", e.getMessage());
// 应该所有数据都回滚:订单不存在或状态为失败,库存和余额不变
}
}
@Test
public void testInsufficientBalance() {
// 余额不足场景
OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("1000000.00")); // 金额太大
try {
orderService.createOrder(orderDTO);
Assert.fail("应该抛出异常");
} catch (Exception e) {
log.info("预期中的异常: {}", e.getMessage());
// 应该所有数据都回滚
}
}
@Test
public void testServiceTimeout() {
// 服务超时场景
OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("100.00"));
// 模拟账户服务超时(需要在AccountService中模拟sleep)
try {
orderService.createOrder(orderDTO);
Assert.fail("应该超时异常");
} catch (Exception e) {
log.info("超时异常: {}", e.getMessage());
// Seata会检测超时并触发回滚
}
}
}
5. 事务状态查询和补偿接口
@RestController
@RequestMapping("/transaction")
@Slf4j
public class TransactionMonitorController {
@Autowired
private DataSource dataSource;
/**
* 查询全局事务状态
*/
@GetMapping("/status/{xid}")
public String getTransactionStatus(@PathVariable String xid) {
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(
"SELECT * FROM undo_log WHERE xid = ? ORDER BY log_created DESC")) {
ps.setString(1, xid);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
return "事务存在,状态: " + rs.getInt("log_status");
} else {
return "事务已完成或不存在";
}
} catch (Exception e) {
return "查询失败: " + e.getMessage();
}
}
/**
* 手动补偿接口(用于异常情况)
*/
@PostMapping("/compensate/{orderId}")
public String manualCompensate(@PathVariable Long orderId) {
log.warn("手动触发事务补偿: orderId={}", orderId);
// 这里可以实现手动补偿逻辑
// 1. 检查订单状态
// 2. 如果处于中间状态,进行补偿操作
// 3. 发送通知等
return "补偿操作已触发";
}
}
一致性保证的关键点
- undo_log 机制:Seata 在执行业务SQL时自动生成回滚日志
- 全局锁:防止脏写,确保数据隔离性
- 超时控制:
@GlobalTransactional(timeoutMills = 300000)
设置事务超时 - 异常传播:任何服务的异常都会触发全局回滚
- 幂等性设计:服务需要支持重试和幂等调用
监控和运维建议
- 监控undo_log表:定期清理已完成的事务日志
- 设置事务超时:避免长时间占用资源
- 日志记录:详细记录事务执行过程
- 告警机制:对失败的事务进行告警
- 人工补偿接口:提供手动干预的能力
这样完整的实现确保了分布式事务的一致性,无论是正向流程还是异常情况都能正确处理。