数据血缘探秘:用SQL串联不同数据源的脉络
RedShift
-- RedShift 智能家居类表-- 表A: smart_device_logs_raw (原始智能设备日志数据)
CREATE TABLE smart_device_logs_raw (log_id BIGINT IDENTITY(1,1),device_id VARCHAR(50) NOT NULL,log_timestamp TIMESTAMP NOT NULL,event_type VARCHAR(100),event_value VARCHAR(255)
);
COMMENT ON TABLE smart_device_logs_raw IS '原始智能设备日志数据表,记录智能设备产生的各类操作和状态日志。';
COMMENT ON COLUMN smart_device_logs_raw.log_id IS '日志ID';
COMMENT ON COLUMN smart_device_logs_raw.device_id IS '设备ID';
COMMENT ON COLUMN smart_device_logs_raw.log_timestamp IS '日志时间戳';
COMMENT ON COLUMN smart_device_logs_raw.event_type IS '事件类型(如:开关、温度变化、亮度调节)';
COMMENT ON COLUMN smart_device_logs_raw.event_value IS '事件值(如:开、25.5、50%)';-- 插入示例数据到 smart_device_logs_raw
INSERT INTO smart_device_logs_raw (device_id, log_timestamp, event_type, event_value) VALUES
('LIGHT001', '2023-09-01 08:00:00', '开关', '开'),
('THERMO001', '2023-09-01 08:05:00', '温度变化', '25.0'),
('LIGHT001', '2023-09-01 08:10:00', '亮度调节', '80%'),
('DOOR001', '2023-09-01 08:15:00', '开关', '关'),
('THERMO001', '2023-09-01 08:20:00', '温度变化', '24.5');-- 表B: smart_device_daily_summary (智能设备每日汇总数据)
CREATE TABLE smart_device_daily_summary (summary_id BIGINT IDENTITY(1,1),device_id VARCHAR(50) NOT NULL,summary_date DATE NOT NULL,total_events INT DEFAULT 0,last_event_type VARCHAR(100)
);
COMMENT ON TABLE smart_device_daily_summary IS '智能设备每日汇总数据表,统计智能设备每日的事件概况。';
COMMENT ON COLUMN smart_device_daily_summary.summary_id IS '汇总ID';
COMMENT ON COLUMN smart_device_daily_summary.device_id IS '设备ID';
COMMENT ON COLUMN smart_device_daily_summary.summary_date IS '汇总日期';
COMMENT ON COLUMN smart_device_daily_summary.total_events IS '当日事件总数';
COMMENT ON COLUMN smart_device_daily_summary.last_event_type IS '当日最后事件类型';-- 从 smart_device_logs_raw 插入数据到 smart_device_daily_summary (血缘关系)
INSERT INTO smart_device_daily_summary (device_id, summary_date, total_events, last_event_type)
SELECTdevice_id,log_timestamp::date AS summary_date,COUNT(*) AS total_events,LAST_VALUE(event_type) OVER (PARTITION BY device_id, log_timestamp::date ORDER BY log_timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_event_type
FROM smart_device_logs_raw
GROUP BY device_id, log_timestamp::date;
Mysql
-- MySQL 学生类表-- 表A: student_enrollment_raw (原始学生注册数据)
CREATE TABLE student_enrollment_raw (enrollment_id INT PRIMARY KEY AUTO_INCREMENT COMMENT '注册ID',student_name VARCHAR(100) NOT NULL COMMENT '学生姓名',student_id VARCHAR(50) UNIQUE NOT NULL COMMENT '学生学号',course_name VARCHAR(100) NOT NULL COMMENT '课程名称',enrollment_date DATE COMMENT '注册日期',source_system VARCHAR(50) COMMENT '数据来源系统'
) COMMENT '原始学生注册数据表,记录学生在不同系统中的课程注册信息。';-- 插入示例数据到 student_enrollment_raw
INSERT INTO student_enrollment_raw (student_name, student_id, course_name, enrollment_date, source_system) VALUES
('张三', 'S001', '高等数学', '2023-09-01', '教务系统A'),
('李四', 'S002', '大学物理', '2023-09-01', '教务系统B'),
('王五', 'S003', 'C语言程序设计', '2023-09-05', '教务系统A'),
('赵六', 'S004', '数据结构', '2023-09-10', '教务系统C'),
('钱七', 'S005', '操作系统', '2023-09-15', '教务系统B');-- 表B: student_master_data (学生主数据)
CREATE TABLE student_master_data (student_pk INT PRIMARY KEY AUTO_INCREMENT COMMENT '学生主键',student_id VARCHAR(50) UNIQUE NOT NULL COMMENT '学生学号',student_name VARCHAR(100) NOT NULL COMMENT '学生姓名',first_enrollment_date DATE COMMENT '首次注册日期',last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间'
) COMMENT '学生主数据表,整合并标准化学生基本信息。';-- 从 student_enrollment_raw 插入数据到 student_master_data (血缘关系)
INSERT INTO student_master_data (student_id, student_name, first_enrollment_date)
SELECTstudent_id,student_name,MIN(enrollment_date) AS first_enrollment_date
FROM student_enrollment_raw
GROUP BY student_id, student_name
ON DUPLICATE KEY UPDATEstudent_name = VALUES(student_name),first_enrollment_date = LEAST(student_master_data.first_enrollment_date, VALUES(first_enrollment_date));
PostgreSQL
-- PostgreSQL 船舶类表-- 表A: ship_position_raw (原始船舶位置数据)
CREATE TABLE ship_position_raw (position_id SERIAL PRIMARY KEY,ship_imo VARCHAR(20) NOT NULL,latitude NUMERIC(10, 7) NOT NULL,longitude NUMERIC(10, 7) NOT NULL,timestamp TIMESTAMP NOT NULL,speed NUMERIC(5, 2)
);
COMMENT ON TABLE ship_position_raw IS '原始船舶位置数据表,记录船舶的实时或历史位置信息。';
COMMENT ON COLUMN ship_position_raw.position_id IS '位置ID';
COMMENT ON COLUMN ship_position_raw.ship_imo IS '船舶IMO编号';
COMMENT ON COLUMN ship_position_raw.latitude IS '纬度';
COMMENT ON COLUMN ship_position_raw.longitude IS '经度';
COMMENT ON COLUMN ship_position_raw.timestamp IS '位置报告时间';
COMMENT ON COLUMN ship_position_raw.speed IS '航速(节)';-- 插入示例数据到 ship_position_raw
INSERT INTO ship_position_raw (ship_imo, latitude, longitude, timestamp, speed) VALUES
('IMO9000001', 34.052235, -118.243683, '2023-08-01 10:00:00', 15.5),
('IMO9000002', 33.700000, -118.200000, '2023-08-01 10:05:00', 12.0),
('IMO9000001', 34.100000, -118.300000, '2023-08-01 10:15:00', 16.0),
('IMO9000003', 25.761681, -80.191788, '2023-08-01 11:00:00', 10.2),
('IMO9000002', 33.800000, -118.100000, '2023-08-01 10:20:00', 13.0);-- 表B: ship_daily_summary (船舶每日汇总数据)
CREATE TABLE ship_daily_summary (summary_id SERIAL PRIMARY KEY,ship_imo VARCHAR(20) NOT NULL,report_date DATE NOT NULL,max_speed NUMERIC(5, 2),avg_speed NUMERIC(5, 2),distance_traveled NUMERIC(10, 2)
);
COMMENT ON TABLE ship_daily_summary IS '船舶每日汇总数据表,统计船舶每日的航行概况。';
COMMENT ON COLUMN ship_daily_summary.summary_id IS '汇总ID';
COMMENT ON COLUMN ship_daily_summary.ship_imo IS '船舶IMO编号';
COMMENT ON COLUMN ship_daily_summary.report_date IS '报告日期';
COMMENT ON COLUMN ship_daily_summary.max_speed IS '当日最大航速';
COMMENT ON COLUMN ship_daily_summary.avg_speed IS '当日平均航速';
COMMENT ON COLUMN ship_daily_summary.distance_traveled IS '当日航行距离(海里)';-- 从 ship_position_raw 插入数据到 ship_daily_summary (血缘关系)
INSERT INTO ship_daily_summary (ship_imo, report_date, max_speed, avg_speed, distance_traveled)
SELECTship_imo,DATE(timestamp) AS report_date,MAX(speed) AS max_speed,AVG(speed) AS avg_speed,-- 假设一个简单的距离计算,实际需要更复杂的地理坐标计算SUM(speed * (EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (PARTITION BY ship_imo ORDER BY timestamp))) / 3600.0)) AS distance_traveled
FROM ship_position_raw
GROUP BY ship_imo, DATE(timestamp)
ON CONFLICT (ship_imo, report_date) DO UPDATE SETmax_speed = EXCLUDED.max_speed,avg_speed = EXCLUDED.avg_speed,distance_traveled = EXCLUDED.distance_traveled;
TDSQL
-- TDSQL 学生类表 (假设其SQL语法与MySQL兼容)-- 表A: student_enrollment_raw (原始学生注册数据)
CREATE TABLE student_enrollment_raw (enrollment_id INT PRIMARY KEY AUTO_INCREMENT COMMENT "注册ID",student_name VARCHAR(100) NOT NULL COMMENT "学生姓名",student_id VARCHAR(50) UNIQUE NOT NULL COMMENT "学生学号",course_name VARCHAR(100) NOT NULL COMMENT "课程名称",enrollment_date DATE COMMENT "注册日期",source_system VARCHAR(50) COMMENT "数据来源系统"
) COMMENT "原始学生注册数据表,记录学生在不同系统中的课程注册信息。";-- 插入示例数据到 student_enrollment_raw
INSERT INTO student_enrollment_raw (student_name, student_id, course_name, enrollment_date, source_system) VALUES
("张三", "S001", "高等数学", "2023-09-01", "教务系统A"),
("李四", "S002", "大学物理", "2023-09-01", "教务系统B"),
("王五", "S003", "C语言程序设计", "2023-09-05", "教务系统A"),
("赵六", "S004", "数据结构", "2023-09-10", "教务系统C"),
("钱七", "S005", "操作系统", "2023-09-15", "教务系统B");-- 表B: student_master_data (学生主数据)
CREATE TABLE student_master_data (student_pk INT PRIMARY KEY AUTO_INCREMENT COMMENT "学生主键",student_id VARCHAR(50) UNIQUE NOT NULL COMMENT "学生学号",student_name VARCHAR(100) NOT NULL COMMENT "学生姓名",first_enrollment_date DATE COMMENT "首次注册日期",last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT "最后更新时间"
) COMMENT "学生主数据表,整合并标准化学生基本信息。";-- 从 student_enrollment_raw 插入数据到 student_master_data (血缘关系)
INSERT INTO student_master_data (student_id, student_name, first_enrollment_date)
SELECTstudent_id,student_name,MIN(enrollment_date) AS first_enrollment_date
FROM student_enrollment_raw
GROUP BY student_id, student_name
ON DUPLICATE KEY UPDATEstudent_name = VALUES(student_name),first_enrollment_date = LEAST(student_master_data.first_enrollment_date, VALUES(first_enrollment_date));
Warebase
-- Warebase 学生类表 (假设其SQL语法与PostgreSQL类似)-- 表A: student_enrollment_raw (原始学生注册数据)
CREATE TABLE student_enrollment_raw (enrollment_id SERIAL PRIMARY KEY,student_name VARCHAR(100) NOT NULL,student_id VARCHAR(50) UNIQUE NOT NULL,course_name VARCHAR(100) NOT NULL,enrollment_date DATE,source_system VARCHAR(50)
);
COMMENT ON TABLE student_enrollment_raw IS '原始学生注册数据表,记录学生在不同系统中的课程注册信息。';
COMMENT ON COLUMN student_enrollment_raw.enrollment_id IS '注册ID';
COMMENT ON COLUMN student_enrollment_raw.student_name IS '学生姓名';
COMMENT ON COLUMN student_enrollment_raw.student_id IS '学生学号';
COMMENT ON COLUMN student_enrollment_raw.course_name IS '课程名称';
COMMENT ON COLUMN student_enrollment_raw.enrollment_date IS '注册日期';
COMMENT ON COLUMN student_enrollment_raw.source_system IS '数据来源系统';-- 插入示例数据到 student_enrollment_raw
INSERT INTO student_enrollment_raw (student_name, student_id, course_name, enrollment_date, source_system) VALUES
('张三', 'S001', '高等数学', '2023-09-01', '教务系统A'),
('李四', 'S002', '大学物理', '2023-09-01', '教务系统B'),
('王五', 'S003', 'C语言程序设计', '2023-09-05', '教务系统A'),
('赵六', 'S004', '数据结构', '2023-09-10', '教务系统C'),
('钱七', 'S005', '操作系统', '2023-09-15', '教务系统B');-- 表B: student_master_data (学生主数据)
CREATE TABLE student_master_data (student_pk SERIAL PRIMARY KEY,student_id VARCHAR(50) UNIQUE NOT NULL,student_name VARCHAR(100) NOT NULL,first_enrollment_date DATE,last_update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
COMMENT ON TABLE student_master_data IS '学生主数据表,整合并标准化学生基本信息。';
COMMENT ON COLUMN student_master_data.student_pk IS '学生主键';
COMMENT ON COLUMN student_master_data.student_id IS '学生学号';
COMMENT ON COLUMN student_master_data.student_name IS '学生姓名';
COMMENT ON COLUMN student_master_data.first_enrollment_date IS '首次注册日期';
COMMENT ON COLUMN student_master_data.last_update_time IS '最后更新时间';-- 从 student_enrollment_raw 插入数据到 student_master_data (血缘关系)
INSERT INTO student_master_data (student_id, student_name, first_enrollment_date)
SELECTstudent_id,student_name,MIN(enrollment_date) AS first_enrollment_date
FROM student_enrollment_raw
GROUP BY student_id, student_name
ON CONFLICT (student_id) DO UPDATE SETstudent_name = EXCLUDED.student_name,first_enrollment_date = LEAST(student_master_data.first_enrollment_date, EXCLUDED.first_enrollment_date);
Hive
-- Hive 价格类表-- 表A: price_source_data (原始价格数据)
CREATE TABLE price_source_data (id INT COMMENT '主键ID',product_id STRING COMMENT '商品ID',price DECIMAL(10, 2) COMMENT '原始价格',currency STRING COMMENT '货币类型',capture_time TIMESTAMP COMMENT '数据捕获时间'
)
COMMENT '原始价格数据表,包含从不同渠道捕获的商品价格信息。'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;-- 插入示例数据到 price_source_data
-- Hive 不支持直接的 INSERT INTO ... VALUES 语法插入多行,通常通过 LOAD DATA 或 INSERT INTO ... SELECT FROM 外部表
-- 这里提供模拟数据,实际使用时需要通过外部文件加载或ETL工具导入
-- id,product_id,price,currency,capture_time
-- 1,PROD001,120.50,CNY,2023-09-01 10:00:00
-- 2,PROD002,200.00,USD,2023-09-01 10:05:00
-- 3,PROD003,75.20,CNY,2023-09-01 10:10:00
-- 4,PROD004,30.00,EUR,2023-09-01 10:15:00
-- 5,PROD005,150.80,CNY,2023-09-01 10:20:00-- 表B: price_processed_data (处理后的价格数据)
CREATE TABLE price_processed_data (id INT COMMENT '主键ID',product_id STRING COMMENT '商品ID',standard_price DECIMAL(10, 2) COMMENT '标准化价格(统一为CNY)',process_time TIMESTAMP COMMENT '数据处理时间'
)
COMMENT '处理后的价格数据表,将原始价格数据进行标准化处理,统一货币单位。'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;-- 从 price_source_data 插入数据到 price_processed_data (血缘关系)
INSERT INTO TABLE price_processed_data
SELECTid,product_id,CASEWHEN currency = 'USD' THEN price * 7.0 -- 假设汇率 1 USD = 7 CNYWHEN currency = 'EUR' THEN price * 8.0 -- 假设汇率 1 EUR = 8 CNYELSE priceEND AS standard_price,CURRENT_TIMESTAMP
FROM price_source_data;
HotDB
-- HotDB 智能家居类表 (假设其SQL语法与MySQL兼容)-- 表A: smart_device_logs_raw (原始智能设备日志数据)
CREATE TABLE smart_device_logs_raw (log_id INT PRIMARY KEY AUTO_INCREMENT COMMENT '日志ID',device_id VARCHAR(50) NOT NULL COMMENT '设备ID',log_timestamp DATETIME NOT NULL COMMENT '日志时间戳',event_type VARCHAR(100) COMMENT '事件类型(如:开关、温度变化、亮度调节)',event_value VARCHAR(255) COMMENT '事件值(如:开、25.5、50%)'
) COMMENT '原始智能设备日志数据表,记录智能设备产生的各类操作和状态日志。';-- 插入示例数据到 smart_device_logs_raw
INSERT INTO smart_device_logs_raw (device_id, log_timestamp, event_type, event_value) VALUES
('LIGHT001', '2023-09-01 08:00:00', '开关', '开'),
('THERMO001', '2023-09-01 08:05:00', '温度变化', '25.0'),
('LIGHT001', '2023-09-01 08:10:00', '亮度调节', '80%'),
('DOOR001', '2023-09-01 08:15:00', '开关', '关'),
('THERMO001', '2023-09-01 08:20:00', '温度变化', '24.5');-- 表B: smart_device_daily_summary (智能设备每日汇总数据)
CREATE TABLE smart_device_daily_summary (summary_id INT PRIMARY KEY AUTO_INCREMENT COMMENT '汇总ID',device_id VARCHAR(50) NOT NULL COMMENT '设备ID',summary_date DATE NOT NULL COMMENT '汇总日期',total_events INT DEFAULT 0 COMMENT '当日事件总数',last_event_type VARCHAR(100) COMMENT '当日最后事件类型'
) COMMENT '智能设备每日汇总数据表,统计智能设备每日的事件概况。';-- 从 smart_device_logs_raw 插入数据到 smart_device_daily_summary (血缘关系)
INSERT INTO smart_device_daily_summary (device_id, summary_date, total_events, last_event_type)
SELECTdevice_id,DATE(log_timestamp) AS summary_date,COUNT(*) AS total_events,SUBSTRING_INDEX(GROUP_CONCAT(event_type ORDER BY log_timestamp DESC), ',', 1) AS last_event_type
FROM smart_device_logs_raw
GROUP BY device_id, DATE(log_timestamp)
ON DUPLICATE KEY UPDATEtotal_events = VALUES(total_events),last_event_type = VALUES(last_event_type);
KingbaseES
-- KingbaseES 商品类表 (假设其SQL语法与PostgreSQL兼容)-- 表A: product_raw_data (原始商品数据)
CREATE TABLE product_raw_data (product_id VARCHAR(50) PRIMARY KEY,product_name VARCHAR(255) NOT NULL,category VARCHAR(100),brand VARCHAR(100),description TEXT
);
COMMENT ON TABLE product_raw_data IS '原始商品数据表,包含从不同渠道获取的商品基本信息。';
COMMENT ON COLUMN product_raw_data.product_id IS '商品唯一标识';
COMMENT ON COLUMN product_raw_data.product_name IS '商品名称';
COMMENT ON COLUMN product_raw_data.category IS '商品类别';
COMMENT ON COLUMN product_raw_data.brand IS '商品品牌';
COMMENT ON COLUMN product_raw_data.description IS '商品描述';-- 插入示例数据到 product_raw_data
INSERT INTO product_raw_data (product_id, product_name, category, brand, description) VALUES
('P001', '智能手机', '电子产品', '华为', '高性能智能手机,拍照功能强大。'),
('P002', '笔记本电脑', '电子产品', '联想', '轻薄便携笔记本,适合办公和学习。'),
('P003', '无线耳机', '电子产品', '小米', '音质出众,佩戴舒适。'),
('P004', '智能手表', '穿戴设备', '苹果', '健康监测,运动追踪。'),
('P005', '平板电脑', '电子产品', '三星', '大屏幕,影音娱乐体验佳。');-- 表B: product_standardized_data (标准化商品数据)
CREATE TABLE product_standardized_data (standard_product_id VARCHAR(50) PRIMARY KEY,product_name VARCHAR(255) NOT NULL,standard_category VARCHAR(100),standard_brand VARCHAR(100),last_update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
COMMENT ON TABLE product_standardized_data IS '标准化商品数据表,对原始商品数据进行清洗和标准化处理。';
COMMENT ON COLUMN product_standardized_data.standard_product_id IS '标准化商品ID';
COMMENT ON COLUMN product_standardized_data.product_name IS '商品名称';
COMMENT ON COLUMN product_standardized_data.standard_category IS '标准化商品类别';
COMMENT ON COLUMN product_standardized_data.standard_brand IS '标准化商品品牌';
COMMENT ON COLUMN product_standardized_data.last_update_time IS '最后更新时间';-- 从 product_raw_data 插入数据到 product_standardized_data (血缘关系)
INSERT INTO product_standardized_data (standard_product_id, product_name, standard_category, standard_brand)
SELECTproduct_id,product_name,CASEWHEN category = '电子产品' THEN '数码家电'WHEN category = '穿戴设备' THEN '智能穿戴'ELSE '其他'END AS standard_category,UPPER(brand) AS standard_brand
FROM product_raw_data;
KYUUBI
-- KYUUBI 医疗类表 (通过Hive SQL或Spark SQL)-- 表A: patient_visits_raw (原始患者就诊数据)
CREATE TABLE patient_visits_raw (visit_id INT COMMENT '就诊ID',patient_id STRING COMMENT '患者ID',visit_date DATE COMMENT '就诊日期',department STRING COMMENT '就诊科室',diagnosis STRING COMMENT '初步诊断',doctor_name STRING COMMENT '主治医生姓名'
)
COMMENT '原始患者就诊数据表,记录患者每次就诊的基本信息。'
STORED AS PARQUET;-- 插入示例数据到 patient_visits_raw
-- KYUUBI作为Spark SQL的Server,数据插入通常通过Spark DataFrame API或外部工具
-- 这里提供模拟数据,实际使用时需要通过外部文件加载或ETL工具导入
-- 1,P001,2023-01-15,内科,感冒,李医生
-- 2,P002,2023-01-20,外科,阑尾炎,王医生
-- 3,P001,2023-02-01,内科,支气管炎,李医生
-- 4,P003,2023-02-10,儿科,发烧,张医生
-- 5,P002,2023-03-05,外科,术后复查,王医生-- 表B: patient_summary_data (患者汇总数据)
CREATE TABLE patient_summary_data (patient_pk INT COMMENT '患者主键',patient_id STRING COMMENT '患者ID',first_visit_date DATE COMMENT '首次就诊日期',last_visit_date DATE COMMENT '最近就诊日期',total_visits INT COMMENT '总就诊次数'
)
COMMENT '患者汇总数据表,统计患者的就诊历史概况。'
STORED AS PARQUET;-- 从 patient_visits_raw 插入数据到 patient_summary_data (血缘关系)
INSERT INTO patient_summary_data
SELECTROW_NUMBER() OVER (ORDER BY patient_id) AS patient_pk,patient_id,MIN(visit_date) AS first_visit_date,MAX(visit_date) AS last_visit_date,COUNT(*) AS total_visits
FROM patient_visits_raw
GROUP BY patient_id;
Phoenix
-- Apache Phoenix 船舶类表 (基于HBase的SQL层)-- 表A: ship_position_raw (原始船舶位置数据)
CREATE TABLE ship_position_raw (position_id BIGINT NOT NULL PRIMARY KEY,ship_imo VARCHAR(20) NOT NULL,latitude DECIMAL(10, 7) NOT NULL,longitude DECIMAL(10, 7) NOT NULL,"timestamp" TIMESTAMP NOT NULL,speed DECIMAL(5, 2)
);
-- COMMENT ON TABLE ship_position_raw IS '原始船舶位置数据表,记录船舶的实时或历史位置信息。'; -- Phoenix不支持COMMENT-- 插入示例数据到 ship_position_raw
UPSERT INTO ship_position_raw (position_id, ship_imo, latitude, longitude, "timestamp", speed) VALUES
(1, 'IMO9000001', 34.052235, -118.243683, TO_DATE('2023-08-01 10:00:00', 'yyyy-MM-dd HH:mm:ss'), 15.5),
(2, 'IMO9000002', 33.700000, -118.200000, TO_DATE('2023-08-01 10:05:00', 'yyyy-MM-dd HH:mm:ss'), 12.0),
(3, 'IMO9000001', 34.100000, -118.300000, TO_DATE('2023-08-01 10:15:00', 'yyyy-MM-dd HH:mm:ss'), 16.0),
(4, 'IMO9000003', 25.761681, -80.191788, TO_DATE('2023-08-01 11:00:00', 'yyyy-MM-dd HH:mm:ss'), 10.2),
(5, 'IMO9000002', 33.800000, -118.100000, TO_DATE('2023-08-01 10:20:00', 'yyyy-MM-dd HH:mm:ss'), 13.0);-- 表B: ship_daily_summary (船舶每日汇总数据)
CREATE TABLE ship_daily_summary (summary_id BIGINT NOT NULL PRIMARY KEY,ship_imo VARCHAR(20) NOT NULL,report_date DATE NOT NULL,max_speed DECIMAL(5, 2),avg_speed DECIMAL(5, 2),distance_traveled DECIMAL(10, 2)
);
-- COMMENT ON TABLE ship_daily_summary IS '船舶每日汇总数据表,统计船舶每日的航行概况。'; -- Phoenix不支持COMMENT-- 从 ship_position_raw 插入数据到 ship_daily_summary (血缘关系)
UPSERT INTO ship_daily_summary (summary_id, ship_imo, report_date, max_speed, avg_speed, distance_traveled)
SELECTROW_NUMBER() OVER (ORDER BY ship_imo, TRUNC("timestamp", 'DAY')) AS summary_id,ship_imo,TRUNC("timestamp", 'DAY') AS report_date,MAX(speed) AS max_speed,AVG(speed) AS avg_speed,-- 假设一个简单的距离计算,实际需要更复杂的地理坐标计算SUM(speed * (("timestamp" - LAG("timestamp") OVER (PARTITION BY ship_imo ORDER BY "timestamp")) / 3600000.0)) AS distance_traveled -- 毫秒转小时
FROM ship_position_raw
GROUP BY ship_imo, TRUNC("timestamp", 'DAY');
在现代数据架构中,数据来源日益多样化,从关系型数据库到大数据平台,从实时流数据到静态数据仓库,数据的流动和转换变得复杂而多变。数据血缘(Data Lineage)是理解这些数据流动的关键,它帮助我们追踪数据从源头到最终使用的每一步,确保数据的准确性和可追溯性。
通过本文提供的10个不同数据源的血缘SQL示例,我们展示了如何在多种数据环境中建立和维护数据血缘关系。
希望这些示例能够为你的数据血缘探索之旅提供有价值的参考,帮助你在复杂的数据环境中保持清晰的视野和高效的管理能力。