ABP vNext 的 Outbox/Inbox 系统化落地(非 CAP / 非 Kafka):幂等写、去重、延迟到达与死信回补 ✨
📚 目录
一、问题与目标 🧭
不使用 DTC;以本地事务 + Outbox 实现“效果上的一次”。核心要解决:重复投递、延迟/乱序、幂等更新、不可恢复失败(死信)。💡
二、总体架构 🏗️
- 写路径:AppService → 聚合操作 →
SaveChanges()
同事务 写业务表 & Outbox 表;提交后由 Dispatcher 异步派发到 Broker。 - 读路径:Consumer 收到消息 → Inbox 先占位去重(
(messageId, consumer)
)→ 幂等处理 → 提交。 - 可观测性:记录 commit→publish、publish→consume 延迟与重试/死信指标。🔍
三、数据模型与表结构 🗃️
兼容多租户、多版本与回补;补齐运营定位字段。✅
OutboxMessages(PostgreSQL 版)
CREATE TABLE outbox_messages (
id uuid PRIMARY KEY,
aggregate_id uuid NOT NULL,
aggregate_type text NOT NULL,
aggregate_version bigint NOT NULL DEFAULT 0, -- 乱序/并发合并
type text NOT NULL, -- 事件类型/路由键
payload jsonb NOT NULL,
headers jsonb NOT NULL,
tenant_id text,
occurred_at timestamptz NOT NULL,
visible_at timestamptz NOT NULL, -- 延迟/重试
attempts int NOT NULL DEFAULT 0,
status smallint NOT NULL DEFAULT 0, -- 0:new 1:sent 3:dead 9:processing
last_error text,
routing_key text,
partition_key text
);
CREATE INDEX idx_outbox_visibility ON outbox_messages(status, visible_at);
CREATE INDEX idx_outbox_tenant ON outbox_messages(tenant_id);
CREATE INDEX idx_outbox_type ON outbox_messages(type);
SQL Server:把
jsonb
换nvarchar(max)
,或加 JSON 校验约束;时间函数换SYSUTCDATETIME()
。
Inbox(去重/幂等)
CREATE TABLE inbox (
message_id uuid NOT NULL,
consumer text NOT NULL,
processed_at timestamptz NOT NULL,
tenant_id text,
PRIMARY KEY (message_id, consumer)
);
(message_id, consumer)
作为幂等键;支持 TTL/归档。🧹
四、ABP 集成点 🧩
使用自研事件收集接口,避免与 ABP 内置 Outbox 混用。聚合根实现
IHasDomainEvents
,拦截器在同事务落 Outbox。
4.1 领域事件接口与基类
public interface IDomainEvent
{
Guid AggregateId { get; }
string AggregateType { get; }
long AggregateVersion { get; }
string TenantId { get; }
}
public interface IHasDomainEvents
{
IReadOnlyCollection<IDomainEvent> DomainEvents { get; }
void ClearDomainEvents();
}
public abstract class AggregateRootBase : Entity<Guid>, IHasDomainEvents
{
private readonly List<IDomainEvent> _domainEvents = new();
public IReadOnlyCollection<IDomainEvent> DomainEvents => _domainEvents.AsReadOnly();
protected void RaiseDomainEvent(IDomainEvent @event) => _domainEvents.Add(@event);
public void ClearDomainEvents() => _domainEvents.Clear();
}
4.2 Outbox 实体与工厂
public enum OutboxStatus { New = 0, Sent = 1, Dead = 3, Processing = 9 }
public sealed class OutboxMessage
{
public Guid Id { get; init; }
public Guid AggregateId { get; init; }
public string AggregateType { get; init; } = default!;
public long AggregateVersion { get; init; }
public string Type { get; init; } = default!;
public string TenantId { get; init; } = default!;
public string Payload { get; init; } = default!;
public string Headers { get; init; } = "{}";
public DateTimeOffset OccurredAt { get; init; }
public DateTimeOffset VisibleAt { get; set; }
public int Attempts { get; set; }
public string? LastError { get; set; }
public string? RoutingKey { get; set; }
public string? PartitionKey { get; set; }
public OutboxStatus Status { get; set; } = OutboxStatus.New;
public static OutboxMessage From(IDomainEvent ev, DateTimeOffset? visibleAt = null) => new()
{
Id = Guid.NewGuid(),
AggregateId = ev.AggregateId,
AggregateType = ev.AggregateType,
AggregateVersion = ev.AggregateVersion,
Type = ev.GetType().Name,
TenantId = ev.TenantId,
Payload = JsonSerializer.Serialize(ev),
OccurredAt = DateTimeOffset.UtcNow,
VisibleAt = visibleAt ?? DateTimeOffset.UtcNow
};
}
4.3 SaveChanges 拦截器
public sealed class OutboxSaveChangesInterceptor : SaveChangesInterceptor
{
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(
DbContextEventData eventData,
InterceptionResult<int> result,
CancellationToken ct = default)
{
var ctx = (MyDbContext)eventData.Context!;
var aggregates = ctx.ChangeTracker.Entries<IHasDomainEvents>()
.Select(e => e.Entity)
.Where(e => e.DomainEvents.Count > 0)
.ToList();
var now = DateTimeOffset.UtcNow;
foreach (var agg in aggregates)
{
foreach (var ev in agg.DomainEvents)
{
ctx.Outbox.Add(OutboxMessage.From(ev, now));
}
agg.ClearDomainEvents(); // 防重复写入
}
return base.SavingChangesAsync(eventData, result, ct);
}
}
五、OutboxDispatcher ⚡
缩短锁持有时间,降低死锁;支持多实例并发。🧵
5.1 调度时序
5.2 传输抽象
public interface IEventPublisher
{
Task PublishAsync(string type, string payload,
IReadOnlyDictionary<string,string> headers, // message-id/aggregate-id/aggregate-type/aggregate-version/tenant-id/traceparent...
string? routingKey,
CancellationToken ct);
}
5.3 PostgreSQL:CTE 领取 ✅
await using (var tx = await _db.Database.BeginTransactionAsync(ct))
{
var claimed = await _db.Outbox
.FromSqlInterpolated($@"
WITH cte AS (
SELECT id
FROM outbox_messages
WHERE status = 0 AND visible_at <= NOW()
ORDER BY visible_at
FOR UPDATE SKIP LOCKED
LIMIT {batchSize}
)
UPDATE outbox_messages o
SET status = 9
FROM cte
WHERE o.id = cte.id
RETURNING o.*;")
.ToListAsync(ct);
await tx.CommitAsync(ct);
var maxAttempts = _options.Value.MaxAttempts;
foreach (var m in claimed)
{
try
{
await _publisher.PublishAsync(m.Type, m.Payload, Headers(m), m.RoutingKey, ct);
await _db.Database.ExecuteSqlInterpolatedAsync($@"
UPDATE outbox_messages
SET status = 1, attempts = {m.Attempts + 1}, last_error = NULL
WHERE id = {m.Id};", ct);
}
catch (TransientException ex)
{
var nextAttempts = m.Attempts + 1;
if (nextAttempts >= maxAttempts)
{
await _db.Database.ExecuteSqlInterpolatedAsync($@"
UPDATE outbox_messages
SET status = 3, attempts = {nextAttempts}, last_error = {ex.Message}
WHERE id = {m.Id};", ct);
}
else
{
var next = Backoff(nextAttempts);
await _db.Database.ExecuteSqlInterpolatedAsync($@"
UPDATE outbox_messages
SET status = 0,
attempts = {nextAttempts},
visible_at = NOW() + {next},
last_error = {ex.Message}
WHERE id = {m.Id};", ct);
}
}
catch (PermanentException ex)
{
await _db.Database.ExecuteSqlInterpolatedAsync($@"
UPDATE outbox_messages
SET status = 3, last_error = {ex.Message}
WHERE id = {m.Id};", ct);
}
}
}
5.4 SQL Server:领取(READPAST/UPDLOCK/ROWLOCK
)
await using (var tx = await _db.Database.BeginTransactionAsync(ct))
{
var claimed = await _db.Outbox.FromSqlRaw(@"
;WITH cte AS (
SELECT TOP (@p0) *
FROM outbox_messages WITH (ROWLOCK, UPDLOCK, READPAST)
WHERE status = 0 AND visible_at <= SYSUTCDATETIME()
ORDER BY visible_at
)
UPDATE cte SET status = 9
OUTPUT inserted.*;",
batchSize).ToListAsync(ct);
await tx.CommitAsync(ct);
}
// 发布与回写同上(注意时间函数/类型差异)
5.5 指数退避 + 抖动 ⏱️(兼容 Mermaid timeline)
static TimeSpan Backoff(int attempt)
{
var baseSec = Math.Min(300, Math.Pow(3, attempt)); // 上限5分钟
var jitterMs = Random.Shared.Next(0, 2500);
return TimeSpan.FromSeconds(baseSec) + TimeSpan.FromMilliseconds(jitterMs);
}
退避可视化
六、消费端:Inbox “先占位后执行业务” + 幂等包装器 🧱
解决并发竞态:先占位(INSERT/ON CONFLICT 或 MERGE)拿到“处理所有权”,再执行业务,彻底杜绝重复副作用。🛡️
PostgreSQL
public sealed class InboxHandler<T> : IMessageHandler<T>
{
private readonly InboxRepository _inbox;
private readonly YourDbContext _db;
private readonly string _consumerName;
private readonly IMessageHandler<T> _inner;
public async Task HandleAsync(T msg, MessageContext ctx, CancellationToken ct)
{
await using var tx = await _db.Database.BeginTransactionAsync(ct);
// 1) 先占位:抢占处理所有权
var inserted = await _db.Database.ExecuteSqlInterpolatedAsync($@"
INSERT INTO inbox(message_id, consumer, processed_at, tenant_id)
VALUES ({ctx.MessageId}, {_consumerName}, NOW(), {ctx.TenantId})
ON CONFLICT (message_id, consumer) DO NOTHING;");
if (inserted == 0)
{
await tx.RollbackAsync(ct);
return; // 已被他人处理/历史处理
}
// 2) 执行业务(使用 Upsert/业务幂等键)
await _inner.HandleAsync(msg, ctx, ct);
// 3) 可选:更新 processed_at/审计
await tx.CommitAsync(ct);
}
}
SQL Server(占位用 MERGE)
MERGE dbo.inbox AS T
USING (SELECT @message_id AS message_id, @consumer AS consumer) AS S
ON (T.message_id = S.message_id AND T.consumer = S.consumer)
WHEN NOT MATCHED THEN
INSERT(message_id, consumer, processed_at, tenant_id)
VALUES(@message_id, @consumer, SYSUTCDATETIME(), @tenant_id);
-- 若已存在则不做任何事(直接返回)
七、延迟到达与乱序 ⏳
- 聚合版本:事件携带
AggregateVersion
;消费者若检测旧版本,可“读-改-写合并”或忽略(按业务定义)。 - 时间窗:强时序场景构造 1–5 分钟“乱序缓冲”,窗外迟到进入补偿流程(重放/合并/告警)。🔁
八、死信与回补 🩹
- 生产者侧:超上限/永久失败 →
status=Dead(3)
,last_error
记录根因。 - 消费者侧:不可恢复异常 → DLQ(或
dead_consume
表)。 - 回补工具:支持按
id
/时间窗/租户/聚合/类型筛选;回放依赖 Inbox 幂等,不会重复生效;产出成功/失败/耗时/覆盖率报表。✅
状态流转
九、对账脚本与健康检查 🧾
- 对账:按日统计“业务表变更计数” vs “消费成功事件计数”(聚合到租户/类型),输出差异。
- 健康检查/指标:
outbox_queue_lag_seconds
、outbox_attempt_rate
、outbox_dead_ratio
、consumer_latency_p95
、inbox_growth_per_day
等。
指标关系
十、可观测性与 SLO 🔭
- Tracing:
App → Outbox.Insert → Dispatcher.Claim → Publish → Broker → Consumer.Inbox → Handler
全链路埋点(W3C TraceContext)。 - Metrics:派发/消费 p95、重试率、死信率、乱序丢弃率。
- Logs:payload 建议仅存 摘要/哈希;日志含
messageId/tenant/aggregate/type/version/attempts/last_error
。
SLO:阈值需压测后定,不建议硬编码。🧪
十一、配置与扩展点 🧰
- 重试策略:指数退避 + 抖动;最大尝试数分级(如支付/库存更高保真)。
- 分片并发:按
tenant_id
或hash(id) % N
分 shard;多实例靠 DB 行锁(或增租约/分布式锁)。 - 清理策略:Outbox
status=1
归档/TTL;Inbox 保留 N 天。 - 发布可靠性:RabbitMQ 开启 Publisher Confirms;ASB/NATS 各自确认/幂等机制。✅
十二、最小可运行 Demo(Compose) ⚙️
docker-compose.yml(PostgreSQL + RabbitMQ)
version: "3.9"
services:
pg:
image: postgres:16
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: app
ports: [ "5432:5432" ]
rabbit:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
appsettings.json(示意)
{
"ConnectionStrings": {
"Default": "Host=localhost;Port=5432;Database=app;Username=postgres;Password=postgres"
},
"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest",
"Exchange": "app.events"
},
"Outbox": {
"BatchSize": 200,
"IntervalMs": 200,
"MaxAttempts": 8
}
}
启动:1)
docker compose up -d
2) 运行 ABP 应用(含迁移) 3) http://localhost:15672(guest/guest)🎛️
十三、关键代码骨架 🧪
13.1 DbContext(多数据库映射)
public class MyDbContext : AbpDbContext<MyDbContext>
{
public DbSet<OutboxMessage> Outbox => Set<OutboxMessage>();
public DbSet<InboxRecord> Inbox => Set<InboxRecord>();
protected override void OnModelCreating(ModelBuilder b)
{
base.OnModelCreating(b);
b.Entity<OutboxMessage>(cfg =>
{
cfg.ToTable("outbox_messages");
cfg.HasKey(x => x.Id);
if (Database.IsNpgsql())
{
cfg.Property(x => x.Payload).HasColumnType("jsonb");
cfg.Property(x => x.Headers).HasColumnType("jsonb");
}
else if (Database.IsSqlServer())
{
cfg.Property(x => x.Payload).HasColumnType("nvarchar(max)");
cfg.Property(x => x.Headers).HasColumnType("nvarchar(max)");
}
cfg.HasIndex(x => new { x.Status, x.VisibleAt })
.HasDatabaseName("idx_outbox_visibility");
});
b.Entity<InboxRecord>(cfg =>
{
cfg.ToTable("inbox");
cfg.HasKey(x => new { x.MessageId, x.Consumer });
});
}
}
13.2 RabbitMQ Publisher(单例连接 + 通道池)📨
// Startup/Module
services.AddSingleton<IConnection>(sp =>
{
var opt = sp.GetRequiredService<IOptions<RabbitOptions>>().Value;
var factory = new ConnectionFactory { HostName = opt.HostName, UserName = opt.UserName, Password = opt.Password };
return factory.CreateConnection();
});
services.AddSingleton<IRabbitChannelPool, RabbitChannelPool>();
services.AddSingleton<IEventPublisher, RabbitMqPublisher>();
public interface IRabbitChannelPool { PooledChannel Rent(); }
public sealed class PooledChannel : IDisposable { public IModel Model { get; init; } /* 省略池实现 */ public void Dispose(){ /* 归还 */ } }
public sealed class RabbitMqPublisher : IEventPublisher
{
private readonly IRabbitChannelPool _pool;
private readonly string _exchange;
public RabbitMqPublisher(IRabbitChannelPool pool, IOptions<RabbitOptions> opt)
{
_pool = pool;
_exchange = opt.Value.Exchange;
}
public Task PublishAsync(string type, string payload,
IReadOnlyDictionary<string,string> headers, string? routingKey, CancellationToken ct)
{
using var ch = _pool.Rent(); // 每次取一个通道,避免多线程共享 IModel
ch.Model.ExchangeDeclare(_exchange, ExchangeType.Topic, durable:true);
ch.Model.ConfirmSelect();
var props = ch.Model.CreateBasicProperties();
props.Persistent = true;
props.Headers = headers.ToDictionary(kv => kv.Key, kv => (object)kv.Value);
ch.Model.BasicPublish(_exchange, routingKey ?? type, props, Encoding.UTF8.GetBytes(payload));
ch.Model.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5)); // 简化:失败抛异常
return Task.CompletedTask;
}
}
13.3 消费端 Upsert(PostgreSQL / SQL Server)🧷
PostgreSQL
await _db.Database.ExecuteSqlInterpolatedAsync($@"
INSERT INTO order_projection(order_id, status, updated_at)
VALUES ({eto.OrderId}, {"Paid"}, NOW())
ON CONFLICT (order_id)
DO UPDATE SET status = EXCLUDED.status, updated_at = NOW();");
SQL Server
MERGE dbo.order_projection AS T
USING (SELECT @order_id AS order_id, @status AS status) AS S
ON (T.order_id = S.order_id)
WHEN MATCHED THEN UPDATE SET T.status = S.status, T.updated_at = SYSUTCDATETIME()
WHEN NOT MATCHED THEN INSERT(order_id, status, updated_at)
VALUES (S.order_id, S.status, SYSUTCDATETIME());