ABP VNext + Temporal:分布式工作流与 Saga

发布于:2025-07-20 ⋅ 阅读:(14) ⋅ 点赞:(0)

ABP VNext + Temporal:分布式工作流与 Saga 🚀



TL;DR

  • 使用 Temporal .NET SDK TemporalClient.ConnectAsync 配合 Temporalio.Extensions.Hosting 插件,在 ABP 中注册并注入客户端 🎯
  • Workflow 接口 上加 [Workflow],在入口方法上加 [WorkflowRun],在活动方法上加 [Activity],符合 SDK 要求 ✅
  • 通过 ITemporalClient.ExecuteWorkflowAsync<bool> 启动并获取布尔结果,或用 StartWorkflowAsync Fire-and-Forget 🔄
  • ActivityOptions 中配置 RetryOptionsTimeoutOptions,并示范 Patch API、OpenTelemetry 拦截器、Continue-As-New、子 Workflow 补偿及本地测试等生产级细节 🛠️

1. 环境与依赖 🛠️

  • 平台:.NET 6 + ABP VNext 6.x

  • NuGet 包

    • Temporalio.ClientTemporalio.Worker(核心 SDK)
    • Temporalio.Extensions.Hosting(DI 与 Worker 托管扩展)
    • Temporalio.Extensions.OpenTelemetry(OpenTelemetry 支持)
    • Volo.Abp.BackgroundJobs.Abstractions(ABP 后台作业)

确保在 Program.cs 或模块的 ConfigureServices 中添加以上包与引用。


2. 系统架构概览 📊

Worker 宿主
Temporal 平台
ABP 应用
启动 Workflow
状态同步/补偿
失败触发
失败触发
拉取并执行
监控
Temporal Web UI
ABP 管理后台
Temporal Worker
Workflow 实例
Activity: ReserveInventory
Activity: ChargePayment
Activity: RefundPayment
Activity: ReleaseInventory
Temporal 客户端
OrderAppService
自定义作业
ABP 后台作业

3. 接入 Temporal 客户端 & OpenTelemetry 🌐

// Program.cs 或 Module.ConfigureServices
using Temporalio.Client;
using Temporalio.Extensions.Hosting;
using Temporalio.Extensions.OpenTelemetry;

public override void ConfigureServices(ServiceConfigurationContext context)
{
    var conf = context.Services.GetConfiguration();

    // 注册 Temporal Client(Singleton)
    context.Services
        .AddTemporalClient(options =>
        {
            options.TargetHost = conf["Temporal:Host"];      // e.g. "localhost:7233"
            options.Namespace  = conf["Temporal:Namespace"]; // e.g. "default"
        })
        // 注入 OpenTelemetry 拦截器
        .Configure<TemporalClientConnectOptions>(o =>
        {
            o.Interceptors = new[] { new TracingInterceptor() };
        });
}

4. 定义 Workflow 与 Activities ✍️

4.1 Workflow 接口

using Temporalio.Workflows;

[Workflow]  // 类型级标记
public interface IOrderWorkflow
{
    [WorkflowRun]  // 入口方法
    Task<bool> RunAsync(Guid orderId, decimal amount);
}
  • 仅在接口(或类)上使用 [Workflow],入口方法标注 [WorkflowRun],返回 Task<T>

4.2 Activities 接口与实现

using Temporalio.Activities;

public interface IOrderActivities
{
    [Activity] Task ReserveInventory(Guid orderId);
    [Activity] Task ChargePayment(Guid orderId, decimal amount);
    [Activity] Task RefundPayment(Guid orderId);
    [Activity] Task ReleaseInventory(Guid orderId);
}

public class OrderActivities : IOrderActivities
{
    private readonly IInventoryRepository _inv;
    private readonly IPaymentService      _pay;

    public OrderActivities(IInventoryRepository inv, IPaymentService pay)
        => (_inv, _pay) = (inv, pay);

    public Task ReserveInventory(Guid orderId)
        => _inv.ReserveAsync(orderId);

    public Task ChargePayment(Guid orderId, decimal amount)
        => _pay.ChargeAsync(orderId, amount);

    public Task RefundPayment(Guid orderId)
        => _pay.RefundAsync(orderId);

    public Task ReleaseInventory(Guid orderId)
        => _inv.ReleaseAsync(orderId);
}

5. Worker 宿主托管 & DI 映射 🔧

// Program.cs
using Temporalio.Extensions.Hosting;

builder.Services.AddScoped<IOrderActivities, OrderActivities>();  // 明确映射接口
builder.Services.AddHostedTemporalWorker(
        builder.Configuration["Temporal:Host"],
        builder.Configuration["Temporal:Namespace"],
        "order-queue"
    )
    .AddScopedActivities<OrderActivities>()           // 注册活动实现
    .AddWorkflow<IOrderWorkflow, OrderWorkflow>();     // 注册 Workflow
  • 必须AddScoped<IOrderActivities, OrderActivities>(),再 AddScopedActivities<OrderActivities>()
  • AddHostedTemporalWorker 会将 Worker 与 ASP.NET 生命周期绑定。

6. Workflow 实现:补偿、重试/超时与 Continue-As-New ⏱️

using Temporalio.Workflows;

public class OrderWorkflow : IOrderWorkflow
{
    private readonly IOrderActivities _act;

    public OrderWorkflow(IOrderActivities act) => _act = act;

