ABP vNext + HBase:打造超高吞吐分布式列式数据库

发布于:2025-06-10 ⋅ 阅读:(17) ⋅ 点赞:(0)

ABP vNext + HBase:打造超高吞吐分布式列式数据库 🚀🔥

  • 基于 ABP vNext 构建亿级写入、毫秒级查询的列式分布式存储
  • 支持 HTTP Basic/Bearer/Kerberos+SPNEGO 多种认证方式 🔐
  • 丰富的表达式 DSL、IAsyncEnumerable 扫描 & Scanner 生命周期管理 🔄
  • 行键防热点、列簇压缩、TTL、版本控制优化 🔑
  • 一键 Docker Compose 部署 🐳 + Kubernetes Secrets & 健康探针示例
  • OpenTelemetry 链路追踪 🔍、Prometheus 指标命名规范 & Grafana 仪表盘 📊
  • 单元测试(MockHttp)✅、集成测试(Testcontainers)🧪 & Phoenix SQL 层(可选)
  • 支持批量写入、限流背压、多租户隔离、TLS 证书 🌐


系统架构与场景定位 🧩

应用场景

  • 物联网设备时序上报
  • 行为日志大规模采集
  • 宽表用户画像写入
Server
Client
Application Service
HBase REST Proxy/Knox
HBase RegionServer
HDFS 存储
ZooKeeper
ABP vNext Web API

安全认证与连接注入 🔐

配置模型与验证

public class HBaseSettings
{
    [Required, Url] public string BaseUrl { get; set; } = null!;  
    [Required]      public string Table   { get; set; } = null!;  
    public string?     Username { get; set; }  
    public string?     Password { get; set; }  
    public string?     Token    { get; set; }  
}
// Program.cs / Startup.cs
services.AddOptions<HBaseSettings>()
    .BindConfiguration("HBaseSettings")
    .ValidateDataAnnotations()
    .Validate(o => !string.IsNullOrWhiteSpace(o.Username) 
                 || !string.IsNullOrWhiteSpace(o.Token),
              "必须配置 Username/Password 或 Token");

services.AddHttpClient<IHBaseClient, HBaseRestClient>("HBase", (sp, client) =>
{
    var opts = sp.GetRequiredService<IOptions<HBaseSettings>>().Value;
    client.BaseAddress = new Uri(opts.BaseUrl);
    if (!string.IsNullOrEmpty(opts.Token))
    {
        client.DefaultRequestHeaders.Authorization =
            new AuthenticationHeaderValue("Bearer", opts.Token);
    }
    else
    {
        var cred = Convert.ToBase64String(
            Encoding.UTF8.GetBytes($"{opts.Username}:{opts.Password}"));
        client.DefaultRequestHeaders.Authorization =
            new AuthenticationHeaderValue("Basic", cred);
    }
})
.AddPolicyHandler(PollyPolicies.DefaultRetry);
Kerberos+SPNEGO 示例 🛡️
var handler = new HttpClientHandler { UseDefaultCredentials = true };
services.AddHttpClient<IHBaseClient, HBaseRestClient>()
        .ConfigurePrimaryHttpMessageHandler(() => handler)
        .AddPolicyHandler(PollyPolicies.DefaultRetry);
TLS 证书配置示例 🔒
var handler = new HttpClientHandler();
handler.ClientCertificates.Add(
    new X509Certificate2("path/to/client.pfx", "certPassword"));
services.AddHttpClient<IHBaseClient, HBaseRestClient>()
        .ConfigurePrimaryHttpMessageHandler(() => handler)
        .AddPolicyHandler(PollyPolicies.DefaultRetry);

DTO、Entity 与工具类定义 📦

public class DeviceLogInput
{
    [Required] public string DeviceId   { get; set; } = null!;
    [Required] public DateTime Timestamp { get; set; }
    public int Level       { get; set; }
    public string JsonData { get; set; } = null!;
}

public class DeviceLogEntity
{
    public string RowKey   { get; set; } = null!;
    public string DeviceId { get; set; } = null!;
    public DateTime Timestamp { get; set; }
    public int Level       { get; set; }
    public string JsonData { get; set; } = null!;
}

public class HBaseRow
{
    public string RowKey { get; set; } = null!;
    public Dictionary<string, string> Columns { get; set; } = new();
}

public static class RowKeyHelper
{
    public static string ReverseTimestamp(DateTime dt) =>
        (DateTime.MaxValue.Ticks - dt.Ticks).ToString("D19");

