ABP VNext + Dapr Workflows:轻量级分布式工作流

发布于:2025-08-03 ⋅ 阅读:(16) ⋅ 点赞:(0)

🚀 ABP VNext + Dapr Workflows:轻量级分布式工作流



一、引言 ✨

TL;DR 🔥

  • 在 ABP VNext 应用中,只需一行 services.AddDaprWorkflow(...) 即可无侵入集成 Dapr Workflow SDK,开启长运行分布式工作流编排 🎉 (Dapr Docs)
  • 通过 state.redis 或 CosmosDB 等可插拔 State Store 实现跨服务状态持久化与恢复,支持 Saga 补偿模式 🔄 (Dapr Docs)
  • 定义继承自 Workflow<TInput, TOutput> 的工作流类与 WorkflowActivity<TArg, TResult> 的活动类,使用 context.CallActivityAsync 保证确定性重放 🛠️ (Diagrid)
  • 演示“下单—保留库存—扣款—失败补偿”全流程,涵盖高性能、高可用、易复现实践 ✅

背景
在微服务架构中,分布式事务难以扩展,“最终一致性”与 Saga 模式已成主流。Dapr Workflows 提供代码化工作流,基于 DurableTask 引擎在 State Store 中持久化状态,结合补偿与定时器,简化复杂业务的可靠编排。


二、环境与依赖 🛠️

  • .NET 平台:.NET 9,ABP vNext v9.x

  • Dapr 运行时:Dapr CLI ≥1.10;Workflow Runtime v1.15.4

  • NuGet 包

    dotnet add package Dapr.Workflow --version 1.15.4
    
  • State Store 组件 (components/statestore.yaml):

    apiVersion: dapr.io/v1alpha1
    kind: Component
    metadata:
      name: statestore
    spec:
      type: state.redis
      version: v1
      metadata:
        - name: redisHost
          value: "localhost:6379"
      # 生产环境推荐使用支持事务的后端,如 Azure Cosmos DB 或 SQL Server
    

    (Dapr Docs)

  • 基础设施:Redis / Azure Cosmos DB;Dapr Sidecar


三、系统架构与流程图 🏗️

State_Store
Workflow_Runtime
ABP_App
ScheduleNewWorkflowAsync
Redis/CosmosDB
OrderWorkflow 实例
ReserveInventoryActivity
ChargePaymentActivity
RefundPaymentActivity
ReleaseInventoryActivity
Dapr Sidecar
OrderService API
  • OrderService API 通过 Dapr Sidecar 调用 Workflow 管理 API
  • Workflow Runtime 调度活动并将状态写入 State Store,支持断点重放
  • Saga 补偿:在失败场景通过补偿活动保证最终一致性

四、在 ABP 模块中注册 Dapr Workflows 📦

using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;

public class MyAppModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        // 可选:显式注册 DaprClient
        context.Services.AddDaprClient();

        // 一行集成 Dapr Workflows,自动注册 Client 与 Worker
        context.Services.AddDaprWorkflow(options =>
        {
            options.RegisterWorkflow<OrderWorkflow>();
            options.RegisterActivity<ReserveInventoryActivity>();
            options.RegisterActivity<ChargePaymentActivity>();
            options.RegisterActivity<RefundPaymentActivity>();
            options.RegisterActivity<ReleaseInventoryActivity>();
        });
    }
}

AddDaprWorkflow 会自动注册 DaprWorkflowClientDaprClient(若未注册)及后台 HostedService,无需额外中间件调用 (Dapr Docs)


五、定义 Workflow 与 Activities 🎯

using Dapr.Workflow;
using Dapr.Workflow.Models;

5.1 定义活动(Activity)

继承自 WorkflowActivity<TArg, TResult> 并重写 RunAsync,实现幂等逻辑:

public record PaymentInput(Guid OrderId, decimal Amount);

public class ReserveInventoryActivity 
    : WorkflowActivity<Guid, bool>
{
    public override Task<bool> RunAsync(
        WorkflowActivityContext context,
        Guid orderId)
    {
        // 调用库存服务,保证幂等
        return Task.FromResult(true);
    }
}

public class ChargePaymentActivity 
    : WorkflowActivity<PaymentInput, bool>
{
    public override Task<bool> RunAsync(
        WorkflowActivityContext context,
        PaymentInput input)
    {
        // 调用支付服务,保证幂等
        return Task.FromResult(true);
    }
}

