当前位置: 首页 > news >正文

【大数据技术实战】Flink+DS+Dinky 自动化构建数仓平台

一、背景:企业数仓建设的现状与挑战

        在数字化转型进入深水区的今天,数据已成为企业核心生产要素,而实时数仓作为 “数据驱动决策” 的关键载体,其建设水平直接决定企业在市场竞争中的响应速度与决策精度。根据 IDC《2024 年全球大数据市场报告》,超过 78% 的企业将 “实时数据处理能力” 列为数字化转型的核心目标,但在实际落地中,传统数仓架构却面临一系列难以突破的瓶颈,这些瓶颈集中体现在 “开发、运维、架构” 三个维度,严重制约数仓价值的释放。

1.1 开发效率低下:技术门槛高,迭代周期长

Apache Flink 作为当前实时计算领域的主流引擎,其强大的流处理能力与状态管理机制已成为实时数仓的核心支撑。但 Flink 原生开发模式存在显著的技术门槛,导致开发效率低下:

  • 语言与 API 复杂度:原生 Flink 开发需掌握 Java/Scala 语言,且需熟悉 DataStream API、Table API 等复杂接口。以 “Kafka 数据清洗后写入 MySQL” 为例,传统开发需编写至少 200 行代码(含连接器配置、序列化 / 反序列化、状态管理),开发周期长达 2-3 天,且需专人维护代码版本;
  • 版本兼容难题:Flink 各版本(如 1.17、1.18、1.19)间存在 API 差异(如 Kafka 连接器参数变化、状态后端优化),企业若需升级 Flink 版本,需批量修改历史作业代码,适配成本极高;
  • 调试体验差:原生 Flink 调试依赖本地集群或远程提交,日志分散在 TaskManager 节点,排查一个 “数据解析失败” 问题需逐一查看节点日志,平均耗时 1-2 小时;
  • SQL 支持有限:Flink SQL Client 仅提供命令行交互,无语法提示、格式化、血缘分析功能,非开发人员(如数据分析师)无法直接参与实时作业开发,需依赖开发团队协作,进一步拉长迭代周期。

        某电商企业案例显示,传统模式下,一个 “实时销量大屏” 需求从开发到上线需 10 天(含需求沟通、代码开发、调试、部署),而业务部门要求 “大促前 3 天紧急上线”,导致需求无法按时交付,错失业务决策窗口。

1.2 运维管理碎片化:全链路协同难,故障排查效率低

        实时数仓是 “数据采集→计算处理→存储落地→监控告警” 的全链路体系,但传统架构中,各环节工具脱节,形成 “数据孤岛” 与 “运维孤岛”,具体痛点如下:

1.2.1 任务管理与调度分离
  • 传统架构中,Flink 作业开发依赖 IDE(如 IntelliJ IDEA),调度依赖 Cron 脚本或简单调度工具(如 Azkaban),两者无联动:
    • 若上游数据源(如 Kafka)延迟,Cron 脚本仍会按时触发 Flink 作业,导致作业读取空数据,需手动重启;
    • 多个 Flink 作业存在依赖关系(如 “用户行为计算作业→用户画像更新作业”),需手动维护依赖顺序,易出现 “上游未完成,下游已执行” 的问题;
  • 某金融企业曾因 “风控数据计算作业未完成,下游风险预警作业已执行”,导致漏判高风险交易,产生百万级损失。
1.2.2 元数据无持久化管理
  • Flink 原生元数据(表结构、UDF、数据源配置)存储在内存中,会话结束或集群重启后,所有元数据丢失:
    • 运维人员需重新执行 DDL 语句创建表、注册 UDF,一个包含 10 张表的数仓,每次重启需耗时 1 小时;
    • 元数据无版本控制,若表结构变更(如新增字段),历史作业可能因 “字段不匹配” 失败,且无法追溯变更记录;
  • 某 IoT 企业因 Flink 集群重启,元数据丢失,导致 12 个设备监控作业中断,2 小时后才恢复,期间无法实时监测设备故障,造成生产停滞。
1.2.3 监控告警体系不完善
  • 传统架构中,Flink 作业监控依赖 Flink WebUI,调度任务监控依赖调度工具日志,元数据变更无监控,三者无统一入口:
    • 作业失败后,需分别查看 Flink 日志、调度工具日志、数据库日志,排查故障平均耗时 30 分钟;
    • 无 “全链路告警” 机制,若 Kafka 集群宕机,Flink 作业因 “无数据输入” 处于空闲状态,但无告警通知,运维人员无法及时发现;
  • 某零售企业曾因 Kafka 集群故障,Flink 实时库存计算作业停滞 4 小时,导致线上库存显示错误,用户下单后无法发货,引发大量投诉。

1.3 架构扩展性弱:批流分离,多场景适配难

        传统数仓多采用 “Lambda 架构”,即 “批处理层(Hive)+ 流处理层(Flink)+ 服务层(MySQL)”,但该架构存在先天缺陷,无法满足企业多样化数据需求:

1.3.1 批流数据口径不一致
  • 批处理与流处理采用两套计算逻辑:
    • 批处理层用 Hive SQL 计算 “每日订单总额”,流处理层用 Flink SQL 计算 “实时订单总额”;
    • 因计算逻辑差异(如时间窗口定义、空值处理规则),导致批流结果偏差(如批处理结果 100 万,流处理结果 98 万),业务部门无法确定 “哪组数据是正确的”;
  • 某电商企业大促后,批处理统计的 “大促总销售额” 为 5.2 亿,流处理实时统计为 4.9 亿,差异 3000 万,需投入 3 人天排查差异原因,影响财务报表生成。
1.3.2 多数据源适配成本高
  • 新增数据源(如 HBase、Elasticsearch、Pulsar)时,需开发自定义 Flink 连接器:
    • 开发一个 HBase 连接器需掌握 HBase API 与 Flink Connector 规范,周期约 3 天;
    • 连接器无统一管理,不同团队重复开发,导致技术债务累积;
  • 某互联网企业因业务需求,需接入 Pulsar 数据源,开发连接器耗时 5 天,期间无法推进实时推荐作业开发,影响新功能上线。
1.3.3 资源利用率低
  • 批处理作业(如每日凌晨执行的 Hive 全量计算)需占用大量资源,流处理作业需长期占用资源,两者无法共享资源:
    • 白天流处理作业资源空闲,但批处理作业未执行;凌晨批处理作业资源紧张,流处理作业资源无法临时调度给批处理;
    • 某金融企业为保障批处理作业执行,额外采购 5 台服务器,资源利用率仅 30%,造成硬件成本浪费。

二、问题解决:自动化数仓平台的核心目标与价值

        针对上述痛点,本方案通过 “Flink+DolphinScheduler+Dinky” 组合,构建一站式自动化实时数仓平台,以 “降本、提效、稳定、扩展” 为核心目标,具体如下:

2.1 降本:降低技术门槛,减少资源浪费

  • 开发门槛降低:通过 Dinky 的 FlinkSQL Studio,将 Flink 作业开发从 “代码级” 简化为 “SQL 级”,非开发人员(如数据分析师)也可参与开发,减少对专业 Flink 开发人员的依赖;
  • 资源利用率提升:基于 Flink 批流一体能力,批处理与流处理共享集群资源,白天流处理作业空闲时,资源可调度给临时批处理任务,资源利用率从 30% 提升至 70%;
  • 运维成本减少:自动化元数据管理、统一监控告警,减少 80% 手动运维操作(如重复建表、日志排查),一个运维人员可管理 100+ Flink 作业,较传统模式效率提升 5 倍。

2.2 提效:全链路自动化,缩短迭代周期

  • 开发效率提升:Dinky 提供 SQL 语法提示、一键调试、血缘分析功能,一个 “Kafka→MySQL” 的 Flink 作业开发周期从 2-3 天缩短至 1 小时,整体开发效率提升 300%;
  • 调度效率提升:DolphinScheduler 可视化工作流编排,支持 “数据就绪触发”“作业依赖联动”,无需手动维护 Cron 脚本,调度配置时间从 1 天缩短至 10 分钟;
  • 故障排查效率提升:统一监控平台整合 Flink 作业日志、调度任务状态、元数据变更记录,故障排查时间从 30 分钟缩短至 5 分钟。

