ABP vNext 的 Outbox/Inbox 系统化落地(非 CAP / 非 Kafka):幂等写、去重、延迟到达与死信回补

发布于:2025-08-18 ⋅ 阅读:(16) ⋅ 点赞:(0)

ABP vNext 的 Outbox/Inbox 系统化落地(非 CAP / 非 Kafka):幂等写、去重、延迟到达与死信回补 ✨



一、问题与目标 🧭

不使用 DTC;以本地事务 + Outbox 实现“效果上的一次”。核心要解决:重复投递延迟/乱序幂等更新不可恢复失败(死信)。💡


二、总体架构 🏗️

消费者(读路径)
消息中间件
Outbox Dispatcher(后台服务)
数据库
应用层(写路径)
claim→提交
Inbox 占位去重
业务处理(幂等 Upsert)
Exchange/Topic
发布到 Broker
回写状态/重试/死信
(Outbox 表)
Inbox 表
聚合/领域事件
AppService
  • 写路径:AppService → 聚合操作 → SaveChanges() 同事务 写业务表 & Outbox 表;提交后由 Dispatcher 异步派发到 Broker。
  • 读路径:Consumer 收到消息 → Inbox 先占位去重(messageId, consumer))→ 幂等处理 → 提交。
  • 可观测性:记录 commit→publish、publish→consume 延迟与重试/死信指标。🔍

三、数据模型与表结构 🗃️

兼容多租户、多版本与回补;补齐运营定位字段。✅

OutboxMessages(PostgreSQL 版)

CREATE TABLE outbox_messages (
  id               uuid PRIMARY KEY,
  aggregate_id     uuid        NOT NULL,
  aggregate_type   text        NOT NULL,
  aggregate_version bigint     NOT NULL DEFAULT 0,  -- 乱序/并发合并
  type             text        NOT NULL,            -- 事件类型/路由键
  payload          jsonb       NOT NULL,
  headers          jsonb       NOT NULL,
  tenant_id        text,
  occurred_at      timestamptz NOT NULL,
  visible_at       timestamptz NOT NULL,            -- 延迟/重试
  attempts         int         NOT NULL DEFAULT 0,
  status           smallint    NOT NULL DEFAULT 0,  -- 0:new 1:sent 3:dead 9:processing
  last_error       text,
  routing_key      text,
  partition_key    text
);

CREATE INDEX idx_outbox_visibility ON outbox_messages(status, visible_at);
CREATE INDEX idx_outbox_tenant     ON outbox_messages(tenant_id);
CREATE INDEX idx_outbox_type       ON outbox_messages(type);

SQL Server:把 jsonbnvarchar(max),或加 JSON 校验约束;时间函数换 SYSUTCDATETIME()

Inbox(去重/幂等)

CREATE TABLE inbox (
  message_id   uuid        NOT NULL,
  consumer     text        NOT NULL,
  processed_at timestamptz NOT NULL,
  tenant_id    text,
  PRIMARY KEY (message_id, consumer)
);

(message_id, consumer) 作为幂等键;支持 TTL/归档。🧹


四、ABP 集成点 🧩

使用自研事件收集接口,避免与 ABP 内置 Outbox 混用。聚合根实现 IHasDomainEvents,拦截器在同事务落 Outbox。

4.1 领域事件接口与基类

public interface IDomainEvent
{
    Guid AggregateId { get; }
    string AggregateType { get; }
    long AggregateVersion { get; }
    string TenantId { get; }
}

public interface IHasDomainEvents
{
    IReadOnlyCollection<IDomainEvent> DomainEvents { get; }
    void ClearDomainEvents();
}

public abstract class AggregateRootBase : Entity<Guid>, IHasDomainEvents
{
    private readonly List<IDomainEvent> _domainEvents = new();
    public IReadOnlyCollection<IDomainEvent> DomainEvents => _domainEvents.AsReadOnly();
    protected void RaiseDomainEvent(IDomainEvent @event) => _domainEvents.Add(@event);
    public void ClearDomainEvents() => _domainEvents.Clear();
}