(Diagrid)

5.2 定义工作流(Workflow)

继承自 Workflow<OrderDto, object>,在 RunAsync 中编排活动并处理补偿:

public class OrderWorkflow 
    : Workflow<OrderDto, object>
{
    public override async Task<object> RunAsync(
        WorkflowContext context,
        OrderDto order)
    {
        var logger = context.CreateReplaySafeLogger<OrderWorkflow>();
        logger.LogInformation("Order {OrderId} 开始", order.Id);

        try
        {
            await context.CallActivityAsync<bool>(
                nameof(ReserveInventoryActivity),
                order.Id);
            await context.CallActivityAsync<bool>(
                nameof(ChargePaymentActivity),
                new PaymentInput(order.Id, order.Amount));
        }
        catch (Exception ex)
        {
            logger.LogWarning(ex, "执行失败,开始补偿");
            await context.CallActivityAsync<bool>(
                nameof(RefundPaymentActivity),
                order.Id);
            await context.CallActivityAsync<bool>(
                nameof(ReleaseInventoryActivity),
                order.Id);
            throw;
        }

        logger.LogInformation("Order {OrderId} 完成", order.Id);
        return null!;
    }
}

(Diagrid)


六、触发与查询工作流 🔍

6.1 启动 ABP 应用

dapr run \
  --app-id order-api \
  --app-port 5000 \
  --dapr-http-port 3500 \
  --components-path ./components \
  dotnet run

6.2 发起工作流

using Dapr.Workflow;

public class OrderAppService : ApplicationService
{
    public async Task<string> CreateOrderAsync(CreateOrderDto dto)
    {
        var client = ServiceProvider
            .GetRequiredService<DaprWorkflowClient>();

        string instanceId = Guid.NewGuid().ToString();
        await client.ScheduleNewWorkflowAsync(
            workflowName: nameof(OrderWorkflow),
            instanceId: instanceId,
            input: dto);
        return instanceId;
    }

    // 新增:查询工作流状态
    public async Task<WorkflowState> GetWorkflowStateAsync(string instanceId)
    {
        var client = ServiceProvider
            .GetRequiredService<DaprWorkflowClient>();
        return await client.GetWorkflowStateAsync(
            instanceId, includeInputsAndOutputs: true);
    }
}

使用 ScheduleNewWorkflowAsync 启动实例 (Dapr Docs)

6.3 暴露查询端点

using Dapr.Workflow;
using Microsoft.AspNetCore.Mvc;

[ApiController]
[Route("api/workflows")]
public class WorkflowController : ControllerBase
{
    private readonly DaprWorkflowClient _client;
    public WorkflowController(DaprWorkflowClient client) => _client = client;

    [HttpGet("{instanceId}")]
    public async Task<IActionResult> Get(string instanceId)
    {
        var state = await _client.GetWorkflowStateAsync(
            instanceId, includeInputsAndOutputs: true);
        return Ok(state);
    }
}
curl http://localhost:5000/api/workflows/{instanceId}
  • 返回 JSON 包含 RuntimeStatus、输入输出、历史事件等信息。

七、示例演示 🎬

  1. 基础设施

    docker run -d --name redis -p 6379:6379 redis
    dapr init --runtime-version v1.10
    
  2. 运行应用并发起订单

    curl -X POST http://localhost:5000/api/orders \
      -H "Content-Type: application/json" \
      -d '{"productId":"123","quantity":1,"amount":100}'
    
  3. 查询状态

    curl http://localhost:5000/api/workflows/{instanceId}
    
  4. 模拟失败:在 ChargePaymentActivity 抛出异常,验证补偿活动自动执行 💥


八、最佳实践与优化 💡

  • 幂等性:活动内部调用尽量幂等,防止重试产生副作用。
  • 超时与重试:结合 Durable Timers 及 RetryOptions 控制超时与重试。
  • 并行与分支:可在工作流中使用 Task.WhenAll(...) 或动态 CallActivityAsync 实现并行。
  • 版本兼容:升级工作流时,通过前缀或迁移逻辑兼容老实例。
  • 生产环境:推荐使用支持事务回滚的 State Store(Cosmos DB、SQL Server)替代 Redis (Dapr Docs)