2.3 稳定:保障数据一致性与作业可用性

  • 数据一致性保障:Flink 内置 Exactly-Once 语义,Dinky 批流统一 SQL 语法,确保批流数据口径一致,结果偏差率从 5% 降至 0.1%;
  • 作业高可用:DolphinScheduler 分布式调度架构、Flink 集群 HA 配置,保障单点故障不影响整体作业运行,作业可用性从 99% 提升至 99.99%;
  • 元数据安全:Dinky 支持 MySQL/Hive Catalog 持久化,元数据存储在数据库中,集群重启后无丢失,元数据可靠性达 100%。

2.4 扩展:支持多场景适配,架构弹性伸缩

  • 多数据源兼容:Dinky 内置 Kafka、MySQL、Hive、ClickHouse、Elasticsearch 等 20 + 数据源连接器,新增数据源无需开发,适配时间从 3 天缩短至 10 分钟;
  • 架构弹性扩展:Flink 与 DolphinScheduler 均支持水平扩展,新增 Worker 节点即可提升计算与调度能力,支持 1000 + 作业并发执行;
  • 业务场景覆盖:支持电商实时大屏、金融风控预警、IoT 设备监控、实时推荐等多场景,一套架构满足企业多样化数据需求。

三、选型对比:为何选择 Flink+DolphinScheduler+Dinky 组合?

        实时数仓技术栈选型需覆盖 “计算引擎、调度工具、开发平台” 三大核心模块,需从 “功能适配性、易用性、扩展性、性能” 四个维度对比主流方案,最终确定最优组合。

3.1 计算引擎选型:Flink vs Spark Streaming vs Kafka Streams

计算引擎是实时数仓的 “心脏”,需具备低延迟、高吞吐、强一致性的特性,三者对比如下:

维度Apache FlinkSpark StreamingKafka Streams选型结论
处理模式原生流处理(基于事件时间),支持毫秒级延迟微批处理(最小批次 100ms+),延迟较高原生流处理(基于事件时间),支持毫秒级延迟Flink/Kafka Streams 更适合低延迟场景;但 Kafka Streams 仅支持 Kafka 数据源,适配性弱,最终选 Flink
状态管理内置状态后端(Memory/RocksDB),支持 Exactly-Once 语义需依赖外部存储(Redis/HBase)实现状态管理,语义保障弱内置状态管理(基于 Kafka 分区),支持 Exactly-Once 语义Flink 状态管理更成熟,支持复杂状态(如窗口聚合、CEP),无需依赖外部组件,降低架构复杂度
批流一体支持支持流批统一 API(DataStream API),代码可复用批流 API 分离(Spark Core/Spark Streaming),需维护两套代码仅支持流处理,无批处理能力Flink 适配 “批流一体数仓”,减少代码冗余,避免批流结果偏差
数据源与连接器原生支持 Kafka、MySQL、Hive、ClickHouse 等 20 + 数据源支持主流数据源,但流处理连接器更新慢(如 ClickHouse 连接器需第三方扩展)仅支持 Kafka 数据源,无法接入其他存储Flink 数据源覆盖最广,无需自定义开发,适配企业多数据源需求
窗口计算能力支持滚动、滑动、会话、会话窗口,支持延迟数据处理(Watermark)仅支持基于时间的滚动 / 滑动窗口,延迟数据处理能力弱支持滚动、滑动窗口,无会话窗口,延迟数据处理需自定义Flink 窗口功能最完善,满足复杂业务场景(如 “用户 30 分钟内连续点击” 会话分析)
社区生态与文档Apache 顶级项目,社区活跃,文档完善,中文资料丰富Apache 顶级项目,社区活跃,但流处理文档较简略Confluent 维护,社区规模较小,中文资料少Flink 生态最成熟,问题排查、技术支持更便捷,降低运维成本
典型应用场景实时大屏、金融风控、IoT 监控、批流一体数仓批流分离场景、非低延迟需求的实时分析(如小时级报表)简单 Kafka 数据处理(如数据过滤、格式转换)Flink 覆盖场景最广,适配企业复杂实时数仓需求

        结论:Flink 在处理模式、状态管理、批流一体、数据源适配等方面均优于 Spark Streaming 与 Kafka Streams,是实时数仓计算引擎的最优选择。

3.2 调度工具选型:DolphinScheduler vs Airflow vs Azkaban

调度工具是实时数仓的 “大脑”,需具备分布式调度、可视化编排、高可用、告警联动的特性,三者对比如下:

维度 DolphinSchedulerApache Airflow Azkaban选型结论
架构设计原生分布式(Master-Worker),支持多 Master/Worker 部署,水平扩展能力强单 Master 架构,分布式需依赖 Kubernetes/YARN,配置复杂单 Server 架构,无分布式能力,仅支持单机调度DolphinScheduler 分布式架构更适合大规模数仓,支持 1000 + 作业并发,避免单点故障
工作流编排可视化拖拽界面,支持分支、循环、条件判断,非技术人员可操作需通过 Python 代码定义 DAG( Directed Acyclic Graph ),学习成本高可视化拖拽界面,但仅支持线性依赖,无分支 / 循环逻辑DolphinScheduler 编排最灵活,易用性最高,适合开发 / 运维 / 业务人员协作
任务类型支持内置 Dinky、Flink、Spark、Shell、SQL 等 20 + 任务类型,支持自定义任务支持主流任务类型,但需通过插件扩展(如 Dinky 任务需自定义 Operator)仅支持 Shell、Hive、Pig 任务,扩展能力弱DolphinScheduler 内置 Dinky/Flink 任务类型,无需二次开发,与计算引擎联动更紧密
调度触发方式支持时间触发(Cron)、事件触发(上游任务完成)、手动触发,触发逻辑灵活主要支持时间触发,事件触发需依赖外部组件(如 Apache NiFi)仅支持时间触发与手动触发,无事件触发DolphinScheduler 触发方式最丰富,满足 “数据就绪后自动执行” 的实时场景需求
告警与监控支持邮件、企业微信、钉钉、短信告警,内置任务失败重试、超时告警,监控界面直观告警需依赖第三方集成(Prometheus+Grafana),配置复杂,无内置重试机制仅支持邮件告警,无重试机制,监控功能简单DolphinScheduler 告警与监控最完善,减少故障响应时间,降低运维成本
多租户与权限控制支持多项目隔离,细粒度权限控制(如 “只读”“编辑”“执行”),通过 Token 实现跨系统权限联动支持多租户,但权限控制较粗(仅项目级),无 Token 联动机制无多租户概念,权限控制仅支持用户级,无法隔离项目DolphinScheduler 权限体系更完善,适合多团队协作的企业场景
社区支持与文档国内团队主导开发,中文文档完善,社区响应快(issue 24 小时内回复)国外团队主导,中文文档较少,社区响应较慢(issue 1-3 天回复)社区活跃度低,文档陈旧,新版本更新慢DolphinScheduler 更适合国内企业,技术支持与问题排查更便捷
部署与维护成本基于 Spring Boot 开发,部署简单(仅需 Java 环境),维护成本低依赖 Python 环境,需配置 Celery、Redis 等组件,部署复杂部署简单,但无分布式能力,维护成本随作业量增加而上升DolphinScheduler 部署维护成本最低,适合中小企业与大型企业

        结论:DolphinScheduler 在分布式架构、可视化编排、任务类型支持、告警监控等方面均优于 Airflow 与 Azkaban,是实时数仓调度工具的最优选择。

3.3 开发平台选型:Dinky vs Flink SQL Client vs HUE

        开发平台是实时数仓的 “工作台”,需具备低代码开发、元数据管理、全生命周期监控、调度联动的特性,三者对比如下:

