ABP VNext + Temporal:分布式工作流与 Saga 🚀
📚 目录
- ABP VNext + Temporal:分布式工作流与 Saga 🚀
TL;DR
- 使用 Temporal .NET SDK
TemporalClient.ConnectAsync
配合Temporalio.Extensions.Hosting
插件,在 ABP 中注册并注入客户端 🎯 - 在 Workflow 接口 上加
[Workflow]
,在入口方法上加[WorkflowRun]
,在活动方法上加[Activity]
,符合 SDK 要求 ✅ - 通过
ITemporalClient.ExecuteWorkflowAsync<bool>
启动并获取布尔结果,或用StartWorkflowAsync
Fire-and-Forget 🔄 - 在
ActivityOptions
中配置RetryOptions
与TimeoutOptions
,并示范 Patch API、OpenTelemetry 拦截器、Continue-As-New、子 Workflow 补偿及本地测试等生产级细节 🛠️
1. 环境与依赖 🛠️
平台:.NET 6 + ABP VNext 6.x
NuGet 包:
Temporalio.Client
、Temporalio.Worker
(核心 SDK)Temporalio.Extensions.Hosting
(DI 与 Worker 托管扩展)Temporalio.Extensions.OpenTelemetry
(OpenTelemetry 支持)Volo.Abp.BackgroundJobs.Abstractions
(ABP 后台作业)
确保在
Program.cs
或模块的ConfigureServices
中添加以上包与引用。
2. 系统架构概览 📊
3. 接入 Temporal 客户端 & OpenTelemetry 🌐
// Program.cs 或 Module.ConfigureServices
using Temporalio.Client;
using Temporalio.Extensions.Hosting;
using Temporalio.Extensions.OpenTelemetry;
public override void ConfigureServices(ServiceConfigurationContext context)
{
var conf = context.Services.GetConfiguration();
// 注册 Temporal Client(Singleton)
context.Services
.AddTemporalClient(options =>
{
options.TargetHost = conf["Temporal:Host"]; // e.g. "localhost:7233"
options.Namespace = conf["Temporal:Namespace"]; // e.g. "default"
})
// 注入 OpenTelemetry 拦截器
.Configure<TemporalClientConnectOptions>(o =>
{
o.Interceptors = new[] { new TracingInterceptor() };
});
}
4. 定义 Workflow 与 Activities ✍️
4.1 Workflow 接口
using Temporalio.Workflows;
[Workflow] // 类型级标记
public interface IOrderWorkflow
{
[WorkflowRun] // 入口方法
Task<bool> RunAsync(Guid orderId, decimal amount);
}
- 仅在接口(或类)上使用
[Workflow]
,入口方法标注[WorkflowRun]
,返回Task<T>
。
4.2 Activities 接口与实现
using Temporalio.Activities;
public interface IOrderActivities
{
[Activity] Task ReserveInventory(Guid orderId);
[Activity] Task ChargePayment(Guid orderId, decimal amount);
[Activity] Task RefundPayment(Guid orderId);
[Activity] Task ReleaseInventory(Guid orderId);
}
public class OrderActivities : IOrderActivities
{
private readonly IInventoryRepository _inv;
private readonly IPaymentService _pay;
public OrderActivities(IInventoryRepository inv, IPaymentService pay)
=> (_inv, _pay) = (inv, pay);
public Task ReserveInventory(Guid orderId)
=> _inv.ReserveAsync(orderId);
public Task ChargePayment(Guid orderId, decimal amount)
=> _pay.ChargeAsync(orderId, amount);
public Task RefundPayment(Guid orderId)
=> _pay.RefundAsync(orderId);
public Task ReleaseInventory(Guid orderId)
=> _inv.ReleaseAsync(orderId);
}
5. Worker 宿主托管 & DI 映射 🔧
// Program.cs
using Temporalio.Extensions.Hosting;
builder.Services.AddScoped<IOrderActivities, OrderActivities>(); // 明确映射接口
builder.Services.AddHostedTemporalWorker(
builder.Configuration["Temporal:Host"],
builder.Configuration["Temporal:Namespace"],
"order-queue"
)
.AddScopedActivities<OrderActivities>() // 注册活动实现
.AddWorkflow<IOrderWorkflow, OrderWorkflow>(); // 注册 Workflow
- 必须先
AddScoped<IOrderActivities, OrderActivities>()
,再AddScopedActivities<OrderActivities>()
。 AddHostedTemporalWorker
会将 Worker 与 ASP.NET 生命周期绑定。
6. Workflow 实现:补偿、重试/超时与 Continue-As-New ⏱️
using Temporalio.Workflows;
public class OrderWorkflow : IOrderWorkflow
{
private readonly IOrderActivities _act;
public OrderWorkflow(IOrderActivities act) => _act = act;
public async Task<bool> RunAsync(Guid orderId, decimal amount)
{
// 活动选项(含超时与重试)
var actOpts = new ActivityOptions
{
TaskQueue = "order-queue",
StartToCloseTimeout = TimeSpan.FromMinutes(1),
RetryOptions = new RetryOptions
{
MaximumAttempts = 3,
InitialInterval = TimeSpan.FromSeconds(5),
BackoffCoefficient = 2,
MaximumInterval = TimeSpan.FromMinutes(1),
}
};
// 工作流选项示例(在调用端传入)
// var wfOpts = new WorkflowOptions
// {
// WorkflowId = orderId.ToString(),
// TaskQueue = "order-queue",
// WorkflowExecutionTimeout = TimeSpan.FromHours(24),
// WorkflowRunTimeout = TimeSpan.FromHours(1),
// WorkflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicateFailedOnly,
// RetryPolicy = new() { MaximumAttempts = 1 }
// };
try
{
await Workflow.ExecuteActivityAsync(
() => _act.ReserveInventory(orderId), actOpts);
await Workflow.ExecuteActivityAsync(
() => _act.ChargePayment(orderId, amount), actOpts);
// 对于非常长流程,可 Continue-As-New 重置历史
// if (needContinue)
// await Workflow.ContinueAsNewAsync(orderId, amount);
return true;
}
catch
{
// 逆序补偿(或使用子 Workflow)
await Workflow.ExecuteActivityAsync(
() => _act.RefundPayment(orderId),
new() { TaskQueue = "order-queue" });
await Workflow.ExecuteActivityAsync(
() => _act.ReleaseInventory(orderId),
new() { TaskQueue = "order-queue" });
// 高级:也可执行子 Workflow
// await Workflow.ExecuteChildWorkflowAsync<ICompensationWorkflow>(
// cw => cw.RunAsync(orderId),
// new() { TaskQueue = "order-queue" });
return false;
}
}
}
7. 启动 Workflow:同步结果 & Fire-and-Forget 🔄
public class OrderAppService : ApplicationService
{
private readonly ITemporalClient _client;
public OrderAppService(ITemporalClient client) => _client = client;
// 同步获取布尔结果
public async Task<Guid> CreateOrderAsync(CreateOrderDto dto)
{
var orderId = Guid.NewGuid();
var success = await _client.ExecuteWorkflowAsync<bool>(
wf => wf.RunAsync(orderId, dto.Amount),
new WorkflowOptions
{
WorkflowId = orderId.ToString(),
TaskQueue = "order-queue",
WorkflowExecutionTimeout = TimeSpan.FromHours(24),
WorkflowRunTimeout = TimeSpan.FromHours(1),
WorkflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicateFailedOnly,
RetryPolicy = new() { MaximumAttempts = 1 }
}
);
return orderId;
}
// Fire-and-Forget
public Task<Guid> CreateOrderFireAndForgetAsync(CreateOrderDto dto)
{
var orderId = Guid.NewGuid();
_ = _client.StartWorkflowAsync(
(IOrderWorkflow wf) => wf.RunAsync(orderId, dto.Amount),
new WorkflowOptions
{
WorkflowId = orderId.ToString(),
TaskQueue = "order-queue"
}
);
return Task.FromResult(orderId);
}
}
8. Patch API(版本管理)示例 🧩
var handle = _client.GetWorkflowHandle(orderId.ToString());
await handle.PatchAsync(
new WorkflowPatchingOptions(),
patch =>
{
patch.MigrationCallback = () =>
{
// 新版本逻辑,例如增加新 Activity 调用
};
}
);
9. 保持 Workflow 确定性 ⚠️
- 禁止在 Workflow 代码中使用
Task.Run
、DateTime.Now
、Guid.NewGuid()
(用 Workflow 提供的 ID)、随机数、外部 I/O 等。 - 必须只依赖
Workflow
API、Workflow.Now
、参数、活动调用,确保重放时行为一致。
10. 本地单元测试示例 📦
using Temporalio.Testing;
using Xunit;
public class OrderWorkflowTests
{
[Fact]
public async Task RunAsync_SuccessfulPath()
{
await using var env = await TestWorkflowEnvironment.CreateAsync();
var worker = env.NewWorker("order-queue", w =>
w.AddWorkflow<OrderWorkflow>()
.AddActivityImplementation(new OrderActivities(/* mocks */)));
await worker.StartAsync();
var client = env.GetTestWorkflowClient();
var handle = client.GetWorkflowHandle("test-order", TaskQueue: "order-queue");
var result = await handle.ExecuteAsync<bool>(wf => wf.RunAsync(Guid.Parse("test-order"), 100m));
Assert.True(result);
}
}
11. 监控与可观测 👁️
- Temporal Web UI:实时查看 Workflow 实例及历史事件
- OpenTelemetry:通过
TracingInterceptor
导出 Span 至 Jaeger/Prometheus - ABP 管理后台:可将 Workflow 状态同步到实体表并展示
参考资料