4.2 Outbox 实体与工厂

public enum OutboxStatus { New = 0, Sent = 1, Dead = 3, Processing = 9 }

public sealed class OutboxMessage
{
    public Guid Id { get; init; }
    public Guid AggregateId { get; init; }
    public string AggregateType { get; init; } = default!;
    public long AggregateVersion { get; init; }
    public string Type { get; init; } = default!;
    public string TenantId { get; init; } = default!;
    public string Payload { get; init; } = default!;
    public string Headers { get; init; } = "{}";
    public DateTimeOffset OccurredAt { get; init; }
    public DateTimeOffset VisibleAt { get; set; }
    public int Attempts { get; set; }
    public string? LastError { get; set; }
    public string? RoutingKey { get; set; }
    public string? PartitionKey { get; set; }
    public OutboxStatus Status { get; set; } = OutboxStatus.New;

    public static OutboxMessage From(IDomainEvent ev, DateTimeOffset? visibleAt = null) => new()
    {
        Id = Guid.NewGuid(),
        AggregateId = ev.AggregateId,
        AggregateType = ev.AggregateType,
        AggregateVersion = ev.AggregateVersion,
        Type = ev.GetType().Name,
        TenantId = ev.TenantId,
        Payload = JsonSerializer.Serialize(ev),
        OccurredAt = DateTimeOffset.UtcNow,
        VisibleAt = visibleAt ?? DateTimeOffset.UtcNow
    };
}

4.3 SaveChanges 拦截器

public sealed class OutboxSaveChangesInterceptor : SaveChangesInterceptor
{
    public override ValueTask<InterceptionResult<int>> SavingChangesAsync(
        DbContextEventData eventData,
        InterceptionResult<int> result,
        CancellationToken ct = default)
    {
        var ctx = (MyDbContext)eventData.Context!;
        var aggregates = ctx.ChangeTracker.Entries<IHasDomainEvents>()
                            .Select(e => e.Entity)
                            .Where(e => e.DomainEvents.Count > 0)
                            .ToList();

        var now = DateTimeOffset.UtcNow;
        foreach (var agg in aggregates)
        {
            foreach (var ev in agg.DomainEvents)
            {
                ctx.Outbox.Add(OutboxMessage.From(ev, now));
            }
            agg.ClearDomainEvents(); // 防重复写入
        }
        return base.SavingChangesAsync(eventData, result, ct);
    }
}

五、OutboxDispatcher ⚡

缩短锁持有时间,降低死锁;支持多实例并发。🧵

5.1 调度时序

Dispatcher OutboxDB Broker Claim N (CTE + SKIP LOCKED / READPAST) Claimed rows Publish batch (headers/routing-key) Confirm / Ack UPDATE status=sent, attempts=attempts+1 attempts=attempts+1, status=new,\nvisible_at=now+backoff+jitter UPDATE status=dead, last_error=ex alt [success] [transient failure] [permanent or attempts>=max] loop [every IntervalMs] Dispatcher OutboxDB Broker

5.2 传输抽象

public interface IEventPublisher
{
    Task PublishAsync(string type, string payload,
        IReadOnlyDictionary<string,string> headers, // message-id/aggregate-id/aggregate-type/aggregate-version/tenant-id/traceparent...
        string? routingKey,
        CancellationToken ct);
}

5.3 PostgreSQL:CTE 领取 ✅