    public static string MakeRowKey(string deviceId, DateTime ts)
    {
        int slot = Math.Abs(deviceId.GetHashCode()) % 16;
        return $"{slot:D2}-{ReverseTimestamp(ts)}-{deviceId}";
    }
}

Repository 封装与 DSL 查询 ⚙️

public interface IFilter { }

public class ColumnValueFilter : IFilter
{
    public string Family { get; }
    public string Qualifier { get; }
    public CompareOp Op { get; }
    public string Value { get; }
    // 构造与属性省略
}

public class HBaseQuery
{
    public string? RowKeyPrefix { get; set; }
    public List<IFilter> Filters { get; } = new();
    public DateTime? FromTime   { get; set; }
    public DateTime? ToTime     { get; set; }
    public int? Limit           { get; set; }
}

public class HBaseQueryBuilder
{
    private readonly HBaseQuery _q = new();
    public HBaseQueryBuilder WithRowPrefix(string p) { _q.RowKeyPrefix = p; return this; }
    public HBaseQueryBuilder AddFilter(IFilter f)    { _q.Filters.Add(f); return this; }
    public HBaseQueryBuilder WithLimit(int l)        { _q.Limit = l; return this; }
    public HBaseQuery Build()                       => _q;
}

Scanner 生命周期管理 & IAsyncEnumerable 🔄

应用 HBaseRestClient HBase REST 请求 ScanAsync(query) POST /table/{table}/scanner scannerId GET /scanner/{scannerId} RowsChunk yield RowsChunk loop [分页读取] DELETE /scanner/{scannerId} 应用 HBaseRestClient HBase REST
public class HBaseRestClient : IHBaseClient, IAsyncDisposable
{
    // … 前面省略 …

    public async IAsyncEnumerable<HBaseRow> ScanAsync(HBaseQuery q)
    {
        var scannerId = await CreateScannerAsync(q);
        try
        {
            while (true)
            {
                var chunk = await ReadScannerAsync(scannerId);
                if (!chunk.Any()) yield break;
                foreach (var row in chunk) yield return row;
            }
        }
        finally
        {
            await DeleteScannerAsync(scannerId);
        }
    }

    // Create/Delete/ReadScannerAsync 等实现省略
}

行键 & 列簇设计优化 🔑

# HBase Shell:开启 Snappy、版本与 TTL
create 'device_logs',
  { NAME=>'cf_base', VERSIONS=>3 },
  { NAME=>'cf_ext',  VERSIONS=>1, COMPRESSION=>'SNAPPY', TTL=>86400 }
列簇 字段 特性
cf_base id, ts, level 高频访问字段
cf_ext json_data, attributes 稀疏字段、压缩候选

多租户隔离 🏷️

RowKey = $"{tenantId}-{slot:D2}-{ReverseTimestamp(ts)}-{deviceId}"

Namespace 管理 📂

create_namespace 'tenant_123'
alter 'tenant_123:device_logs', {}

日志写入 API 示例 📝

[HttpPost("upload")]
public async Task<IActionResult> UploadLog([FromBody] DeviceLogInput input)
{
    if (!ModelState.IsValid)
        return BadRequest(ModelState);

    string rowKey = RowKeyHelper.MakeRowKey(input.DeviceId, input.Timestamp);
    var entity = new DeviceLogEntity {
        RowKey    = rowKey,
        DeviceId  = input.DeviceId,
        Timestamp = input.Timestamp,
        Level     = input.Level,
        JsonData  = input.JsonData
    };

    try
    {
        await _repo.InsertAsync(rowKey, entity);
        return Ok(new { rowKey });
    }
    catch (HBaseException ex)
    {
        _logger.LogError(ex, "HBase 写入失败:{RowKey}", rowKey);
        return StatusCode(500, "数据写入失败");
    }
}

批量写入与背压控制 🚀

public async Task BulkInsertAsync(IEnumerable<DeviceLogEntity> batch)
{
    var mutations = batch.Select(e => new {
        Put = new {
            Row = Convert.ToBase64String(Encoding.UTF8.GetBytes(e.RowKey)),
            Cells = new[] {
              // 构建 Cells...
            }
        }
    });
    await _client.PostAsJsonAsync($"table/{_table}/mutations", new { mutations });
}

var channel = Channel.CreateBounded<DeviceLogEntity>(1000);
_ = Task.Run(async () => {
    await foreach (var item in channel.Reader.ReadAllAsync())
        await BulkInsertAsync(new[]{ item });
});

public Task EnqueueAsync(DeviceLogEntity log) =>
    channel.Writer.WriteAsync(log).AsTask();

