多模联邦查询网关:ABP + Trino/Presto 聚合跨源数据
多模联邦查询网关:ABP + Trino/Presto 聚合跨源数据 🚀
📚 目录
- 多模联邦查询网关:ABP + Trino/Presto 聚合跨源数据 🚀
- 0. TL;DR 🧾
- 1. 场景与边界 🎯
- 2. 总体架构 🧩
- 3. 权限与最小可见(引擎层强制)🔐
- 3.1 File-based Access Control
- 3.2 OPA Access Control
- 4. 路由与优化策略(DF + CBO)⚡
- 5. ABP 网关实现(.NET,流式 & 可取消) 🧑💻
- 5.1 客户端协议与重试要点
- 5.2 流式返回(`IAsyncEnumerable<object[]>`)示例
- 5.3 会话默认值托管(可选)
- 6. Catalog 配置样例(S3/MinIO、JDBC、Kafka) 🗂️
- 6.1 `etc/catalog/hive.properties`
- 6.2 `etc/catalog/postgres.properties`
- 6.3 `etc/catalog/kafka.properties`
- 7. 高可用与容错:FTE + Exchange(必读) 🧯
- 8. 资源隔离:Resource Groups(文件或 DB 后端) 🧱
- 9. 部署与“最小可跑”(Compose 片段) 🐳
- 10. 基准与用例 🧪
- 11. 成本与时延对比 📈
- 12. 运维与关停 🛠️
- 13. 常见坑位 Checklist ✅
0. TL;DR 🧾
- 引擎:Trino 做联邦查询(多 Catalog 单点聚合);网关:ABP 做多租户身份穿透、列掩码/行过滤策略注入、配额与审计、路由(Trino 或源端直连只读)。
- 客户端协议:只用
POST /v1/statement
提交;解析响应体nextUri
循环 GET 拉取;需要取消则 DELETE nextUri;请求头使用标准X-Trino-*
。 - 性能主轴:动态过滤(默认开启) + CBO(Join 重排/分发自动选择,广播上限可控)。
- 治理主轴:Resource Groups 分队列限流;FTE+Exchange 抗节点故障;Worker 优雅下线 保障维护期稳定。
1. 场景与边界 🎯
- 目标:统一查询入口,跨 PostgreSQL(维/主数据)、对象存储(ORC/Parquet on S3/MinIO,经 Hive/Iceberg 编目)、Kafka(事件流)与若干 OLAP 明细/汇总。
- 非目标:不讨论“单库一体化”(如纯 ClickHouse/DuckDB);本文聚焦联邦与网关治理。
2. 总体架构 🧩
-
ABP 网关(
Abp.FederatedQueryGateway
)- 从
ICurrentTenant/ICurrentUser
获取上下文 → 映射到X-Trino-User
、X-Trino-Source
、X-Trino-Client-Tags
、X-Trino-Session
(如join_reordering_strategy
/join_distribution_type
);必要时加X-Trino-Routing-Group
对接 Trino Gateway。 - 安全:把租户/角色映射为 File-based/OPA 的列掩码与行过滤策略(引擎强制);应用层再做白名单/禁用昂贵模式。
- 审计/配额:事件监听(HTTP/MySQL/Kafka/OpenLineage)做“查询层”审计,ABP 聚合看板与熔断。
- 从
-
Trino 联邦层
- Coordinator + Workers,多 Catalog(Hive/Iceberg、Postgres、Kafka、JDBC…);Resource Groups 以
source
/clientTags
限流与排队。
- Coordinator + Workers,多 Catalog(Hive/Iceberg、Postgres、Kafka、JDBC…);Resource Groups 以
3. 权限与最小可见(引擎层强制)🔐
3.1 File-based Access Control
etc/access-control.properties
:access-control.name=file
+ JSON 规则(系统信息/对象级/列级规则),支持列掩码与行过滤;系统信息规则里可放行优雅下线所需写权限。
3.2 OPA Access Control
access-control.name=opa
+opa.policy.*
端点;OPA 返回列掩码表达式与行过滤谓词并由 Trino 强制应用。适合把 ABP 角色/租户映射为 Rego 决策。
选择建议:小团队优先 File-based(维护简单);需要统一策略/审计/合规时再上 OPA/Ranger。
4. 路由与优化策略(DF + CBO)⚡
-
动态过滤 DF(默认开启)
-
全局关闭:
enable-dynamic-filtering=false
;会话关闭:enable_dynamic_filtering=false
。 -
等待收集(按连接器配置):
- JDBC:
dynamic-filtering.wait-timeout
- Hive:
hive.dynamic-filtering.wait-timeout
- Iceberg:
iceberg.dynamic-filtering.wait-timeout
- JDBC:
-
作用:在“维表(小)↔ 明细(大)”Join 中,将维表键值作为运行时谓词,下推到扫描/分区裁剪,显著减少 IO 与网络。
-
-
CBO & Join 分发/重排
-
配置(
config.properties
):optimizer.join-reordering-strategy=AUTOMATIC
(会话:join_reordering_strategy
)join-distribution-type=AUTOMATIC
(会话:join_distribution_type
)- 广播上限:
join-max-broadcast-table-size
(会话:join_max_broadcast_table_size
,默认 100MB)
-
5. ABP 网关实现(.NET,流式 & 可取消) 🧑💻
5.1 客户端协议与重试要点
- 只使用
POST /v1/statement
提交 SQL;解析响应 JSON 的nextUri
循环 GET;需要取消就 DELETEnextUri
。 - 对
429/503/504
做指数退避并尊重Retry-After
;HttpCompletionOption.ResponseHeadersRead
降低大结果集内存占用;CancellationToken
全链路透传。
5.2 流式返回(IAsyncEnumerable<object[]>
)示例
相比“把所有行装进 List 再返回”,边拉取边
yield
不仅省内存,还能更快给到首批结果。取消时会尝试 DELETEnextUri
释放服务端资源。
public class QueryAppService : ApplicationService
{private readonly IHttpClientFactory _http;private readonly ICurrentTenant _tenant;public QueryAppService(IHttpClientFactory http, ICurrentTenant tenant){ _http = http; _tenant = tenant; }[Authorize]public async IAsyncEnumerable<object[]> ExecuteStreamAsync(ExecuteQueryInput input,[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default){var cli = _http.CreateClient("trino");using var req = new HttpRequestMessage(HttpMethod.Post, "/v1/statement"){ Content = new StringContent(input.Sql, Encoding.UTF8, "text/plain") };// 身份穿透 + 策略标签(Resource Groups / 路由组)req.Headers.TryAddWithoutValidation("X-Trino-User", CurrentUser.UserName ?? "abp");req.Headers.TryAddWithoutValidation("X-Trino-Source", "abp-fq-gateway");req.Headers.TryAddWithoutValidation("X-Trino-Client-Tags", $"tenant:{_tenant?.Id},app:abp");if (!string.IsNullOrEmpty(input.RoutingGroup))req.Headers.TryAddWithoutValidation("X-Trino-Routing-Group", input.RoutingGroup);// 会话属性(配置名与会话名区分:连字符 vs 下划线)req.Headers.TryAddWithoutValidation("X-Trino-Session","join_reordering_strategy=AUTOMATIC, join_distribution_type=AUTOMATIC");string? next = null;try{// 1st POSTusing (var resp = await cli.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct)){resp.EnsureSuccessStatusCode();var json = await resp.Content.ReadAsStringAsync(ct);var qr = JsonSerializer.Deserialize<QueryResults>(json, new() { PropertyNameCaseInsensitive = true })!;next = qr.nextUri;if (qr.data != null)foreach (var r in qr.data) yield return r.ToArray();}// Follow nextUriint attempt = 0;while (!ct.IsCancellationRequested && !string.IsNullOrEmpty(next)){using var resp = await cli.GetAsync(next, HttpCompletionOption.ResponseHeadersRead, ct);if ((int)resp.StatusCode is 429 or 503 or 504){var retry = resp.Headers.RetryAfter?.Delta?? TimeSpan.FromSeconds(Math.Min(60, Math.Pow(2, attempt++) + Random.Shared.NextDouble()));await Task.Delay(retry, ct);continue;}resp.EnsureSuccessStatusCode();var body = await resp.Content.ReadAsStringAsync(ct);var qr = JsonSerializer.Deserialize<QueryResults>(body, new() { PropertyNameCaseInsensitive = true })!;if (qr.data != null)foreach (var r in qr.data) yield return r.ToArray();next = qr.nextUri;attempt = 0; // reset on success}}finally{// 取消时尽量通知服务端释放资源if (ct.IsCancellationRequested && !string.IsNullOrEmpty(next)){try { await cli.DeleteAsync(next, ct); } catch { /* 忽略 */ }}}}private sealed record QueryResults(string? id, TrinoColumn[]? columns, object[][]? data, string? nextUri, TrinoError? error);private sealed record TrinoColumn(string name, string type);private sealed record TrinoError(string message);
}
可选:再提供一个
StreamToNdjsonAsync(HttpResponse response, ...)
,边拉取边写入 NDJSON/CSV,适合超大结果导出。
5.3 会话默认值托管(可选)
- 用 Session Property Manager 基于条件(
source=abp-fq-gateway
、clientTags=tenant:*
)下发默认会话,减少前端可变性与误配。
6. Catalog 配置样例(S3/MinIO、JDBC、Kafka) 🗂️
对象存储:启用原生 S3(
fs.native-s3.enabled=true
)与s3.*
前缀;Trino 设计支持 S3 兼容系统,但只对 AWS S3 与 MinIO 做过兼容测试,其他厂商需自测。
6.1 etc/catalog/hive.properties
connector.name=hive
hive.metastore.uri=thrift://hms:9083fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.aws-access-key=minio
s3.aws-secret-key=minio123
s3.path-style-access=true
6.2 etc/catalog/postgres.properties
connector.name=postgresql
connection-url=jdbc:postgresql://postgres:5432/appdb
connection-user=app
connection-password=app123
6.3 etc/catalog/kafka.properties
connector.name=kafka
kafka.nodes=kafka:9092# 文件式表描述(推荐):目录中的 *.json 会被自动发现并映射为表
kafka.table-description-supplier=FILE
kafka.table-description-dir=/etc/kafka/descriptions# 如需白名单方式可显式指定表名(可选)
# kafka.table-names=tpch.customer,tpch.orders
7. 高可用与容错:FTE + Exchange(必读) 🧯
- 开启 FTE:
retry-policy=QUERY | TASK
。若设为TASK
,必须配置 Exchange Manager(S3/HDFS/ABFS/GCS/兼容端)。 - S3 Filesystem Exchange(示例):
exchange-manager.name=filesystem
exchange.base-directories=s3://trino-exchange
exchange.s3.endpoint=http://minio:9000
exchange.s3.region=us-east-1
exchange.s3.aws-access-key=minio
exchange.s3.aws-secret-key=minio123
8. 资源隔离:Resource Groups(文件或 DB 后端) 🧱
-
启用(config.properties):
resource-groups.configuration-manager=file resource-groups.config-file=etc/resource-groups.json
-
最小 JSON(按
source
/clientTags
做队列与权重):
{"rootGroups": [{ "name": "global", "softMemoryLimit": "80%", "hardConcurrencyLimit": 100, "schedulingPolicy": "weighted_fair","subGroups": [{ "name": "adhoc", "softMemoryLimit": "30%", "hardConcurrencyLimit": 20, "schedulingWeight": 1 },{ "name": "etl", "softMemoryLimit": "50%", "hardConcurrencyLimit": 50, "schedulingWeight": 3 },{ "name": "rt", "softMemoryLimit": "20%", "hardConcurrencyLimit": 30, "schedulingWeight": 2 }]}],"selectors": [{ "source": "abp-fq-gateway", "clientTags": ["tenant:.*"], "group": "global.adhoc" },{ "source": "airflow", "clientTags": ["batch"], "group": "global.etl" },{ "clientTags": ["realtime"], "group": "global.rt" }]
}
多集群分流可用 Trino Gateway,在请求头携带
X-Trino-Routing-Group
指定路由组(未指定按默认组,如adhoc
)。
9. 部署与“最小可跑”(Compose 片段) 🐳
services:trino:image: trinodb/trino:latestports: ["8080:8080"]volumes:- ./trino/etc:/etc/trino- ./kafka/descriptions:/etc/kafka/descriptionsdepends_on: [hms, minio, postgres]hms:image: apache/hive:4.0.0command: ["bash","-c","/opt/hive/bin/hive --service metastore"]ports: ["9083:9083"]minio:image: minio/miniocommand: server /data --console-address ":9001"environment:- MINIO_ROOT_USER=minio- MINIO_ROOT_PASSWORD=minio123ports: ["9000:9000","9001:9001"]postgres:image: postgres:15environment:- POSTGRES_PASSWORD=app123- POSTGRES_USER=app- POSTGRES_DB=appdbports: ["5432:5432"]
10. 基准与用例 🧪
- Join(维↔明细):Postgres × Hive(Iceberg) TopN/计数/去重窗口;观察 DF 命中与 Join 分发。
- REST 压测(
nextUri
协议):首包后循环 GET,带X-Trino-Client-Tags
与默认会话;统计statementStats
。
11. 成本与时延对比 📈
- A:Trino 联邦(DF 默认开 + CBO 自动分发/重排,必要时
enable_large_dynamic_filters
)。 - B:源侧拆分查询 + ABP 聚合(网络/扫描对照)。
- C:明细沉入专用 OLAP(对照)。
- 输出 雷达图/表格:选择度 × 并发 × 维表大小 →
p50/p95/p99
、扫描字节/行、remote reads。
12. 运维与关停 🛠️
-
OpenMetrics:Prometheus 抓
/metrics
(如trino_execution_*
、trino_memory_*
)。 -
OpenTelemetry:
tracing.enabled=true
+ 导出端点,串起查询链路。 -
优雅下线(仅 Worker):
curl -v -X PUT -d '"SHUTTING_DOWN"' -H "Content-type: application/json" http://worker:8081/v1/info/state
提示:若启用 File-based System 信息规则,需放行该写操作。
13. 常见坑位 Checklist ✅
- 不要轮询
/v1/query
;严格用/v1/statement
+nextUri
;对 429/503/504 退避并尊重Retry-After
;取消时 DELETEnextUri
。 - DF 默认开启;按连接器设置等待超时键位(JDBC/Hive/Iceberg)。
- CBO 属性名:配置(
optimizer.join-reordering-strategy
、join-distribution-type
、join-max-broadcast-table-size
) vs 会话(join_reordering_strategy
、join_distribution_type
、join_max_broadcast_table_size
)。 - FTE:
retry-policy=TASK
⇒ 必须配置 Exchange;S3/MinIO 走 filesystem exchange。 - Resource Groups:启用键为
resource-groups.configuration-manager
与resource-groups.config-file
,selectors 用source
/clientTags
。 - S3 原生:
fs.native-s3.enabled=true
+s3.*
;仅 AWS S3 / MinIO 做过兼容测试(其他需自测)。 - Kafka 文件式描述:启用
kafka.table-description-supplier=FILE
后,table-description-dir
下的*.json
会自动发现为表(可不填kafka.table-names
)。 - Gateway 分流:多集群可带
X-Trino-Routing-Group
;未显式指定按 Gateway 默认组(常为adhoc
)。