当前位置: 首页 > news >正文

多模联邦查询网关:ABP + Trino/Presto 聚合跨源数据

多模联邦查询网关:ABP + Trino/Presto 聚合跨源数据 🚀


📚 目录

  • 多模联邦查询网关:ABP + Trino/Presto 聚合跨源数据 🚀
    • 0. TL;DR 🧾
    • 1. 场景与边界 🎯
    • 2. 总体架构 🧩
    • 3. 权限与最小可见(引擎层强制)🔐
      • 3.1 File-based Access Control
      • 3.2 OPA Access Control
    • 4. 路由与优化策略(DF + CBO)⚡
    • 5. ABP 网关实现(.NET,流式 & 可取消) 🧑‍💻
      • 5.1 客户端协议与重试要点
      • 5.2 流式返回(`IAsyncEnumerable<object[]>`)示例
      • 5.3 会话默认值托管(可选)
    • 6. Catalog 配置样例(S3/MinIO、JDBC、Kafka) 🗂️
      • 6.1 `etc/catalog/hive.properties`
      • 6.2 `etc/catalog/postgres.properties`
      • 6.3 `etc/catalog/kafka.properties`
    • 7. 高可用与容错:FTE + Exchange(必读) 🧯
    • 8. 资源隔离:Resource Groups(文件或 DB 后端) 🧱
    • 9. 部署与“最小可跑”(Compose 片段) 🐳
    • 10. 基准与用例 🧪
    • 11. 成本与时延对比 📈
    • 12. 运维与关停 🛠️
    • 13. 常见坑位 Checklist ✅


0. TL;DR 🧾

  • 引擎:Trino 做联邦查询(多 Catalog 单点聚合);网关:ABP 做多租户身份穿透、列掩码/行过滤策略注入、配额与审计、路由(Trino 或源端直连只读)。
  • 客户端协议:只用 POST /v1/statement 提交;解析响应体 nextUri 循环 GET 拉取;需要取消则 DELETE nextUri;请求头使用标准 X-Trino-*
  • 性能主轴动态过滤(默认开启) + CBO(Join 重排/分发自动选择,广播上限可控)。
  • 治理主轴Resource Groups 分队列限流;FTE+Exchange 抗节点故障;Worker 优雅下线 保障维护期稳定。

1. 场景与边界 🎯

  • 目标:统一查询入口,跨 PostgreSQL(维/主数据)、对象存储(ORC/Parquet on S3/MinIO,经 Hive/Iceberg 编目)、Kafka(事件流)与若干 OLAP 明细/汇总。
  • 非目标:不讨论“单库一体化”(如纯 ClickHouse/DuckDB);本文聚焦联邦网关治理

