🚦 差分隐私在运营指标:ABP 的 DP 计数器与噪声预算
📚 目录
0. TL;DR 🚀
- 👤 隐私单元(Privacy Unit)选择 用户/设备级,写路径先按隐私单元聚合并剪裁贡献上界 K。
- ➕ 计数/求和 → Laplace(尺度
b = Δ/ε
);📈 比例/均值 → Gaussian(生产建议解析高斯校准 σ;示例以经典上界为保守缺省)。 - 🧩 后处理不降隐私:结果可做非负截断、舍入、分桶、平滑等业务约束。
- 📒 预算账本:按“租户×指标×窗口开始时刻(UTC)”累计 ε/δ;Key 与窗口严格对齐;Lua 原子消费;TTL=窗口剩余秒数;超额硬阻断。
- 🔒 发布幂等:周期任务在每个窗口结束时发布上一窗(UTC);用 Redis SET NX 建立
released
锁,保证只发布一次(并可 Upsert 落库)。 - 🧪 灰度与 A/B:按租户/功能开关;以 MAPE / P95 误差阈值放量;随时回滚。
📈 一图看懂(写入→发布→预算→加噪)
1. 背景与边界 🧭
- 🔐 与加密/RLS:加密/RLS 决定“谁能看真值”;差分隐私(DP)约束“公开后能泄露多少”。
- ✅ 适用:DAU/留存、功能调用次数、错误率、漏斗流量等群体指标;⛔ 不用于强一致计费或逐个体对账。
- ⚠️ 风险与对策:合谋/跨窗差分/滑动窗反推 → 隐私预算、最小样本门槛、发布限频、审计追溯。
2. DP 工程口径 📐
📏 敏感度 Δ:单个隐私单元在单窗口的最大贡献(经 K 剪裁后)。
🔊 Laplace:
b = Δ/ε
;❄️ Gaussian:σ = f(ε, δ, Δ)
(生产建议“解析高斯”或接入 OpenDP 校准;示例用经典上界)。🧮 组合/会计:
- 拉普拉斯:ε 线性加和(基础组合)。
- 高斯:用 RDP 按 α 网格累加,再对给定 δ 反推
ε*(δ) = min_α {RDP(α) + ln(1/δ)/(α-1)}
,展示“已用/剩余”。
🧯 后处理不伤隐私:非负截断、舍入、分桶、平滑等结果级操作不降低 DP 保证。
3. 目标架构(ABP 集成)🏗️
Abp.DpMetrics 模块(应用层)
IDpCounter.Increment(tenant, metric, unitId, amount=1)
:写路径,按隐私单元聚合→剪裁 K→入库。DpPublishWorker
:周期发布;按窗口结束发布上一窗(UTC);发布幂等锁→预算校验→加噪→后处理→审计落表。IPrivacyAccountant
(Redis/DB 实现):存(tenant, metric, windowStartUtc)
的 ε/δ 用量与上限;提供Lua 原子 TryConsume。IBudgetPolicyProvider
:预算上限(ε/δ cap)配置化,支持按租户/指标/窗口粒度下发。IReleaseLog
:存噪声参数、ε/δ、机制、真值哈希、seed 哈希、审批单。
组件关系图
4. 数据/配置模型 🗂️
5. 关键实现 🧩
代码为最小可运行模板,重点展示口径与关键边界处理;生产可替换为解析高斯校准、完善 RDP 会计、指标门槛配置中心化等。
5.1 接口统一(会计以窗口起点为主键)
public interface IDpCounter {
Task IncrementAsync(string tenantId, string metric, string privacyUnitId, int amount = 1);
}
public interface IPrivacyAccountant {
Task<bool> TryConsumeAsync(
string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window,
double epsilon, double? delta);
Task<(double usedEps, double? usedDelta, double capEps, double? capDelta)>
GetUsageAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window);
}
public interface IBudgetPolicyProvider {
(double capEps, double? capDel) GetCaps(string tenantId, string metric, TimeSpan window);
}
public enum DpMechanism { Laplace, Gaussian }
5.2 随机源与噪声(开区间采样 + 可缓存正态)
public interface IRandomSource { double NextUnit(); }
public sealed class CryptoRandom : IRandomSource {
private readonly System.Security.Cryptography.RandomNumberGenerator _rng
= System.Security.Cryptography.RandomNumberGenerator.Create();
public double NextUnit(){
Span<byte> b = stackalloc byte[8]; _rng.GetBytes(b);
ulong u = BitConverter.ToUInt64(b);
return (u >> 11) * (1.0 / (1UL << 53)); // [0,1)
}
}
public static class RandUtil {
public static double Open01(IRandomSource r) {
double u; do { u = r.NextUnit(); } while (u <= double.Epsilon || u >= 1.0 - double.Epsilon);
return u; // (0,1)
}
}
public sealed class GaussianSampler {
private readonly IRandomSource _r; private bool _has; private double _cache;
public GaussianSampler(IRandomSource r){ _r = r; }
public double Next(){
if (_has){ _has=false; return _cache; }
var u1 = RandUtil.Open01(_r); var u2 = RandUtil.Open01(_r);
var mag = Math.Sqrt(-2.0 * Math.Log(u1));
var z0 = mag * Math.Cos(2.0 * Math.PI * u2);
var z1 = mag * Math.Sin(2.0 * Math.PI * u2);
_cache = z1; _has = true; return z0; // 产出两个,缓存一个
}
}
public static class DpNoise {
public static double Laplace(double x, double b, IRandomSource r){
double u = RandUtil.Open01(r) - 0.5; // (-0.5,0.5)
double noise = -b * Math.Sign(u) * Math.Log(1 - 2 * Math.Abs(u));
return Math.Max(0, x + noise); // 后处理:非负
}
public static double ClassicGaussianSigma(double deltaF, double eps, double delta){
return Math.Sqrt(2 * Math.Log(1.25 / delta)) * deltaF / eps; // 经典上界
}
}
5.3 窗口对齐(上一窗,UTC)与预算键
static (DateTimeOffset Start, DateTimeOffset End) PrevWindowUtc(IClock clock, TimeSpan w){
var now = clock.Now.ToUniversalTime();
long sec = (long)w.TotalSeconds;
long end = (now.ToUnixTimeSeconds() / sec) * sec; // 当前窗结束(UTC)
long start = end - sec; // 上一窗 [start,end)
return (DateTimeOffset.FromUnixTimeSeconds(start), DateTimeOffset.FromUnixTimeSeconds(end));
}
static string BudgetKey(string tenant, string metric, TimeSpan w, DateTimeOffset windowStartUtc){
return $"dp:budget:{tenant}:{metric}:{(int)w.TotalSeconds}:{windowStartUtc.ToUnixTimeSeconds()}";
}
5.4 Redis 会计(Lua 原子消费 + TTL 对齐“窗口剩余秒数” + 配置化上限)
public sealed class RedisPrivacyAccountant : IPrivacyAccountant {
private readonly IConnectionMultiplexer _redis; private readonly IBudgetPolicyProvider _caps;
public RedisPrivacyAccountant(IConnectionMultiplexer redis, IBudgetPolicyProvider caps){ _redis = redis; _caps = caps; }
private const string LUA = @"
local key = KEYS[1]
local epsUse = tonumber(ARGV[1])
local delUse = (ARGV[2] ~= '' and tonumber(ARGV[2]) or nil)
local capEps = tonumber(ARGV[3])
local capDel = (ARGV[4] ~= '' and tonumber(ARGV[4]) or nil)
local ttlSec = tonumber(ARGV[5])
local used = redis.call('GET', key)
local usedEps, usedDel = 0, 0
if used then
local i = string.find(used, '|')
usedEps = tonumber(string.sub(used, 1, i-1))
usedDel = tonumber(string.sub(used, i+1))
end
if usedEps + epsUse > capEps then return 0 end
if capDel and delUse and (usedDel + delUse > capDel) then return 0 end
usedEps = usedEps + epsUse
usedDel = usedDel + (delUse or 0)
redis.call('SET', key, usedEps .. '|' .. usedDel)
if ttlSec > 0 then redis.call('EXPIRE', key, ttlSec) end
return 1
";
public async Task<bool> TryConsumeAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window, double epsilon, double? delta){
var db = _redis.GetDatabase();
var key = BudgetKey(tenantId, metric, window, windowStartUtc);
var (capEps, capDel) = _caps.GetCaps(tenantId, metric, window);
var nowUtc = DateTimeOffset.UtcNow;
var we = windowStartUtc + window;
int ttlSec = (int)Math.Max(1, (we - nowUtc).TotalSeconds); // TTL 对齐窗口结束
var result = (long)await db.ScriptEvaluateAsync(
LUA,
new RedisKey[]{ key },
new RedisValue[]{
epsilon,
delta.HasValue ? (RedisValue)delta.Value : RedisValue.EmptyString,
capEps,
capDel.HasValue ? (RedisValue)capDel.Value : RedisValue.EmptyString,
ttlSec
});
return result == 1L;
}
public async Task<(double, double?, double, double?)> GetUsageAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window){
var db = _redis.GetDatabase();
var key = BudgetKey(tenantId, metric, window, windowStartUtc);
var s = await db.StringGetAsync(key);
double usedEps = 0, usedDelRaw = 0;
if (!s.IsNullOrEmpty){
var parts = ((string)s!).Split('|');
usedEps = double.Parse(parts[0], System.Globalization.CultureInfo.InvariantCulture);
usedDelRaw= double.Parse(parts[1], System.Globalization.CultureInfo.InvariantCulture);
}
var (capEps, capDel) = _caps.GetCaps(tenantId, metric, window);
double? usedDel = capDel.HasValue ? usedDelRaw : (double?)null;
return (usedEps, usedDel, capEps, capDel);
}
var caps = _caps.GetCaps(tenantId, metric, window);
return (usedEps, double.IsNaN(usedDel)? null : usedDel, caps.capEps, caps.capDel);
}
}
5.5 发布 Worker(上一窗发布 + 幂等锁 + 小样本抑制 + 预算校验 + 加噪)
public sealed class DpPublishWorker : AsyncPeriodicBackgroundWorkerBase, ITransientDependency {
private readonly IClock _clock; private readonly IDpRepository _repo;
private readonly IPrivacyAccountant _acct; private readonly IRandomSource _rng = new CryptoRandom();
private readonly GaussianSampler _gau; private readonly IConnectionMultiplexer _redis;
public DpPublishWorker(AbpTimer timer, IClock clock, IDpRepository repo, IPrivacyAccountant acct, IConnectionMultiplexer redis)
: base(timer){ _clock = clock; _repo = repo; _acct = acct; _gau = new GaussianSampler(_rng); _redis = redis; Timer.Period = 60_000; }
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext context){
var ct = context.CancellationToken;
foreach (var def in await _repo.ListMetricDefinitionsAsync()){
ct.ThrowIfCancellationRequested();
var (ws, we) = PrevWindowUtc(_clock, def.Window); // 发布上一窗(UTC)
// 幂等发布锁:只允许一个实例发布该窗
var db = _redis.GetDatabase();
int ttlSec = (int)Math.Max(1, (we - _clock.Now.ToUniversalTime()).TotalSeconds);
var relKey = $"dp:released:{def.TenantId}:{def.Name}:{(int)def.Window.TotalSeconds}:{ws.ToUnixTimeSeconds()}";
bool acquired = await db.StringSetAsync(relKey, "1", TimeSpan.FromSeconds(ttlSec), when: When.NotExists);
if (!acquired) continue;
var (trueCount, distinctUnits) = await _repo.AggregateAsync(def, ws, we);
if (distinctUnits < def.MinDistinctUnits){ await _repo.LogSuppressedAsync(def, ws, we); continue; }
var ok = await _acct.TryConsumeAsync(def.TenantId, def.Name, ws, def.Window, def.Epsilon, def.Delta);
if (!ok){ await _repo.LogBudgetExceededAsync(def, ws, we); continue; }
double noisy;
if (def.Mechanism == DpMechanism.Laplace){
var b = def.SensitivityDelta / def.Epsilon;
noisy = DpNoise.Laplace(trueCount, b, _rng);
} else {
if (!def.Delta.HasValue) { await _repo.LogErrorAsync(def, ws, we, "Gaussian requires delta"); continue; }
var sigma = DpNoise.ClassicGaussianSigma(def.SensitivityDelta, def.Epsilon, def.Delta.Value);
noisy = Math.Max(0, trueCount + _gau.Next() * sigma);
}
await _repo.PublishAsync(def, ws, we, noisy, Hash64Stable(trueCount), def.Epsilon, def.Delta, def.Mechanism.ToString());
}
}
5.6 比例/均值指标(分子/分母分别加噪 + 小样本抑制)
// 伪代码:同窗同预算,或将 ε 在分子/分母间按重要性拆分
public async Task<double?> ReleaseRatioAsync(MetricDefinition numDef, MetricDefinition denDef, DateTimeOffset ws, DateTimeOffset we){
var (num, nu) = await _repo.AggregateAsync(numDef, ws, we);
var (den, du) = await _repo.AggregateAsync(denDef, ws, we);
var minU = Math.Min(nu, du);
if (minU < Math.Min(numDef.MinDistinctUnits, denDef.MinDistinctUnits)) return null; // 小样本抑制
// 分子与分母分别预算校验与加噪(略,等同计数)
var noisyNum = /* Laplace/Gaussian */ 0.0;
var noisyDen = /* Laplace/Gaussian */ 1.0;
if (noisyDen <= 0) return null; // 防爆炸
var ratio = noisyNum / noisyDen;
return Math.Clamp(ratio, 0, 1);
}
6. 灰度与回滚 🎛️
- 🔀 特性开关:按租户启用“DP化指标”。数值型特性可存“灰度比例”,由调用方按此值自定义分流;或在网关层做百分比分流。
- 🧪 A/B 验证:监控 MAPE / P95;达标→放量;不达标→回滚到“内部仅真值”。
7. 可视化与审计 📊
- 📉 面板:ε/δ 消耗曲线、剩余额度、发布热力图、小样本抑制/超额次数、失败原因分布(超 ε/超 δ)。
- 🧾 审计:租户、指标、窗口(UTC)、真值哈希、噪声参数、ε/δ、机制、seedHash、审批单号。
- 🆘 运维:超额硬阻断、速率限制、异常告警(深夜暴增、跨租户)。
⏱️ 窗口与 TTL 对齐示意
8. 评测脚本 🧪
# eval_dp_counter.py(Laplace 误差曲线)
import numpy as np
def laplace_noise(b, size=1, rng=None):
rng = rng or np.random.default_rng(2025)
u = rng.random(size=size)
u = np.clip(u, np.finfo(float).eps, 1-np.finfo(float).eps) - 0.5 # (-0.5,0.5)
return -b * np.sign(u) * np.log(1 - 2 * np.abs(u))
def eval_mape_p95(true_counts, epsilon, delta_f=1.0, trials=2000):
b = delta_f / epsilon
rows = []
for n in true_counts:
preds = np.clip(n + laplace_noise(b, trials), 0, None)
mape = np.mean(np.abs(preds - n) / max(1, n))
p95 = np.percentile(np.abs(preds - n), 95)
rows.append((n, mape, p95))
return rows
if __name__ == '__main__':
Ns = [50, 100, 300, 1000, 3000, 10000]
for eps in [0.3, 0.5, 1.0, 2.0]:
print(eps, eval_mape_p95(Ns, eps))
经验近似:Laplace 的
E(|噪声|)≈b
,P95(|噪声|)≈2.996·b
,可用于看板阈值与业务影响初判。
9. 上线步骤 ✅
- 盘点指标 → 划分“公开/内审”;
- 选隐私单元与剪裁 K;
- 为公开集合设预算(每窗 ε/δ 上限、最小样本、发布频率),由 IBudgetPolicyProvider 配置化下发;
- 接入
Abp.DpMetrics
,启用 Redis & Background Worker; - 启用发布幂等锁 与 Lua 原子预算消费;
- 灰度:设置租户特性与分流比例;
- A/B:看板跟踪 MAPE/P95 与预算曲线,达标放量;
- 演练:预算见底/拒绝服务/回滚 SOP;
- 合规:导出审计日志与预算账本快照(UTC)。
10. 仓库结构建议 🧱
/src/Abp.DpMetrics/ // 模块源码(接口/机制/会计/Worker)
/src/Abp.DpMetrics.Tests/ // xUnit:Laplace 误差、预算原子消费并发、窗口边界、幂等锁
/tools/eval/ // 评测脚本
/tools/dashboard/ // ε 账本 & 误差曲线仪表盘模板
/docker/docker-compose.yml // Redis(可选)
docker-compose(可选)
version: "3.9"
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
command: ["redis-server","--appendonly","yes"]
appsettings.json(片段)
{
"Redis": { "Configuration": "localhost:6379" },
"DistributedCache": { "KeyPrefix": "demo:dp:" }
}
11. 常见坑 🧨
- “解析高斯”误称:若未接入解析校准,应标注为“经典高斯上界”。
- 预算键未对齐窗:用 TTL 滑动计窗会与日报/小时看板错位;务必以
windowStartUtc
为主键,TTL=窗口剩余秒数。 - 上一窗发布 & 幂等:避免按
floor(now,w)
导致跨点漏发/重复发;用released
锁或 DB Upsert。 - 预算原子性:并发发布须用 Lua 原子或分布式锁;推荐 Lua。
- 随机源与开区间:避免
log(0)
边界;Box–Muller 产出双样本需缓存。 - 比例/均值抑制:小样本或分母过小应抑制结果;必要时加无偏修正。
- 审计哈希稳定:固定序列化精度/编码,跨平台一致。
- UTC 一致性:窗口、键、审计时间统一用 UTC,避免时区与夏令时影响。