当前位置: 首页 > news >正文

ABP vNext 与 HDFS 数据湖存储集成

ABP vNext 与 HDFS 数据湖存储集成 🚀


📚 目录

  • ABP vNext 与 HDFS 数据湖存储集成 🚀
    • 🧠 背景与目标
    • 🛠️ 依赖与安装
    • 🧱 系统架构设计
    • ⚙️ 核心实现
      • 1️⃣ 配置 `appsettings.json`
      • 2️⃣ 自定义 `HdfsBlobProvider`
      • 3️⃣ 注册 `HdfsBlobProvider`
      • 4️⃣ 应用服务示例
    • 🔐 HDFS HA & 安全配置
    • 💾 分片上传与合并
    • 🐶 单元测试示例
    • 📦 Docker Compose 快速部署
    • 🔍 监控与运维


🧠 背景与目标

随着企业多源数据(图像、日志、文档)激增,构建一个具备海量存储能力、统一管理视图与数据湖分析的文件平台已成必需。

基于 ABP vNext BlobStoringHDFS HA,打造一个可视化、可控、可拓展的现代数据湖文件平台。


🛠️ 依赖与安装

# ABP Blob 存储框架
dotnet add package Volo.Abp.BlobStoring
dotnet add package Volo.Abp.BlobStoring.UI# WebHDFS 客户端
dotnet add package WebHdfs.NET# 重试与断路器
dotnet add package Polly# Shell 调用
dotnet add package CliWrap# 可选:应用监控
dotnet add package Microsoft.ApplicationInsights.AspNetCore --version 2.21.0

🧱 系统架构设计

HDFS 集群
ABP 层
WebHDFS API 🌐
ZooKeeper
NameNode nn1
NameNode nn2
DataNodes
ABP UI/Swagger
IBlobContainer
BlobContainerFactory
HdfsBlobProvider (Singleton)
WebHdfsClient
  • IBlobContainer:ABP 中访问存储容器的统一接口
  • HdfsBlobProvider:继承 BlobProviderBase,支持重试、日志、监控
  • HDFS HA:通过 ZooKeeper 主备切换;支持 HTTPS/TLS 🔐 和 Kerberos 安全认证🛡️

⚙️ 核心实现

1️⃣ 配置 appsettings.json

