ABP vNext 与 HDFS 数据湖存储集成

发布于:2025-06-09 ⋅ 阅读:(20) ⋅ 点赞:(0)

ABP vNext 与 HDFS 数据湖存储集成 🚀



🧠 背景与目标

随着企业多源数据(图像、日志、文档)激增,构建一个具备海量存储能力、统一管理视图与数据湖分析的文件平台已成必需。

基于 ABP vNext BlobStoringHDFS HA,打造一个可视化、可控、可拓展的现代数据湖文件平台。


🛠️ 依赖与安装

# ABP Blob 存储框架
dotnet add package Volo.Abp.BlobStoring
dotnet add package Volo.Abp.BlobStoring.UI

# WebHDFS 客户端
dotnet add package WebHdfs.NET

# 重试与断路器
dotnet add package Polly

# Shell 调用
dotnet add package CliWrap

# 可选:应用监控
dotnet add package Microsoft.ApplicationInsights.AspNetCore --version 2.21.0

🧱 系统架构设计

HDFS 集群
ABP 层
WebHDFS API 🌐
ZooKeeper
NameNode nn1
NameNode nn2
DataNodes
ABP UI/Swagger
IBlobContainer
BlobContainerFactory
HdfsBlobProvider (Singleton)
WebHdfsClient
  • IBlobContainer:ABP 中访问存储容器的统一接口
  • HdfsBlobProvider:继承 BlobProviderBase,支持重试、日志、监控
  • HDFS HA:通过 ZooKeeper 主备切换;支持 HTTPS/TLS 🔐 和 Kerberos 安全认证🛡️

⚙️ 核心实现

1️⃣ 配置 appsettings.json

{
  "Hdfs": {
    "NameNodeUri": "https://mycluster:50070",   // 支持 HTTP/HTTPS 🌐
    "User": "hdfs",
    "UseKerberos": true,                        // 是否开启 Kerberos 🔒
    "KeytabPath": "/etc/security/keytabs/hdfs.headless.keytab"
  }
}
// HdfsOptions 定义
public class HdfsOptions
{
    public string NameNodeUri { get; set; } = default!;
    public string User { get; set; } = default!;
    public bool UseKerberos { get; set; }
    public string KeytabPath { get; set; } = default!;
}

// 注册配置
context.Services.Configure<HdfsOptions>(
    context.Configuration.GetSection("Hdfs"));

2️⃣ 自定义 HdfsBlobProvider

using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Microsoft.ApplicationInsights;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Volo.Abp.BlobStoring;
using Volo.Abp.DependencyInjection;
using WebHdfs.Core;

public class HdfsBlobProvider : BlobProviderBase, ISingletonDependency
{
    private readonly WebHdfsClient _client;
    private readonly ILogger<HdfsBlobProvider> _logger;
    private readonly TelemetryClient _telemetry;
    private readonly AsyncPolicy _retryPolicy;

    public HdfsBlobProvider(
        IOptions<HdfsOptions> options,
        ILogger<HdfsBlobProvider> logger,
        TelemetryClient telemetry)
    {
        var opts = options.Value;

        // Kerberos 初始化(容器已挂载 Keytab)
        if (opts.UseKerberos)
        {
            Process.Start("kinit", $"-kt {opts.KeytabPath} {opts.User}");
        }

        _client = new WebHdfsClient(new Uri(opts.NameNodeUri), opts.User);
        _logger = logger;
        _telemetry = telemetry;

        // 3 次指数退避重试 🔄
        _retryPolicy = Policy
            .Handle<IOException>()
            .WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(Math.Pow(2, i)));
    }

    public override async Task SaveAsync(BlobProviderSaveArgs args)
    {
        var sw = Stopwatch.StartNew();
        try
        {
            using var buffered = new BufferedStream(args.Stream, 4 * 1024 * 1024);
            await _retryPolicy.ExecuteAsync(() =>
                _client.CreateFileAsync(args.BlobName, buffered, overwrite: true));

            _telemetry.TrackMetric("HDFS_Save_Duration", sw.ElapsedMilliseconds);
            _logger.LogInformation("✔️ 文件 {Name} 保存成功", args.BlobName);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "❌ 保存至 HDFS 失败:{Name}", args.BlobName);
            throw;
        }
    }

    public override async Task<Stream?> GetOrNullAsync(BlobProviderGetArgs args)
    {
        var sw = Stopwatch.StartNew();
        try
        {
            var stream = await _retryPolicy.ExecuteAsync(() =>
                _client.OpenReadAsync(args.BlobName));

            _telemetry.TrackMetric("HDFS_Get_Duration", sw.ElapsedMilliseconds);
            _logger.LogInformation("📥 文件 {Name} 获取成功", args.BlobName);
            return stream;
        }
        catch (FileNotFoundException)
        {
            _logger.LogWarning("⚠️ 未找到文件:{Name}", args.BlobName);
            return null;
        }
    }

    public override async Task<bool> DeleteAsync(BlobProviderDeleteArgs args)
    {
        try
        {
            var result = await _retryPolicy.ExecuteAsync(() =>
                _client.DeleteAsync(args.BlobName));
            _logger.LogInformation("🗑️ 文件 {Name} 删除{Status}", args.BlobName, result);
            return result;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "❌ 删除 HDFS 文件失败:{Name}", args.BlobName);
            return false;
        }
    }
}

