大数据治理域——离线数据开发
摘要
文本主要介绍了离线数据开发相关内容,包括业务与流程、阿里MaxCompute系统设计以及阿里调度系统设计。离线数据开发是大数据开发核心组成部分,用于处理批量数据,支持企业多种需求,其流程涵盖需求调研、数据源接入等环节。阿里MaxCompute系统架构与特点被阐述,调度系统背景及核心设计模型也进行了说明。
1. 离线数据开发业务与流程
离线数据开发是大数据开发中的一个核心组成部分,主要用于处理批量数据(如日志、交易、行为数据等),支持企业的报表统计、数据分析、风控建模、数据仓库建设等需求。下面从业务视角和流程视角分别进行说明:
1.1. 业务视角:离线数据开发的典型业务场景
1.1.1. 数据仓库建设(DW)
- 构建维度模型(ODS → DWD → DWS → ADS)
- 支撑 BI、数据分析、运营报表、管理驾驶舱
1.1.2. 报表分析
- 日报、周报、月报
- 各类运营、风控、财务、营销等部门的指标需求
1.1.3. 风控/营销模型训练
- 特征抽取、标签加工、样本生成等
- 为模型提供批量训练数据
1.1.4. 数据集市构建
- 面向特定主题域,如用户画像、交易分析、风险评分等
1.1.5. 数据质量监控
- 样本监控、规则校验、异常检测
1.2. 流程视角:离线数据开发流程
离线数据开发一般遵循如下流程:
1.2.1. 需求调研与分析
- 明确数据开发目标(如构建某个指标、支持某报表)
- 梳理数据来源、计算逻辑、口径定义
1.2.2. 数据源接入
- 数据可能来自:
- 业务数据库(MySQL、Oracle 等)
- 日志系统(如 Kafka、Flume)
- 外部数据源(API、第三方文件等)
- 通过数据同步工具(如 DataX、Sqoop、Flink CDC)导入到 HDFS、Hive、Iceberg、Hudi 等存储系统
1.2.3. 数仓建模
- ODS(原始数据层):按天分区保存,保留业务系统原貌
- DWD(明细数据层):数据清洗、去重、格式标准化,按主题域分类
- DWS(宽表汇总层):聚合维度构建宽表
- ADS(应用层):直接面向业务的指标表或分析结果表
1.2.4. 数据开发实现
使用工具和语言:
- Hive SQL、Spark SQL、Flink Batch、Kettle、DolphinScheduler 等
开发方式:
- 编写 SQL 脚本或 Spark 程序
- 使用图形化数据开发平台(如阿里 DataWorks、百度 Fuxi、腾讯 WeData)
1.2.5. 调度与运维
- 调度平台(如 Airflow、DolphinScheduler、Azkaban)定时运行任务
- 设置依赖关系、优先级、重试机制
- 运维监控日志、任务失败报警
1.2.6. 数据校验与质量保障
- 建立数据校验规则(如行数、字段非空、业务逻辑校验)
- 数据质量平台监控指标波动、异常检测
1.2.7. 数据服务/输出
- 支持 BI 工具(如 Tableau、FineBI)
- 提供接口/API供前端或业务系统调用
- 供机器学习、风控模型调用
1.3. 开发工具/平台
工具类型 | 示例 |
开发语言 | SQL、Spark、Java、Python |
数据平台 | Hive、Hudi、Iceberg、ClickHouse |
任务调度 | Airflow、DolphinScheduler |
数据同步 | DataX、Sqoop、Flink CDC |
数据治理 | AWS Glue、阿里 DataWorks |
1.4. 离线数据开发实际案例(金融场景举例)
以“用户贷款风险评估模型”为例:需求:为建模团队提供用户近90天的交易统计特征。
流程:
- 从 MySQL 采集用户交易记录至 Hive ODS 层
- 清洗、标准化,进入 DWD 层(用户日交易明细)
- 统计用户维度的交易次数、金额、渠道分布,进入 DWS 层
- 提供训练样本给模型训练平台,或输出标签至 ADS 层供后续分析
- 数据每天定时跑批,保证更新及时性。
2. 阿里MaxComputer系统设计
阿里离线数据仓库的存储和计算都是在阿里云大数据计算服务MaxCompute上完成的。
大数据计算服务MaxCompute是由阿里云自主研发的海量数据处理平台,主要服务于海量数据的存储和计算,提供完善的数据导入方案,以及多种经典的分布式计算模型,提供海量数据仓库的解决方案,能够更快速地解决用户的海量数据计算问题,有效降低企业成本,并保障数据安全。
MaxCompute采用抽象的作业处理框架,将不同场景的各种计算任务统一在同一个平台之上,共享安全、存储、数据管理和资源调度,为来自不同用户需求的各种数据处理任务提供统一的编程接口和界面。它提供数据上传/下载通道、SQL、MapReduce、机器学习算法、图编程模型和流式计算模型多种计算分析服务,并且提供完善的安全解决方案。
2.1. MaxComputer系统架构
阿里 MaxCompute(前身为 ODPS,Open Data Processing Service)是阿里云推出的大数据计算平台,主要面向海量离线数据的存储与分析处理,适用于企业数仓建设、离线报表分析、AI 特征抽取等场景。下面从系统设计的多个维度深入分析 MaxCompute 的架构设计思想、核心组件与特点。
MaxCompute由四部分组成,分别是客户端(MaxCompute Client)、接入层(MaxCompute Front End)、逻辑层(MaxCompute Server)及存储与计算层(Apsara Core)。
- MaxCompute客户端有以下几种形式。
- Web:以RESTful API的方式提供离线数据处理服务。
- SDK:对RESTful API的封装,目前有Java等版本的实现。
- CLT(Command Line Tool):运行在Windows/Linux下的客户端工具,通过CLT可以提交命令完成Project管理、DDL、DML等操作。
- IDE:上层可视化ETL/BI工具,即阿里内部名称是在云端(D2),用户可以基于在云端完成数据同步、任务调度、报表生成等常见操作。
- 接入层提供HTTP服务、Cache、负载均衡,实现用户认证和服务层面的访问控制。
- 逻辑层又称作控制层,是MaxCompute的核心部分,实现用户空间和对象的管理、命令的解析与执行逻辑、数据对象的访问控制与授权等功能。在逻辑层有Worker、Scheduler和Executor三个角色:
- Worker处理所有的RESTful请求,包括用户空间(Project)管理操作、资源(Resource)管理操作、作业管理等,对于SQLDML、MR等需要启动MapReduce的作业,会生成MaxComputeInstance(类似于Hive中的Job),提交给Scheduler进一步处理。)
- Scheduler负责MaxComputeInstance的调度和拆解,并向计算层的计算集群询问资源占用情况以进行流控。
- Executor负责MaxComputeInstance的执行,向计算层的计算集群提交真正的计算任务。
- 计算层就是飞天内核(ApsaraCore),运行在和控制层相互独立的计算集群上,它包括Pangu(分布式文件系统)、Fuxi(资源调度系统)、Nuwa/ZK(Namespace服务)、Shennong(监控模块)等。MaxCompute中的元数据存储在阿里云计算的另一个开放服务OTS(OpenTableService,开放结构化数据服务)中,元数据内容主要包括用户空间元数据、Table/PartitionSchema、ACL、Job元数据、安全体系等。
2.2. MaxCompute系统特点
当前的调度系统支持阿里巴巴大数据系统日常应用的各种场景,其主要具有如下特点和应用场景。
- 调度配置,常见的调度配置方式是对具体任务手工配置依赖的上游任务,此方式基本可以满足调度系统的正常运行。这种方式存在两个问题:一是配置上较麻烦,需要知道上游依赖表的产出任务;二是上游任务修改不再产出依赖表或本身任务不再依赖某上游任务时,对调度依赖不做修改,导致依赖配置错误。阿里巴巴的调度配置方式采用的是输人输出配置和自动识别相结合的方式。任务提交时,SQL解析引擎自动识别此任务的输人表和输出表,输人表自动关联产出此表的任务,输出表亦然。通过此种方式,解决了上述问题,可以自动调整任务依赖,避免依赖配置错误。
- 定时调度,可以根据实际需要,设定任务的运行时间,共有5种时间类型:分钟、小时、日、周、月,具体可精确到秒。比如日任务可选择每天的几点几分运行,周任务可选择每周几的几点几分运行,月任务也可选择每月第几天的几点几分运行。对于周任务和月任务,通常选择定时调度的方式。
- 周期调度,可按照小时、日等时间周期运行任务,与定时调度的区别是无须指定具体的开始运行时间。比如离线数据处理的大多数日任务就是这种类型,任务根据依赖关系,按照调度树的顺序从上依次向下运行,每个周期的具体运行时间随着每天资源和上游依赖任务运行的总体情况会有所不同。
- 手动运行,当生产环境需要做一些数据修复或其他一次性的临时数据操作时,可以选择手动运行的任务类型,在开发环境(IDE)中写好脚本后发布到生产环境,再通过手动触发运行。
- 补数据,在完成数据开发的发布以后,有些表需要进行数据初始化,比如有些日增量表要补齐最近三年的历史数据,这时就需要用到补数据任务了。可以设定需要补的时间区间,并圈定需要运行的任务节点,从而生成一个补数据的工作流,同时还能选择并行的运行方式以节约时间。
- 基线管理,基于充分利用计算资源,保证重点业务数据优先产出,合理安排各类优先级任务的运行,调度系统引人了按优先级分类管理的方法。优先级分类从1~9,数字越大代表优先级越高,系统会先保障高优先级任务的运行资源。对于同一类优先级的任务,放到同一条基线中,这样可以实现按优先级不同进行分层的统一管理,并可对基线的运行时间进行预测估计,以监控是否能在规定的时间内完成。
3. 阿里调度系统设计
3.1. 调度系统背景
现代信息化条件下的战争,从太空的卫星到空中的各类作战飞机,从地面的导弹到坦克火炮,从水面的大小舰艇到水下的潜艇,还有诸如网络、电磁环境等多种方式、多种维度的作战空间,各种武器装备、人员、作战环境纷繁复杂,如何能够准确、合理地调配这些资源,组织有序、高效的攻防体系赢得胜利,最关键的是需要有一个强大的指挥系统。
在云计算大数据时代,调度系统无疑是整个大数据体系的指挥中枢。如图4.7所示,调度系统中的各类任务互相依赖,形成一个典型的有向无环图。在传统的数据仓库系统中,很多是依靠Crontab定时任务功能进行任务调度处理的。这种方式有很多弊端:
- 各任务之间的依赖基于执行时间实现,容易造成前面的任务未结束或失败而后面的任务已经运行;
- 任务难以并发执行,增加了整体的处理时间;
- 无法设置任务优先级;
- 任务的管理维护很不方便,无法进行执行效果分析等。
3.2. 调度系统的核心设计模型
整个调度系统共有两个核心模块:调度引擎(PhoenixEngine)和执行引擎(Alisa)。简单来说,调度引擎的作用是根据任务节点属性以及依赖关系进行实例化,生成各类参数的实值,并生成调度树;执行引擎的作用是根据调度引擎生成的具体任务实例和配置信息,分配CPU、内存、运行节点等资源,在任务对应的执行环境中运行节点代码。在介绍调度引擎设计之前,我们先来了解两个模型:任务状态机模型和工作流状态机模型。
任务状态机模型是针对数据任务节点在整个运行生命周期的状态定义,总共有6种状态,状态之间的转换逻辑如图4.9所示。
3.2.1. 任务状态机模式
3.2.2. 工作流状态机模型
工作流状态机模型是针对数据任务节点在调度树中生成的工作流运行的不同状态定义,共有5种状态。
3.2.3. 调度引擎工作原理
调度引擎(Phoenix Engine)基于以上两个状态机模型原理,以事件驱动的方式运行,为数据任务节点生成实例,并在调度树中生成具体执行的工作流。任务节点实例在工作流状态机、任务状态机和事件处理器之间转换,其中调度引擎只涉及任务状态机的未运行和等待运行两种状态,其他5种状态存在于执行引擎中。
调度引擎工作原理示意图如图所示。
- Async Dispatcher:异步处理任务调度。
- Sync Dispatcher:.同步处理任务调度。
- Task事件处理器:任务事件处理器,与任务状态机交互。
- DAG事件处理器:工作流事件处理器,与工作流状态机交互。
- 一个DAG事件处理器包含若干个Task事件处理器。
3.2.4. 执行器工作原理
- 任务管理接口:供用户系统向Alisa中提交、查询和操作离线任务,并获得异步通知。
- 系统管理接口:供系统管理员进行后台管理,包括为集群增加新的机器、划分资源组、查看集群资源和负载、追踪任务状态等。
- Driver:Alisa的调度器,Driver中实现了任务管理接口和系统管理接口;负责任务的调度策略、集群容灾和伸缩、任务失效备援、负载均衡实现。Drive的任务调度策略是可插拔替换的,以满足不同的使用场景。Driver使用Resource manager管理整个集群的负载。(我们可以把Driver理解为Hadoop的JobTracker。.)
- Task pool:Driver也将已经提交的全部任务都放入到Task pool中管理,包括等待资源、数据质量检测、运行中、运行成功和失败的所有任务。直到任务运行完成(不论成功或者失败),并且用户确实获取到了关于这个状态的通知,Driver才会将任务从Task pool中移除。Driver和Node通过Task pool提供的事件机制进行可靠的通信。整个系统全部状态(除了与运行无关的部分管理信息外)都保存在Task pool中,这样系统的其他部分很容易实现高可用性和伸缩性。而Task pool本身采用Zookeeper实现,这样它本身也是具备高可用能力的。
- Resource manager:这个组件专注于集群整体资源的管理。
- Task container:类似于Web Server,为Task提供运行的容器(类似的,Web Server为Action提供运行的容器)。容器负责处理Task的公共逻辑,如文件下载,任务级Session、流程级Session的维护等。同时Task container负责收集机器的实际负载并上报给Resource manager。
- Session manager:这个组件实现了对Task session的管理。
- Node:Node代表Alisa集群中的一个节点。节点负责提供任务运行所需的物理资源。Node是逻辑概念,一台物理机器上可以部署一个或者多个Node(Node类似于Hadoop的TaskTracker)。
博文参考
《阿里巴巴大数据系统建设》