基于 Spring Boot + RocketMQ 实现一个完整的 Saga Orchestrator 模式案例,确保“创建订单 -> 添加积分 -> 创建物流单”这三个操作的最终一致性。
一、架构概览
我们将创建四个微服务:
- Order Service (订单服务):核心入口,创建订单并启动Saga流程。
- Points Service (积分服务):负责为用户增加积分。
- Shipping Service (物流服务):负责创建物流运单。
- Saga Orchestrator (协调器服务):核心大脑,集中管理整个Saga流程的状态和顺序,触发正向操作和补偿操作。
交互流程如下图所示:
二、实现步骤
第1步:环境与依赖
1. 公共依赖 (pom.xml
)
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
2. 公共配置 (application.yml
)
rocketmq:
name-server: localhost:9876
spring:
datasource:
url: jdbc:mysql://localhost:3306/saga_example?useSSL=false
username: root
password: password
jpa:
hibernate:
ddl-auto: update
show-sql: true
第2步:定义公共事件与命令
创建公共模块或直接在各个服务中定义这些类。所有消息体必须可序列化(实现 Serializable
)。
// ==== 事件 (Events) - 表示某事已发生 ====
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderCreatedEvent implements Serializable {
private Long orderId;
private Long userId;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PointsAddedEvent implements Serializable {
private Long orderId;
private Long userId;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ShippingCreatedEvent implements Serializable {
private Long orderId;
private String shippingNo;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderCompletedEvent implements Serializable {
private Long orderId;
}
// ==== 命令 (Commands) - 指示要执行的操作 ====
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AddPointsCommand implements Serializable {
private Long orderId;
private Long userId;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CreateShippingCommand implements Serializable {
private Long orderId;
private Long userId;
private String address;
}
// ==== 补偿命令 (Compensation Commands) ====
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CompensateOrderCommand implements Serializable {
private Long orderId;
private String reason;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CompensatePointsCommand implements Serializable {
private Long orderId;
}
第3步:实现订单服务 (Order Service)
1. Entity 和 Repository
@Entity
@Data
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId;
private BigDecimal amount;
private String status; // "CREATING", "SUCCESS", "FAILED", "COMPENSATED"
private String address;
}
public interface OrderRepository extends JpaRepository<Order, Long> {
}
2. Controller & Service (Saga流程起点)
@RestController
@RequiredArgsConstructor
public class OrderController {
private final OrderService orderService;
@PostMapping("/orders")
public Order createOrder(@RequestBody OrderCreateRequest request) {
return orderService.createOrder(request);
}
}
@Service
@Transactional
@Slf4j
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final RocketMQTemplate rocketMQTemplate;
public Order createOrder(OrderCreateRequest request) {
// 1. 本地事务:创建订单记录,状态为“CREATING”
Order order = new Order();
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setAddress(request.getAddress());
order.setStatus("CREATING");
order = orderRepository.save(order);
log.info("订单创建成功, orderId: {}", order.getId());
// 2. 发送事件,启动Saga流程
// 使用事务消息确保订单创建和事件发送的原子性
OrderCreatedEvent event = new OrderCreatedEvent(order.getId(), order.getUserId());
rocketMQTemplate.sendMessageInTransaction(
"order-created-topic",
MessageBuilder.withPayload(event).build(),
null
);
return order;
}
// 3. 监听Saga完成事件,更新订单状态
@RocketMQMessageListener(topic = "order-completed-topic", consumerGroup = "order-complete-group")
public void onOrderCompleted(OrderCompletedEvent event) {
Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
order.setStatus("SUCCESS");
orderRepository.save(order);
log.info("订单流程完成,订单状态更新为SUCCESS. orderId: {}", event.getOrderId());
}
// 4. 监听补偿命令,取消订单
@RocketMQMessageListener(topic = "compensate-order-topic", consumerGroup = "order-compensate-group")
public void onCompensateOrder(CompensateOrderCommand command) {
Order order = orderRepository.findById(command.getOrderId()).orElseThrow();
if (!"COMPENSATED".equals(order.getStatus())) { // 幂等性检查
order.setStatus("FAILED"); // 或 "COMPENSATED"
orderRepository.save(order);
log.warn("收到补偿命令,订单已取消. orderId: {}, reason: {}", command.getOrderId(), command.getReason());
}
}
}
第4步:实现Saga协调器 (Saga Orchestrator)
这是整个流程的核心。
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderSagaOrchestrator {
private final RocketMQTemplate rocketMQTemplate;
// ========== 正向流程 ==========
// 1. 监听订单创建事件,发起Saga第一步
@RocketMQMessageListener(topic = "order-created-topic", consumerGroup = "saga-starter-group")
public void onOrderCreated(OrderCreatedEvent event) {
log.info("Saga开始: 收到订单创建事件, orderId: {}", event.getOrderId());
// 发送命令:添加积分
AddPointsCommand command = new AddPointsCommand(event.getOrderId(), event.getUserId());
rocketMQTemplate.convertAndSend("add-points-command-topic", command);
}
// 2. 监听积分添加成功事件,发起Saga第二步
@RocketMQMessageListener(topic = "points-added-topic", consumerGroup = "saga-shipping-trigger-group")
public void onPointsAdded(PointsAddedEvent event) {
log.info("积分已加: 开始创建物流单, orderId: {}", event.getOrderId());
// 发送命令:创建物流单 (这里需要更多信息,可以从DB查,或事件里带更多数据)
CreateShippingCommand command = new CreateShippingCommand(event.getOrderId(), event.getUserId(), "上海仓库");
rocketMQTemplate.convertAndSend("create-shipping-command-topic", command);
}
// 3. 监听物流单创建成功事件,完成Saga
@RocketMQMessageListener(topic = "shipping-created-topic", consumerGroup = "saga-completer-group")
public void onShippingCreated(ShippingCreatedEvent event) {
log.info("物流单已创建: Saga流程成功结束, orderId: {}", event.getOrderId());
// 发送事件:订单完成
OrderCompletedEvent completedEvent = new OrderCompletedEvent(event.getOrderId());
rocketMQTemplate.convertAndSend("order-completed-topic", completedEvent);
}
// ========== 补偿流程 ==========
// 4. 监听积分服务失败事件
@RocketMQMessageListener(topic = "points-failed-topic", consumerGroup = "saga-compensate-points-group")
public void onPointsFailed(PointsFailedEvent event) {
log.error("积分添加失败: 触发Saga补偿, orderId: {}", event.getOrderId());
// 只需要补偿订单
CompensateOrderCommand command = new CompensateOrderCommand(event.getOrderId(), "积分添加失败: " + event.getReason());
rocketMQTemplate.convertAndSend("compensate-order-topic", command);
}
// 5. 监听物流服务失败事件
@RocketMQMessageListener(topic = "shipping-failed-topic", consumerGroup = "saga-compensate-shipping-group")
public void onShippingFailed(ShippingFailedEvent event) {
log.error("物流单创建失败: 触发Saga补偿, orderId: {}", event.getOrderId());
// 需要先补偿积分,再补偿订单
CompensatePointsCommand compPointsCmd = new CompensatePointsCommand(event.getOrderId());
rocketMQTemplate.convertAndSend("compensate-points-topic", compPointsCmd);
CompensateOrderCommand compOrderCmd = new CompensateOrderCommand(event.getOrderId(), "物流单创建失败: " + event.getReason());
rocketMQTemplate.convertAndSend("compensate-order-topic", compOrderCmd);
}
}
第5步:实现积分服务 (Points Service)
@Service
@Slf4j
@RequiredArgsConstructor
public class PointsService {
private final PointsRepository pointsRepository;
// 1. 监听添加积分命令
@RocketMQMessageListener(topic = "add-points-command-topic", consumerGroup = "points-service-group")
public void addPoints(AddPointsCommand command) {
try {
// 幂等性检查:通过订单ID判断是否已处理
if (pointsRepository.existsByOrderId(command.getOrderId())) {
log.info("积分已添加过,跳过. orderId: {}", command.getOrderId());
return;
}
// 本地事务:添加积分记录
Points points = new Points();
points.setUserId(command.getUserId());
points.setOrderId(command.getOrderId());
points.setPoints(100); // 固定100积分
pointsRepository.save(points);
log.info("积分添加成功. orderId: {}, userId: {}", command.getOrderId(), command.getUserId());
// 2. 发布“积分添加成功”事件
PointsAddedEvent event = new PointsAddedEvent(command.getOrderId(), command.getUserId());
rocketMQTemplate.convertAndSend("points-added-topic", event);
} catch (Exception e) {
log.error("积分添加失败: orderId: {}", command.getOrderId(), e);
// 3. 发布“积分添加失败”事件,触发补偿
PointsFailedEvent event = new PointsFailedEvent(command.getOrderId(), e.getMessage());
rocketMQTemplate.convertAndSend("points-failed-topic", event);
}
}
// 4. 监听补偿命令(来自物流失败后的补偿)
@RocketMQMessageListener(topic = "compensate-points-topic", consumerGroup = "points-compensate-group")
public void compensatePoints(CompensatePointsCommand command) {
// 逆操作:扣除积分
pointsRepository.findByOrderId(command.getOrderId()).ifPresent(points -> {
pointsRepository.delete(points); // 或标记为已回滚
log.info("积分补偿完成,已回滚. orderId: {}", command.getOrderId());
});
}
}
第6步:实现物流服务 (Shipping Service)
@Service
@Slf4j
@RequiredArgsConstructor
public class ShippingService {
private final ShippingRepository shippingRepository;
// 1. 监听创建物流单命令
@RocketMQMessageListener(topic = "create-shipping-command-topic", consumerGroup = "shipping-service-group")
public void createShipping(CreateShippingCommand command) {
try {
// 幂等性检查
if (shippingRepository.existsByOrderId(command.getOrderId())) {
log.info("物流单已创建过,跳过. orderId: {}", command.getOrderId());
return;
}
// 本地事务:创建物流单
Shipping shipping = new Shipping();
shipping.setOrderId(command.getOrderId());
shipping.setUserId(command.getUserId());
shipping.setShippingNo("SH" + System.currentTimeMillis()); // 生成运单号
shipping.setAddress(command.getAddress());
shippingRepository.save(shipping);
log.info("物流单创建成功. orderId: {}, shippingNo: {}", command.getOrderId(), shipping.getShippingNo());
// 2. 发布“物流单创建成功”事件
ShippingCreatedEvent event = new ShippingCreatedEvent(command.getOrderId(), shipping.getShippingNo());
rocketMQTemplate.convertAndSend("shipping-created-topic", event);
} catch (Exception e) {
log.error("物流单创建失败: orderId: {}", command.getOrderId(), e);
// 3. 发布“物流单创建失败”事件,触发补偿
ShippingFailedEvent event = new ShippingFailedEvent(command.getOrderId(), e.getMessage());
rocketMQTemplate.convertAndSend("shipping-failed-topic", event);
}
}
// 注:物流创建失败本身是“无操作”,所以通常不需要补偿接口,由上游服务负责回滚。
}
三、关键保障措施
- 幂等性 (Idempotence):所有消费者都必须通过业务唯一ID(如
orderId
)检查是否已处理过,防止消息重复消费。 - 原子性 (Atomicity):订单服务使用RocketMQ事务消息确保“创建订单”与“启动Saga”的原子性。
- 补偿机制 (Compensation):为每个正向操作设计对应的逆操作,并在协调器中串联调用,确保失败时能回溯。
- 可观测性 (Observability):在每个关键节点打印日志,方便追踪Saga流程和排查问题。可以引入全局事务ID(如
sagaId
)串联所有日志。
这个实现提供了一个健壮的、生产可用的分布式事务解决方案,能够很好地处理高并发场景下的最终一致性需求。