(3)Seata AT 模式的事务一致性保证机制

发布于:2025-09-06 ⋅ 阅读:(19) ⋅ 点赞:(0)

您说得非常对!方案一(Seata AT模式)我只展示了正向逻辑,确实需要补充完整的反向逻辑和一致性保证机制。让我详细解释并补充完整代码。

Seata AT 模式的事务一致性保证机制

核心原理:二阶段提交 + 回滚日志

  1. 一阶段:执行业务SQL,生成回滚日志(undo_log)
  2. 二阶段提交:删除回滚日志
  3. 二阶段回滚:用回滚日志进行数据恢复

完整的事务流程(包含反向逻辑)

graph TD
    A[全局事务开始] --> B[一阶段: 执行业务SQL]
    B --> C[生成undo_log回滚日志]
    C --> D{所有分支成功?}
    D -- 是 --> E[二阶段提交]
    E --> F[删除undo_log]
    F --> G[事务成功]
    D -- 否 --> H[二阶段回滚]
    H --> I[使用undo_log恢复数据]
    I --> J[删除undo_log]
    J --> K[事务回滚]

完整的代码实现(包含异常处理)

1. 全局事务入口(OrderService)

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private AccountFeignClient accountFeignClient;
    
    @Autowired
    private StorageFeignClient storageFeignClient;
    
    @Override
    @GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class, timeoutMills = 300000)
    public void createOrder(OrderDTO orderDTO) {
        try {
            log.info("=== 开始创建订单全局事务 ===");
            
            // 1. 创建订单(状态为创建中)
            Order order = createOrderRecord(orderDTO);
            
            // 2. 扣减库存(可能会抛出异常)
            decreaseStorage(orderDTO.getProductId(), orderDTO.getCount());
            
            // 3. 扣减余额(可能会抛出异常)
            decreaseAccount(orderDTO.getUserId(), orderDTO.getMoney());
            
            // 4. 更新订单状态为成功
            updateOrderStatus(order.getId(), 1); // 1-成功
            
            log.info("=== 订单创建成功 ===");
            
        } catch (Exception e) {
            log.error("=== 订单创建失败,全局事务回滚 ===", e);
            
            // 这里不需要手动回滚,@GlobalTransactional 会自动触发回滚
            // Seata 会通过undo_log自动回滚已经执行的SQL
            
            throw new RuntimeException("订单创建失败: " + e.getMessage(), e);
        }
    }
    
    private Order createOrderRecord(OrderDTO orderDTO) {
        Order order = new Order();
        order.setUserId(orderDTO.getUserId());
        order.setProductId(orderDTO.getProductId());
        order.setCount(orderDTO.getCount());
        order.setMoney(orderDTO.getMoney());
        order.setStatus(0); // 0-创建中
        orderMapper.insert(order);
        log.info("创建订单记录: {}", order);
        return order;
    }
    
    private void decreaseStorage(Long productId, Integer count) {
        try {
            ResponseEntity<String> result = storageFeignClient.decrease(productId, count);
            if (!result.getStatusCode().is2xxSuccessful()) {
                throw new RuntimeException("库存服务调用失败: " + result.getBody());
            }
            log.info("库存扣减成功: productId={}, count={}", productId, count);
        } catch (Exception e) {
            log.error("库存扣减失败", e);
            throw new RuntimeException("库存不足或服务异常", e);
        }
    }
    
    private void decreaseAccount(Long userId, BigDecimal money) {
        try {
            ResponseEntity<String> result = accountFeignClient.decrease(userId, money);
            if (!result.getStatusCode().is2xxSuccessful()) {
                throw new RuntimeException("账户服务调用失败: " + result.getBody());
            }
            log.info("账户扣减成功: userId={}, money={}", userId, money);
        } catch (Exception e) {
            log.error("账户扣减失败", e);
            throw new RuntimeException("余额不足或服务异常", e);
        }
    }
    
    private void updateOrderStatus(Long orderId, Integer status) {
        Order order = new Order();
        order.setId(orderId);
        order.setStatus(status);
        orderMapper.updateById(order);
    }
}

2. 库存服务异常处理增强

@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
    
    @Autowired
    private StorageMapper storageMapper;
    
    @Override
    @Transactional
    public void decrease(Long productId, Integer count) {
        try {
            Storage storage = storageMapper.selectByProductId(productId);
            if (storage == null) {
                throw new RuntimeException("商品不存在: " + productId);
            }
            
            if (storage.getResidue() < count) {
                // 这里抛出异常会触发全局回滚
                throw new RuntimeException("库存不足: 当前库存=" + storage.getResidue() + ", 需要数量=" + count);
            }
            
            // 执行业务SQL - Seata代理会拦截并生成undo_log
            storage.setResidue(storage.getResidue() - count);
            storage.setUsed(storage.getUsed() + count);
            storageMapper.updateById(storage);
            
            log.info("库存扣减完成: productId={}, 扣减数量={}, 剩余库存={}", 
                    productId, count, storage.getResidue());
                    
        } catch (Exception e) {
            log.error("库存扣减业务异常", e);
            throw e; // 异常继续抛出,触发全局回滚
        }
    }
    
    /**
     * 查询库存(用于前端校验)
     */
    public Storage getStorage(Long productId) {
        return storageMapper.selectByProductId(productId);
    }
}