await using (var tx = await _db.Database.BeginTransactionAsync(ct))
{
    var claimed = await _db.Outbox
        .FromSqlInterpolated($@"
            WITH cte AS (
              SELECT id
              FROM outbox_messages
              WHERE status = 0 AND visible_at <= NOW()
              ORDER BY visible_at
              FOR UPDATE SKIP LOCKED
              LIMIT {batchSize}
            )
            UPDATE outbox_messages o
            SET status = 9
            FROM cte
            WHERE o.id = cte.id
            RETURNING o.*;")
        .ToListAsync(ct);

    await tx.CommitAsync(ct);

    var maxAttempts = _options.Value.MaxAttempts;

    foreach (var m in claimed)
    {
        try
        {
            await _publisher.PublishAsync(m.Type, m.Payload, Headers(m), m.RoutingKey, ct);

            await _db.Database.ExecuteSqlInterpolatedAsync($@"
                UPDATE outbox_messages
                SET status = 1, attempts = {m.Attempts + 1}, last_error = NULL
                WHERE id = {m.Id};", ct);
        }
        catch (TransientException ex)
        {
            var nextAttempts = m.Attempts + 1;
            if (nextAttempts >= maxAttempts)
            {
                await _db.Database.ExecuteSqlInterpolatedAsync($@"
                    UPDATE outbox_messages
                    SET status = 3, attempts = {nextAttempts}, last_error = {ex.Message}
                    WHERE id = {m.Id};", ct);
            }
            else
            {
                var next = Backoff(nextAttempts);
                await _db.Database.ExecuteSqlInterpolatedAsync($@"
                    UPDATE outbox_messages
                    SET status = 0,
                        attempts = {nextAttempts},
                        visible_at = NOW() + {next},
                        last_error = {ex.Message}
                    WHERE id = {m.Id};", ct);
            }
        }
        catch (PermanentException ex)
        {
            await _db.Database.ExecuteSqlInterpolatedAsync($@"
                UPDATE outbox_messages
                SET status = 3, last_error = {ex.Message}
                WHERE id = {m.Id};", ct);
        }
    }
}

5.4 SQL Server:领取(READPAST/UPDLOCK/ROWLOCK

await using (var tx = await _db.Database.BeginTransactionAsync(ct))
{
    var claimed = await _db.Outbox.FromSqlRaw(@"
        ;WITH cte AS (
           SELECT TOP (@p0) *
           FROM outbox_messages WITH (ROWLOCK, UPDLOCK, READPAST)
           WHERE status = 0 AND visible_at <= SYSUTCDATETIME()
           ORDER BY visible_at
        )
        UPDATE cte SET status = 9
        OUTPUT inserted.*;",
        batchSize).ToListAsync(ct);

    await tx.CommitAsync(ct);
}
// 发布与回写同上(注意时间函数/类型差异)

5.5 指数退避 + 抖动 ⏱️(兼容 Mermaid timeline)

static TimeSpan Backoff(int attempt)
{
    var baseSec = Math.Min(300, Math.Pow(3, attempt)); // 上限5分钟
    var jitterMs = Random.Shared.Next(0, 2500);
    return TimeSpan.FromSeconds(baseSec) + TimeSpan.FromMilliseconds(jitterMs);
}

退避可视化

在这里插入图片描述


六、消费端:Inbox “先占位后执行业务” + 幂等包装器 🧱

解决并发竞态:先占位(INSERT/ON CONFLICT 或 MERGE)拿到“处理所有权”,再执行业务,彻底杜绝重复副作用。🛡️

PostgreSQL

public sealed class InboxHandler<T> : IMessageHandler<T>
{
    private readonly InboxRepository _inbox;
    private readonly YourDbContext _db;
    private readonly string _consumerName;
    private readonly IMessageHandler<T> _inner;

    public async Task HandleAsync(T msg, MessageContext ctx, CancellationToken ct)
    {
        await using var tx = await _db.Database.BeginTransactionAsync(ct);

        // 1) 先占位:抢占处理所有权
        var inserted = await _db.Database.ExecuteSqlInterpolatedAsync($@"
            INSERT INTO inbox(message_id, consumer, processed_at, tenant_id)
            VALUES ({ctx.MessageId}, {_consumerName}, NOW(), {ctx.TenantId})
            ON CONFLICT (message_id, consumer) DO NOTHING;");

        if (inserted == 0)
        {
            await tx.RollbackAsync(ct);
            return; // 已被他人处理/历史处理
        }

        // 2) 执行业务(使用 Upsert/业务幂等键)
        await _inner.HandleAsync(msg, ctx, ct);

        // 3) 可选:更新 processed_at/审计
        await tx.CommitAsync(ct);
    }
}

SQL Server(占位用 MERGE)

MERGE dbo.inbox AS T
USING (SELECT @message_id AS message_id, @consumer AS consumer) AS S
ON (T.message_id = S.message_id AND T.consumer = S.consumer)
WHEN NOT MATCHED THEN
  INSERT(message_id, consumer, processed_at, tenant_id)
  VALUES(@message_id, @consumer, SYSUTCDATETIME(), @tenant_id);
-- 若已存在则不做任何事(直接返回)

七、延迟到达与乱序 ⏳

  • 聚合版本:事件携带 AggregateVersion;消费者若检测旧版本,可“读-改-写合并”或忽略(按业务定义)。
  • 时间窗:强时序场景构造 1–5 分钟“乱序缓冲”,窗外迟到进入补偿流程(重放/合并/告警)。🔁

八、死信与回补 🩹

  • 生产者侧:超上限/永久失败 → status=Dead(3)last_error 记录根因。
  • 消费者侧:不可恢复异常 → DLQ(或 dead_consume 表)。
  • 回补工具:支持按 id/时间窗/租户/聚合/类型筛选;回放依赖 Inbox 幂等,不会重复生效;产出成功/失败/耗时/覆盖率报表。✅

状态流转

claim
publish ok
transient fail
(visible_at += backoff)
permanent fail
or attempts >= max
New
Processing
Sent
Dead

九、对账脚本与健康检查 🧾

  • 对账:按日统计“业务表变更计数” vs “消费成功事件计数”(聚合到租户/类型),输出差异。
  • 健康检查/指标outbox_queue_lag_secondsoutbox_attempt_rateoutbox_dead_ratioconsumer_latency_p95inbox_growth_per_day 等。

指标关系

阈值
Outbox Lag p95
延迟告警
Retry Rate
重试参数调优
Dead Ratio
死信回补任务
Consumer Latency p95
扩容/批大小调度
Inbox Size / Day
TTL/归档策略

十、可观测性与 SLO 🔭

  • TracingApp → Outbox.Insert → Dispatcher.Claim → Publish → Broker → Consumer.Inbox → Handler 全链路埋点(W3C TraceContext)。
  • Metrics:派发/消费 p95、重试率、死信率、乱序丢弃率。
  • Logs:payload 建议仅存 摘要/哈希;日志含 messageId/tenant/aggregate/type/version/attempts/last_error

SLO:阈值需压测后定,不建议硬编码。🧪


十一、配置与扩展点 🧰

  • 重试策略:指数退避 + 抖动;最大尝试数分级(如支付/库存更高保真)。
  • 分片并发:按 tenant_idhash(id) % N 分 shard;多实例靠 DB 行锁(或增租约/分布式锁)。
  • 清理策略:Outbox status=1 归档/TTL;Inbox 保留 N 天。
  • 发布可靠性:RabbitMQ 开启 Publisher Confirms;ASB/NATS 各自确认/幂等机制。✅

十二、最小可运行 Demo(Compose) ⚙️

docker-compose.yml(PostgreSQL + RabbitMQ)

version: "3.9"
services:
  pg:
    image: postgres:16
    environment:
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: app
    ports: [ "5432:5432" ]
  rabbit:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"

appsettings.json(示意)

{
  "ConnectionStrings": {
    "Default": "Host=localhost;Port=5432;Database=app;Username=postgres;Password=postgres"
  },
  "RabbitMQ": {
    "HostName": "localhost",
    "UserName": "guest",
    "Password": "guest",
    "Exchange": "app.events"
  },
  "Outbox": {
    "BatchSize": 200,
    "IntervalMs": 200,
    "MaxAttempts": 8
  }
}

启动:1) docker compose up -d 2) 运行 ABP 应用(含迁移) 3) http://localhost:15672(guest/guest)🎛️


十三、关键代码骨架 🧪

13.1 DbContext(多数据库映射)

public class MyDbContext : AbpDbContext<MyDbContext>
{
    public DbSet<OutboxMessage> Outbox => Set<OutboxMessage>();
    public DbSet<InboxRecord>  Inbox  => Set<InboxRecord>();

    protected override void OnModelCreating(ModelBuilder b)
    {
        base.OnModelCreating(b);

        b.Entity<OutboxMessage>(cfg =>
        {
            cfg.ToTable("outbox_messages");
            cfg.HasKey(x => x.Id);
            if (Database.IsNpgsql())
            {
                cfg.Property(x => x.Payload).HasColumnType("jsonb");
                cfg.Property(x => x.Headers).HasColumnType("jsonb");
            }
            else if (Database.IsSqlServer())
            {
                cfg.Property(x => x.Payload).HasColumnType("nvarchar(max)");
                cfg.Property(x => x.Headers).HasColumnType("nvarchar(max)");
            }
            cfg.HasIndex(x => new { x.Status, x.VisibleAt })
               .HasDatabaseName("idx_outbox_visibility");
        });

        b.Entity<InboxRecord>(cfg =>
        {
            cfg.ToTable("inbox");
            cfg.HasKey(x => new { x.MessageId, x.Consumer });
        });
    }
}

13.2 RabbitMQ Publisher(单例连接 + 通道池)📨

// Startup/Module
services.AddSingleton<IConnection>(sp =>
{
    var opt = sp.GetRequiredService<IOptions<RabbitOptions>>().Value;
    var factory = new ConnectionFactory { HostName = opt.HostName, UserName = opt.UserName, Password = opt.Password };
    return factory.CreateConnection();
});
services.AddSingleton<IRabbitChannelPool, RabbitChannelPool>();
services.AddSingleton<IEventPublisher, RabbitMqPublisher>();

public interface IRabbitChannelPool { PooledChannel Rent(); }
public sealed class PooledChannel : IDisposable { public IModel Model { get; init; } /* 省略池实现 */ public void Dispose(){ /* 归还 */ } }

public sealed class RabbitMqPublisher : IEventPublisher
{
    private readonly IRabbitChannelPool _pool;
    private readonly string _exchange;

    public RabbitMqPublisher(IRabbitChannelPool pool, IOptions<RabbitOptions> opt)
    {
        _pool = pool;
        _exchange = opt.Value.Exchange;
    }

    public Task PublishAsync(string type, string payload,
        IReadOnlyDictionary<string,string> headers, string? routingKey, CancellationToken ct)
    {
        using var ch = _pool.Rent(); // 每次取一个通道,避免多线程共享 IModel
        ch.Model.ExchangeDeclare(_exchange, ExchangeType.Topic, durable:true);
        ch.Model.ConfirmSelect();

        var props = ch.Model.CreateBasicProperties();
        props.Persistent = true;
        props.Headers = headers.ToDictionary(kv => kv.Key, kv => (object)kv.Value);

        ch.Model.BasicPublish(_exchange, routingKey ?? type, props, Encoding.UTF8.GetBytes(payload));
        ch.Model.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5)); // 简化:失败抛异常
        return Task.CompletedTask;
    }
}

13.3 消费端 Upsert(PostgreSQL / SQL Server)🧷

PostgreSQL

await _db.Database.ExecuteSqlInterpolatedAsync($@"
    INSERT INTO order_projection(order_id, status, updated_at)
    VALUES ({eto.OrderId}, {"Paid"}, NOW())
    ON CONFLICT (order_id)
    DO UPDATE SET status = EXCLUDED.status, updated_at = NOW();");

SQL Server

MERGE dbo.order_projection AS T
USING (SELECT @order_id AS order_id, @status AS status) AS S
ON (T.order_id = S.order_id)
WHEN MATCHED THEN UPDATE SET T.status = S.status, T.updated_at = SYSUTCDATETIME()
WHEN NOT MATCHED THEN INSERT(order_id, status, updated_at)
VALUES (S.order_id, S.status, SYSUTCDATETIME());


网站公告

今日签到

点亮在社区的每一天
去签到