当前位置: 首页 > news >正文

数据库与大数据技术栈

目录

  1. 传统关系型数据库基础
  2. MySQL数据库详解
  3. Oracle数据库详解
  4. 大数据技术概述
  5. Hadoop生态系统
  6. Hive数据仓库
  7. Kafka消息队列
  8. 实战项目与最佳实践
  9. 学习路径与资源推荐

第一部分:传统关系型数据库基础

1.1 数据库基本概念

什么是数据库?

数据库(Database)是按照数据结构来组织、存储和管理数据的仓库。它是一个长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。

数据库管理系统(DBMS)
  • 定义:用于管理数据库的软件系统
  • 功能:数据定义、数据操作、数据安全性、数据完整性、并发控制、数据恢复
关系型数据库核心概念
  1. 表(Table):存储数据的基本单位
  2. 行(Row):表中的一条记录
  3. 列(Column):表中的一个字段
  4. 主键(Primary Key):唯一标识一条记录
  5. 外键(Foreign Key):建立表之间的关联
  6. 索引(Index):提高查询效率的数据结构

1.2 SQL语言基础

SQL语言分类
-- DDL(数据定义语言)
CREATE TABLE users (id INT PRIMARY KEY AUTO_INCREMENT,username VARCHAR(50) NOT NULL,email VARCHAR(100) UNIQUE,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- DML(数据操作语言)
INSERT INTO users (username, email) VALUES ('张三', 'zhangsan@email.com');
UPDATE users SET email = 'new@email.com' WHERE id = 1;
DELETE FROM users WHERE id = 1;-- DQL(数据查询语言)
SELECT * FROM users WHERE username LIKE '张%';-- DCL(数据控制语言)
GRANT SELECT, INSERT ON database.* TO 'user'@'localhost';
REVOKE INSERT ON database.* FROM 'user'@'localhost';

1.3 数据库设计原则

三大范式
  1. 第一范式(1NF):每个字段都是不可分割的原子值
  2. 第二范式(2NF):满足1NF,非主键字段完全依赖主键
  3. 第三范式(3NF):满足2NF,非主键字段之间不存在传递依赖
ER图设计

实体-关系图是数据库设计的重要工具,包括:

  • 实体(Entity):矩形表示
  • 属性(Attribute):椭圆表示
  • 关系(Relationship):菱形表示

第二部分:MySQL数据库详解

2.1 MySQL简介与安装

MySQL特点
  • 开源免费
  • 性能优秀
  • 跨平台支持
  • 社区活跃
  • 支持多种存储引擎
安装配置
# Ubuntu/Debian系统
sudo apt-get update
sudo apt-get install mysql-server# CentOS/RHEL系统
sudo yum install mysql-server# 初始化安全配置
sudo mysql_secure_installation# 登录MySQL
mysql -u root -p

2.2 MySQL存储引擎

InnoDB引擎(默认)
-- 特点:支持事务、行级锁、外键约束
CREATE TABLE orders (order_id INT PRIMARY KEY,customer_id INT,total_amount DECIMAL(10,2),FOREIGN KEY (customer_id) REFERENCES customers(id)
) ENGINE=InnoDB;
MyISAM引擎
-- 特点:不支持事务、表级锁、查询速度快
CREATE TABLE logs (id INT PRIMARY KEY AUTO_INCREMENT,message TEXT,created_at TIMESTAMP
) ENGINE=MyISAM;

2.3 MySQL高级特性

事务处理
START TRANSACTION;UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
UPDATE accounts SET balance = balance + 100 WHERE account_id = 2;-- 如果成功则提交
COMMIT;
-- 如果失败则回滚
ROLLBACK;
索引优化
-- 创建单列索引
CREATE INDEX idx_username ON users(username);-- 创建复合索引
CREATE INDEX idx_name_email ON users(username, email);-- 查看索引使用情况
EXPLAIN SELECT * FROM users WHERE username = 'zhangsan';-- 全文索引(用于文本搜索)
CREATE FULLTEXT INDEX idx_content ON articles(title, content);
存储过程与函数
-- 创建存储过程
DELIMITER $$
CREATE PROCEDURE GetUserOrders(IN userId INT)
BEGINSELECT o.*, u.username FROM orders o JOIN users u ON o.user_id = u.id WHERE u.id = userId;
END$$
DELIMITER ;-- 调用存储过程
CALL GetUserOrders(1);-- 创建函数
DELIMITER $$
CREATE FUNCTION CalculateAge(birthDate DATE) 
RETURNS INT
DETERMINISTIC
BEGINRETURN YEAR(CURDATE()) - YEAR(birthDate);
END$$
DELIMITER ;
触发器
-- 创建触发器:自动记录数据变更
CREATE TRIGGER log_user_updates
AFTER UPDATE ON users
FOR EACH ROW
BEGININSERT INTO audit_log (table_name, action, user_id, timestamp)VALUES ('users', 'UPDATE', NEW.id, NOW());
END;

2.4 MySQL性能优化

查询优化技巧
-- 1. 使用LIMIT限制结果集
SELECT * FROM large_table LIMIT 100;-- 2. 避免SELECT *
SELECT id, username, email FROM users;-- 3. 使用JOIN代替子查询
-- 差:
SELECT * FROM orders WHERE user_id IN (SELECT id FROM users WHERE status = 'active');
-- 好:
SELECT o.* FROM orders o JOIN users u ON o.user_id = u.id WHERE u.status = 'active';-- 4. 使用索引覆盖
SELECT username FROM users WHERE username = 'zhangsan'; -- username字段已建立索引
配置优化
# my.cnf配置文件优化
[mysqld]
# 缓冲池大小(通常设置为内存的70-80%)
innodb_buffer_pool_size = 4G# 日志文件大小
innodb_log_file_size = 256M# 最大连接数
max_connections = 500# 查询缓存
query_cache_size = 128M
query_cache_type = 1

第三部分:Oracle数据库详解

3.1 Oracle数据库架构

Oracle体系结构
┌─────────────────────────────────────┐
│         Oracle Instance              │
├─────────────────────────────────────┤
│  SGA (System Global Area)            │
│  ├─ Shared Pool                     │
│  ├─ Database Buffer Cache           │
│  ├─ Redo Log Buffer                 │
│  └─ Large Pool / Java Pool          │
├─────────────────────────────────────┤
│  Background Processes                │
│  ├─ PMON (Process Monitor)          │
│  ├─ SMON (System Monitor)           │
│  ├─ DBWn (Database Writer)          │
│  ├─ LGWR (Log Writer)               │
│  └─ CKPT (Checkpoint)               │
└─────────────────────────────────────┘

3.2 Oracle基础操作

表空间管理
-- 创建表空间
CREATE TABLESPACE app_data
DATAFILE '/oracle/oradata/app_data01.dbf' SIZE 100M
AUTOEXTEND ON NEXT 10M MAXSIZE UNLIMITED;-- 创建用户并分配表空间
CREATE USER appuser IDENTIFIED BY password123
DEFAULT TABLESPACE app_data
TEMPORARY TABLESPACE temp
QUOTA UNLIMITED ON app_data;-- 授权
GRANT CREATE SESSION, CREATE TABLE, CREATE VIEW TO appuser;
Oracle特有数据类型
-- VARCHAR2:可变长度字符串
CREATE TABLE oracle_example (name VARCHAR2(100),description CLOB,           -- 大文本photo BLOB,                 -- 二进制大对象price NUMBER(10,2),         -- 数字类型created_date DATE,          -- 日期updated_time TIMESTAMP      -- 时间戳
);

3.3 Oracle高级特性

分区表
-- 范围分区
CREATE TABLE sales (sale_id NUMBER,sale_date DATE,amount NUMBER(10,2)
)
PARTITION BY RANGE (sale_date) (PARTITION p_2023 VALUES LESS THAN (DATE '2024-01-01'),PARTITION p_2024 VALUES LESS THAN (DATE '2025-01-01'),PARTITION p_2025 VALUES LESS THAN (MAXVALUE)
);-- 列表分区
CREATE TABLE customers (customer_id NUMBER,region VARCHAR2(20),name VARCHAR2(100)
)
PARTITION BY LIST (region) (PARTITION p_north VALUES ('北京', '天津', '河北'),PARTITION p_south VALUES ('广东', '广西', '海南'),PARTITION p_other VALUES (DEFAULT)
);
PL/SQL编程
-- PL/SQL块结构
DECLAREv_employee_name VARCHAR2(100);v_salary NUMBER;
BEGIN-- 查询员工信息SELECT name, salary INTO v_employee_name, v_salaryFROM employeesWHERE employee_id = 100;-- 条件判断IF v_salary > 10000 THENDBMS_OUTPUT.PUT_LINE(v_employee_name || ' 是高收入员工');ELSEDBMS_OUTPUT.PUT_LINE(v_employee_name || ' 是普通员工');END IF;EXCEPTIONWHEN NO_DATA_FOUND THENDBMS_OUTPUT.PUT_LINE('未找到员工');WHEN OTHERS THENDBMS_OUTPUT.PUT_LINE('发生错误: ' || SQLERRM);
END;
/
Oracle包
-- 创建包规范
CREATE OR REPLACE PACKAGE employee_pkg ISPROCEDURE hire_employee(p_name VARCHAR2, p_salary NUMBER);FUNCTION get_employee_count RETURN NUMBER;
END employee_pkg;
/-- 创建包体
CREATE OR REPLACE PACKAGE BODY employee_pkg ISPROCEDURE hire_employee(p_name VARCHAR2, p_salary NUMBER) ISBEGININSERT INTO employees (name, salary, hire_date)VALUES (p_name, p_salary, SYSDATE);COMMIT;END hire_employee;FUNCTION get_employee_count RETURN NUMBER ISv_count NUMBER;BEGINSELECT COUNT(*) INTO v_count FROM employees;RETURN v_count;END get_employee_count;
END employee_pkg;
/

第四部分:大数据技术概述

4.1 大数据的4V特征

  1. Volume(容量):数据量巨大,TB、PB级别
  2. Velocity(速度):数据产生和处理速度快
  3. Variety(多样性):结构化、半结构化、非结构化数据
  4. Value(价值):数据价值密度低,但总体价值高

4.2 传统数据处理vs大数据处理

特征传统数据处理大数据处理
数据量GB-TB级TB-PB级
处理方式垂直扩展水平扩展
存储方式单机存储分布式存储
计算模式单机计算分布式计算
数据类型结构化多种类型

4.3 大数据技术栈全景图

┌────────────────────────────────────────────────┐
│              应用层                             │
│   BI工具 | 数据可视化 | 机器学习 | 数据挖掘      │
├────────────────────────────────────────────────┤
│              分析层                             │
│   Spark | Flink | Storm | MapReduce            │
├────────────────────────────────────────────────┤
│              处理层                             │
│   Hive | Pig | Impala | Presto | SparkSQL      │
├────────────────────────────────────────────────┤
│              传输层                             │
│   Kafka | Flume | Sqoop | DataX                │
├────────────────────────────────────────────────┤
│              存储层                             │
│   HDFS | HBase | Cassandra | MongoDB           │
├────────────────────────────────────────────────┤
│              资源管理层                          │
│   YARN | Mesos | Kubernetes                    │
└────────────────────────────────────────────────┘

第五部分:Hadoop生态系统

5.1 Hadoop核心组件

HDFS(分布式文件系统)
# HDFS基本命令
# 创建目录
hdfs dfs -mkdir /user/data# 上传文件
hdfs dfs -put local_file.txt /user/data/# 下载文件
hdfs dfs -get /user/data/file.txt local_file.txt# 查看文件内容
hdfs dfs -cat /user/data/file.txt# 删除文件
hdfs dfs -rm /user/data/file.txt
HDFS架构原理
NameNode(主节点)├─ 管理文件系统命名空间├─ 维护文件系统树└─ 记录文件块位置信息DataNode(数据节点)├─ 存储实际数据块├─ 执行读写操作└─ 定期向NameNode汇报文件存储流程:
1. 文件切分成块(默认128MB)
2. 每个块复制多份(默认3份)
3. 分布存储在不同DataNode

5.2 MapReduce编程模型

MapReduce工作原理
// Mapper类:处理输入数据
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] words = line.split(" ");for (String w : words) {word.set(w);context.write(word, one);}}
}// Reducer类:汇总处理结果
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}
}// 主程序
public class WordCount {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(WordCountMapper.class);job.setCombinerClass(WordCountReducer.class);job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

5.3 YARN资源管理

YARN架构
ResourceManager(资源管理器)├─ 调度器(Scheduler)└─ 应用管理器(ApplicationManager)NodeManager(节点管理器)├─ 监控节点资源└─ 管理容器生命周期ApplicationMaster(应用主控)├─ 协调应用执行└─ 申请和释放资源
YARN配置示例
<!-- yarn-site.xml -->
<configuration><property><name>yarn.resourcemanager.hostname</name><value>master</value></property><property><name>yarn.nodemanager.resource.memory-mb</name><value>8192</value></property><property><name>yarn.nodemanager.resource.cpu-vcores</name><value>4</value></property>
</configuration>

第六部分:Hive数据仓库

6.1 Hive简介与架构

Hive特点
  • 基于Hadoop的数据仓库工具
  • 提供SQL-like查询语言(HiveQL)
  • 将结构化数据映射为数据库表
  • 底层存储在HDFS,计算使用MapReduce/Spark
Hive架构
用户接口层├─ CLI(命令行)├─ JDBC/ODBC└─ Web UI元数据存储(Metastore)└─ MySQL/Derby(存储表结构信息)执行引擎├─ MapReduce├─ Spark└─ Tez存储层└─ HDFS

6.2 Hive基础操作

建表与数据操作
-- 创建内部表
CREATE TABLE IF NOT EXISTS employees (emp_id INT,emp_name STRING,department STRING,salary DOUBLE,hire_date DATE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;-- 创建外部表(数据在HDFS上)
CREATE EXTERNAL TABLE IF NOT EXISTS logs (ip STRING,request_date STRING,method STRING,url STRING,status INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION '/data/logs';-- 创建分区表
CREATE TABLE sales_data (product_id INT,quantity INT,price DOUBLE
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;-- 加载数据
LOAD DATA LOCAL INPATH '/local/path/data.csv' 
INTO TABLE employees;-- 插入分区数据
INSERT INTO TABLE sales_data PARTITION (year=2024, month=1)
SELECT product_id, quantity, price FROM temp_sales
WHERE sale_date BETWEEN '2024-01-01' AND '2024-01-31';

6.3 Hive高级特性

分桶表
-- 创建分桶表
CREATE TABLE bucketed_users (user_id INT,username STRING,email STRING
)
CLUSTERED BY (user_id) INTO 4 BUCKETS
STORED AS ORC;-- 开启分桶
SET hive.enforce.bucketing = true;-- 插入分桶数据
INSERT INTO TABLE bucketed_users
SELECT * FROM users;
窗口函数
-- 排名函数
SELECT department,emp_name,salary,ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank_num,RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank,DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dense_rank
FROM employees;-- 聚合窗口函数
SELECT emp_name,salary,department,AVG(salary) OVER (PARTITION BY department) as dept_avg_salary,SUM(salary) OVER (PARTITION BY department ORDER BY hire_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as cumulative_salary
FROM employees;
UDF(用户自定义函数)
// Java UDF示例
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;public class UpperCaseUDF extends UDF {public Text evaluate(Text input) {if (input == null) return null;return new Text(input.toString().toUpperCase());}
}
-- 注册UDF
ADD JAR /path/to/udf.jar;
CREATE TEMPORARY FUNCTION uppercase AS 'com.example.UpperCaseUDF';-- 使用UDF
SELECT uppercase(emp_name) FROM employees;

6.4 Hive优化技巧

查询优化
-- 1. 使用分区裁剪
SELECT * FROM sales_data 
WHERE year = 2024 AND month = 1;-- 2. 使用列裁剪
SELECT emp_id, emp_name FROM employees;  -- 只选择需要的列-- 3. Map端Join(小表Join大表)
SET hive.auto.convert.join = true;
SET hive.mapjoin.smalltable.filesize = 25000000;-- 4. 数据压缩
SET hive.exec.compress.output = true;
SET mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;-- 5. 并行执行
SET hive.exec.parallel = true;
SET hive.exec.parallel.thread.number = 8;

第七部分:Kafka消息队列

7.1 Kafka核心概念

基本概念
  • Producer:消息生产者
  • Consumer:消息消费者
  • Broker:Kafka服务器节点
  • Topic:消息主题
  • Partition:主题分区
  • Offset:消息偏移量
  • Consumer Group:消费者组
Kafka架构
Producer → Topic(Partitions) → Consumer↓ZooKeeper(元数据管理)

7.2 Kafka安装与配置

安装步骤
# 1. 下载Kafka
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0# 2. 启动ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties# 3. 启动Kafka
bin/kafka-server-start.sh config/server.properties# 4. 创建Topic
bin/kafka-topics.sh --create --topic test-topic \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 1# 5. 查看Topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 6. 查看Topic详情
bin/kafka-topics.sh --describe --topic test-topic \--bootstrap-server localhost:9092

7.3 Kafka编程实战

Java生产者示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {// 配置属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");  // 等待所有副本确认props.put("retries", 3);    // 重试次数// 创建生产者Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 100; i++) {String key = "key-" + i;String value = "message-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);// 异步发送带回调producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {exception.printStackTrace();} else {System.out.printf("Sent message to partition %d with offset %d%n",metadata.partition(), metadata.offset());}}});}producer.close();}
}
Java消费者示例
import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;public class KafkaConsumerExample {public static void main(String[] args) {// 配置属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-consumer-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest");  // 从最早的消息开始消费// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Arrays.asList("test-topic"));// 持续消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s, partition = %d%n",record.offset(), record.key(), record.value(), record.partition());}}}
}

7.4 Kafka高级特性

Kafka Streams
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");// 流处理:词频统计
source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" "))).groupBy((key, word) -> word).count().toStream().to("output-topic");KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Kafka Connect
// 配置文件:file-source-connector.json
{"name": "file-source-connector","config": {"connector.class": "FileStreamSource","tasks.max": "1","file": "/tmp/input.txt","topic": "file-topic","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}
}
# 启动Kafka Connect
bin/connect-standalone.sh config/connect-standalone.properties \config/file-source-connector.properties

7.5 Kafka性能优化

生产者优化
// 批量发送
props.put("batch.size", 16384);
props.put("linger.ms", 10);// 压缩
props.put("compression.type", "snappy");// 缓冲区大小
props.put("buffer.memory", 33554432);
消费者优化
// 批量拉取
props.put("fetch.min.bytes", 1024);
props.put("fetch.max.wait.ms", 500);// 单次拉取最大数据量
props.put("max.poll.records", 500);// Session超时设置
props.put("session.timeout.ms", 30000);

第八部分:实战项目与最佳实践

8.1 项目一:构建实时数据处理管道

项目架构
数据源 → Flume → Kafka → Spark Streaming → HBase → 数据可视化采集     缓冲        实时处理        存储       展示
实现步骤

1. Flume采集配置

# flume-kafka.conf
agent.sources = spooldir-source
agent.channels = memory-channel
agent.sinks = kafka-sink# Source配置
agent.sources.spooldir-source.type = spooldir
agent.sources.spooldir-source.spoolDir = /var/log/flume-spooling
agent.sources.spooldir-source.channels = memory-channel# Channel配置
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 10000
agent.channels.memory-channel.transactionCapacity = 1000# Sink配置
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafka-sink.kafka.topic = log-topic
agent.sinks.kafka-sink.channel = memory-channel

2. Spark Streaming处理

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._val sparkConf = new SparkConf().setAppName("RealTimeAnalysis")
val ssc = new StreamingContext(sparkConf, Seconds(5))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "spark-streaming-group","auto.offset.reset" -> "latest"
)val topics = Array("log-topic")
val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams)
)// 实时处理逻辑
stream.map(record => record.value()).flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).foreachRDD { rdd =>// 存储到HBaserdd.foreachPartition { partition =>val hbaseConnection = HBaseConnection.create()partition.foreach { case (word, count) =>hbaseConnection.put("word_count", word, "count", count.toString)}hbaseConnection.close()}}ssc.start()
ssc.awaitTermination()

8.2 项目二:离线数据仓库建设

数据仓库分层架构
ODS层(操作数据存储)↓ 数据清洗
DWD层(明细数据层)↓ 轻度汇总
DWS层(服务数据层)↓ 主题汇总
ADS层(应用数据层)
实现示例
-- ODS层:原始数据
CREATE EXTERNAL TABLE ods_user_behavior (user_id STRING,item_id STRING,category_id STRING,behavior_type STRING,timestamp BIGINT
)
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/warehouse/ods/user_behavior';-- DWD层:清洗后的明细数据
CREATE TABLE dwd_user_behavior_detail (user_id STRING,item_id STRING,category_id STRING,behavior_type STRING,behavior_time TIMESTAMP,date_id STRING,hour_id INT
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET;-- 数据清洗与转换
INSERT OVERWRITE TABLE dwd_user_behavior_detail PARTITION (dt='2024-01-01')
SELECT user_id,item_id,category_id,behavior_type,from_unixtime(timestamp) as behavior_time,from_unixtime(timestamp, 'yyyy-MM-dd') as date_id,hour(from_unixtime(timestamp)) as hour_id
FROM ods_user_behavior
WHERE dt = '2024-01-01'AND user_id IS NOT NULL;-- DWS层:按主题汇总
CREATE TABLE dws_user_behavior_daily (user_id STRING,pv_count BIGINT,buy_count BIGINT,cart_count BIGINT,fav_count BIGINT
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET;-- 日汇总统计
INSERT OVERWRITE TABLE dws_user_behavior_daily PARTITION (dt='2024-01-01')
SELECT user_id,SUM(CASE WHEN behavior_type = 'pv' THEN 1 ELSE 0 END) as pv_count,SUM(CASE WHEN behavior_type = 'buy' THEN 1 ELSE 0 END) as buy_count,SUM(CASE WHEN behavior_type = 'cart' THEN 1 ELSE 0 END) as cart_count,SUM(CASE WHEN behavior_type = 'fav' THEN 1 ELSE 0 END) as fav_count
FROM dwd_user_behavior_detail
WHERE dt = '2024-01-01'
GROUP BY user_id;-- ADS层:应用层指标
CREATE TABLE ads_user_behavior_stats (stat_date STRING,total_users BIGINT,total_pv BIGINT,total_orders BIGINT,conversion_rate DOUBLE
)
STORED AS PARQUET;-- 计算转化率等核心指标
INSERT OVERWRITE TABLE ads_user_behavior_stats
SELECT '2024-01-01' as stat_date,COUNT(DISTINCT user_id) as total_users,SUM(pv_count) as total_pv,SUM(buy_count) as total_orders,SUM(buy_count) / SUM(pv_count) as conversion_rate
FROM dws_user_behavior_daily
WHERE dt = '2024-01-01';

8.3 性能调优最佳实践

Hadoop集群调优
<!-- hdfs-site.xml -->
<configuration><!-- 增加副本数提高可靠性 --><property><name>dfs.replication</name><value>3</value></property><!-- 增大块大小减少NameNode压力 --><property><name>dfs.blocksize</name><value>268435456</value> <!-- 256MB --></property><!-- 开启短路读取 --><property><name>dfs.client.read.shortcircuit</name><value>true</value></property>
</configuration>
Hive调优参数
-- 开启动态分区
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;-- 开启Map端聚合
SET hive.map.aggr = true;-- 控制Reducer数量
SET mapreduce.job.reduces = 10;-- 开启向量化执行
SET hive.vectorized.execution.enabled = true;
SET hive.vectorized.execution.reduce.enabled = true;-- 使用CBO优化器
SET hive.cbo.enable = true;
SET hive.compute.query.using.stats = true;
SET hive.stats.fetch.column.stats = true;
Kafka集群调优
# server.properties
# 增加网络线程数
num.network.threads=8# 增加IO线程数
num.io.threads=16# 日志段大小
log.segment.bytes=1073741824  # 1GB# 日志保留时间
log.retention.hours=168  # 7天# 增加socket缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

第九部分:学习路径与资源推荐

9.1 学习路径规划

初级阶段(1-3个月)
  1. 数据库基础

    • SQL语言基础
    • 关系型数据库概念
    • MySQL基础操作
  2. Linux基础

    • 基本命令操作
    • Shell脚本编写
    • 系统管理基础
  3. 编程基础

    • Java/Python基础语法
    • 面向对象编程
    • 基础数据结构
中级阶段(3-6个月)
  1. Hadoop生态系统

    • HDFS原理与实践
    • MapReduce编程
    • YARN资源管理
  2. 数据仓库技术

    • Hive深入学习
    • 数据建模理论
    • ETL流程设计
  3. 实时处理技术

    • Kafka原理与应用
    • Spark基础
    • 流处理概念
高级阶段(6-12个月)
  1. 性能优化

    • SQL优化技巧
    • 大数据作业调优
    • 系统架构优化
  2. 项目实战

    • 完整项目开发
    • 故障排查处理
    • 生产环境部署
  3. 新技术探索

    • Flink实时计算
    • 数据湖技术
    • 云原生大数据

9.2 推荐学习资源

在线课程平台
  • Coursera: 大数据专业课程
  • Udacity: 数据工程纳米学位
  • 极客时间: 专栏课程
  • 慕课网: 实战项目课程
经典书籍推荐

数据库类

  • 《MySQL必知必会》- 入门经典
  • 《高性能MySQL》- 进阶必读
  • 《Oracle Database 12c DBA指南》- Oracle权威指南
  • 《数据库系统概念》- 理论基础

大数据类

  • 《Hadoop权威指南》- Hadoop圣经
  • 《Spark快速大数据分析》- Spark入门
  • 《Kafka权威指南》- Kafka深度解析
  • 《数据密集型应用系统设计》- 架构思维
实践平台
  • 阿里云大数据平台: MaxCompute, E-MapReduce
  • AWS大数据服务: EMR, Redshift, Kinesis
  • 本地虚拟机集群: VMware + CentOS搭建
社区与论坛
  • Stack Overflow: 技术问题解答
  • GitHub: 开源项目学习
  • Apache官网: 官方文档
  • InfoQ: 技术文章与案例

9.3 认证考试

数据库认证
  • MySQL: Oracle Certified MySQL Database Administrator
  • Oracle: Oracle Database SQL Certified Associate
  • MongoDB: MongoDB Certified Developer
大数据认证
  • Cloudera: CCA Spark and Hadoop Developer
  • Databricks: Apache Spark Developer Certification
  • AWS: AWS Certified Big Data - Specialty

9.4 职业发展路径

初级数据工程师(0-2年)↓
中级数据工程师(2-5年)↓
高级数据工程师(5-8年)↓
数据架构师 / 技术专家(8年+)
技能要求演进
级别核心技能软技能
初级SQL、基础编程、Linux学习能力、执行力
中级大数据框架、性能优化问题解决、团队协作
高级架构设计、技术选型项目管理、技术影响力
架构师系统设计、技术战略领导力、业务理解

9.5 持续学习建议

  1. 关注技术趋势

    • 实时计算的演进
    • 数据湖与数据仓库融合
    • 云原生数据平台
    • AI与大数据结合
  2. 参与开源项目

    • 贡献代码
    • 提交Issue
    • 编写文档
    • 分享经验
  3. 建立技术博客

    • 记录学习历程
    • 分享项目经验
    • 总结最佳实践
    • 建立个人品牌
  4. 参加技术会议

    • Hadoop Summit
    • Spark Summit
    • Kafka Summit
    • 国内技术大会

总结

本教程从传统关系型数据库(MySQL、Oracle)到现代大数据技术栈(Hadoop、Hive、Kafka)进行了全面介绍,涵盖了:

  1. 基础知识:数据库原理、SQL语言、数据建模
  2. 核心技术:分布式存储、批处理、流处理
  3. 实战项目:数据管道、数据仓库建设
  4. 优化技巧:性能调优、最佳实践
  5. 学习路径:从入门到精通的完整规划

学习大数据技术是一个循序渐进的过程,需要:

  • 扎实的基础知识
  • 大量的实践经验
  • 持续的学习更新
  • 解决问题的能力
http://www.dtcms.com/a/365721.html

相关文章:

  • ElasticSearch倒排索引原理
  • redis中五大数据类型的操作命令
  • 编程基础-eclipse创建第一个程序
  • 【开题答辩全过程】以 基于java的隔离酒店管理系统设计与开发为例,包含答辩的问题和答案
  • 线程通信机制
  • 记录一下node后端写下载https的文件报错,而浏览器却可以下载。
  • 开源与闭源的再对决:从Grok到中国力量,AI生态走向何方?
  • 并发编程指南 同步操作与强制排序
  • Claude Code初体验:让AI成为你的结对程序员
  • Linux学习——管理基本存储(十八)
  • A股大盘数据-2025093分析
  • Provider中的watch、read、Consumer、ChangeNotifierProvider、ValueNotifierProvider
  • 信息融智学=信息哲学+信息科学+信息技术+信息系统工程+信息处理之智
  • 数据库选择有讲究?SQLite、PostgreSQL还是MySQL?
  • 全渠道 + 低代码:如何打造 “内外协同” 的客服管理系统体系?
  • http和https区别是什么
  • docker 安装 redis 并设置 volumes 并修改 修改密码(三)
  • 【TypeScript】事件循环
  • k8s的SidecarSet配置和initContainers
  • 《四川棒球知识百科》球速最快的运动之一·棒球1号位
  • Omi录屏专家 Screen Recorder Mac中文
  • 如何在私域运营中快速建立信任,三招解决你的烦恼!
  • linux---------------网络基础概念
  • 【IQA技术专题】 无参考自然图像IQA:NIQE
  • 审核问题——一个关于版本号的乌龙事件
  • Elasticsearch面试精讲 Day 6:Query DSL查询语法详解
  • 2025年9月,十大求职神器测评:谁是Offer收割机之王?
  • 玳瑁的嵌入式日记D32-0903(网络编程)
  • Nginx简介
  • 自学嵌入式第三十四天:网络编程-TCP