3. 账户服务异常处理增强

@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
    
    @Autowired
    private AccountMapper accountMapper;
    
    @Override
    @Transactional
    public void decrease(Long userId, BigDecimal money) {
        try {
            Account account = accountMapper.selectByUserId(userId);
            if (account == null) {
                throw new RuntimeException("账户不存在: " + userId);
            }
            
            if (account.getBalance().compareTo(money) < 0) {
                // 余额不足,抛出异常触发回滚
                throw new RuntimeException("余额不足: 当前余额=" + account.getBalance() + ", 需要金额=" + money);
            }
            
            // 执行业务SQL - Seata代理会拦截并生成undo_log
            account.setBalance(account.getBalance().subtract(money));
            accountMapper.updateById(account);
            
            log.info("账户扣减完成: userId={}, 扣减金额={}, 剩余余额={}", 
                    userId, money, account.getBalance());
                    
        } catch (Exception e) {
            log.error("账户扣减业务异常", e);
            throw e; // 异常继续抛出,触发全局回滚
        }
    }
}

4. 异常场景测试用例

@SpringBootTest
@Slf4j
public class DistributedTransactionTest {
    
    @Autowired
    private OrderService orderService;
    
    @Test
    public void testNormalScenario() {
        // 正常场景
        OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("100.00"));
        orderService.createOrder(orderDTO);
        // 应该成功,所有数据一致
    }
    
    @Test
    public void testInsufficientInventory() {
        // 库存不足场景
        OrderDTO orderDTO = new OrderDTO(1L, 1L, 1000, new BigDecimal("100.00")); // 数量太大
        try {
            orderService.createOrder(orderDTO);
            Assert.fail("应该抛出异常");
        } catch (Exception e) {
            log.info("预期中的异常: {}", e.getMessage());
            // 应该所有数据都回滚:订单不存在或状态为失败,库存和余额不变
        }
    }
    
    @Test
    public void testInsufficientBalance() {
        // 余额不足场景
        OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("1000000.00")); // 金额太大
        try {
            orderService.createOrder(orderDTO);
            Assert.fail("应该抛出异常");
        } catch (Exception e) {
            log.info("预期中的异常: {}", e.getMessage());
            // 应该所有数据都回滚
        }
    }
    
    @Test
    public void testServiceTimeout() {
        // 服务超时场景
        OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("100.00"));
        // 模拟账户服务超时(需要在AccountService中模拟sleep)
        try {
            orderService.createOrder(orderDTO);
            Assert.fail("应该超时异常");
        } catch (Exception e) {
            log.info("超时异常: {}", e.getMessage());
            // Seata会检测超时并触发回滚
        }
    }
}

5. 事务状态查询和补偿接口

@RestController
@RequestMapping("/transaction")
@Slf4j
public class TransactionMonitorController {
    
    @Autowired
    private DataSource dataSource;
    
    /**
     * 查询全局事务状态
     */
    @GetMapping("/status/{xid}")
    public String getTransactionStatus(@PathVariable String xid) {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(
                 "SELECT * FROM undo_log WHERE xid = ? ORDER BY log_created DESC")) {
            ps.setString(1, xid);
            ResultSet rs = ps.executeQuery();
            
            if (rs.next()) {
                return "事务存在,状态: " + rs.getInt("log_status");
            } else {
                return "事务已完成或不存在";
            }
        } catch (Exception e) {
            return "查询失败: " + e.getMessage();
        }
    }
    
    /**
     * 手动补偿接口(用于异常情况)
     */
    @PostMapping("/compensate/{orderId}")
    public String manualCompensate(@PathVariable Long orderId) {
        log.warn("手动触发事务补偿: orderId={}", orderId);
        // 这里可以实现手动补偿逻辑
        // 1. 检查订单状态
        // 2. 如果处于中间状态,进行补偿操作
        // 3. 发送通知等
        
        return "补偿操作已触发";
    }
}

一致性保证的关键点

  1. undo_log 机制:Seata 在执行业务SQL时自动生成回滚日志
  2. 全局锁:防止脏写,确保数据隔离性
  3. 超时控制@GlobalTransactional(timeoutMills = 300000) 设置事务超时
  4. 异常传播:任何服务的异常都会触发全局回滚
  5. 幂等性设计:服务需要支持重试和幂等调用

监控和运维建议

  1. 监控undo_log表:定期清理已完成的事务日志
  2. 设置事务超时:避免长时间占用资源
  3. 日志记录:详细记录事务执行过程
  4. 告警机制:对失败的事务进行告警
  5. 人工补偿接口:提供手动干预的能力

这样完整的实现确保了分布式事务的一致性,无论是正向流程还是异常情况都能正确处理。


网站公告

今日签到

点亮在社区的每一天
去签到