部署与运维 🐳📈

Docker Compose

version: '3.8'

services:
  zookeeper:
    image: zookeeper:3.7
    container_name: zk
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zk:2888:3888
    ports:
      - "2181:2181"
    volumes:
      - zk-data:/data

  hbase:
    image: harisekhon/hbase:2.4.9
    container_name: hbase
    depends_on:
      - zookeeper
    environment:
      HBASE_MANAGES_ZK: "false"
      HBASE_ZOOKEEPER_QUORUM: zk
      HBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT: 2181
    ports:
      - "16010:16010"   # HBase Master UI
      - "9090:9090"     # Stargate REST
    volumes:
      - hbase-data:/data/hbase
      - hbase-wal:/data/wal

  knox:
    image: apache/knox:1.6.0
    container_name: knox
    depends_on:
      - hbase
    ports:
      - "8443:8443"     # Knox Gateway HTTPS
    volumes:
      - ./knox/conf:/etc/knox/conf:ro

volumes:
  zk-data:    # 持久化 ZooKeeper 数据(对应容器内 /data),保留选举状态与配置信息
  hbase-data: # 持久化 HBase RegionServer 存储数据(对应容器内 /data/hbase)
  hbase-wal:  # 持久化 HBase 写前日志(WAL)(对应容器内 /data/wal)

Kubernetes & Secrets 🔒

apiVersion: v1
kind: Secret
metadata: { name: hbase-secret }
stringData:
  HBASE__BASEURL: "https://hbase-gateway.company.com"
  HBASE__USERNAME: "api_user"
  HBASE__PASSWORD: "secure!"
---
apiVersion: apps/v1
kind: Deployment
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: web
        image: yourregistry/abp-hbase:latest
        envFrom: [{ secretRef: { name: hbase-secret }}]
        livenessProbe:
          httpGet: { path: /health/live, port: 80 }
        readinessProbe:
          httpGet: { path: /health/ready, port: 80 }

监控与告警 📊🚨

OpenTelemetry & Prometheus 指标

services.AddOpenTelemetryTracing(builder =>
{
    builder
      .AddAspNetCoreInstrumentation()
      .AddHttpClientInstrumentation()
      .AddSource("HBaseClient")
      .AddJaegerExporter()
      .AddPrometheusMetrics();  // 导出 /metrics
});
  • 指标示例:
    • hbase_request_duration_seconds
    • hbase_request_failures_total
    • hbase_scanner_active

Grafana 仪表盘示例

{
  "dashboard": {
    "title": "📊 HBase 服务指标",
    "panels": [
      {
        "type": "graph",
        "title": "请求时延 (95th)",
        "targets": [{ "expr": "histogram_quantile(0.95, sum(rate(hbase_request_duration_seconds_bucket[5m])) by (le))" }]
      },
      {
        "type": "stat",
        "title": "失败率",
        "targets": [{ "expr": "increase(hbase_request_failures_total[5m]) / increase(hbase_requests_total[5m])" }]
      }
    ]
  }
}

Prometheus AlertRule 示例

groups:
- name: hbase-alerts
  rules:
  - alert: HBaseHighErrorRate
    expr: increase(hbase_request_failures_total[5m]) / increase(hbase_requests_total[5m]) > 0.05
    for: 2m
    labels: { severity: critical }
    annotations: { summary: "🚨 HBase 错误率超过 5%" }

测试与 CI 集成 🧪

单元测试(MockHttp)

var mock = new MockHttpMessageHandler();
mock.When("*/table/device_logs/rowkey*")
    .Respond("application/json", "{ /* fake row */ }");
var client = mock.ToHttpClient();
var repo = new HBaseRestClient(client, /* options */);
// Assert GetAsync、InsertAsync 行为

集成测试 & GitHub Actions

name: CI Pipeline
on: [push, pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Setup .NET
        uses: actions/setup-dotnet@v2
        with: dotnet-version: '7.0.x'
      - name: Start HBase Container
        run: docker run -d --name hbase -p 9090:9090 harisekhon/hbase
      - name: Run Tests
        run: dotnet test --no-build
      - name: Teardown
        run: docker stop hbase && docker rm hbase

Phoenix SQL 层(可选)🐘

using var conn = new PhoenixConnection("Server=phoenix;...");
await conn.OpenAsync();
using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT * FROM device_logs WHERE ts > ?";
cmd.Parameters.Add(new PhoenixParameter("ts", someTimestamp));
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
    Console.WriteLine(reader["json_data"]);