2. 总体架构 🧩

  • ABP 网关(Abp.FederatedQueryGateway

    • ICurrentTenant/ICurrentUser 获取上下文 → 映射到 X-Trino-UserX-Trino-SourceX-Trino-Client-TagsX-Trino-Session(如 join_reordering_strategy / join_distribution_type);必要时加 X-Trino-Routing-Group 对接 Trino Gateway。
    • 安全:把租户/角色映射为 File-based/OPA列掩码行过滤策略(引擎强制);应用层再做白名单/禁用昂贵模式。
    • 审计/配额:事件监听(HTTP/MySQL/Kafka/OpenLineage)做“查询层”审计,ABP 聚合看板与熔断。
  • Trino 联邦层

    • Coordinator + Workers,多 Catalog(Hive/Iceberg、Postgres、Kafka、JDBC…);Resource Groupssource/clientTags 限流与排队。
REST /api/fq/query
/v1/statement + X-Trino-*
Catalogs
策略注入
鉴权 / 掩码 / 行过滤
Client/BI
ABP FederatedQueryGateway
Trino Gateway(可选)
Trino Coordinator
Trino Workers
Hive / S3 / Iceberg
PostgreSQL
Kafka
JDBC / Other
Access Control
File-based / OPA

3. 权限与最小可见(引擎层强制)🔐

3.1 File-based Access Control

  • etc/access-control.propertiesaccess-control.name=file + JSON 规则(系统信息/对象级/列级规则),支持列掩码与行过滤;系统信息规则里可放行优雅下线所需写权限。

3.2 OPA Access Control

  • access-control.name=opa + opa.policy.* 端点;OPA 返回列掩码表达式与行过滤谓词并由 Trino 强制应用。适合把 ABP 角色/租户映射为 Rego 决策。

选择建议:小团队优先 File-based(维护简单);需要统一策略/审计/合规时再上 OPA/Ranger


4. 路由与优化策略(DF + CBO)⚡

  • 动态过滤 DF(默认开启)

    • 全局关闭:enable-dynamic-filtering=false;会话关闭:enable_dynamic_filtering=false

    • 等待收集(按连接器配置):

      • JDBC:dynamic-filtering.wait-timeout
      • Hive:hive.dynamic-filtering.wait-timeout
      • Iceberg:iceberg.dynamic-filtering.wait-timeout
    • 作用:在“维表(小)↔ 明细(大)”Join 中,将维表键值作为运行时谓词,下推到扫描/分区裁剪,显著减少 IO 与网络。

  • CBO & Join 分发/重排

    • 配置(config.properties

      • optimizer.join-reordering-strategy=AUTOMATIC(会话:join_reordering_strategy
      • join-distribution-type=AUTOMATIC(会话:join_distribution_type
      • 广播上限:join-max-broadcast-table-size(会话:join_max_broadcast_table_size,默认 100MB)
Yes
No
Start Query
Stats available?
Choose Join Order (CBO)
Small table?
BROADCAST small table
PARTITIONED join
DF applies at runtime
Push DF predicates to scans
Reduced IO & Network

5. ABP 网关实现(.NET,流式 & 可取消) 🧑‍💻

5.1 客户端协议与重试要点

  • 使用 POST /v1/statement 提交 SQL;解析响应 JSON 的 nextUri 循环 GET;需要取消就 DELETE nextUri
  • 429/503/504 做指数退避并尊重 Retry-AfterHttpCompletionOption.ResponseHeadersRead 降低大结果集内存占用;CancellationToken 全链路透传。

5.2 流式返回(IAsyncEnumerable<object[]>)示例

相比“把所有行装进 List 再返回”,边拉取边 yield 不仅省内存,还能更快给到首批结果。取消时会尝试 DELETE nextUri 释放服务端资源。

public class QueryAppService : ApplicationService
{private readonly IHttpClientFactory _http;private readonly ICurrentTenant _tenant;public QueryAppService(IHttpClientFactory http, ICurrentTenant tenant){ _http = http; _tenant = tenant; }[Authorize]public async IAsyncEnumerable<object[]> ExecuteStreamAsync(ExecuteQueryInput input,[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default){var cli = _http.CreateClient("trino");using var req = new HttpRequestMessage(HttpMethod.Post, "/v1/statement"){ Content = new StringContent(input.Sql, Encoding.UTF8, "text/plain") };// 身份穿透 + 策略标签(Resource Groups / 路由组)req.Headers.TryAddWithoutValidation("X-Trino-User",   CurrentUser.UserName ?? "abp");req.Headers.TryAddWithoutValidation("X-Trino-Source", "abp-fq-gateway");req.Headers.TryAddWithoutValidation("X-Trino-Client-Tags", $"tenant:{_tenant?.Id},app:abp");if (!string.IsNullOrEmpty(input.RoutingGroup))req.Headers.TryAddWithoutValidation("X-Trino-Routing-Group", input.RoutingGroup);// 会话属性(配置名与会话名区分:连字符 vs 下划线)req.Headers.TryAddWithoutValidation("X-Trino-Session","join_reordering_strategy=AUTOMATIC, join_distribution_type=AUTOMATIC");string? next = null;try{// 1st POSTusing (var resp = await cli.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct)){resp.EnsureSuccessStatusCode();var json = await resp.Content.ReadAsStringAsync(ct);var qr = JsonSerializer.Deserialize<QueryResults>(json, new() { PropertyNameCaseInsensitive = true })!;next = qr.nextUri;if (qr.data != null)foreach (var r in qr.data) yield return r.ToArray();}// Follow nextUriint attempt = 0;while (!ct.IsCancellationRequested && !string.IsNullOrEmpty(next)){using var resp = await cli.GetAsync(next, HttpCompletionOption.ResponseHeadersRead, ct);if ((int)resp.StatusCode is 429 or 503 or 504){var retry = resp.Headers.RetryAfter?.Delta?? TimeSpan.FromSeconds(Math.Min(60, Math.Pow(2, attempt++) + Random.Shared.NextDouble()));await Task.Delay(retry, ct);continue;}resp.EnsureSuccessStatusCode();var body = await resp.Content.ReadAsStringAsync(ct);var qr = JsonSerializer.Deserialize<QueryResults>(body, new() { PropertyNameCaseInsensitive = true })!;if (qr.data != null)foreach (var r in qr.data) yield return r.ToArray();next = qr.nextUri;attempt = 0; // reset on success}}finally{// 取消时尽量通知服务端释放资源if (ct.IsCancellationRequested && !string.IsNullOrEmpty(next)){try { await cli.DeleteAsync(next, ct); } catch { /* 忽略 */ }}}}private sealed record QueryResults(string? id, TrinoColumn[]? columns, object[][]? data, string? nextUri, TrinoError? error);private sealed record TrinoColumn(string name, string type);private sealed record TrinoError(string message);
}

可选:再提供一个 StreamToNdjsonAsync(HttpResponse response, ...),边拉取边写入 NDJSON/CSV,适合超大结果导出。

5.3 会话默认值托管(可选)

  • Session Property Manager 基于条件(source=abp-fq-gatewayclientTags=tenant:*)下发默认会话,减少前端可变性与误配。

6. Catalog 配置样例(S3/MinIO、JDBC、Kafka) 🗂️

对象存储:启用原生 S3(fs.native-s3.enabled=true)与 s3.* 前缀;Trino 设计支持 S3 兼容系统,但只对 AWS S3 与 MinIO 做过兼容测试,其他厂商需自测。

6.1 etc/catalog/hive.properties

connector.name=hive
hive.metastore.uri=thrift://hms:9083fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.aws-access-key=minio
s3.aws-secret-key=minio123
s3.path-style-access=true

6.2 etc/catalog/postgres.properties

connector.name=postgresql
connection-url=jdbc:postgresql://postgres:5432/appdb
connection-user=app
connection-password=app123

6.3 etc/catalog/kafka.properties

connector.name=kafka
kafka.nodes=kafka:9092# 文件式表描述(推荐):目录中的 *.json 会被自动发现并映射为表
kafka.table-description-supplier=FILE
kafka.table-description-dir=/etc/kafka/descriptions# 如需白名单方式可显式指定表名(可选)
# kafka.table-names=tpch.customer,tpch.orders

7. 高可用与容错:FTE + Exchange(必读) 🧯

  • 开启 FTE:retry-policy=QUERY | TASK若设为 TASK,必须配置 Exchange Manager(S3/HDFS/ABFS/GCS/兼容端)。
  • S3 Filesystem Exchange(示例):
exchange-manager.name=filesystem
exchange.base-directories=s3://trino-exchange
exchange.s3.endpoint=http://minio:9000
exchange.s3.region=us-east-1
exchange.s3.aws-access-key=minio
exchange.s3.aws-secret-key=minio123

8. 资源隔离:Resource Groups(文件或 DB 后端) 🧱

  • 启用(config.properties)

    resource-groups.configuration-manager=file
    resource-groups.config-file=etc/resource-groups.json
    
  • 最小 JSON(按 source/clientTags 做队列与权重):

{"rootGroups": [{ "name": "global", "softMemoryLimit": "80%", "hardConcurrencyLimit": 100, "schedulingPolicy": "weighted_fair","subGroups": [{ "name": "adhoc", "softMemoryLimit": "30%", "hardConcurrencyLimit": 20, "schedulingWeight": 1 },{ "name": "etl",   "softMemoryLimit": "50%", "hardConcurrencyLimit": 50, "schedulingWeight": 3 },{ "name": "rt",    "softMemoryLimit": "20%", "hardConcurrencyLimit": 30, "schedulingWeight": 2 }]}],"selectors": [{ "source": "abp-fq-gateway", "clientTags": ["tenant:.*"], "group": "global.adhoc" },{ "source": "airflow",        "clientTags": ["batch"],     "group": "global.etl"   },{ "clientTags": ["realtime"],                          "group": "global.rt"    }]
}
Selectors
source=abp-fq-gateway
clientTags=tenant:*
source=airflow
clientTags=batch
clientTags=realtime
Group: global.adhoc
Group: global.etl
Group: global.rt
Queue & Limits
Queue & Limits
Queue & Limits

多集群分流可用 Trino Gateway,在请求头携带 X-Trino-Routing-Group 指定路由组(未指定按默认组,如 adhoc)。


9. 部署与“最小可跑”(Compose 片段) 🐳

services:trino:image: trinodb/trino:latestports: ["8080:8080"]volumes:- ./trino/etc:/etc/trino- ./kafka/descriptions:/etc/kafka/descriptionsdepends_on: [hms, minio, postgres]hms:image: apache/hive:4.0.0command: ["bash","-c","/opt/hive/bin/hive --service metastore"]ports: ["9083:9083"]minio:image: minio/miniocommand: server /data --console-address ":9001"environment:- MINIO_ROOT_USER=minio- MINIO_ROOT_PASSWORD=minio123ports: ["9000:9000","9001:9001"]postgres:image: postgres:15environment:- POSTGRES_PASSWORD=app123- POSTGRES_USER=app- POSTGRES_DB=appdbports: ["5432:5432"]

10. 基准与用例 🧪

  • Join(维↔明细):Postgres × Hive(Iceberg) TopN/计数/去重窗口;观察 DF 命中与 Join 分发。
  • REST 压测(nextUri 协议):首包后循环 GET,带 X-Trino-Client-Tags 与默认会话;统计 statementStats
ABP GatewayTrino CoordinatorPOST /v1/statement\nX-Trino-User/Source/Session\n(SQL)200 OK\n{ "nextUri": ".../1" }GET nextUri200 OK\n{ "data": [...], "nextUri": ".../2" }loop[拉取分页结果]DELETE nextUri204 No Contentopt[用户取消]ABP GatewayTrino Coordinator

11. 成本与时延对比 📈

  • A:Trino 联邦(DF 默认开 + CBO 自动分发/重排,必要时 enable_large_dynamic_filters)。
  • B:源侧拆分查询 + ABP 聚合(网络/扫描对照)。
  • C:明细沉入专用 OLAP(对照)。
  • 输出 雷达图/表格:选择度 × 并发 × 维表大小 → p50/p95/p99、扫描字节/行、remote reads。

12. 运维与关停 🛠️

  • OpenMetrics:Prometheus 抓 /metrics(如 trino_execution_*trino_memory_*)。

  • OpenTelemetrytracing.enabled=true + 导出端点,串起查询链路。

  • 优雅下线(仅 Worker)

    curl -v -X PUT -d '"SHUTTING_DOWN"' -H "Content-type: application/json" http://worker:8081/v1/info/state
    

    提示:若启用 File-based System 信息规则,需放行该写操作。


13. 常见坑位 Checklist ✅

  1. 不要轮询 /v1/query;严格用 /v1/statement + nextUri;对 429/503/504 退避并尊重 Retry-After;取消时 DELETE nextUri
  2. DF 默认开启;按连接器设置等待超时键位(JDBC/Hive/Iceberg)。
  3. CBO 属性名:配置(optimizer.join-reordering-strategyjoin-distribution-typejoin-max-broadcast-table-size) vs 会话(join_reordering_strategyjoin_distribution_typejoin_max_broadcast_table_size)。
  4. FTEretry-policy=TASK必须配置 Exchange;S3/MinIO 走 filesystem exchange。
  5. Resource Groups:启用键为 resource-groups.configuration-managerresource-groups.config-file,selectors 用 source/clientTags
  6. S3 原生fs.native-s3.enabled=true + s3.*;仅 AWS S3 / MinIO 做过兼容测试(其他需自测)。
  7. Kafka 文件式描述:启用 kafka.table-description-supplier=FILE 后,table-description-dir 下的 *.json 会自动发现为表(可不填 kafka.table-names)。
  8. Gateway 分流:多集群可带 X-Trino-Routing-Group;未显式指定按 Gateway 默认组(常为 adhoc)。

文章转载自:

http://YjeIDc29.dbdmr.cn
http://YEjWmbTF.dbdmr.cn
http://MFtgKCrk.dbdmr.cn
http://DJ6BnZS6.dbdmr.cn
http://ORExsF9W.dbdmr.cn
http://LGQbCGXL.dbdmr.cn
http://cfamgLg6.dbdmr.cn
http://ehhfzStp.dbdmr.cn
http://jEMBrnYA.dbdmr.cn
http://SSYkbqRK.dbdmr.cn
http://X52N7pT1.dbdmr.cn
http://PGgDYUd2.dbdmr.cn
http://tcWUCoVm.dbdmr.cn
http://UaIpLgNC.dbdmr.cn
http://TEwwnFkl.dbdmr.cn
http://omBegeJz.dbdmr.cn
http://UV8LXMxN.dbdmr.cn
http://o03gLVgu.dbdmr.cn
http://G5SD7m5c.dbdmr.cn
http://92lub5S9.dbdmr.cn
http://YzJkSilB.dbdmr.cn
http://PdXD5PVL.dbdmr.cn
http://fSuBtrIb.dbdmr.cn
http://UDkyBO3I.dbdmr.cn
http://RR24n13h.dbdmr.cn
http://gyZ3gVYQ.dbdmr.cn
http://e9s7C2wd.dbdmr.cn
http://ARdebLS5.dbdmr.cn
http://yW1bmApn.dbdmr.cn
http://4Yvcpn8T.dbdmr.cn
http://www.dtcms.com/a/368278.html

相关文章:

  • 基于单片机智能家居环境检测系统/室内环境检测设计
  • 23种设计模式-模板方法模式
  • 容器学习day05_k8s(二)
  • ES04-批量写入
  • 大数据毕业设计推荐:基于Spark的零售时尚精品店销售数据分析系统【Hadoop+python+spark】
  • 企业数字安全双保险:终端安全与数据防泄漏如何构筑全方位防护体系
  • 信息系统安全保护措施文件方案
  • 【C++】 list 容器模拟实现解析
  • 鹿客发布旗舰新品AI智能锁V6 Max,打造AI家庭安全领域新标杆
  • 【GEOS-Chem 输入数据】使用 AWS CLI 访问 GEOS-Chem 数据
  • 23种设计模式——原型模式 (Prototype Pattern)详解
  • 《Cocos Creator的2D、3D渲染使用记录》
  • Conda 使用py环境隔离
  • 数据结构:栈和队列力扣算法题
  • 深度学习之第八课迁移学习(残差网络ResNet)
  • 数据一致性、AI样本可追溯性与数据治理
  • 基于MATLAB的CNN大气散射传播率计算与图像去雾实现
  • 【Redis】初识 Redis 与基础数据结构
  • 分布式常见面试题整理
  • “卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
  • 数字时代的 “安全刚需”:为什么销售管理企业都在做手机号码脱敏
  • 乐观并发: TCP 与编程实践
  • 两条平面直线之间通过三次多项式曲线进行过渡的方法介绍
  • if __name__=‘__main__‘的用处
  • MySQL知识回顾总结----数据类型
  • WeaveFox AI智能开发平台介绍
  • Oracle:select top 5
  • sub3G、sub6G和LB、MB、HB、MHB、LMHB、UHB之间的区别和联系
  • Tenda AC20路由器缓冲区溢出漏洞分析
  • 52核心52线程,Intel下一代CPU憋了个大的