🚀 ABP VNext + Webhook:订阅与异步回调
📚 目录
🎯 一、背景切入:如何优雅地支持第三方回调?
在现代分布式系统中,Webhook 是实现系统解耦和异步通知的重要手段,广泛用于支付通知、审核结果返回、消息推送等场景。但在实践中,我们需要同时解决以下挑战:
- 🔐 安全防护:如何防止伪造请求?
- 🔄 幂等控制:如何避免重复处理同一事件?
- ⚙️ 失败重试:如何确保最终一致性,并避免无限重试?
- 💼 多厂商 & 多通道:如何优雅地支持不同支付/消息通道?
- 📊 可观测 & 可运维:如何快速诊断、监控并手动补偿?
🏗 二、系统架构设计
🔍 三、核心能力实现
3.1 🔐 签名验证(防伪造)
接口定义
public interface ISignatureVerifier
{
/// <summary>从安全配置中心获取 Secret</summary>
string GetSecret(string provider);
/// <summary>签名 Header 名</summary>
string HeaderName { get; }
bool Verify(string payload, string signature);
}
实现示例(Wxpay)
public class WxSignatureVerifier : ISignatureVerifier, ITransientDependency
{
private readonly IDynamicParameterStore _paramStore;
public string HeaderName { get; } = "X-Wxpay-Signature";
public WxSignatureVerifier(IDynamicParameterStore paramStore)
=> _paramStore = paramStore;
public string GetSecret(string provider)
=> _paramStore.GetOrNullAsync($"Webhook:Secret:{provider}")
.GetAwaiter().GetResult()
?? throw new BusinessException("未配置签名 Secret");
public bool Verify(string payload, string signature)
{
var secret = GetSecret("Wxpay");
var expected = ComputeHmac(payload, secret);
return ConstantTimeEquals(expected, signature);
}
private static string ComputeHmac(string data, string key)
{
using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
return Convert.ToHexString(hmac.ComputeHash(Encoding.UTF8.GetBytes(data)))
.ToLowerInvariant();
}
private static bool ConstantTimeEquals(string a, string b)
{
if (a.Length != b.Length) return false;
int diff = 0;
for (int i = 0; i < a.Length; i++)
diff |= a[i] ^ b[i];
return diff == 0;
}
}
3.2 🔄 幂等控制(防重复处理)
接口与实现
public interface IIdempotencyService
{
Task<bool> IsProcessedAsync(string eventId);
Task<bool> TryProcessAsync(string eventId, Func<Task> handler);
}
public class IdempotencyService : IIdempotencyService, ITransientDependency
{
private readonly IDistributedCache _cache;
private readonly IDistributedLockProvider _lockProvider;
public IdempotencyService(
IDistributedCache cache,
IDistributedLockProvider lockProvider)
{
_cache = cache;
_lockProvider = lockProvider;
}
public async Task<bool> IsProcessedAsync(string eventId)
=> await _cache.GetStringAsync(Key(eventId)) != null;
public async Task<bool> TryProcessAsync(string eventId, Func<Task> handler)
{
var lockName = $"webhook:lock:{eventId}";
var locker = _lockProvider.Create(lockName);
using var handle = await locker.TryAcquireAsync(TimeSpan.FromSeconds(5));
if (handle == null)
return false; // 获取锁失败
if (await IsProcessedAsync(eventId))
return true; // 已处理
// 真正执行业务
await handler.Invoke();
// 缓存标记
await _cache.SetStringAsync(
Key(eventId),
"1",
new DistributedCacheEntryOptions {
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(2)
});
return true;
}
private static string Key(string id) => $"webhook:processed:{id}";
}
🛠️ 3.3 多厂商处理策略
接口
public interface IPaymentWebhookHandler : ITransientDependency
{
string Provider { get; }
Task<WebhookResult> HandleAsync(string payload, IDictionary<string, string> headers);
}
策略工厂
public class WebhookHandlerFactory : ITransientDependency
{
private readonly IEnumerable<IPaymentWebhookHandler> _handlers;
public WebhookHandlerFactory(IEnumerable<IPaymentWebhookHandler> handlers)
=> _handlers = handlers;
public IPaymentWebhookHandler Get(string provider)
=> _handlers
.FirstOrDefault(h =>
h.Provider.Equals(provider, StringComparison.OrdinalIgnoreCase))
?? throw new BusinessException($"不支持的厂商:{provider}");
}
🔁 四、关键流程图
4.1 请求处理流程
4.2 重试工作流程
🛠️ 3.5 接收控制器(统一入口)
[Route("api/webhooks/payments")]
public class WebhookController : AbpController
{
private readonly ISignatureVerifier _verifier;
private readonly WebhookHandlerFactory _factory;
private readonly IIdempotencyService _idem;
private readonly IRepository<WebhookLog, Guid> _logRepo;
public WebhookController(
ISignatureVerifier verifier,
WebhookHandlerFactory factory,
IIdempotencyService idem,
IRepository<WebhookLog, Guid> logRepo)
{
_verifier = verifier;
_factory = factory;
_idem = idem;
_logRepo = logRepo;
}
[HttpPost("{provider}")]
public async Task<IActionResult> HandleAsync(string provider)
{
// 1️⃣ 读取原始 Body
using var sr = new StreamReader(Request.Body);
var payload = await sr.ReadToEndAsync();
// 2️⃣ 签名校验
var signature = Request.Headers[_verifier.HeaderName].FirstOrDefault();
if (signature == null || !_verifier.Verify(payload, signature))
return Unauthorized(new { code = 1001, message = "Invalid signature" });
// 3️⃣ 提取 EventId
string eventId;
try
{
var obj = JObject.Parse(payload);
eventId = obj["eventId"]?.ToString() ?? throw new FormatException();
}
catch
{
return BadRequest(new { code = 1002, message = "Invalid payload" });
}
// 4️⃣ 幂等 & 业务处理
var success = await _idem.TryProcessAsync(eventId, async () =>
{
var handler = _factory.Get(provider);
var result = await handler.HandleAsync(
payload,
Request.Headers.ToDictionary(
h => h.Key, h => h.Value.FirstOrDefault()));
// 5️⃣ 持久化日志
await _logRepo.InsertAsync(new WebhookLog
{
Provider = provider,
Payload = payload,
EventId = eventId,
RetryCount = 0,
Status = result.Success
? WebhookStatus.Success
: WebhookStatus.Failed
});
});
// 6️⃣ 指标埋点
Metrics.WebhookProcessed
.WithLabels(provider, success ? "ok" : "duplicate")
.Inc();
return Ok(new { code = 0, message = success ? "OK" : "Duplicate" });
}
}
📈 五、DevOps & 监控
1. Prometheus 指标
public static class Metrics
{
public static readonly Counter WebhookProcessed =
Metrics.CreateCounter(
"webhook_processed_total",
"Webhook 处理总数",
new CounterConfiguration {
LabelNames = new [] { "provider", "status" }
});
}
services.AddPrometheusMetrics();
app.UseMetricServer(); // /metrics
app.UseHttpMetrics(); // HTTP 请求指标
2. 健康检查
services.AddHealthChecks()
.AddCheck<RedisHealthCheck>("redis")
.AddSqlServer(connStr, name: "sql")
.AddCheck<CustomWebhookHealthCheck>("webhook_receive");
app.UseHealthChecks("/health");
- 容器部署(docker-compose.yml
version: '3.8'
services:
api:
image: yourrepo/webhook-api:latest
ports:
- "5000:80"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost/health"]
interval: 30s
retries: 3
redis:
image: redis:6
db:
image: mcr.microsoft.com/mssql/server:2019-latest
environment:
- ACCEPT_EULA=Y
- SA_PASSWORD=Your_password123
✅ 六、测试
6.1 单元测试
6.1.1 签名校验测试(xUnit)
public class SignatureVerifierTests
{
private readonly ISignatureVerifier _verifier;
public SignatureVerifierTests()
{
// 这里用测试版本的 DynamicParameterStore 返回固定 secret
var paramStore = A.Fake<IDynamicParameterStore>();
A.CallTo(() => paramStore.GetOrNullAsync("Webhook:Secret:Wxpay"))
.Returns(Task.FromResult<string>("test-secret"));
_verifier = new WxSignatureVerifier(paramStore);
}
[Theory]
[InlineData("payload", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")] // 示例 hash
public void Verify_ValidSignature_ReturnsTrue(string payload, string signature)
{
var result = _verifier.Verify(payload, signature);
Assert.True(result);
}
[Fact]
public void Verify_InvalidSignature_ReturnsFalse()
{
var result = _verifier.Verify("payload", "bad-signature");
Assert.False(result);
}
}
6.1.2 幂等服务并发安全测试
public class IdempotencyServiceTests
{
[Fact]
public async Task TryProcessAsync_FirstConcurrency_OnlyOnceExecuted()
{
var cache = new MemoryDistributedCache(new OptionsWrapper<MemoryDistributedCacheOptions>(new MemoryDistributedCacheOptions()));
var lockProvider = new DefaultDistributedLockProvider(); // 假设已实现
var service = new IdempotencyService(cache, lockProvider);
int executeCount = 0;
Func<Task> handler = async () =>
{
await Task.Delay(50);
Interlocked.Increment(ref executeCount);
};
// 并发 10 次调用
var tasks = Enumerable.Range(0, 10)
.Select(_ => service.TryProcessAsync("evt-1", handler))
.ToArray();
await Task.WhenAll(tasks);
// handler 只应执行一次
Assert.Equal(1, executeCount);
}
}
6.1.3 重试 Worker 测试
public class WebhookRetryWorkerTests
{
[Fact]
public async Task DoWorkAsync_FailedLogs_ExponentialBackoffAndDeadLetter()
{
// 准备内存仓库
var logs = new List<WebhookLog>
{
new WebhookLog { Id = Guid.NewGuid(), Provider="Wxpay", Payload="p", EventId="1", RetryCount=5, Status=WebhookStatus.Failed }
};
var repo = new InMemoryRepository<WebhookLog, Guid>(logs);
var fakeFactory = A.Fake<WebhookHandlerFactory>();
// 模拟每次抛异常
A.CallTo(() => fakeFactory.Get(A<string>._))
.Returns(new FailingHandler());
var worker = new WebhookRetryWorker(repo, fakeFactory);
using var cts = new CancellationTokenSource();
await worker.DoWorkAsync(cts.Token);
var updated = logs.Single();
Assert.Equal(WebhookStatus.Dead, updated.Status);
Assert.Equal(6, updated.RetryCount);
}
private class FailingHandler : IPaymentWebhookHandler
{
public string Provider => "Wxpay";
public Task<WebhookResult> HandleAsync(string payload, IDictionary<string, string> headers)
=> throw new Exception("fail");
}
}
6.2 集成测试(Testcontainers)
public class WebhookIntegrationTests : IAsyncLifetime
{
private readonly TestcontainerDatabase _redisContainer;
private readonly TestcontainerDatabase _sqlContainer;
public WebhookIntegrationTests()
{
_redisContainer = new TestcontainersBuilder<TestcontainersDatabase>()
.WithDatabase(new RedisTestcontainerConfiguration())
.Build();
_sqlContainer = new TestcontainersBuilder<TestcontainersDatabase>()
.WithDatabase(new MsSqlTestcontainerConfiguration
{
Password = "Your_password123"
})
.Build();
}
public async Task InitializeAsync()
{
await _redisContainer.StartAsync();
await _sqlContainer.StartAsync();
// 这里可以动态构建 IConfiguration 并启动 TestServer
}
public async Task DisposeAsync()
{
await _redisContainer.DisposeAsync();
await _sqlContainer.DisposeAsync();
}
[Fact]
public async Task FullWebhookFlow_ReturnsOk()
{
// 使用 TestServer 调用 API
var client = TestWebApplicationFactory.CreateClient(new Dictionary<string, string>
{
["ConnectionStrings:Redis"] = _redisContainer.ConnectionString,
["ConnectionStrings:Default"] = _sqlContainer.ConnectionString
});
var payload = "{\"eventId\":\"evt-100\",\"data\":{}}";
var signature = ComputeTestSignature(payload, "test-secret");
var response = await client.PostAsync(
"/api/webhooks/payments/Wxpay",
new StringContent(payload, Encoding.UTF8, "application/json"));
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
// 再次调用应返回 Duplicate
response = await client.PostAsync(
"/api/webhooks/payments/Wxpay",
new StringContent(payload, Encoding.UTF8, "application/json"));
var json = await response.Content.ReadAsStringAsync();
Assert.Contains("Duplicate", json);
}
private static string ComputeTestSignature(string payload, string secret)
{
using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(secret));
return Convert.ToHexString(hmac.ComputeHash(Encoding.UTF8.GetBytes(payload)))
.ToLowerInvariant();
}
}
说明:
- 使用 DotNet.Testcontainers 启动 Redis 和 SQL Server;
- 通过
TestWebApplicationFactory
启动完整 ASP.NET Core 应用;- 验证首次处理和幂等结果。