长流程、复杂业务流程分布式事务管理实战

发布于:2025-09-05 ⋅ 阅读:(19) ⋅ 点赞:(0)

基于 Spring Boot + RocketMQ 实现一个完整的 Saga Orchestrator 模式案例,确保“创建订单 -> 添加积分 -> 创建物流单”这三个操作的最终一致性。

一、架构概览

我们将创建四个微服务:

  1. Order Service (订单服务):核心入口,创建订单并启动Saga流程。
  2. Points Service (积分服务):负责为用户增加积分。
  3. Shipping Service (物流服务):负责创建物流运单。
  4. Saga Orchestrator (协调器服务)核心大脑,集中管理整个Saga流程的状态和顺序,触发正向操作和补偿操作。

交互流程如下图所示:

在这里插入图片描述

二、实现步骤

第1步:环境与依赖

1. 公共依赖 (pom.xml)

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

2. 公共配置 (application.yml)

rocketmq:
  name-server: localhost:9876
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/saga_example?useSSL=false
    username: root
    password: password
  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true
第2步:定义公共事件与命令

创建公共模块或直接在各个服务中定义这些类。所有消息体必须可序列化(实现 Serializable

// ==== 事件 (Events) - 表示某事已发生 ====
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderCreatedEvent implements Serializable {
    private Long orderId;
    private Long userId;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class PointsAddedEvent implements Serializable {
    private Long orderId;
    private Long userId;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ShippingCreatedEvent implements Serializable {
    private Long orderId;
    private String shippingNo;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderCompletedEvent implements Serializable {
    private Long orderId;
}

// ==== 命令 (Commands) - 指示要执行的操作 ====
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AddPointsCommand implements Serializable {
    private Long orderId;
    private Long userId;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CreateShippingCommand implements Serializable {
    private Long orderId;
    private Long userId;
    private String address;
}

// ==== 补偿命令 (Compensation Commands) ====
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CompensateOrderCommand implements Serializable {
    private Long orderId;
    private String reason;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CompensatePointsCommand implements Serializable {
    private Long orderId;
}
第3步:实现订单服务 (Order Service)

1. Entity 和 Repository

@Entity
@Data
public class Order {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long userId;
    private BigDecimal amount;
    private String status; // "CREATING", "SUCCESS", "FAILED", "COMPENSATED"
    private String address;
}
public interface OrderRepository extends JpaRepository<Order, Long> {
}

2. Controller & Service (Saga流程起点)

@RestController
@RequiredArgsConstructor
public class OrderController {
    private final OrderService orderService;

    @PostMapping("/orders")
    public Order createOrder(@RequestBody OrderCreateRequest request) {
        return orderService.createOrder(request);
    }
}

@Service
@Transactional
@Slf4j
@RequiredArgsConstructor
public class OrderService {
    private final OrderRepository orderRepository;
    private final RocketMQTemplate rocketMQTemplate;

    public Order createOrder(OrderCreateRequest request) {
        // 1. 本地事务:创建订单记录,状态为“CREATING”
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setAmount(request.getAmount());
        order.setAddress(request.getAddress());
        order.setStatus("CREATING");
        order = orderRepository.save(order);
        log.info("订单创建成功, orderId: {}", order.getId());

        // 2. 发送事件,启动Saga流程
        // 使用事务消息确保订单创建和事件发送的原子性
        OrderCreatedEvent event = new OrderCreatedEvent(order.getId(), order.getUserId());
        rocketMQTemplate.sendMessageInTransaction(
            "order-created-topic",
            MessageBuilder.withPayload(event).build(),
            null
        );
        return order;
    }

    // 3. 监听Saga完成事件,更新订单状态
    @RocketMQMessageListener(topic = "order-completed-topic", consumerGroup = "order-complete-group")
    public void onOrderCompleted(OrderCompletedEvent event) {
        Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
        order.setStatus("SUCCESS");
        orderRepository.save(order);
        log.info("订单流程完成,订单状态更新为SUCCESS. orderId: {}", event.getOrderId());
    }

    // 4. 监听补偿命令,取消订单
    @RocketMQMessageListener(topic = "compensate-order-topic", consumerGroup = "order-compensate-group")
    public void onCompensateOrder(CompensateOrderCommand command) {
        Order order = orderRepository.findById(command.getOrderId()).orElseThrow();
        if (!"COMPENSATED".equals(order.getStatus())) { // 幂等性检查
            order.setStatus("FAILED"); // 或 "COMPENSATED"
            orderRepository.save(order);
            log.warn("收到补偿命令,订单已取消. orderId: {}, reason: {}", command.getOrderId(), command.getReason());
        }
    }
}
第4步:实现Saga协调器 (Saga Orchestrator)

这是整个流程的核心。

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderSagaOrchestrator {

    private final RocketMQTemplate rocketMQTemplate;

    // ========== 正向流程 ==========
    // 1. 监听订单创建事件,发起Saga第一步
    @RocketMQMessageListener(topic = "order-created-topic", consumerGroup = "saga-starter-group")
    public void onOrderCreated(OrderCreatedEvent event) {
        log.info("Saga开始: 收到订单创建事件, orderId: {}", event.getOrderId());
        // 发送命令:添加积分
        AddPointsCommand command = new AddPointsCommand(event.getOrderId(), event.getUserId());
        rocketMQTemplate.convertAndSend("add-points-command-topic", command);
    }

    // 2. 监听积分添加成功事件,发起Saga第二步
    @RocketMQMessageListener(topic = "points-added-topic", consumerGroup = "saga-shipping-trigger-group")
    public void onPointsAdded(PointsAddedEvent event) {
        log.info("积分已加: 开始创建物流单, orderId: {}", event.getOrderId());
        // 发送命令:创建物流单 (这里需要更多信息,可以从DB查,或事件里带更多数据)
        CreateShippingCommand command = new CreateShippingCommand(event.getOrderId(), event.getUserId(), "上海仓库");
        rocketMQTemplate.convertAndSend("create-shipping-command-topic", command);
    }

    // 3. 监听物流单创建成功事件,完成Saga
    @RocketMQMessageListener(topic = "shipping-created-topic", consumerGroup = "saga-completer-group")
    public void onShippingCreated(ShippingCreatedEvent event) {
        log.info("物流单已创建: Saga流程成功结束, orderId: {}", event.getOrderId());
        // 发送事件:订单完成
        OrderCompletedEvent completedEvent = new OrderCompletedEvent(event.getOrderId());
        rocketMQTemplate.convertAndSend("order-completed-topic", completedEvent);
    }

    // ========== 补偿流程 ==========
    // 4. 监听积分服务失败事件
    @RocketMQMessageListener(topic = "points-failed-topic", consumerGroup = "saga-compensate-points-group")
    public void onPointsFailed(PointsFailedEvent event) {
        log.error("积分添加失败: 触发Saga补偿, orderId: {}", event.getOrderId());
        // 只需要补偿订单
        CompensateOrderCommand command = new CompensateOrderCommand(event.getOrderId(), "积分添加失败: " + event.getReason());
        rocketMQTemplate.convertAndSend("compensate-order-topic", command);
    }

    // 5. 监听物流服务失败事件
    @RocketMQMessageListener(topic = "shipping-failed-topic", consumerGroup = "saga-compensate-shipping-group")
    public void onShippingFailed(ShippingFailedEvent event) {
        log.error("物流单创建失败: 触发Saga补偿, orderId: {}", event.getOrderId());
        // 需要先补偿积分,再补偿订单
        CompensatePointsCommand compPointsCmd = new CompensatePointsCommand(event.getOrderId());
        rocketMQTemplate.convertAndSend("compensate-points-topic", compPointsCmd);

        CompensateOrderCommand compOrderCmd = new CompensateOrderCommand(event.getOrderId(), "物流单创建失败: " + event.getReason());
        rocketMQTemplate.convertAndSend("compensate-order-topic", compOrderCmd);
    }
}
第5步:实现积分服务 (Points Service)
@Service
@Slf4j
@RequiredArgsConstructor
public class PointsService {

    private final PointsRepository pointsRepository;

    // 1. 监听添加积分命令
    @RocketMQMessageListener(topic = "add-points-command-topic", consumerGroup = "points-service-group")
    public void addPoints(AddPointsCommand command) {
        try {
            // 幂等性检查:通过订单ID判断是否已处理
            if (pointsRepository.existsByOrderId(command.getOrderId())) {
                log.info("积分已添加过,跳过. orderId: {}", command.getOrderId());
                return;
            }

            // 本地事务:添加积分记录
            Points points = new Points();
            points.setUserId(command.getUserId());
            points.setOrderId(command.getOrderId());
            points.setPoints(100); // 固定100积分
            pointsRepository.save(points);
            log.info("积分添加成功. orderId: {}, userId: {}", command.getOrderId(), command.getUserId());

            // 2. 发布“积分添加成功”事件
            PointsAddedEvent event = new PointsAddedEvent(command.getOrderId(), command.getUserId());
            rocketMQTemplate.convertAndSend("points-added-topic", event);

        } catch (Exception e) {
            log.error("积分添加失败: orderId: {}", command.getOrderId(), e);
            // 3. 发布“积分添加失败”事件,触发补偿
            PointsFailedEvent event = new PointsFailedEvent(command.getOrderId(), e.getMessage());
            rocketMQTemplate.convertAndSend("points-failed-topic", event);
        }
    }

    // 4. 监听补偿命令(来自物流失败后的补偿)
    @RocketMQMessageListener(topic = "compensate-points-topic", consumerGroup = "points-compensate-group")
    public void compensatePoints(CompensatePointsCommand command) {
        // 逆操作:扣除积分
        pointsRepository.findByOrderId(command.getOrderId()).ifPresent(points -> {
            pointsRepository.delete(points); // 或标记为已回滚
            log.info("积分补偿完成,已回滚. orderId: {}", command.getOrderId());
        });
    }
}
第6步:实现物流服务 (Shipping Service)
@Service
@Slf4j
@RequiredArgsConstructor
public class ShippingService {

    private final ShippingRepository shippingRepository;

    // 1. 监听创建物流单命令
    @RocketMQMessageListener(topic = "create-shipping-command-topic", consumerGroup = "shipping-service-group")
    public void createShipping(CreateShippingCommand command) {
        try {
            // 幂等性检查
            if (shippingRepository.existsByOrderId(command.getOrderId())) {
                log.info("物流单已创建过,跳过. orderId: {}", command.getOrderId());
                return;
            }

            // 本地事务:创建物流单
            Shipping shipping = new Shipping();
            shipping.setOrderId(command.getOrderId());
            shipping.setUserId(command.getUserId());
            shipping.setShippingNo("SH" + System.currentTimeMillis()); // 生成运单号
            shipping.setAddress(command.getAddress());
            shippingRepository.save(shipping);
            log.info("物流单创建成功. orderId: {}, shippingNo: {}", command.getOrderId(), shipping.getShippingNo());

            // 2. 发布“物流单创建成功”事件
            ShippingCreatedEvent event = new ShippingCreatedEvent(command.getOrderId(), shipping.getShippingNo());
            rocketMQTemplate.convertAndSend("shipping-created-topic", event);

        } catch (Exception e) {
            log.error("物流单创建失败: orderId: {}", command.getOrderId(), e);
            // 3. 发布“物流单创建失败”事件,触发补偿
            ShippingFailedEvent event = new ShippingFailedEvent(command.getOrderId(), e.getMessage());
            rocketMQTemplate.convertAndSend("shipping-failed-topic", event);
        }
    }
    // 注:物流创建失败本身是“无操作”,所以通常不需要补偿接口,由上游服务负责回滚。
}

三、关键保障措施

  1. 幂等性 (Idempotence):所有消费者都必须通过业务唯一ID(如orderId)检查是否已处理过,防止消息重复消费。
  2. 原子性 (Atomicity):订单服务使用RocketMQ事务消息确保“创建订单”与“启动Saga”的原子性。
  3. 补偿机制 (Compensation):为每个正向操作设计对应的逆操作,并在协调器中串联调用,确保失败时能回溯。
  4. 可观测性 (Observability):在每个关键节点打印日志,方便追踪Saga流程和排查问题。可以引入全局事务ID(如sagaId)串联所有日志。

这个实现提供了一个健壮的、生产可用的分布式事务解决方案,能够很好地处理高并发场景下的最终一致性需求。


网站公告

今日签到

点亮在社区的每一天
去签到