3️⃣ 注册 HdfsBlobProvider

Configure<AbpBlobStoringOptions>(options =>
{
    options.Containers.ConfigureDefault(container =>
    {
        container.ProviderType = typeof(HdfsBlobProvider);
    });
});

4️⃣ 应用服务示例

using Microsoft.AspNetCore.Mvc;
using Volo.Abp.BlobStoring;
using Volo.Abp.Application.Services;
using Volo.Abp.Exceptions;

public class FileAppService : ApplicationService
{
    private readonly IBlobContainer _blobContainer;

    public FileAppService(IBlobContainer blobContainer)
    {
        _blobContainer = blobContainer;
    }

    public async Task UploadAsync(string name, Stream content)
    {
        await _blobContainer.SaveAsync(name, content);
        Logger.LogInformation("✅ 上传 {Name} 完成", name);
    }

    public async Task<Stream> DownloadAsync(string name)
    {
        var stream = await _blobContainer.GetOrNullAsync(name)
                     ?? throw new UserFriendlyException("文件不存在");
        Logger.LogInformation("📤 下载 {Name} 完成", name);
        return stream;
    }
}

🔐 HDFS HA & 安全配置

<configuration>
  <property><name>fs.defaultFS</name><value>hdfs://mycluster</value></property>
  <property><name>dfs.nameservices</name><value>mycluster</value></property>
  <property><name>dfs.ha.namenodes.mycluster</name><value>nn1,nn2</value></property>
  <property><name>dfs.namenode.rpc-address.mycluster.nn1</name><value>node1:8020</value></property>
  <property><name>dfs.namenode.rpc-address.mycluster.nn2</name><value>node2:8020</value></property>
  <property><name>dfs.client.failover.proxy.provider.mycluster</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property>
  <property><name>dfs.replication</name><value>3</value></property>
  <!-- Kerberos -->
  <property><name>hadoop.security.authentication</name><value>kerberos</value></property>
</configuration>
  • 部署要求:3×JournalNode + 3×ZooKeeper + Kerberos KDC 🛡️
  • Keytab 挂载:容器 /etc/security/keytabs,设置 chmod 400
  • HTTPS/TLS:配置 HttpClientHandler.ServerCertificateCustomValidationCallback 忽略或校验证书 🔐

💾 分片上传与合并

前端 FileController HdfsBlobProvider POST /upload-part (file, guid, index) SaveAsync("/temp/guid/part-index") for each part POST /merge-parts (guid, fileName) ListStatus("/temp/guid") CreateFileAsync("/final/fileName") OpenReadAsync(part) AppendFileAsync(final) loop [合并所有分片] 200 OK 前端 FileController HdfsBlobProvider

使用 HDFS Append 保持二进制完整,避免文本命令限制。🔗


🐶 单元测试示例

using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Moq;
using Polly;
using Volo.Abp.Testing;
using Xunit;
using WebHdfs.Core;

public class HdfsBlobProvider_Tests : AbpIntegratedTestBase
{
    [Fact]
    public async Task SaveAsync_RetriesOnIOException()
    {
        var clientMock = new Mock<WebHdfsClient>(MockBehavior.Strict,
            new Uri("http://x"), "u");

        clientMock.SetupSequence(c => c.CreateFileAsync(
            It.IsAny<string>(), It.IsAny<Stream>(), true))
          .ThrowsAsync(new IOException())
          .ThrowsAsync(new IOException())
          .Returns(Task.CompletedTask);

        var options = Options.Create(new HdfsOptions { NameNodeUri = "http://x", User = "u" });
        var telemetry = new TelemetryClient();
        var provider = new HdfsBlobProvider(options,
            NullLogger<HdfsBlobProvider>.Instance,
            telemetry);

        await provider.SaveAsync(new BlobProviderSaveArgs("test", new MemoryStream()));
        clientMock.Verify(c => c.CreateFileAsync("test",
            It.IsAny<Stream>(), true), Times.Exactly(3));
    }
}

📦 Docker Compose 快速部署

version: '3'
services:
  zk:
    image: zookeeper:3.6
    ports: ["2181:2181"]
  journalnode:
    image: bde2020/hadoop-journalnode:2.0.0-hadoop3.2.1-java8
    depends_on: [zk]
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
    depends_on: [zk, journalnode]
    environment:
      - CLUSTER_NAME=mycluster
  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
    depends_on: [namenode]

🔍 监控与运维

TrackMetric
Log Information
Blob 操作
Application Insights
Log Store
Grafana/Power BI
  • Prometheus/AI 埋点:使用 TelemetryClient 或 ABP ICounter 记录操作耗时
  • 日志链路:加入 CorrelationIdBlobNameNodeAddress 等上下文信息
  • 健康检查:配置 ABP HealthChecks,监测 HDFS 端点 ✅


网站公告

今日签到

点亮在社区的每一天
去签到