维度DinkyFlink SQL ClientHUE(Hadoop User Experience)选型结论
开发体验可视化 FlinkSQL Studio,支持语法提示、格式化、代码高亮、SQL 模板,支持多标签页编辑,开发效率高纯命令行交互,无语法提示 / 格式化功能,需手动输入完整 SQL 语句,开发体验差支持 Hive SQL 可视化编辑,但对 Flink SQL 支持有限(仅基础语法高亮,无调试功能)Dinky 开发体验最优,大幅降低 Flink SQL 开发门槛,非开发人员也可快速上手
Flink 版本兼容性支持 Flink 1.15~1.19 多版本,无需手动切换集群配置,可在同一界面开发不同版本作业仅支持当前绑定的 Flink 版本,切换版本需重新部署 Client,操作繁琐依赖外部 Flink 集成,版本兼容性弱,常出现 “语法支持不全” 问题Dinky 多版本兼容能力满足企业 “逐步升级 Flink 集群” 的需求,避免版本切换成本
元数据管理支持 MySQL/Hive Catalog 持久化(表结构、UDF、数据源、函数),元数据变更可追溯,集群重启不丢失元数据存储在内存,会话结束 / 集群重启后完全丢失,需重新执行 DDL仅支持 Hive 元数据管理,无 Flink 专属元数据模块,无法存储 UDF 与作业配置Dinky 元数据管理最完善,解决传统架构 “元数据碎片化” 痛点,减少重复建表操作
作业调试与运维支持一键调试(本地 / 远程模式)、实时查看作业日志、Flink UI 跳转、作业暂停 / 重启 / 终止,全生命周期可控调试需依赖 Flink WebUI,日志需手动登录集群查看,作业运维操作繁琐无 Flink 作业调试功能,仅支持查看 Hive 作业日志,运维能力弱Dinky 运维功能最全面,无需切换多工具,实现 “开发 - 调试 - 运维” 一体化
UDF 开发与管理支持 Python/Java/Scala UDF 开发,提供模板生成(如 ScalarFunction 继承类)、一键发布、版本管理,支持 UDF 血缘分析需手动通过 JAR 包注册 UDF,无版本管理,UDF 冲突需手动排查不支持 Flink UDF 开发,仅支持 Hive UDF 注册,功能单一Dinky UDF 管理能力满足企业 “自定义函数复用” 需求,降低 UDF 维护成本
调度工具联动内置 DolphinScheduler 集成模块,支持作业 “发布→推送→调度” 一键操作,无需手动在调度平台创建任务无调度联动功能,需手动通过 Shell 脚本调用 Client,再配置调度任务仅支持与 Azkaban 简单联动,无 DolphinScheduler 集成能力Dinky 与 DolphinScheduler 深度集成,打通 “开发 - 调度” 链路,减少手动操作
批流一体支持内置批流统一 SQL 语法,支持动态切换执行模式(流模式 / 批模式),作业代码可复用需手动指定执行模式(execution.type=streaming/batch),批流语法需单独适配仅支持批处理(Hive),无流处理能力,无法适配批流一体架构Dinky 批流一体支持能力保障数据口径一致,避免 “一套逻辑两套代码”
社区支持与更新开源项目活跃,迭代速度快(每月更新小版本,每季度更新大版本),中文文档完善,社区群实时答疑作为 Flink 附属工具,更新缓慢,功能迭代依赖 Flink 主版本社区活跃度低,近 2 年无重大版本更新,功能停滞Dinky 社区支持最及时,可快速获取新功能与问题修复,保障平台稳定性

        结论:Dinky 在开发体验、元数据管理、运维能力、调度联动等方面全面超越 Flink SQL Client 与 HUE,是 Flink 作业开发与数仓管理的最优平台选择。

3.4 整体组合优势总结

        “Flink+DolphinScheduler+Dinky” 组合并非简单的组件叠加,而是形成 “计算 - 调度 - 开发” 三位一体的协同体系,核心优势如下:

四、系统架构:自动化数仓平台的整体设计

        本平台采用 “分层解耦、分布式部署” 架构,遵循 “高可用、可扩展、易维护” 原则,自上而下分为接入层、开发层、调度层、计算层、存储层、监控层,各层独立部署、协同工作,具体架构如下:

4.1 架构分层与组件部署

4.1.1 架构分层图

4.1.2 各层核心组件与部署要求
分层核心组件部署模式硬件配置建议(单节点)核心职责
接入层Kafka、Flink CDC、DataX分布式(Kafka 3 节点 +)CPU:4 核,内存:8G,磁盘:100G实现多数据源实时 / 批同步,将原始数据统一接入平台,保持数据原貌
开发层Dinky 平台单机 / 集群(生产建议集群)CPU:4 核,内存:8G,磁盘:50G提供 Flink 作业开发、元数据管理、UDF 开发能力,是平台的 “开发入口”
调度层DolphinScheduler分布式(Master 2 节点 + Worker 3 节点 +)CPU:4 核,内存:8G,磁盘:50G负责作业调度、任务依赖管理、告警发送,是平台的 “调度中枢”
计算层Flink 集群分布式(JobManager 2 节点 + TaskManager 3 节点 +)CPU:8 核,内存:16G,磁盘:100G执行实时 / 批计算任务,处理数据清洗、汇总、分析逻辑,是平台的 “计算核心”
存储层MySQL、Hive、ClickHouse分布式(Hive/ClickHouse 3 节点 +)CPU:8 核,内存:16G,磁盘:500G存储计算结果(实时结果→MySQL/ClickHouse,批结果→Hive)与元数据
监控层Prometheus、Grafana、Kibana分布式(Prometheus 2 节点 +)CPU:4 核,内存:8G,磁盘:100G监控全链路组件状态、作业运行情况、日志,及时发现故障并告警

4.2 核心技术特性设计

4.2.1 高可用设计

        为避免单点故障,平台各核心组件均采用高可用部署,具体方案如下:

4.2.2 扩展性设计

        平台支持水平扩展,可根据业务需求灵活增加节点,具体扩展方案如下:

4.2.3 安全性设计

        平台从 “权限控制、数据加密、日志审计” 三个维度保障安全性:

4.3 核心业务流程示例(电商实时销量大屏)

        以 “电商实时销量大屏” 业务为例,完整展示平台从数据接入到结果展示的全流程,具体步骤如下:

步骤 1:数据接入(接入层)
  • 实时数据源:用户下单数据通过业务系统写入 Kafka 主题 order_real_time(字段:order_iduser_idamountpay_timestatus);
  • 数据同步:通过 Flink CDC 同步 MySQL 商品表 product_info(字段:product_idproduct_namecategory)至 Kafka 主题 product_real_time
  • 数据格式:Kafka 消息格式为 JSON,确保数据结构标准化,便于后续处理。
步骤 2:作业开发(开发层)
  1. 数据源配置:在 Dinky 平台 “数据源管理” 模块,新增 Kafka 数据源(配置 bootstrap.servers=kafka-1:9092,kafka-2:9092)与 MySQL 数据源(配置商品表连接信息);
  2. 元数据持久化:创建 MySQL Catalog ecommerce_catalog,用于存储后续创建的表结构与 UDF;
    CREATE CATALOG ecommerce_catalog WITH ('type' = 'dinky_mysql','username' = 'dinky','password' = 'dinky@123','url' = 'jdbc:mysql://mysql-1:3306/dinky_metadata?useSSL=false&serverTimezone=Asia/Shanghai'
    );
    USE CATALOG ecommerce_catalog;
    
  3. 创建表结构:在 Dinky 中编写 Flink SQL,创建 Kafka 源表与 ClickHouse 结果表;

    sql

    -- 1. 创建 Kafka 订单源表
    CREATE TABLE order_kafka_source (`order_id` BIGINT COMMENT '订单ID',`user_id` BIGINT COMMENT '用户ID',`product_id` BIGINT COMMENT '商品ID',`amount` DECIMAL(10,2) COMMENT '订单金额',`pay_time` TIMESTAMP_LTZ(3) COMMENT '支付时间',`status` STRING COMMENT '订单状态(PAID/UNPAID)'
    ) WITH ('connector' = 'kafka','topic' = 'order_real_time','properties.bootstrap.servers' = '${kafka_source.bootstrap.servers}', -- 引用 Dinky 数据源变量'properties.group.id' = 'order_consumer_group','scan.startup.mode' = 'latest-offset','format' = 'json','json.ignore-parse-errors' = 'true'
    );-- 2. 创建 Kafka 商品源表
    CREATE TABLE product_kafka_source (`product_id` BIGINT COMMENT '商品ID',`product_name` STRING COMMENT '商品名称',`category` STRING COMMENT '商品分类'
    ) WITH ('connector' = 'kafka','topic' = 'product_real_time',
    'properties.bootstrap.servers' = '${kafka_source.bootstrap.servers}',
    'properties.group.id' = 'product_consumer_group',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true'
    );
    -- 3. 创建 ClickHouse 实时销量结果表(按分类汇总)
    CREATE TABLE sales_clickhouse_sink (
    category STRING COMMENT ' 商品分类 ',
    real_time_sales DECIMAL (12,2) COMMENT ' 实时销量金额 ',
    order_count BIGINT COMMENT ' 订单数量 ',
    update_time TIMESTAMP COMMENT ' 更新时间 ',
    PRIMARY KEY (category, update_time) NOT ENFORCED -- 复合主键确保数据唯一性
    ) WITH (
    'connector' = 'clickhouse',
    'url' = 'jdbc:clickhouse://clickhouse-1:8123,ecommerce_sales',
    'table-name' = 'real_time_sales_summary',
    'username' = 'clickhouse',
    'password' = 'clickhouse@123',
    'sink.batch-size' = '1000', -- 批量写入大小
    'sink.flush-interval' = '1000' -- 刷新间隔(毫秒)
    );