    public async Task<bool> RunAsync(Guid orderId, decimal amount)
    {
        // 活动选项(含超时与重试)
        var actOpts = new ActivityOptions
        {
            TaskQueue           = "order-queue",
            StartToCloseTimeout = TimeSpan.FromMinutes(1),
            RetryOptions = new RetryOptions
            {
                MaximumAttempts    = 3,
                InitialInterval    = TimeSpan.FromSeconds(5),
                BackoffCoefficient = 2,
                MaximumInterval    = TimeSpan.FromMinutes(1),
            }
        };

        // 工作流选项示例(在调用端传入)
        // var wfOpts = new WorkflowOptions
        // {
        //     WorkflowId               = orderId.ToString(),
        //     TaskQueue                = "order-queue",
        //     WorkflowExecutionTimeout = TimeSpan.FromHours(24),
        //     WorkflowRunTimeout       = TimeSpan.FromHours(1),
        //     WorkflowIdReusePolicy    = WorkflowIdReusePolicy.AllowDuplicateFailedOnly,
        //     RetryPolicy              = new() { MaximumAttempts = 1 }
        // };

        try
        {
            await Workflow.ExecuteActivityAsync(
                () => _act.ReserveInventory(orderId), actOpts);

            await Workflow.ExecuteActivityAsync(
                () => _act.ChargePayment(orderId, amount), actOpts);

            // 对于非常长流程,可 Continue-As-New 重置历史
            // if (needContinue)
            //     await Workflow.ContinueAsNewAsync(orderId, amount);

            return true;
        }
        catch
        {
            // 逆序补偿(或使用子 Workflow)
            await Workflow.ExecuteActivityAsync(
                () => _act.RefundPayment(orderId),
                new() { TaskQueue = "order-queue" });
            await Workflow.ExecuteActivityAsync(
                () => _act.ReleaseInventory(orderId),
                new() { TaskQueue = "order-queue" });

            // 高级:也可执行子 Workflow
            // await Workflow.ExecuteChildWorkflowAsync<ICompensationWorkflow>(
            //     cw => cw.RunAsync(orderId),
            //     new() { TaskQueue = "order-queue" });

            return false;
        }
    }
}

7. 启动 Workflow:同步结果 & Fire-and-Forget 🔄

public class OrderAppService : ApplicationService
{
    private readonly ITemporalClient _client;
    public OrderAppService(ITemporalClient client) => _client = client;

    // 同步获取布尔结果
    public async Task<Guid> CreateOrderAsync(CreateOrderDto dto)
    {
        var orderId = Guid.NewGuid();
        var success = await _client.ExecuteWorkflowAsync<bool>(
            wf => wf.RunAsync(orderId, dto.Amount),
            new WorkflowOptions
            {
                WorkflowId               = orderId.ToString(),
                TaskQueue                = "order-queue",
                WorkflowExecutionTimeout = TimeSpan.FromHours(24),
                WorkflowRunTimeout       = TimeSpan.FromHours(1),
                WorkflowIdReusePolicy    = WorkflowIdReusePolicy.AllowDuplicateFailedOnly,
                RetryPolicy              = new() { MaximumAttempts = 1 }
            }
        );
        return orderId;
    }

    // Fire-and-Forget
    public Task<Guid> CreateOrderFireAndForgetAsync(CreateOrderDto dto)
    {
        var orderId = Guid.NewGuid();
        _ = _client.StartWorkflowAsync(
            (IOrderWorkflow wf) => wf.RunAsync(orderId, dto.Amount),
            new WorkflowOptions
            {
                WorkflowId = orderId.ToString(),
                TaskQueue  = "order-queue"
            }
        );
        return Task.FromResult(orderId);
    }
}

8. Patch API(版本管理)示例 🧩

var handle = _client.GetWorkflowHandle(orderId.ToString());
await handle.PatchAsync(
    new WorkflowPatchingOptions(),
    patch =>
    {
        patch.MigrationCallback = () =>
        {
            // 新版本逻辑,例如增加新 Activity 调用
        };
    }
);

9. 保持 Workflow 确定性 ⚠️

  • 禁止在 Workflow 代码中使用 Task.RunDateTime.NowGuid.NewGuid()(用 Workflow 提供的 ID)、随机数、外部 I/O 等。
  • 必须只依赖 Workflow API、Workflow.Now、参数、活动调用,确保重放时行为一致。

10. 本地单元测试示例 📦

using Temporalio.Testing;
using Xunit;

public class OrderWorkflowTests
{
    [Fact]
    public async Task RunAsync_SuccessfulPath()
    {
        await using var env = await TestWorkflowEnvironment.CreateAsync();
        var worker = env.NewWorker("order-queue", w =>
            w.AddWorkflow<OrderWorkflow>()
             .AddActivityImplementation(new OrderActivities(/* mocks */)));
        await worker.StartAsync();

        var client = env.GetTestWorkflowClient();
        var handle = client.GetWorkflowHandle("test-order", TaskQueue: "order-queue");
        var result = await handle.ExecuteAsync<bool>(wf => wf.RunAsync(Guid.Parse("test-order"), 100m));
        Assert.True(result);
    }
}

11. 监控与可观测 👁️

  • Temporal Web UI:实时查看 Workflow 实例及历史事件
  • OpenTelemetry:通过 TracingInterceptor 导出 Span 至 Jaeger/Prometheus
  • ABP 管理后台:可将 Workflow 状态同步到实体表并展示

参考资料


网站公告

今日签到

点亮在社区的每一天
去签到