数据库与大数据技术栈
目录
- 传统关系型数据库基础
- MySQL数据库详解
- Oracle数据库详解
- 大数据技术概述
- Hadoop生态系统
- Hive数据仓库
- Kafka消息队列
- 实战项目与最佳实践
- 学习路径与资源推荐
第一部分:传统关系型数据库基础
1.1 数据库基本概念
什么是数据库?
数据库(Database)是按照数据结构来组织、存储和管理数据的仓库。它是一个长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。
数据库管理系统(DBMS)
- 定义:用于管理数据库的软件系统
- 功能:数据定义、数据操作、数据安全性、数据完整性、并发控制、数据恢复
关系型数据库核心概念
- 表(Table):存储数据的基本单位
- 行(Row):表中的一条记录
- 列(Column):表中的一个字段
- 主键(Primary Key):唯一标识一条记录
- 外键(Foreign Key):建立表之间的关联
- 索引(Index):提高查询效率的数据结构
1.2 SQL语言基础
SQL语言分类
-- DDL(数据定义语言)
CREATE TABLE users (id INT PRIMARY KEY AUTO_INCREMENT,username VARCHAR(50) NOT NULL,email VARCHAR(100) UNIQUE,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- DML(数据操作语言)
INSERT INTO users (username, email) VALUES ('张三', 'zhangsan@email.com');
UPDATE users SET email = 'new@email.com' WHERE id = 1;
DELETE FROM users WHERE id = 1;-- DQL(数据查询语言)
SELECT * FROM users WHERE username LIKE '张%';-- DCL(数据控制语言)
GRANT SELECT, INSERT ON database.* TO 'user'@'localhost';
REVOKE INSERT ON database.* FROM 'user'@'localhost';
1.3 数据库设计原则
三大范式
- 第一范式(1NF):每个字段都是不可分割的原子值
- 第二范式(2NF):满足1NF,非主键字段完全依赖主键
- 第三范式(3NF):满足2NF,非主键字段之间不存在传递依赖
ER图设计
实体-关系图是数据库设计的重要工具,包括:
- 实体(Entity):矩形表示
- 属性(Attribute):椭圆表示
- 关系(Relationship):菱形表示
第二部分:MySQL数据库详解
2.1 MySQL简介与安装
MySQL特点
- 开源免费
- 性能优秀
- 跨平台支持
- 社区活跃
- 支持多种存储引擎
安装配置
# Ubuntu/Debian系统
sudo apt-get update
sudo apt-get install mysql-server# CentOS/RHEL系统
sudo yum install mysql-server# 初始化安全配置
sudo mysql_secure_installation# 登录MySQL
mysql -u root -p
2.2 MySQL存储引擎
InnoDB引擎(默认)
-- 特点:支持事务、行级锁、外键约束
CREATE TABLE orders (order_id INT PRIMARY KEY,customer_id INT,total_amount DECIMAL(10,2),FOREIGN KEY (customer_id) REFERENCES customers(id)
) ENGINE=InnoDB;
MyISAM引擎
-- 特点:不支持事务、表级锁、查询速度快
CREATE TABLE logs (id INT PRIMARY KEY AUTO_INCREMENT,message TEXT,created_at TIMESTAMP
) ENGINE=MyISAM;
2.3 MySQL高级特性
事务处理
START TRANSACTION;UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
UPDATE accounts SET balance = balance + 100 WHERE account_id = 2;-- 如果成功则提交
COMMIT;
-- 如果失败则回滚
ROLLBACK;
索引优化
-- 创建单列索引
CREATE INDEX idx_username ON users(username);-- 创建复合索引
CREATE INDEX idx_name_email ON users(username, email);-- 查看索引使用情况
EXPLAIN SELECT * FROM users WHERE username = 'zhangsan';-- 全文索引(用于文本搜索)
CREATE FULLTEXT INDEX idx_content ON articles(title, content);
存储过程与函数
-- 创建存储过程
DELIMITER $$
CREATE PROCEDURE GetUserOrders(IN userId INT)
BEGINSELECT o.*, u.username FROM orders o JOIN users u ON o.user_id = u.id WHERE u.id = userId;
END$$
DELIMITER ;-- 调用存储过程
CALL GetUserOrders(1);-- 创建函数
DELIMITER $$
CREATE FUNCTION CalculateAge(birthDate DATE)
RETURNS INT
DETERMINISTIC
BEGINRETURN YEAR(CURDATE()) - YEAR(birthDate);
END$$
DELIMITER ;
触发器
-- 创建触发器:自动记录数据变更
CREATE TRIGGER log_user_updates
AFTER UPDATE ON users
FOR EACH ROW
BEGININSERT INTO audit_log (table_name, action, user_id, timestamp)VALUES ('users', 'UPDATE', NEW.id, NOW());
END;
2.4 MySQL性能优化
查询优化技巧
-- 1. 使用LIMIT限制结果集
SELECT * FROM large_table LIMIT 100;-- 2. 避免SELECT *
SELECT id, username, email FROM users;-- 3. 使用JOIN代替子查询
-- 差:
SELECT * FROM orders WHERE user_id IN (SELECT id FROM users WHERE status = 'active');
-- 好:
SELECT o.* FROM orders o JOIN users u ON o.user_id = u.id WHERE u.status = 'active';-- 4. 使用索引覆盖
SELECT username FROM users WHERE username = 'zhangsan'; -- username字段已建立索引
配置优化
# my.cnf配置文件优化
[mysqld]
# 缓冲池大小(通常设置为内存的70-80%)
innodb_buffer_pool_size = 4G# 日志文件大小
innodb_log_file_size = 256M# 最大连接数
max_connections = 500# 查询缓存
query_cache_size = 128M
query_cache_type = 1
第三部分:Oracle数据库详解
3.1 Oracle数据库架构
Oracle体系结构
┌─────────────────────────────────────┐
│ Oracle Instance │
├─────────────────────────────────────┤
│ SGA (System Global Area) │
│ ├─ Shared Pool │
│ ├─ Database Buffer Cache │
│ ├─ Redo Log Buffer │
│ └─ Large Pool / Java Pool │
├─────────────────────────────────────┤
│ Background Processes │
│ ├─ PMON (Process Monitor) │
│ ├─ SMON (System Monitor) │
│ ├─ DBWn (Database Writer) │
│ ├─ LGWR (Log Writer) │
│ └─ CKPT (Checkpoint) │
└─────────────────────────────────────┘
3.2 Oracle基础操作
表空间管理
-- 创建表空间
CREATE TABLESPACE app_data
DATAFILE '/oracle/oradata/app_data01.dbf' SIZE 100M
AUTOEXTEND ON NEXT 10M MAXSIZE UNLIMITED;-- 创建用户并分配表空间
CREATE USER appuser IDENTIFIED BY password123
DEFAULT TABLESPACE app_data
TEMPORARY TABLESPACE temp
QUOTA UNLIMITED ON app_data;-- 授权
GRANT CREATE SESSION, CREATE TABLE, CREATE VIEW TO appuser;
Oracle特有数据类型
-- VARCHAR2:可变长度字符串
CREATE TABLE oracle_example (name VARCHAR2(100),description CLOB, -- 大文本photo BLOB, -- 二进制大对象price NUMBER(10,2), -- 数字类型created_date DATE, -- 日期updated_time TIMESTAMP -- 时间戳
);
3.3 Oracle高级特性
分区表
-- 范围分区
CREATE TABLE sales (sale_id NUMBER,sale_date DATE,amount NUMBER(10,2)
)
PARTITION BY RANGE (sale_date) (PARTITION p_2023 VALUES LESS THAN (DATE '2024-01-01'),PARTITION p_2024 VALUES LESS THAN (DATE '2025-01-01'),PARTITION p_2025 VALUES LESS THAN (MAXVALUE)
);-- 列表分区
CREATE TABLE customers (customer_id NUMBER,region VARCHAR2(20),name VARCHAR2(100)
)
PARTITION BY LIST (region) (PARTITION p_north VALUES ('北京', '天津', '河北'),PARTITION p_south VALUES ('广东', '广西', '海南'),PARTITION p_other VALUES (DEFAULT)
);
PL/SQL编程
-- PL/SQL块结构
DECLAREv_employee_name VARCHAR2(100);v_salary NUMBER;
BEGIN-- 查询员工信息SELECT name, salary INTO v_employee_name, v_salaryFROM employeesWHERE employee_id = 100;-- 条件判断IF v_salary > 10000 THENDBMS_OUTPUT.PUT_LINE(v_employee_name || ' 是高收入员工');ELSEDBMS_OUTPUT.PUT_LINE(v_employee_name || ' 是普通员工');END IF;EXCEPTIONWHEN NO_DATA_FOUND THENDBMS_OUTPUT.PUT_LINE('未找到员工');WHEN OTHERS THENDBMS_OUTPUT.PUT_LINE('发生错误: ' || SQLERRM);
END;
/
Oracle包
-- 创建包规范
CREATE OR REPLACE PACKAGE employee_pkg ISPROCEDURE hire_employee(p_name VARCHAR2, p_salary NUMBER);FUNCTION get_employee_count RETURN NUMBER;
END employee_pkg;
/-- 创建包体
CREATE OR REPLACE PACKAGE BODY employee_pkg ISPROCEDURE hire_employee(p_name VARCHAR2, p_salary NUMBER) ISBEGININSERT INTO employees (name, salary, hire_date)VALUES (p_name, p_salary, SYSDATE);COMMIT;END hire_employee;FUNCTION get_employee_count RETURN NUMBER ISv_count NUMBER;BEGINSELECT COUNT(*) INTO v_count FROM employees;RETURN v_count;END get_employee_count;
END employee_pkg;
/
第四部分:大数据技术概述
4.1 大数据的4V特征
- Volume(容量):数据量巨大,TB、PB级别
- Velocity(速度):数据产生和处理速度快
- Variety(多样性):结构化、半结构化、非结构化数据
- Value(价值):数据价值密度低,但总体价值高
4.2 传统数据处理vs大数据处理
特征 | 传统数据处理 | 大数据处理 |
---|---|---|
数据量 | GB-TB级 | TB-PB级 |
处理方式 | 垂直扩展 | 水平扩展 |
存储方式 | 单机存储 | 分布式存储 |
计算模式 | 单机计算 | 分布式计算 |
数据类型 | 结构化 | 多种类型 |
4.3 大数据技术栈全景图
┌────────────────────────────────────────────────┐
│ 应用层 │
│ BI工具 | 数据可视化 | 机器学习 | 数据挖掘 │
├────────────────────────────────────────────────┤
│ 分析层 │
│ Spark | Flink | Storm | MapReduce │
├────────────────────────────────────────────────┤
│ 处理层 │
│ Hive | Pig | Impala | Presto | SparkSQL │
├────────────────────────────────────────────────┤
│ 传输层 │
│ Kafka | Flume | Sqoop | DataX │
├────────────────────────────────────────────────┤
│ 存储层 │
│ HDFS | HBase | Cassandra | MongoDB │
├────────────────────────────────────────────────┤
│ 资源管理层 │
│ YARN | Mesos | Kubernetes │
└────────────────────────────────────────────────┘
第五部分:Hadoop生态系统
5.1 Hadoop核心组件
HDFS(分布式文件系统)
# HDFS基本命令
# 创建目录
hdfs dfs -mkdir /user/data# 上传文件
hdfs dfs -put local_file.txt /user/data/# 下载文件
hdfs dfs -get /user/data/file.txt local_file.txt# 查看文件内容
hdfs dfs -cat /user/data/file.txt# 删除文件
hdfs dfs -rm /user/data/file.txt
HDFS架构原理
NameNode(主节点)├─ 管理文件系统命名空间├─ 维护文件系统树└─ 记录文件块位置信息DataNode(数据节点)├─ 存储实际数据块├─ 执行读写操作└─ 定期向NameNode汇报文件存储流程:
1. 文件切分成块(默认128MB)
2. 每个块复制多份(默认3份)
3. 分布存储在不同DataNode
5.2 MapReduce编程模型
MapReduce工作原理
// Mapper类:处理输入数据
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] words = line.split(" ");for (String w : words) {word.set(w);context.write(word, one);}}
}// Reducer类:汇总处理结果
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}
}// 主程序
public class WordCount {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(WordCountMapper.class);job.setCombinerClass(WordCountReducer.class);job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
5.3 YARN资源管理
YARN架构
ResourceManager(资源管理器)├─ 调度器(Scheduler)└─ 应用管理器(ApplicationManager)NodeManager(节点管理器)├─ 监控节点资源└─ 管理容器生命周期ApplicationMaster(应用主控)├─ 协调应用执行└─ 申请和释放资源
YARN配置示例
<!-- yarn-site.xml -->
<configuration><property><name>yarn.resourcemanager.hostname</name><value>master</value></property><property><name>yarn.nodemanager.resource.memory-mb</name><value>8192</value></property><property><name>yarn.nodemanager.resource.cpu-vcores</name><value>4</value></property>
</configuration>
第六部分:Hive数据仓库
6.1 Hive简介与架构
Hive特点
- 基于Hadoop的数据仓库工具
- 提供SQL-like查询语言(HiveQL)
- 将结构化数据映射为数据库表
- 底层存储在HDFS,计算使用MapReduce/Spark
Hive架构
用户接口层├─ CLI(命令行)├─ JDBC/ODBC└─ Web UI元数据存储(Metastore)└─ MySQL/Derby(存储表结构信息)执行引擎├─ MapReduce├─ Spark└─ Tez存储层└─ HDFS
6.2 Hive基础操作
建表与数据操作
-- 创建内部表
CREATE TABLE IF NOT EXISTS employees (emp_id INT,emp_name STRING,department STRING,salary DOUBLE,hire_date DATE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;-- 创建外部表(数据在HDFS上)
CREATE EXTERNAL TABLE IF NOT EXISTS logs (ip STRING,request_date STRING,method STRING,url STRING,status INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION '/data/logs';-- 创建分区表
CREATE TABLE sales_data (product_id INT,quantity INT,price DOUBLE
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;-- 加载数据
LOAD DATA LOCAL INPATH '/local/path/data.csv'
INTO TABLE employees;-- 插入分区数据
INSERT INTO TABLE sales_data PARTITION (year=2024, month=1)
SELECT product_id, quantity, price FROM temp_sales
WHERE sale_date BETWEEN '2024-01-01' AND '2024-01-31';
6.3 Hive高级特性
分桶表
-- 创建分桶表
CREATE TABLE bucketed_users (user_id INT,username STRING,email STRING
)
CLUSTERED BY (user_id) INTO 4 BUCKETS
STORED AS ORC;-- 开启分桶
SET hive.enforce.bucketing = true;-- 插入分桶数据
INSERT INTO TABLE bucketed_users
SELECT * FROM users;
窗口函数
-- 排名函数
SELECT department,emp_name,salary,ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank_num,RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank,DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dense_rank
FROM employees;-- 聚合窗口函数
SELECT emp_name,salary,department,AVG(salary) OVER (PARTITION BY department) as dept_avg_salary,SUM(salary) OVER (PARTITION BY department ORDER BY hire_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as cumulative_salary
FROM employees;
UDF(用户自定义函数)
// Java UDF示例
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;public class UpperCaseUDF extends UDF {public Text evaluate(Text input) {if (input == null) return null;return new Text(input.toString().toUpperCase());}
}
-- 注册UDF
ADD JAR /path/to/udf.jar;
CREATE TEMPORARY FUNCTION uppercase AS 'com.example.UpperCaseUDF';-- 使用UDF
SELECT uppercase(emp_name) FROM employees;
6.4 Hive优化技巧
查询优化
-- 1. 使用分区裁剪
SELECT * FROM sales_data
WHERE year = 2024 AND month = 1;-- 2. 使用列裁剪
SELECT emp_id, emp_name FROM employees; -- 只选择需要的列-- 3. Map端Join(小表Join大表)
SET hive.auto.convert.join = true;
SET hive.mapjoin.smalltable.filesize = 25000000;-- 4. 数据压缩
SET hive.exec.compress.output = true;
SET mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;-- 5. 并行执行
SET hive.exec.parallel = true;
SET hive.exec.parallel.thread.number = 8;
第七部分:Kafka消息队列
7.1 Kafka核心概念
基本概念
- Producer:消息生产者
- Consumer:消息消费者
- Broker:Kafka服务器节点
- Topic:消息主题
- Partition:主题分区
- Offset:消息偏移量
- Consumer Group:消费者组
Kafka架构
Producer → Topic(Partitions) → Consumer↓ZooKeeper(元数据管理)
7.2 Kafka安装与配置
安装步骤
# 1. 下载Kafka
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0# 2. 启动ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties# 3. 启动Kafka
bin/kafka-server-start.sh config/server.properties# 4. 创建Topic
bin/kafka-topics.sh --create --topic test-topic \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 1# 5. 查看Topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 6. 查看Topic详情
bin/kafka-topics.sh --describe --topic test-topic \--bootstrap-server localhost:9092
7.3 Kafka编程实战
Java生产者示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {// 配置属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all"); // 等待所有副本确认props.put("retries", 3); // 重试次数// 创建生产者Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 100; i++) {String key = "key-" + i;String value = "message-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);// 异步发送带回调producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {exception.printStackTrace();} else {System.out.printf("Sent message to partition %d with offset %d%n",metadata.partition(), metadata.offset());}}});}producer.close();}
}
Java消费者示例
import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;public class KafkaConsumerExample {public static void main(String[] args) {// 配置属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-consumer-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest"); // 从最早的消息开始消费// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Arrays.asList("test-topic"));// 持续消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s, partition = %d%n",record.offset(), record.key(), record.value(), record.partition());}}}
}
7.4 Kafka高级特性
Kafka Streams
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");// 流处理:词频统计
source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" "))).groupBy((key, word) -> word).count().toStream().to("output-topic");KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Kafka Connect
// 配置文件:file-source-connector.json
{"name": "file-source-connector","config": {"connector.class": "FileStreamSource","tasks.max": "1","file": "/tmp/input.txt","topic": "file-topic","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}
}
# 启动Kafka Connect
bin/connect-standalone.sh config/connect-standalone.properties \config/file-source-connector.properties
7.5 Kafka性能优化
生产者优化
// 批量发送
props.put("batch.size", 16384);
props.put("linger.ms", 10);// 压缩
props.put("compression.type", "snappy");// 缓冲区大小
props.put("buffer.memory", 33554432);
消费者优化
// 批量拉取
props.put("fetch.min.bytes", 1024);
props.put("fetch.max.wait.ms", 500);// 单次拉取最大数据量
props.put("max.poll.records", 500);// Session超时设置
props.put("session.timeout.ms", 30000);
第八部分:实战项目与最佳实践
8.1 项目一:构建实时数据处理管道
项目架构
数据源 → Flume → Kafka → Spark Streaming → HBase → 数据可视化采集 缓冲 实时处理 存储 展示
实现步骤
1. Flume采集配置
# flume-kafka.conf
agent.sources = spooldir-source
agent.channels = memory-channel
agent.sinks = kafka-sink# Source配置
agent.sources.spooldir-source.type = spooldir
agent.sources.spooldir-source.spoolDir = /var/log/flume-spooling
agent.sources.spooldir-source.channels = memory-channel# Channel配置
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 10000
agent.channels.memory-channel.transactionCapacity = 1000# Sink配置
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafka-sink.kafka.topic = log-topic
agent.sinks.kafka-sink.channel = memory-channel
2. Spark Streaming处理
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._val sparkConf = new SparkConf().setAppName("RealTimeAnalysis")
val ssc = new StreamingContext(sparkConf, Seconds(5))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "spark-streaming-group","auto.offset.reset" -> "latest"
)val topics = Array("log-topic")
val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams)
)// 实时处理逻辑
stream.map(record => record.value()).flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).foreachRDD { rdd =>// 存储到HBaserdd.foreachPartition { partition =>val hbaseConnection = HBaseConnection.create()partition.foreach { case (word, count) =>hbaseConnection.put("word_count", word, "count", count.toString)}hbaseConnection.close()}}ssc.start()
ssc.awaitTermination()
8.2 项目二:离线数据仓库建设
数据仓库分层架构
ODS层(操作数据存储)↓ 数据清洗
DWD层(明细数据层)↓ 轻度汇总
DWS层(服务数据层)↓ 主题汇总
ADS层(应用数据层)
实现示例
-- ODS层:原始数据
CREATE EXTERNAL TABLE ods_user_behavior (user_id STRING,item_id STRING,category_id STRING,behavior_type STRING,timestamp BIGINT
)
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/warehouse/ods/user_behavior';-- DWD层:清洗后的明细数据
CREATE TABLE dwd_user_behavior_detail (user_id STRING,item_id STRING,category_id STRING,behavior_type STRING,behavior_time TIMESTAMP,date_id STRING,hour_id INT
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET;-- 数据清洗与转换
INSERT OVERWRITE TABLE dwd_user_behavior_detail PARTITION (dt='2024-01-01')
SELECT user_id,item_id,category_id,behavior_type,from_unixtime(timestamp) as behavior_time,from_unixtime(timestamp, 'yyyy-MM-dd') as date_id,hour(from_unixtime(timestamp)) as hour_id
FROM ods_user_behavior
WHERE dt = '2024-01-01'AND user_id IS NOT NULL;-- DWS层:按主题汇总
CREATE TABLE dws_user_behavior_daily (user_id STRING,pv_count BIGINT,buy_count BIGINT,cart_count BIGINT,fav_count BIGINT
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET;-- 日汇总统计
INSERT OVERWRITE TABLE dws_user_behavior_daily PARTITION (dt='2024-01-01')
SELECT user_id,SUM(CASE WHEN behavior_type = 'pv' THEN 1 ELSE 0 END) as pv_count,SUM(CASE WHEN behavior_type = 'buy' THEN 1 ELSE 0 END) as buy_count,SUM(CASE WHEN behavior_type = 'cart' THEN 1 ELSE 0 END) as cart_count,SUM(CASE WHEN behavior_type = 'fav' THEN 1 ELSE 0 END) as fav_count
FROM dwd_user_behavior_detail
WHERE dt = '2024-01-01'
GROUP BY user_id;-- ADS层:应用层指标
CREATE TABLE ads_user_behavior_stats (stat_date STRING,total_users BIGINT,total_pv BIGINT,total_orders BIGINT,conversion_rate DOUBLE
)
STORED AS PARQUET;-- 计算转化率等核心指标
INSERT OVERWRITE TABLE ads_user_behavior_stats
SELECT '2024-01-01' as stat_date,COUNT(DISTINCT user_id) as total_users,SUM(pv_count) as total_pv,SUM(buy_count) as total_orders,SUM(buy_count) / SUM(pv_count) as conversion_rate
FROM dws_user_behavior_daily
WHERE dt = '2024-01-01';
8.3 性能调优最佳实践
Hadoop集群调优
<!-- hdfs-site.xml -->
<configuration><!-- 增加副本数提高可靠性 --><property><name>dfs.replication</name><value>3</value></property><!-- 增大块大小减少NameNode压力 --><property><name>dfs.blocksize</name><value>268435456</value> <!-- 256MB --></property><!-- 开启短路读取 --><property><name>dfs.client.read.shortcircuit</name><value>true</value></property>
</configuration>
Hive调优参数
-- 开启动态分区
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;-- 开启Map端聚合
SET hive.map.aggr = true;-- 控制Reducer数量
SET mapreduce.job.reduces = 10;-- 开启向量化执行
SET hive.vectorized.execution.enabled = true;
SET hive.vectorized.execution.reduce.enabled = true;-- 使用CBO优化器
SET hive.cbo.enable = true;
SET hive.compute.query.using.stats = true;
SET hive.stats.fetch.column.stats = true;
Kafka集群调优
# server.properties
# 增加网络线程数
num.network.threads=8# 增加IO线程数
num.io.threads=16# 日志段大小
log.segment.bytes=1073741824 # 1GB# 日志保留时间
log.retention.hours=168 # 7天# 增加socket缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
第九部分:学习路径与资源推荐
9.1 学习路径规划
初级阶段(1-3个月)
-
数据库基础
- SQL语言基础
- 关系型数据库概念
- MySQL基础操作
-
Linux基础
- 基本命令操作
- Shell脚本编写
- 系统管理基础
-
编程基础
- Java/Python基础语法
- 面向对象编程
- 基础数据结构
中级阶段(3-6个月)
-
Hadoop生态系统
- HDFS原理与实践
- MapReduce编程
- YARN资源管理
-
数据仓库技术
- Hive深入学习
- 数据建模理论
- ETL流程设计
-
实时处理技术
- Kafka原理与应用
- Spark基础
- 流处理概念
高级阶段(6-12个月)
-
性能优化
- SQL优化技巧
- 大数据作业调优
- 系统架构优化
-
项目实战
- 完整项目开发
- 故障排查处理
- 生产环境部署
-
新技术探索
- Flink实时计算
- 数据湖技术
- 云原生大数据
9.2 推荐学习资源
在线课程平台
- Coursera: 大数据专业课程
- Udacity: 数据工程纳米学位
- 极客时间: 专栏课程
- 慕课网: 实战项目课程
经典书籍推荐
数据库类
- 《MySQL必知必会》- 入门经典
- 《高性能MySQL》- 进阶必读
- 《Oracle Database 12c DBA指南》- Oracle权威指南
- 《数据库系统概念》- 理论基础
大数据类
- 《Hadoop权威指南》- Hadoop圣经
- 《Spark快速大数据分析》- Spark入门
- 《Kafka权威指南》- Kafka深度解析
- 《数据密集型应用系统设计》- 架构思维
实践平台
- 阿里云大数据平台: MaxCompute, E-MapReduce
- AWS大数据服务: EMR, Redshift, Kinesis
- 本地虚拟机集群: VMware + CentOS搭建
社区与论坛
- Stack Overflow: 技术问题解答
- GitHub: 开源项目学习
- Apache官网: 官方文档
- InfoQ: 技术文章与案例
9.3 认证考试
数据库认证
- MySQL: Oracle Certified MySQL Database Administrator
- Oracle: Oracle Database SQL Certified Associate
- MongoDB: MongoDB Certified Developer
大数据认证
- Cloudera: CCA Spark and Hadoop Developer
- Databricks: Apache Spark Developer Certification
- AWS: AWS Certified Big Data - Specialty
9.4 职业发展路径
初级数据工程师(0-2年)↓
中级数据工程师(2-5年)↓
高级数据工程师(5-8年)↓
数据架构师 / 技术专家(8年+)
技能要求演进
级别 | 核心技能 | 软技能 |
---|---|---|
初级 | SQL、基础编程、Linux | 学习能力、执行力 |
中级 | 大数据框架、性能优化 | 问题解决、团队协作 |
高级 | 架构设计、技术选型 | 项目管理、技术影响力 |
架构师 | 系统设计、技术战略 | 领导力、业务理解 |
9.5 持续学习建议
-
关注技术趋势
- 实时计算的演进
- 数据湖与数据仓库融合
- 云原生数据平台
- AI与大数据结合
-
参与开源项目
- 贡献代码
- 提交Issue
- 编写文档
- 分享经验
-
建立技术博客
- 记录学习历程
- 分享项目经验
- 总结最佳实践
- 建立个人品牌
-
参加技术会议
- Hadoop Summit
- Spark Summit
- Kafka Summit
- 国内技术大会
总结
本教程从传统关系型数据库(MySQL、Oracle)到现代大数据技术栈(Hadoop、Hive、Kafka)进行了全面介绍,涵盖了:
- 基础知识:数据库原理、SQL语言、数据建模
- 核心技术:分布式存储、批处理、流处理
- 实战项目:数据管道、数据仓库建设
- 优化技巧:性能调优、最佳实践
- 学习路径:从入门到精通的完整规划
学习大数据技术是一个循序渐进的过程,需要:
- 扎实的基础知识
- 大量的实践经验
- 持续的学习更新
- 解决问题的能力