{"Hdfs": {"NameNodeUri": "https://mycluster:50070",   // 支持 HTTP/HTTPS 🌐"User": "hdfs","UseKerberos": true,                        // 是否开启 Kerberos 🔒"KeytabPath": "/etc/security/keytabs/hdfs.headless.keytab"}
}
// HdfsOptions 定义
public class HdfsOptions
{public string NameNodeUri { get; set; } = default!;public string User { get; set; } = default!;public bool UseKerberos { get; set; }public string KeytabPath { get; set; } = default!;
}// 注册配置
context.Services.Configure<HdfsOptions>(context.Configuration.GetSection("Hdfs"));

2️⃣ 自定义 HdfsBlobProvider

using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Microsoft.ApplicationInsights;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Volo.Abp.BlobStoring;
using Volo.Abp.DependencyInjection;
using WebHdfs.Core;public class HdfsBlobProvider : BlobProviderBase, ISingletonDependency
{private readonly WebHdfsClient _client;private readonly ILogger<HdfsBlobProvider> _logger;private readonly TelemetryClient _telemetry;private readonly AsyncPolicy _retryPolicy;public HdfsBlobProvider(IOptions<HdfsOptions> options,ILogger<HdfsBlobProvider> logger,TelemetryClient telemetry){var opts = options.Value;// Kerberos 初始化(容器已挂载 Keytab)if (opts.UseKerberos){Process.Start("kinit", $"-kt {opts.KeytabPath} {opts.User}");}_client = new WebHdfsClient(new Uri(opts.NameNodeUri), opts.User);_logger = logger;_telemetry = telemetry;// 3 次指数退避重试 🔄_retryPolicy = Policy.Handle<IOException>().WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(Math.Pow(2, i)));}public override async Task SaveAsync(BlobProviderSaveArgs args){var sw = Stopwatch.StartNew();try{using var buffered = new BufferedStream(args.Stream, 4 * 1024 * 1024);await _retryPolicy.ExecuteAsync(() =>_client.CreateFileAsync(args.BlobName, buffered, overwrite: true));_telemetry.TrackMetric("HDFS_Save_Duration", sw.ElapsedMilliseconds);_logger.LogInformation("✔️ 文件 {Name} 保存成功", args.BlobName);}catch (Exception ex){_logger.LogError(ex, "❌ 保存至 HDFS 失败:{Name}", args.BlobName);throw;}}public override async Task<Stream?> GetOrNullAsync(BlobProviderGetArgs args){var sw = Stopwatch.StartNew();try{var stream = await _retryPolicy.ExecuteAsync(() =>_client.OpenReadAsync(args.BlobName));_telemetry.TrackMetric("HDFS_Get_Duration", sw.ElapsedMilliseconds);_logger.LogInformation("📥 文件 {Name} 获取成功", args.BlobName);return stream;}catch (FileNotFoundException){_logger.LogWarning("⚠️ 未找到文件:{Name}", args.BlobName);return null;}}public override async Task<bool> DeleteAsync(BlobProviderDeleteArgs args){try{var result = await _retryPolicy.ExecuteAsync(() =>_client.DeleteAsync(args.BlobName));_logger.LogInformation("🗑️ 文件 {Name} 删除{Status}", args.BlobName, result);return result;}catch (Exception ex){_logger.LogError(ex, "❌ 删除 HDFS 文件失败:{Name}", args.BlobName);return false;}}
}

3️⃣ 注册 HdfsBlobProvider

Configure<AbpBlobStoringOptions>(options =>
{options.Containers.ConfigureDefault(container =>{container.ProviderType = typeof(HdfsBlobProvider);});
});

4️⃣ 应用服务示例

using Microsoft.AspNetCore.Mvc;
using Volo.Abp.BlobStoring;
using Volo.Abp.Application.Services;
using Volo.Abp.Exceptions;public class FileAppService : ApplicationService
{private readonly IBlobContainer _blobContainer;public FileAppService(IBlobContainer blobContainer){_blobContainer = blobContainer;}public async Task UploadAsync(string name, Stream content){await _blobContainer.SaveAsync(name, content);Logger.LogInformation("✅ 上传 {Name} 完成", name);}public async Task<Stream> DownloadAsync(string name){var stream = await _blobContainer.GetOrNullAsync(name)?? throw new UserFriendlyException("文件不存在");Logger.LogInformation("📤 下载 {Name} 完成", name);return stream;}
}

🔐 HDFS HA & 安全配置

<configuration><property><name>fs.defaultFS</name><value>hdfs://mycluster</value></property><property><name>dfs.nameservices</name><value>mycluster</value></property><property><name>dfs.ha.namenodes.mycluster</name><value>nn1,nn2</value></property><property><name>dfs.namenode.rpc-address.mycluster.nn1</name><value>node1:8020</value></property><property><name>dfs.namenode.rpc-address.mycluster.nn2</name><value>node2:8020</value></property><property><name>dfs.client.failover.proxy.provider.mycluster</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property><property><name>dfs.replication</name><value>3</value></property><!-- Kerberos --><property><name>hadoop.security.authentication</name><value>kerberos</value></property>
</configuration>
  • 部署要求:3×JournalNode + 3×ZooKeeper + Kerberos KDC 🛡️
  • Keytab 挂载:容器 /etc/security/keytabs,设置 chmod 400
  • HTTPS/TLS:配置 HttpClientHandler.ServerCertificateCustomValidationCallback 忽略或校验证书 🔐

💾 分片上传与合并

前端 FileController HdfsBlobProvider POST /upload-part (file, guid, index) SaveAsync("/temp/guid/part-index") for each part POST /merge-parts (guid, fileName) ListStatus("/temp/guid") CreateFileAsync("/final/fileName") OpenReadAsync(part) AppendFileAsync(final) loop [合并所有分片] 200 OK 前端 FileController HdfsBlobProvider

使用 HDFS Append 保持二进制完整,避免文本命令限制。🔗


🐶 单元测试示例

using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Moq;
using Polly;
using Volo.Abp.Testing;
using Xunit;
using WebHdfs.Core;public class HdfsBlobProvider_Tests : AbpIntegratedTestBase
{[Fact]public async Task SaveAsync_RetriesOnIOException(){var clientMock = new Mock<WebHdfsClient>(MockBehavior.Strict,new Uri("http://x"), "u");clientMock.SetupSequence(c => c.CreateFileAsync(It.IsAny<string>(), It.IsAny<Stream>(), true)).ThrowsAsync(new IOException()).ThrowsAsync(new IOException()).Returns(Task.CompletedTask);var options = Options.Create(new HdfsOptions { NameNodeUri = "http://x", User = "u" });var telemetry = new TelemetryClient();var provider = new HdfsBlobProvider(options,NullLogger<HdfsBlobProvider>.Instance,telemetry);await provider.SaveAsync(new BlobProviderSaveArgs("test", new MemoryStream()));clientMock.Verify(c => c.CreateFileAsync("test",It.IsAny<Stream>(), true), Times.Exactly(3));}
}

📦 Docker Compose 快速部署

version: '3'
services:zk:image: zookeeper:3.6ports: ["2181:2181"]journalnode:image: bde2020/hadoop-journalnode:2.0.0-hadoop3.2.1-java8depends_on: [zk]namenode:image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8depends_on: [zk, journalnode]environment:- CLUSTER_NAME=myclusterdatanode:image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8depends_on: [namenode]

🔍 监控与运维

TrackMetric
Log Information
Blob 操作
Application Insights
Log Store
Grafana/Power BI
  • Prometheus/AI 埋点:使用 TelemetryClient 或 ABP ICounter 记录操作耗时
  • 日志链路:加入 CorrelationIdBlobNameNodeAddress 等上下文信息
  • 健康检查:配置 ABP HealthChecks,监测 HDFS 端点 ✅

相关文章:

  • DiscuzX3.5发帖json api
  • QEMU源码全解析 —— 块设备虚拟化(24)
  • eBPF系列--BCC中提供的BPF maps高级抽象如何映射到内核的BPF maps?
  • 第23讲、Odoo18 二开常见陷阱
  • 如何思考?思维篇
  • 数学:”度量空间”了解一下?
  • STM标准库-TIM旋转编码器
  • Spark流水线+Gravitino+Marquez数据血缘采集
  • 1 Studying《蓝牙核心规范5.3》
  • MyBatis原理剖析(二)
  • DeepSeek10-RAG相关模型知识说明
  • 编程实验篇--线性探测哈希表
  • 5.子网划分及分片相关计算
  • Apache Spark详解
  • 三十五、面向对象底层逻辑-Spring MVC中AbstractXlsxStreamingView的设计
  • Java求职者面试:微服务技术与源码原理深度解析
  • Spring Cloud Alibaba Seata安装+微服务实战
  • SpringCloud——微服务
  • 微服务体系下将环境流量路由到开发本机
  • (五)Linux性能优化-CPU-性能优化
  • 网站开发说明书/建设网站费用
  • 临清建网站/营销和运营的区别是什么
  • 网站无法下载视频 怎么做/全国疫情最新消息今天新增
  • 网站服务器租用哪家好/2023年9月疫情又开始了吗
  • ABc做的网站被关了说没有备案/制作网页的软件有哪些
  • 一款可做引流的网站源码/成都高端企业网站建设