ABP vNext + HBase:打造超高吞吐分布式列式数据库 🚀🔥
- 基于 ABP vNext 构建亿级写入、毫秒级查询的列式分布式存储
- 支持 HTTP Basic/Bearer/Kerberos+SPNEGO 多种认证方式 🔐
- 丰富的表达式 DSL、
IAsyncEnumerable
扫描 & Scanner 生命周期管理 🔄- 行键防热点、列簇压缩、TTL、版本控制优化 🔑
- 一键 Docker Compose 部署 🐳 + Kubernetes Secrets & 健康探针示例
- OpenTelemetry 链路追踪 🔍、Prometheus 指标命名规范 & Grafana 仪表盘 📊
- 单元测试(MockHttp)✅、集成测试(Testcontainers)🧪 & Phoenix SQL 层(可选)
- 支持批量写入、限流背压、多租户隔离、TLS 证书 🌐
📑 目录
系统架构与场景定位 🧩
应用场景
- 物联网设备时序上报
- 行为日志大规模采集
- 宽表用户画像写入
安全认证与连接注入 🔐
配置模型与验证
public class HBaseSettings
{
[Required, Url] public string BaseUrl { get; set; } = null!;
[Required] public string Table { get; set; } = null!;
public string? Username { get; set; }
public string? Password { get; set; }
public string? Token { get; set; }
}
// Program.cs / Startup.cs
services.AddOptions<HBaseSettings>()
.BindConfiguration("HBaseSettings")
.ValidateDataAnnotations()
.Validate(o => !string.IsNullOrWhiteSpace(o.Username)
|| !string.IsNullOrWhiteSpace(o.Token),
"必须配置 Username/Password 或 Token");
services.AddHttpClient<IHBaseClient, HBaseRestClient>("HBase", (sp, client) =>
{
var opts = sp.GetRequiredService<IOptions<HBaseSettings>>().Value;
client.BaseAddress = new Uri(opts.BaseUrl);
if (!string.IsNullOrEmpty(opts.Token))
{
client.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Bearer", opts.Token);
}
else
{
var cred = Convert.ToBase64String(
Encoding.UTF8.GetBytes($"{opts.Username}:{opts.Password}"));
client.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Basic", cred);
}
})
.AddPolicyHandler(PollyPolicies.DefaultRetry);
Kerberos+SPNEGO 示例 🛡️
var handler = new HttpClientHandler { UseDefaultCredentials = true };
services.AddHttpClient<IHBaseClient, HBaseRestClient>()
.ConfigurePrimaryHttpMessageHandler(() => handler)
.AddPolicyHandler(PollyPolicies.DefaultRetry);
TLS 证书配置示例 🔒
var handler = new HttpClientHandler();
handler.ClientCertificates.Add(
new X509Certificate2("path/to/client.pfx", "certPassword"));
services.AddHttpClient<IHBaseClient, HBaseRestClient>()
.ConfigurePrimaryHttpMessageHandler(() => handler)
.AddPolicyHandler(PollyPolicies.DefaultRetry);
DTO、Entity 与工具类定义 📦
public class DeviceLogInput
{
[Required] public string DeviceId { get; set; } = null!;
[Required] public DateTime Timestamp { get; set; }
public int Level { get; set; }
public string JsonData { get; set; } = null!;
}
public class DeviceLogEntity
{
public string RowKey { get; set; } = null!;
public string DeviceId { get; set; } = null!;
public DateTime Timestamp { get; set; }
public int Level { get; set; }
public string JsonData { get; set; } = null!;
}
public class HBaseRow
{
public string RowKey { get; set; } = null!;
public Dictionary<string, string> Columns { get; set; } = new();
}
public static class RowKeyHelper
{
public static string ReverseTimestamp(DateTime dt) =>
(DateTime.MaxValue.Ticks - dt.Ticks).ToString("D19");
public static string MakeRowKey(string deviceId, DateTime ts)
{
int slot = Math.Abs(deviceId.GetHashCode()) % 16;
return $"{slot:D2}-{ReverseTimestamp(ts)}-{deviceId}";
}
}
Repository 封装与 DSL 查询 ⚙️
public interface IFilter { }
public class ColumnValueFilter : IFilter
{
public string Family { get; }
public string Qualifier { get; }
public CompareOp Op { get; }
public string Value { get; }
// 构造与属性省略
}
public class HBaseQuery
{
public string? RowKeyPrefix { get; set; }
public List<IFilter> Filters { get; } = new();
public DateTime? FromTime { get; set; }
public DateTime? ToTime { get; set; }
public int? Limit { get; set; }
}
public class HBaseQueryBuilder
{
private readonly HBaseQuery _q = new();
public HBaseQueryBuilder WithRowPrefix(string p) { _q.RowKeyPrefix = p; return this; }
public HBaseQueryBuilder AddFilter(IFilter f) { _q.Filters.Add(f); return this; }
public HBaseQueryBuilder WithLimit(int l) { _q.Limit = l; return this; }
public HBaseQuery Build() => _q;
}
Scanner 生命周期管理 & IAsyncEnumerable 🔄
public class HBaseRestClient : IHBaseClient, IAsyncDisposable
{
// … 前面省略 …
public async IAsyncEnumerable<HBaseRow> ScanAsync(HBaseQuery q)
{
var scannerId = await CreateScannerAsync(q);
try
{
while (true)
{
var chunk = await ReadScannerAsync(scannerId);
if (!chunk.Any()) yield break;
foreach (var row in chunk) yield return row;
}
}
finally
{
await DeleteScannerAsync(scannerId);
}
}
// Create/Delete/ReadScannerAsync 等实现省略
}
行键 & 列簇设计优化 🔑
# HBase Shell:开启 Snappy、版本与 TTL
create 'device_logs',
{ NAME=>'cf_base', VERSIONS=>3 },
{ NAME=>'cf_ext', VERSIONS=>1, COMPRESSION=>'SNAPPY', TTL=>86400 }
列簇 | 字段 | 特性 |
---|---|---|
cf_base | id, ts, level | 高频访问字段 |
cf_ext | json_data, attributes | 稀疏字段、压缩候选 |
多租户隔离 🏷️
RowKey = $"{tenantId}-{slot:D2}-{ReverseTimestamp(ts)}-{deviceId}"
Namespace 管理 📂
create_namespace 'tenant_123'
alter 'tenant_123:device_logs', { … }
日志写入 API 示例 📝
[HttpPost("upload")]
public async Task<IActionResult> UploadLog([FromBody] DeviceLogInput input)
{
if (!ModelState.IsValid)
return BadRequest(ModelState);
string rowKey = RowKeyHelper.MakeRowKey(input.DeviceId, input.Timestamp);
var entity = new DeviceLogEntity {
RowKey = rowKey,
DeviceId = input.DeviceId,
Timestamp = input.Timestamp,
Level = input.Level,
JsonData = input.JsonData
};
try
{
await _repo.InsertAsync(rowKey, entity);
return Ok(new { rowKey });
}
catch (HBaseException ex)
{
_logger.LogError(ex, "HBase 写入失败:{RowKey}", rowKey);
return StatusCode(500, "数据写入失败");
}
}
批量写入与背压控制 🚀
public async Task BulkInsertAsync(IEnumerable<DeviceLogEntity> batch)
{
var mutations = batch.Select(e => new {
Put = new {
Row = Convert.ToBase64String(Encoding.UTF8.GetBytes(e.RowKey)),
Cells = new[] {
// 构建 Cells...
}
}
});
await _client.PostAsJsonAsync($"table/{_table}/mutations", new { mutations });
}
var channel = Channel.CreateBounded<DeviceLogEntity>(1000);
_ = Task.Run(async () => {
await foreach (var item in channel.Reader.ReadAllAsync())
await BulkInsertAsync(new[]{ item });
});
public Task EnqueueAsync(DeviceLogEntity log) =>
channel.Writer.WriteAsync(log).AsTask();
部署与运维 🐳📈
Docker Compose
version: '3.8'
services:
zookeeper:
image: zookeeper:3.7
container_name: zk
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zk:2888:3888
ports:
- "2181:2181"
volumes:
- zk-data:/data
hbase:
image: harisekhon/hbase:2.4.9
container_name: hbase
depends_on:
- zookeeper
environment:
HBASE_MANAGES_ZK: "false"
HBASE_ZOOKEEPER_QUORUM: zk
HBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT: 2181
ports:
- "16010:16010" # HBase Master UI
- "9090:9090" # Stargate REST
volumes:
- hbase-data:/data/hbase
- hbase-wal:/data/wal
knox:
image: apache/knox:1.6.0
container_name: knox
depends_on:
- hbase
ports:
- "8443:8443" # Knox Gateway HTTPS
volumes:
- ./knox/conf:/etc/knox/conf:ro
volumes:
zk-data: # 持久化 ZooKeeper 数据(对应容器内 /data),保留选举状态与配置信息
hbase-data: # 持久化 HBase RegionServer 存储数据(对应容器内 /data/hbase)
hbase-wal: # 持久化 HBase 写前日志(WAL)(对应容器内 /data/wal)
Kubernetes & Secrets 🔒
apiVersion: v1
kind: Secret
metadata: { name: hbase-secret }
stringData:
HBASE__BASEURL: "https://hbase-gateway.company.com"
HBASE__USERNAME: "api_user"
HBASE__PASSWORD: "secure!"
---
apiVersion: apps/v1
kind: Deployment
spec:
replicas: 3
template:
spec:
containers:
- name: web
image: yourregistry/abp-hbase:latest
envFrom: [{ secretRef: { name: hbase-secret }}]
livenessProbe:
httpGet: { path: /health/live, port: 80 }
readinessProbe:
httpGet: { path: /health/ready, port: 80 }
监控与告警 📊🚨
OpenTelemetry & Prometheus 指标
services.AddOpenTelemetryTracing(builder =>
{
builder
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddSource("HBaseClient")
.AddJaegerExporter()
.AddPrometheusMetrics(); // 导出 /metrics
});
- 指标示例:
hbase_request_duration_seconds
hbase_request_failures_total
hbase_scanner_active
Grafana 仪表盘示例
{
"dashboard": {
"title": "📊 HBase 服务指标",
"panels": [
{
"type": "graph",
"title": "请求时延 (95th)",
"targets": [{ "expr": "histogram_quantile(0.95, sum(rate(hbase_request_duration_seconds_bucket[5m])) by (le))" }]
},
{
"type": "stat",
"title": "失败率",
"targets": [{ "expr": "increase(hbase_request_failures_total[5m]) / increase(hbase_requests_total[5m])" }]
}
]
}
}
Prometheus AlertRule 示例
groups:
- name: hbase-alerts
rules:
- alert: HBaseHighErrorRate
expr: increase(hbase_request_failures_total[5m]) / increase(hbase_requests_total[5m]) > 0.05
for: 2m
labels: { severity: critical }
annotations: { summary: "🚨 HBase 错误率超过 5%" }
测试与 CI 集成 🧪
单元测试(MockHttp)
var mock = new MockHttpMessageHandler();
mock.When("*/table/device_logs/rowkey*")
.Respond("application/json", "{ /* fake row */ }");
var client = mock.ToHttpClient();
var repo = new HBaseRestClient(client, /* options */);
// Assert GetAsync、InsertAsync 行为
集成测试 & GitHub Actions
name: CI Pipeline
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup .NET
uses: actions/setup-dotnet@v2
with: dotnet-version: '7.0.x'
- name: Start HBase Container
run: docker run -d --name hbase -p 9090:9090 harisekhon/hbase
- name: Run Tests
run: dotnet test --no-build
- name: Teardown
run: docker stop hbase && docker rm hbase
Phoenix SQL 层(可选)🐘
using var conn = new PhoenixConnection("Server=phoenix;...");
await conn.OpenAsync();
using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT * FROM device_logs WHERE ts > ?";
cmd.Parameters.Add(new PhoenixParameter("ts", someTimestamp));
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
Console.WriteLine(reader["json_data"]);