4. **开发 UDF 函数**

 **开发 UDF 函数**:创建 Python UDF `format_amount`,用于格式化订单金额(保留 2 位小数并添加千分位分隔符);
- 选择 Dinky 作业类型为“Python”,模板为“UDF / python_udf_1”;
- 编写 UDF 代码:```pythonfrom pyflink.table import ScalarFunction, DataTypesfrom pyflink.table.udf import udfclass format_amount(ScalarFunction):def __init__(self):passdef eval(self, amount: float) -> str:# 格式化金额:保留2位小数,添加千分位分隔符return "{:,.2f}".format(amount)# 注册 UDF,指定返回类型为字符串format_amount_udf = udf(format_amount(), result_type=DataTypes.STRING())

编写计算逻辑:关联订单表与商品表,按商品分类实时汇总销量,并调用 UDF 格式化金额;

-- 1. 关联订单与商品数据,过滤已支付订单
WITH order_product_join AS (SELECT o.order_id,o.user_id,o.amount,o.pay_time,p.product_name,p.categoryFROM order_kafka_source oJOIN product_kafka_source p ON o.product_id = p.product_idWHERE o.status = 'PAID' -- 仅处理已支付订单
),-- 2. 按分类实时汇总(5秒滚动窗口)
category_sales_summary AS (SELECT category,SUM(amount) AS real_time_sales,COUNT(order_id) AS order_count,CURRENT_TIMESTAMP() AS update_timeFROM order_product_joinGROUP BY category, TUMBLE(pay_time, INTERVAL '5' SECOND) -- 5秒滚动窗口
)-- 3. 写入 ClickHouse 结果表,同时调用 UDF 格式化金额(仅用于日志打印)
INSERT INTO sales_clickhouse_sink
SELECT category,real_time_sales,order_count,update_time
FROM category_sales_summary;-- 打印格式化后的金额(用于调试)
SELECT category,format_amount_udf(real_time_sales) AS formatted_sales,order_count,update_time
FROM category_sales_summary;

        作业调试:选择 Flink 实例(外部分布式集群),点击 “调试” 按钮,查看日志确认无语法错误,数据可正常读取与写入。

步骤 3:调度配置(调度层)

Dinky 与 DolphinScheduler 联动配置

        在 DolphinScheduler 中创建令牌:进入 “安全中心→令牌管理”,创建令牌 dinky_token,设置有效期为 1 年(避免频繁过期);

        在 Dinky 中配置调度信息:进入 “配置中心→全局配置→DolphinScheduler 配置”,启用开关,填写 DolphinScheduler 地址(http://dolphinscheduler-master-1:12345)、项目名称(ecommerce_realtime)、令牌 dinky_token

作业发布与推送

        在 Dinky 中发布 Flink 作业(版本号 V1.0),点击 “推送” 按钮,选择推送至 DolphinScheduler 项目 ecommerce_realtime

        推送完成后,在 DolphinScheduler 项目中自动生成工作流 sales_summary_workflow,包含 1 个 Dinky 任务(关联已发布的 Flink 作业);

调度规则配置

        编辑工作流 sales_summary_workflow,设置调度周期为 “连续运行”(实时作业需持续执行);

        配置依赖规则:添加 “Kafka 主题健康检查” 前置任务(Shell 脚本检测 order_real_time 主题是否可用),确保上游数据就绪后再执行 Flink 作业;

        配置告警规则:任务失败时触发 “企业微信 + 邮件” 双告警,通知运维团队(接收人:ops_team@xxx.com),并设置失败重试(重试次数 2 次,间隔 5 分钟)。

步骤 4:计算执行(计算层)
  • Flink 作业提交:DolphinScheduler 按调度规则触发任务,通过 Dinky API 向 Flink 集群提交作业;
  • Flink 执行流程
    • JobManager 接收作业,解析 Flink SQL 生成物理执行计划,分配资源(每个 TaskManager 分配 2 核 CPU、4G 内存);
    • TaskManager 启动 Task 执行数据处理:
      • Source Task:读取 Kafka 主题 order_real_time 与 product_real_time 数据;
      • Join Task:关联订单与商品数据,过滤已支付订单;
      • Aggregate Task:按 5 秒滚动窗口汇总各分类销量;
      • Sink Task:将汇总结果写入 ClickHouse 表 real_time_sales_summary
    • 状态管理:采用 RocksDBStateBackend,窗口聚合状态持久化至 HDFS(路径 hdfs://hdfs-nn:9000/flink/state/sales_summary),避免作业重启后状态丢失;
  • 作业监控:通过 Flink WebUI(http://flink-jobmanager-1:8081)查看作业运行状态,确认 Task 无失败,数据吞吐率正常(约 1000 条 / 秒)。
步骤 5:结果存储与应用(存储层 + 应用层)
  • 结果存储:Flink 作业将实时销量汇总结果写入 ClickHouse 表 real_time_sales_summary,每 5 秒更新一次数据;
  • 数据应用:电商实时大屏通过 JDBC 连接 ClickHouse,查询 real_time_sales_summary 表数据,展示各商品分类的实时销量金额、订单数量,并按销量排序 Top5 分类,支持钻取查看具体商品销量(关联 product_info 表)。
步骤 6:全链路监控(监控层)
  • 组件监控:Prometheus 采集 Flink、DolphinScheduler、Dinky 组件指标(如 Flink TaskManager CPU 使用率、DolphinScheduler 任务成功率),Grafana 配置仪表盘展示,设置阈值告警(如 Flink Task 失败率 > 0 时告警);
  • 作业监控:Dinky 内置监控模块展示作业运行时长、数据读取 / 写入量、UDF 调用次数,支持查看实时日志;Flink WebUI 查看 Task 背压情况(若背压高,需增加 TaskManager 节点);
  • 日志监控:全链路日志(Dinky 开发日志、Flink 执行日志、DolphinScheduler 调度日志)统一写入 Elasticsearch,通过 Kibana 按 “作业 ID”“时间范围” 查询,快速定位故障(如 “2024-05-20 14:30 作业失败” 可通过日志发现是 Kafka 连接超时)。

五、数据架构:批流一体的数仓分层设计

        基于 Flink 批流一体能力与 Dinky 元数据管理特性,本平台采用 “Lambda 架构优化版” 设计数据分层,兼顾实时性与数据完整性,同时避免传统 Lambda 架构 “批流分离” 的痛点。数据架构从下至上分为 ODS 层(操作数据存储层)、DWD 层(数据仓库明细层)、DWS 层(数据仓库汇总层)、ADS 层(数据应用层),各层职责清晰、数据单向流动,具体设计如下:

5.1 数据分层详情

分层英文全称定位存储组件数据处理规则典型表 / 主题示例数据时效
ODS 层Operational Data Store原始数据接入层,“数据着陆区”Kafka(实时)、HDFS(批)1. 保持数据原貌,不做清洗 / 转换;
2. 按数据源分区存储(如 Kafka 按主题、HDFS 按日期);
3. 保留原始格式(JSON/CSV/Parquet)
实时:ods_kafka_order(订单日志)、ods_kafka_product(商品日志);
批:ods_hdfs_user(用户全量数据)
实时(毫秒级)、批(T+1)
DWD 层Data Warehouse Detail数据清洗层,“明细数据层”Kafka(实时)、Hive(批)1. 数据清洗:去重(如订单重复日志)、补空(如缺失的商品分类)、格式标准化(如时间统一为 UTC+8);
2. 数据脱敏:敏感字段加密(如用户手机号脱敏为 138****5678);
3. 按业务主题拆分(如 “订单主题”“用户主题”)
实时:dwd_kafka_order_detail(订单明细)、dwd_kafka_user_behavior(用户行为明细);
批:dwd_hive_product_detail(商品明细)
实时(秒级)、批(T+1)
DWS 层Data Warehouse Summary数据汇总层,“汇总数据层”ClickHouse(实时)、Hive(批)1. 实时汇总:按时间窗口(如 5 秒 / 1 分钟)、业务维度(如商品分类、区域)聚合;
2. 批处理补全:每日凌晨执行批作业,补全前一天全量数据(修正实时数据偏差);
3. 数据去重:基于主键(如 “订单 ID + 时间”)确保汇总结果唯一性
实时:dws_clickhouse_sales_real(实时销量汇总)、dws_clickhouse_user_active(实时用户活跃汇总);
批:dws_hive_sales_day(日销量汇总)
实时(秒级 / 分钟级)、批(T+1)
ADS 层Application Data Store数据应用层,“业务输出层”MySQL(轻量应用)、ClickHouse(OLAP)、Redis(缓存)1. 面向业务需求封装数据(如实时大屏需 “分类销量 Top10”、风控需 “高风险订单列表”);
2. 数据压缩:减少冗余字段,仅保留业务所需字段;
3. 支持高并发查询(如 MySQL 加索引、Redis 缓存热点数据)
实时大屏:ads_mysql_sales_top10(销量 Top10 分类);
风控:ads_clickhouse_risk_order(高风险订单);
缓存:ads_redis_user_active(实时活跃用户数)
实时(毫秒级 / 秒级)、批(T+1)

5.2 批流一体设计核心策略

        传统 Lambda 架构中,批处理层(Batch Layer)与流处理层(Speed Layer)独立运行,易导致数据口径不一致。本平台通过以下策略实现 “批流一体”,确保实时数据与批数据结果统一:

5.2.1 统一数据模型
  • 表结构统一:DWD/DWS/ADS 层的实时表与批表采用相同的表结构(字段名、数据类型、主键),例如:
    • 实时表 dws_clickhouse_sales_real 与批表 dws_hive_sales_day 均包含 category(商品分类)、sales_amount(销量金额)、stat_date(统计日期)字段;
  • 维度统一:批流共享同一维度表(如商品维度表 dwd_hive_product_dim),实时作业通过 Flink Hive Catalog 读取批维度表,避免 “实时用临时维度、批用正式维度” 导致的偏差。
5.2.2 统一计算逻辑
  • SQL 语法统一:基于 Dinky 批流统一 SQL 语法,实时作业与批作业复用同一套计算逻辑,仅通过 execution.type 参数切换执行模式:

    sql

    -- 实时模式(流处理)
    SET 'execution.type' = 'streaming';
    -- 批模式(批处理)
    SET 'execution.type' = 'batch';-- 统一计算逻辑:按分类汇总销量
    SELECT category,SUM(amount) AS sales_amount,COUNT(order_id) AS order_count,DATE_FORMAT(pay_time, 'yyyy-MM-dd') AS stat_date
    FROM dwd_layer_table
    WHERE status = 'PAID'
    GROUP BY category, DATE_FORMAT(pay_time, 'yyyy-MM-dd');
    
  • 函数统一:批流作业调用相同的 UDF(如 format_amount 金额格式化函数),避免 “实时用 Python UDF、批用 Hive UDF” 导致的计算差异。
5.2.3 数据补全与合并
  • 实时数据补全:实时作业通过 Flink 窗口延迟处理(Watermark)接收迟到数据(如用户支付延迟 5 分钟),减少实时数据遗漏;
  • 批流数据合并:每日凌晨(如 2:00)执行批作业,计算前一天全量销量数据,覆盖实时作业的汇总结果(修正实时数据因 “迟到数据未处理” 导致的偏差),确保最终结果一致性;
  • 合并策略示例

        批作业计算 dws_hive_sales_day(T-1 日全量销量);

        通过 Flink 批作业将 dws_hive_sales_day 数据覆盖写入 ClickHouse 实时表 dws_clickhouse_sales_real 中 T-1 日的分区;

        应用层查询时,T-1 日及之前的数据读取批处理结果,当日数据读取实时结果,实现 “批流无缝衔接”。

5.3 数据流转示例(电商销量分析)

        以 “电商销量分析” 业务为例,结合批流一体设计策略,完整展示数据从 ODS 层到 ADS 层的流转过程、处理逻辑及技术实现,清晰呈现各层数据的加工链路与价值转化。

5.3.1 流转链路概览

        数据流转遵循 “单向流动、分层加工” 原则,实时链路与批处理链路并行且最终融合,具体链路如下:

  • 实时链路:ODS 层(Kafka 实时订单 / 商品日志)→ DWD 层(Kafka 清洗后明细)→ DWS 层(ClickHouse 实时汇总)→ ADS 层(MySQL/Redis 应用数据),支撑实时销量大屏、实时库存调整等低延迟需求;
  • 批处理链路:ODS 层(HDFS 全量订单 / 用户数据)→ DWD 层(Hive 明细补全)→ DWS 层(Hive 日汇总)→ DWS 层(ClickHouse 历史数据覆盖),修正实时数据偏差、补全历史数据,支撑财务报表、月度销量分析等精准性需求。
5.3.2 各层数据处理详情

1. ODS 层:原始数据接入

        作为数据 “着陆区”,ODS 层不做任何业务加工,仅按数据源类型存储原始数据,确保数据溯源能力。

类型数据源存储组件数据格式分区 / 主题设计数据内容示例
实时订单电商业务系统(下单接口)KafkaJSON主题:ods_kafka_order_real_time,按 “小时” 分区(如 2024052014{"order_id":10001,"user_id":20005,"product_id":30012,"amount":999.00,"pay_time":"2024-05-20 14:30:25","status":"UNPAID","create_time":"2024-05-20 14:29:58"}
实时商品Flink CDC(MySQL 商品表)KafkaJSON主题:ods_kafka_product_real_time,按 “天” 分区(如 20240520{"product_id":30012,"product_name":"智能手表","category":"数码产品","price":1299.00,"stock":500,"update_time":"2024-05-20 10:15:30"}
批订单业务系统每日全量导出HDFS(Hive 外部表)ParquetHive 分区:dt=20240520(按日期),分桶:order_id%10(10 个桶)同实时订单结构,包含当日所有订单(含历史未支付转支付订单)
批用户用户中心全量同步HDFS(Hive 外部表)ParquetHive 分区:dt=20240520,分桶:user_id%5(5 个桶){"user_id":20005,"user_name":"张三","phone":"138****5678","region":"北京","register_time":"2023-01-15 09:20:30"}

技术实现

  • 实时数据:业务系统通过 Kafka Producer 写入订单主题,Flink CDC 监听 MySQL 商品表 binlog,自动同步变更数据至商品主题;
  • 批数据:业务系统每日 00:30 导出前一日全量订单 CSV 文件,通过 DataX 同步至 HDFS,Hive 外部表关联 HDFS 路径实现查询。

2. DWD 层:数据清洗与标准化

        DWD 层是 “数据净化站”,通过清洗、脱敏、标准化处理,将 ODS 层原始数据转化为结构化、高质量的明细数据,为后续汇总提供统一基础。

(1)实时 DWD 处理(Kafka 明细)

处理逻辑:基于 Flink SQL 实时处理 ODS 层 Kafka 数据,核心操作包括:

  • 数据清洗:过滤无效订单(如 order_id 为空、amount ≤0)、去重(基于 order_id + create_time 主键,避免重复日志);
  • 字段补全:对缺失的 category(商品分类),通过关联商品主题数据补全;
  • 格式标准化:将 pay_time/create_time 统一转换为 UTC+8 时区的 TIMESTAMP_LTZ(3) 类型;
  • 敏感数据脱敏:用户手机号 phone 字段脱敏(如 138****5678),通过 Dinky 内置 UDF mask_phone 实现。

SQL 实现(Dinky 平台)

-- 1. 读取 ODS 层订单与商品 Kafka 数据
CREATE TABLE ods_kafka_order (`order_id` BIGINT COMMENT '订单ID',`user_id` BIGINT COMMENT '用户ID',`product_id` BIGINT COMMENT '商品ID',`amount` DECIMAL(10,2) COMMENT '订单金额',`pay_time` TIMESTAMP_LTZ(3) COMMENT '支付时间',`status` STRING COMMENT '订单状态',`create_time` TIMESTAMP_LTZ(3) COMMENT '创建时间',`proc_time` AS PROCTIME() COMMENT '处理时间(用于水印)'
) WITH ('connector' = 'kafka','topic' = 'ods_kafka_order_real_time','properties.bootstrap.servers' = '${kafka_servers}','properties.group.id' = 'dwd_order_consumer','scan.startup.mode' = 'latest-offset','format' = 'json'
);CREATE TABLE ods_kafka_product (`product_id` BIGINT COMMENT '商品ID',`product_name` STRING COMMENT '商品名称',`category` STRING COMMENT '商品分类',`update_time` TIMESTAMP_LTZ(3) COMMENT '更新时间'
) WITH ('connector' = 'kafka','topic' = 'ods_kafka_product_real_time','properties.bootstrap.servers' = '${kafka_servers}','properties.group.id' = 'dwd_product_consumer','scan.startup.mode' = 'earliest-offset','format' = 'json'
);-- 2. 创建 DWD 层订单明细 Kafka 表
CREATE TABLE dwd_kafka_order_detail (`order_id` BIGINT COMMENT '订单ID',`user_id` BIGINT COMMENT '用户ID',`user_phone_masked` STRING COMMENT '脱敏手机号(关联用户表获取)',`product_id` BIGINT COMMENT '商品ID',`product_name` STRING COMMENT '商品名称',`category` STRING COMMENT '商品分类',`amount` DECIMAL(10,2) COMMENT '订单金额',`pay_time` TIMESTAMP_LTZ(3) COMMENT '支付时间',`status` STRING COMMENT '订单状态',`create_time` TIMESTAMP_LTZ(3) COMMENT '创建时间',PRIMARY KEY (`order_id`) NOT ENFORCED -- 主键去重
) WITH ('connector' = 'kafka','topic' = 'dwd_kafka_order_detail','properties.bootstrap.servers' = '${kafka_servers}','format' = 'json','sink.partitioner' = 'round-robin' -- 轮询分区,负载均衡
);-- 3. 清洗并写入 DWD 层(关联商品表补全分类,脱敏手机号)
INSERT INTO dwd_kafka_order_detail
SELECT o.order_id,o.user_id,mask_phone(u.phone) AS user_phone_masked, -- 内置脱敏UDFo.product_id,p.product_name,p.category,o.amount,o.pay_time,o.status,o.create_time
FROM ods_kafka_order o
-- 关联商品表(实时维度关联,采用Lookup Join)
LEFT JOIN ods_kafka_product FOR SYSTEM_TIME AS OF o.proc_time p ON o.product_id = p.product_id
-- 关联批用户表(Hive 维度表,通过Flink Hive Catalog读取)
LEFT JOIN hive_catalog.ods_db.ods_hive_user u ON o.user_id = u.user_id
-- 过滤无效数据
WHERE o.order_id IS NOT NULL AND o.amount > 0 AND o.status IN ('PAID', 'UNPAID', 'REFUNDED');

(2)批 DWD 处理(Hive 明细)

        处理逻辑:每日凌晨 01:00 执行 Flink 批作业,基于 ODS 层 Hive 全量数据,对实时 DWD 数据进行补全(如遗漏的历史订单、延迟同步的用户数据),核心操作包括:

  1. 补全历史数据:对实时链路未捕获的前一日未支付转支付订单,更新 status 字段;

  2. 修正数据偏差:若实时关联商品表时出现延迟,批处理重新关联最新商品维度表,修正 category 字段;

  3. 数据对齐:确保批处理明细与实时明细字段完全一致,为后续批流汇总统一基础。

SQL 实现(Dinky 平台,批模式)

-- 1. 切换为批处理模式
SET 'execution.type' = 'batch';-- 2. 读取 ODS 层批订单与批用户 Hive 表
CREATE TABLE ods_hive_order (`order_id` BIGINT COMMENT '订单ID',`user_id` BIGINT COMMENT '用户ID',`product_id` BIGINT COMMENT '商品ID',`amount` DECIMAL(10,2) COMMENT '订单金额',`pay_time` TIMESTAMP COMMENT '支付时间',`status` STRING COMMENT '订单状态',`create_time` TIMESTAMP COMMENT '创建时间',`dt` STRING COMMENT '分区日期'
) WITH ('connector' = 'hive','database-name' = 'ods_db','table-name' = 'ods_hive_order','partition' = 'dt=20240520' -- 处理前一日数据
);-- 3. 创建 DWD 层批订单明细 Hive 表
CREATE TABLE dwd_hive_order_detail (`order_id` BIGINT COMMENT '订单ID',`user_id` BIGINT COMMENT '用户ID',`user_phone_masked` STRING COMMENT '脱敏手机号',`product_id` BIGINT COMMENT '商品ID',`product_name` STRING COMMENT '商品名称',`category` STRING COMMENT '商品分类',`amount` DECIMAL(10,2) COMMENT '订单金额',`pay_time` TIMESTAMP COMMENT '支付时间',`status` STRING COMMENT '订单状态',`create_time` TIMESTAMP COMMENT '创建时间',`dt` STRING COMMENT '分区日期'
) PARTITIONED BY (`dt` STRING)
WITH ('connector' = 'hive','database-name' = 'dwd_db','table-name' = 'dwd_hive_order_detail'
);-- 4. 批处理清洗并写入 DWD 层
INSERT OVERWRITE TABLE dwd_hive_order_detail PARTITION (dt='20240520')
SELECT o.order_id,o.user_id,mask_phone(u.phone) AS user_phone_masked,o.product_id,p.product_name,p.category,o.amount,o.pay_time,o.status,o.create_time
FROM ods_hive_order o
LEFT JOIN hive_catalog.ods_db.ods_hive_product p ON o.product_id = p.product_id
LEFT JOIN hive_catalog.ods_db.ods_hive_user u ON o.user_id = u.user_id
WHERE o.order_id IS NOT NULL AND o.amount > 0;

3. DWS 层:数据汇总与批流融合

        DWS 层是 “数据聚合中心”,分别生成实时汇总数据(支撑低延迟需求)和批汇总数据(修正偏差、补全历史),最终通过 “批覆盖实时” 实现数据一致性。

(1)实时 DWS 处理(ClickHouse 汇总)

处理逻辑:基于 DWD 层 Kafka 明细数据,按 “商品分类” 和 “5 秒滚动窗口” 实时聚合销量,核心操作包括:

  1. 窗口聚合:采用 Flink TUMBLE 窗口(5 秒),统计各分类的实时销量金额、订单数量;
  2. 状态管理:使用 RocksDBStateBackend 持久化窗口状态,避免作业重启后数据丢失;
  3. 实时写入:将聚合结果写入 ClickHouse,支持高并发查询(ClickHouse 列存引擎适配 OLAP 场景)。

SQL 实现(Dinky 平台)

-- 1. 读取 DWD 层订单明细 Kafka 表
CREATE TABLE dwd_kafka_order_detail (-- 字段同 5.3.2.2(1)中定义,此处省略
);-- 2. 创建 DWS 层实时销量汇总 ClickHouse 表
CREATE TABLE dws_clickhouse_sales_real (`category` STRING COMMENT '商品分类',`window_start` TIMESTAMP_LTZ(3) COMMENT '窗口开始时间',`window_end` TIMESTAMP_LTZ(3) COMMENT '窗口结束时间',`real_sales` DECIMAL(12,2) COMMENT '实时销量金额',`order_count` BIGINT COMMENT '订单数量',`update_time` TIMESTAMP_LTZ(3) COMMENT '数据更新时间',PRIMARY KEY (`category`, `window_start`) NOT ENFORCED -- 复合主键确保唯一性
) WITH ('connector' = 'clickhouse','url' = 'jdbc:clickhouse://${clickhouse_servers}/dws_db','table-name' = 'dws_clickhouse_sales_real','username' = '${clickhouse_user}','password' = '${clickhouse_pwd}','sink.batch-size' = '500', -- 批量写入,提升性能'sink.flush-interval' = '1000' -- 1秒刷新一次
);-- 3. 实时窗口聚合并写入 ClickHouse
INSERT INTO dws_clickhouse_sales_real
SELECT category,TUMBLE_START(pay_time, INTERVAL '5' SECOND) AS window_start,TUMBLE_END(pay_time, INTERVAL '5' SECOND) AS window_end,SUM(amount) AS real_sales,COUNT(DISTINCT order_id) AS order_count, -- 去重统计订单数CURRENT_TIMESTAMP() AS update_time
FROM dwd_kafka_order_detail
WHERE status = 'PAID' -- 仅统计已支付订单
GROUP BY category, TUMBLE(pay_time, INTERVAL '5' SECOND);

(2)批 DWS 处理(Hive 汇总 + ClickHouse 覆盖)

        处理逻辑:每日凌晨 02:00 执行 Flink 批作业,基于 DWD 层 Hive 明细数据,计算前一日全量销量汇总,核心操作包括:

  1. 全量聚合:按 “商品分类” 和 “日期” 聚合,得到前一日各分类的精准销量(修正实时窗口遗漏的迟到数据);

  2. 历史覆盖:将批汇总结果写入 Hive 表的同时,覆盖 ClickHouse 实时表中对应日期的分区数据,实现 “批流数据统一”;

  3. 数据校验:对比批汇总与实时汇总的差异(如差异率 >0.5% 触发告警),确保数据准确性。

SQL 实现(Dinky 平台,批模式)

-- 4. 批处理聚合并写入 Hive
INSERT OVERWRITE TABLE dws_hive_sales_day PARTITION (stat_date='20240520')
SELECT category,'20240520' AS stat_date,SUM(amount) AS day_sales,COUNT(DISTINCT order_id) AS day_order_count,CURRENT_TIMESTAMP() AS calc_time
FROM dwd_hive_order_detail
WHERE status = 'PAID' AND dt = '20240520' -- 仅处理前一日明细数据
GROUP BY category;-- 5. 创建 ClickHouse 批数据写入表(复用实时表结构,仅覆盖历史分区)
CREATE TABLE dws_clickhouse_sales_real (-- 同 5.3.2.3(1)中实时表结构,此处省略
);-- 6. 批汇总结果覆盖 ClickHouse 实时表历史数据
INSERT OVERWRITE TABLE dws_clickhouse_sales_real
SELECT category,TO_TIMESTAMP_LTZ(CONCAT(stat_date, ' 00:00:00'), 3) AS window_start, -- 按日构建窗口起始时间TO_TIMESTAMP_LTZ(CONCAT(stat_date, ' 23:59:59'), 3) AS window_end, -- 按日构建窗口结束时间day_sales AS real_sales,day_order_count AS order_count,CURRENT_TIMESTAMP() AS update_time
FROM dws_hive_sales_day
WHERE stat_date = '20240520';-- 7. 数据差异校验(对比批汇总与实时汇总结果,差异率>0.5%则告警)
WITH batch_data AS (SELECT category, day_sales FROM dws_hive_sales_day WHERE stat_date = '20240520'
),
stream_data AS (SELECT category, SUM(real_sales) AS stream_total_sales FROM dws_clickhouse_sales_realWHERE DATE_FORMAT(window_start, 'yyyyMMdd') = '20240520'GROUP BY category
)
SELECT b.category,b.day_sales AS batch_sales,s.stream_total_sales,ROUND(ABS(b.day_sales - s.stream_total_sales) / b.day_sales * 100, 2) AS diff_rate
FROM batch_data b
JOIN stream_data s ON b.category = s.category
WHERE ROUND(ABS(b.day_sales - s.stream_total_sales) / b.day_sales * 100, 2) > 0.5;
-- 若查询返回结果,通过 Dinky 告警模块触发企业微信通知,提示运维人员排查差异原因

批流融合效果:通过 “实时计算 + 批处理补全”,确保 ClickHouse 表中:

  • 当日数据:实时更新(5 秒窗口),支撑实时大屏展示;
  • 历史数据(T-1 及之前):由批处理结果覆盖,保证数据精准性(如财务对账场景);
  • 差异率控制在 0.5% 以内,满足业务对数据一致性的要求。

4. ADS 层:数据应用封装

        ADS 层是 “业务输出端”,基于 DWS 层汇总数据,按具体业务需求封装数据,直接支撑前端应用或下游系统调用,核心目标是 “简化查询、提升性能”。

(1)实时大屏场景(MySQL 存储)

        业务需求:电商实时大屏需展示 “商品分类销量 Top10”“实时总销售额”“近 1 小时销量趋势”,要求查询延迟 <100ms。
处理逻辑

  1. 从 ClickHouse 实时表聚合核心指标;

  2. 将结果写入 MySQL(支持高并发读),并创建索引优化查询;

  3. 每 10 秒刷新一次数据,平衡实时性与数据库压力。

SQL 实现(Dinky 平台)

-- 1. 读取 DWS 层 ClickHouse 实时汇总表
CREATE TABLE dws_clickhouse_sales_real (-- 同前序定义,此处省略
);-- 2. 创建 ADS 层实时大屏 MySQL 表
CREATE TABLE ads_mysql_sales_dashboard (`indicator_name` STRING COMMENT '指标名称(如 total_sales/top10_category)',`category` STRING COMMENT '商品分类(top10场景非空,总销量场景为空)',`indicator_value` STRING COMMENT '指标值(金额/数量,转字符串便于统一存储)',`rank` INT COMMENT '排名(仅top10场景非空)',`update_time` TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`indicator_name`, `category`) NOT ENFORCED -- 复合主键避免重复数据
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://${mysql_servers}/ads_db?useSSL=false','table-name' = 'ads_mysql_sales_dashboard','username' = '${mysql_user}','password' = '${mysql_pwd}','sink.batch-size' = '10','sink.flush-interval' = '10000' -- 10秒刷新一次
);-- 3. 封装“实时总销售额”指标
INSERT INTO ads_mysql_sales_dashboard
SELECT 'total_sales' AS indicator_name,'' AS category,CAST(SUM(real_sales) AS STRING) AS indicator_value,0 AS rank,CURRENT_TIMESTAMP() AS update_time
FROM dws_clickhouse_sales_real
WHERE window_end >= DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL '1' DAY); -- 统计近24小时总销量-- 4. 封装“商品分类销量 Top10”指标
INSERT INTO ads_mysql_sales_dashboard
SELECT 'top10_category' AS indicator_name,category,CAST(SUM(real_sales) AS STRING) AS indicator_value,ROW_NUMBER() OVER (ORDER BY SUM(real_sales) DESC) AS rank,CURRENT_TIMESTAMP() AS update_time
FROM dws_clickhouse_sales_real
WHERE window_end >= DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL '1' DAY)
GROUP BY category
QUALIFY ROW_NUMBER() OVER (ORDER BY SUM(real_sales) DESC) <= 10; -- 取销量前10分类

应用调用:实时大屏通过 JDBC 连接 MySQL,执行简单查询即可获取指标:

-- 查询实时总销售额
SELECT indicator_value AS total_sales 
FROM ads_mysql_sales_dashboard 
WHERE indicator_name = 'total_sales' 
ORDER BY update_time DESC LIMIT 1;-- 查询销量 Top10 分类
SELECT category, indicator_value AS sales_amount, rank 
FROM ads_mysql_sales_dashboard 
WHERE indicator_name = 'top10_category' 
ORDER BY rank ASC;

(2)财务报表场景(Hive 存储)

        业务需求:财务部门需生成 “每日商品分类销量报表”,包含销量金额、订单数量、客单价(销量金额 / 订单数量),要求数据 100% 精准。
处理逻辑

  1. 基于 DWS 层批处理汇总表(Hive)计算客单价;

  2. 按日期分区存储,支持按 “月 / 季度” 汇总查询;

  3. 数据生成后触发邮件通知,推送报表至财务邮箱。

SQL 实现(Dinky 平台,批模式)

-- 1. 切换为批处理模式
SET 'execution.type' = 'batch';-- 2. 读取 DWS 层批汇总 Hive 表
CREATE TABLE dws_hive_sales_day (-- 同前序定义,此处省略
);-- 3. 创建 ADS 层财务报表 Hive 表
CREATE TABLE ads_hive_finance_sales_report (`stat_date` STRING COMMENT '统计日期(yyyy-MM-dd)',`category` STRING COMMENT '商品分类',`sales_amount` DECIMAL(12,2) COMMENT '销量金额',`order_count` BIGINT COMMENT '订单数量',`avg_order_price` DECIMAL(10,2) COMMENT '客单价(sales_amount/order_count)',`create_time` TIMESTAMP COMMENT '报表生成时间'
) PARTITIONED BY (`stat_date` STRING)
WITH ('connector' = 'hive','database-name' = 'ads_db','table-name' = 'ads_hive_finance_sales_report'
);-- 4. 计算并写入财务报表数据
INSERT OVERWRITE TABLE ads_hive_finance_sales_report PARTITION (stat_date='20240520')
SELECT '20240520' AS stat_date,category,day_sales AS sales_amount,day_order_count AS order_count,ROUND(day_sales / day_order_count, 2) AS avg_order_price, -- 计算客单价CURRENT_TIMESTAMP() AS create_time
FROM dws_hive_sales_day
WHERE stat_date = '20240520'AND day_order_count > 0; -- 避免除数为0-- 5. 报表推送(通过 Dinky 集成 Shell 任务实现)
-- 执行 Shell 脚本:将 Hive 表数据导出为 Excel,并通过邮件发送给财务部门
5.3.3 数据流转保障机制

        为确保数据从 ODS 层到 ADS 层流转过程中的 “准确性、完整性、时效性”,平台设计了三大保障机制:

  1. 数据血缘追踪:通过 Dinky 元数据管理模块,自动记录各层表之间的依赖关系(如 ods_kafka_order_real_time → dwd_kafka_order_detail → dws_clickhouse_sales_real),支持按 “表 / 字段” 反向溯源,便于故障定位(如 ADS 层数据异常时,可快速定位到 DWD 层清洗逻辑问题);

  2. 数据质量校验:在各层处理节点嵌入质量校验规则,示例如下:

    分层校验规则处理方式
    ODS 层订单 amount >0、order_id 非空过滤无效数据,记录至错误日志表
    DWD 层脱敏后手机号格式为 138****5678格式错误数据暂存至临时表,人工核查
    DWS 层批流汇总差异率 ≤0.5%差异超标触发告警,暂停 ADS 层数据更新
    ADS 层财务报表客单价 ≤ 该分类商品均价的 3 倍异常数据标红,在报表中注明 “待核查”
  3. 数据备份与恢复

    • 实时数据:Kafka 主题设置 3 副本,避免数据丢失;
    • 批数据:Hive 表开启定时快照(每日凌晨 03:00),支持回滚至前一日状态;
    • 应用数据:MySQL 开启 binlog,支持按时间点恢复(如误操作删除数据后,可恢复至删除前状态)。

5.4 数据架构价值总结

        本平台的批流一体数据架构,通过 “分层解耦 + 批流融合” 设计,解决了传统数仓的核心痛点,具体价值如下:

        数据一致性:统一批流数据模型与计算逻辑,批处理补全实时数据偏差,结果差异率控制在 0.5% 以内,满足业务对数据精准性的需求(如财务对账、风控决策);

        业务适配性:实时链路(毫秒级 - 秒级)支撑实时大屏、库存调整等低延迟场景,批处理链路(T+1)支撑财务报表、月度分析等精准性场景,一套架构覆盖多业务需求;

        效率提升:各层数据单向流动,避免重复加工,DWD 层清洗后的数据可复用至 DWS 层多个汇总任务,数据加工效率提升 40%;

        可维护性:数据血缘清晰、质量校验自动化、备份机制完善,故障排查时间从传统架构的 2 小时缩短至 15 分钟,降低运维成本。


文章转载自:

http://yD2UpDWu.kskpx.cn
http://n1IgVWZW.kskpx.cn
http://u0T4Rjgq.kskpx.cn
http://zXTmmmtb.kskpx.cn
http://zmFrU7ND.kskpx.cn
http://JdXkfIEI.kskpx.cn
http://SoYJk0M1.kskpx.cn
http://6PzkvMwq.kskpx.cn
http://yW8wxaHS.kskpx.cn
http://0zzXtCOF.kskpx.cn
http://0eV2dNnf.kskpx.cn
http://E6l1mCDw.kskpx.cn
http://VtYWhAKm.kskpx.cn
http://16iTHTEy.kskpx.cn
http://eu3CIeGE.kskpx.cn
http://ijP24saC.kskpx.cn
http://nls9FSMW.kskpx.cn
http://hlCtSxtv.kskpx.cn
http://pBpKICDP.kskpx.cn
http://iVanhjVz.kskpx.cn
http://pXclG1af.kskpx.cn
http://af9NrLoG.kskpx.cn
http://EUuxzywr.kskpx.cn
http://UnxZnj9e.kskpx.cn
http://cmVaiyLm.kskpx.cn
http://us1sA5Fe.kskpx.cn
http://vVSkkoAg.kskpx.cn
http://dKg0Wasg.kskpx.cn
http://JnBvmSJF.kskpx.cn
http://2PAs8STV.kskpx.cn
http://www.dtcms.com/a/365045.html

相关文章:

  • FFmpeg-Batch:GitHub开源视频批量处理工具,高效解决视频转格式与画质压缩需求
  • AI在金融、医疗、教育、制造业等领域的落地案例(含代码、流程图、Prompt示例与图表)
  • B样条曲线,已知曲线上的某个点到起点的距离,确定这个点的参数u的值的方法
  • 计算机视觉(七):膨胀操作
  • 键盘上面有F3,四,R,F,V,按下没有反应,维修记录
  • VS2015+QT编译protobuf库
  • Java--json与map,colloct与流
  • SpringMVC的请求接收与结果响应
  • Python爬取nc数据
  • 数据科学家如何更好地展示自己的能力
  • 理解sed命令
  • 干货知识:ERP、CRM、OA,小公司到底先上哪个?
  • 从 0 到 1 实现 PyTorch 食物图像分类:核心知识点与完整实
  • k8s知识点总结3
  • 基于 CC-Link IE FB 转 DeviceNet 技术的三菱 PLC 与发那科机器人在汽车涂装线的精准喷涂联动
  • Grafana Loki日志聚合系统深度解析:选型、竞品、成本与资源消耗
  • 安卓9.0系统修改定制化____如何修改安卓低版本固件 解决 API/SDK 版本过低的问题
  • 题解:UVA1589 象棋 Xiangqi
  • 根据fullcalendar实现企业微信的拖动式预约会议
  • 别错过!一杯奶茶钱开启企业微信 Power BI 之旅
  • gitlab推送失败,内存不足的处理
  • iOS蓝牙使用及深入剖析高频高负载传输丢包解决方案(附源码)
  • 【STL】C++ 开发者必学字符类详解析:std::string
  • 【人工智能学习之MMdeploy部署踩坑总结】
  • Linux之shell-awk命令详解
  • Ai Qwen3解答epochs多少为最佳 仅共参考
  • AI时代:智能体系统构建基本范式
  • 峰谷套利+动态增容:工商业储能能量管理系统的妙用
  • Docker容器与镜像:两种导入导出方式全解析
  • 洛谷 P3128 [USACO15DEC] Max Flow P -普及+/提高