01-Hadoop简介与生态系统
1.1 什么是Hadoop
1.1.1 Hadoop定义
Apache Hadoop是一个开源的分布式存储和计算框架,专门设计用于处理大规模数据集。它基于Google发表的MapReduce和Google File System(GFS)论文实现,为大数据处理提供了可靠、可扩展和分布式的解决方案。
1.1.2 Hadoop的核心特性
分布式存储
- HDFS(Hadoop Distributed File System):分布式文件系统
- 数据冗余:自动数据备份,保证数据可靠性
- 容错性:节点故障时自动恢复
分布式计算
- MapReduce:并行计算框架
- 任务调度:自动任务分配和管理
- 负载均衡:计算资源的合理分配
可扩展性
- 水平扩展:通过增加节点提升性能
- 线性扩展:性能与节点数量成正比
- 弹性伸缩:根据需求动态调整集群规模
1.1.3 Hadoop的优势
# Hadoop核心优势## 1. 成本效益
- 使用商用硬件,降低成本
- 开源软件,无许可费用
- 高性价比的大数据解决方案## 2. 可靠性
- 数据自动备份(默认3副本)
- 故障自动检测和恢复
- 无单点故障设计## 3. 可扩展性
- 支持PB级数据存储
- 支持数千节点集群
- 线性性能扩展## 4. 灵活性
- 支持结构化、半结构化、非结构化数据
- 多种数据格式支持
- 丰富的生态系统工具
1.2 Hadoop发展历史
1.2.1 发展时间线
2003年:Google发布GFS论文
2004年:Google发布MapReduce论文
2005年:Doug Cutting开始开发Hadoop
2006年:Hadoop成为Apache顶级项目
2008年:Yahoo!部署4000节点Hadoop集群
2011年:Hadoop 1.0发布
2012年:YARN项目启动
2013年:Hadoop 2.0发布
2017年:Hadoop 3.0发布
2020年:Hadoop 3.3发布(当前稳定版)
1.2.2 版本演进
Hadoop 1.x
- 核心组件:HDFS + MapReduce
- 特点:简单架构,易于理解
- 限制:JobTracker单点故障,扩展性有限
Hadoop 2.x
- 重大改进:引入YARN资源管理器
- 架构变化:MapReduce从资源管理中分离
- 新特性:支持多种计算框架
Hadoop 3.x
- 性能提升:更好的性能和稳定性
- 新特性:纠删码、多NameNode支持
- 云原生:更好的云环境支持
1.3 Hadoop核心组件
1.3.1 HDFS(Hadoop Distributed File System)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
HDFS架构示例代码
演示HDFS的基本概念和操作
"""from hdfs import InsecureClient
import os
import json
from typing import List, Dict, Anyclass HDFSManager:"""HDFS管理器"""def __init__(self, namenode_url: str = "http://localhost:9870"):"""初始化HDFS客户端Args:namenode_url: NameNode的URL地址"""self.client = InsecureClient(namenode_url)self.replication_factor = 3 # 默认副本数self.block_size = 128 * 1024 * 1024 # 默认块大小128MBdef create_directory(self, path: str) -> bool:"""创建HDFS目录Args:path: 目录路径Returns:bool: 创建是否成功"""try:self.client.makedirs(path)print(f"目录创建成功: {path}")return Trueexcept Exception as e:print(f"目录创建失败: {e}")return Falsedef upload_file(self, local_path: str, hdfs_path: str, overwrite: bool = False) -> bool:"""上传文件到HDFSArgs:local_path: 本地文件路径hdfs_path: HDFS目标路径overwrite: 是否覆盖已存在文件Returns:bool: 上传是否成功"""try:with open(local_path, 'rb') as local_file:self.client.write(hdfs_path, local_file, overwrite=overwrite)print(f"文件上传成功: {local_path} -> {hdfs_path}")return Trueexcept Exception as e:print(f"文件上传失败: {e}")return Falsedef download_file(self, hdfs_path: str, local_path: str) -> bool:"""从HDFS下载文件Args:hdfs_path: HDFS文件路径local_path: 本地目标路径Returns:bool: 下载是否成功"""try:with self.client.read(hdfs_path) as hdfs_file:with open(local_path, 'wb') as local_file:local_file.write(hdfs_file.read())print(f"文件下载成功: {hdfs_path} -> {local_path}")return Trueexcept Exception as e:print(f"文件下载失败: {e}")return Falsedef list_directory(self, path: str = "/") -> List[Dict[str, Any]]:"""列出HDFS目录内容Args:path: 目录路径Returns:List[Dict]: 目录内容列表"""try:files = self.client.list(path, status=True)result = []for file_name, file_status in files:result.append({'name': file_name,'type': 'directory' if file_status['type'] == 'DIRECTORY' else 'file','size': file_status.get('length', 0),'replication': file_status.get('replication', 0),'block_size': file_status.get('blockSize', 0),'modification_time': file_status.get('modificationTime', 0),'permission': file_status.get('permission', ''),'owner': file_status.get('owner', ''),'group': file_status.get('group', '')})return resultexcept Exception as e:print(f"列出目录失败: {e}")return []def get_file_info(self, path: str) -> Dict[str, Any]:"""获取文件信息Args:path: 文件路径Returns:Dict: 文件信息"""try:status = self.client.status(path)return {'path': path,'type': status['type'],'size': status.get('length', 0),'replication': status.get('replication', 0),'block_size': status.get('blockSize', 0),'modification_time': status.get('modificationTime', 0),'access_time': status.get('accessTime', 0),'permission': status.get('permission', ''),'owner': status.get('owner', ''),'group': status.get('group', '')}except Exception as e:print(f"获取文件信息失败: {e}")return {}def delete_file(self, path: str, recursive: bool = False) -> bool:"""删除HDFS文件或目录Args:path: 文件或目录路径recursive: 是否递归删除Returns:bool: 删除是否成功"""try:self.client.delete(path, recursive=recursive)print(f"删除成功: {path}")return Trueexcept Exception as e:print(f"删除失败: {e}")return Falsedef get_cluster_info(self) -> Dict[str, Any]:"""获取HDFS集群信息Returns:Dict: 集群信息"""try:# 获取文件系统统计信息fs_stats = self.client.status('/')# 模拟集群信息(实际应该通过WebHDFS API获取)cluster_info = {'cluster_id': 'hadoop-cluster-001','namenode_address': self.client.url,'total_capacity': '1TB', # 实际应该从API获取'used_capacity': '256GB','available_capacity': '768GB','total_files': 10000,'total_blocks': 50000,'missing_blocks': 0,'corrupt_blocks': 0,'under_replicated_blocks': 0,'datanodes': [{'hostname': 'datanode1', 'status': 'live'},{'hostname': 'datanode2', 'status': 'live'},{'hostname': 'datanode3', 'status': 'live'}]}return cluster_infoexcept Exception as e:print(f"获取集群信息失败: {e}")return {}def check_file_health(self, path: str) -> Dict[str, Any]:"""检查文件健康状态Args:path: 文件路径Returns:Dict: 文件健康状态"""try:file_info = self.get_file_info(path)if not file_info:return {'status': 'not_found'}# 检查副本数replication = file_info.get('replication', 0)expected_replication = self.replication_factorhealth_status = {'path': path,'status': 'healthy','replication': {'current': replication,'expected': expected_replication,'status': 'ok' if replication >= expected_replication else 'under_replicated'},'size': file_info.get('size', 0),'blocks': file_info.get('size', 0) // self.block_size + 1,'last_modified': file_info.get('modification_time', 0)}if replication < expected_replication:health_status['status'] = 'warning'health_status['issues'] = ['under_replicated']return health_statusexcept Exception as e:print(f"检查文件健康状态失败: {e}")return {'status': 'error', 'message': str(e)}# 使用示例
if __name__ == "__main__":# 创建HDFS管理器hdfs = HDFSManager("http://localhost:9870")# 创建目录hdfs.create_directory("/user/hadoop/data")# 上传文件# hdfs.upload_file("local_file.txt", "/user/hadoop/data/file.txt")# 列出目录内容files = hdfs.list_directory("/user/hadoop")print("目录内容:")for file_info in files:print(f" {file_info['name']} ({file_info['type']}) - {file_info['size']} bytes")# 获取集群信息cluster_info = hdfs.get_cluster_info()print(f"\n集群信息: {json.dumps(cluster_info, indent=2, ensure_ascii=False)}")
1.3.2 MapReduce计算框架
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MapReduce编程示例
演示MapReduce的基本概念和编程模型
"""import sys
import re
from collections import defaultdict
from typing import Iterator, Tuple, List, Dict, Any
import jsonclass MapReduceFramework:"""MapReduce框架模拟器"""def __init__(self):self.mappers = []self.reducers = []self.intermediate_data = defaultdict(list)def map_phase(self, input_data: List[str], mapper_func) -> Dict[str, List[Any]]:"""Map阶段:将输入数据分发给多个Mapper处理Args:input_data: 输入数据列表mapper_func: Mapper函数Returns:Dict: 中间结果数据"""intermediate_results = defaultdict(list)for data in input_data:# 每个Mapper处理一部分数据for key, value in mapper_func(data):intermediate_results[key].append(value)return dict(intermediate_results)def shuffle_phase(self, intermediate_data: Dict[str, List[Any]]) -> Dict[str, List[Any]]:"""Shuffle阶段:对中间结果进行分组和排序Args:intermediate_data: 中间结果数据Returns:Dict: 分组后的数据"""# 按key进行分组(已经在map阶段完成)# 在真实的Hadoop中,这个阶段会涉及网络传输和排序shuffled_data = {}for key in sorted(intermediate_data.keys()):shuffled_data[key] = sorted(intermediate_data[key])return shuffled_datadef reduce_phase(self, shuffled_data: Dict[str, List[Any]], reducer_func) -> Dict[str, Any]:"""Reduce阶段:对分组后的数据进行聚合处理Args:shuffled_data: 分组后的数据reducer_func: Reducer函数Returns:Dict: 最终结果"""final_results = {}for key, values in shuffled_data.items():result = reducer_func(key, values)if result is not None:final_results[key] = resultreturn final_resultsdef run_job(self, input_data: List[str], mapper_func, reducer_func) -> Dict[str, Any]:"""运行完整的MapReduce作业Args:input_data: 输入数据mapper_func: Mapper函数reducer_func: Reducer函数Returns:Dict: 最终结果"""print("开始MapReduce作业...")# Map阶段print("执行Map阶段...")intermediate_data = self.map_phase(input_data, mapper_func)print(f"Map阶段完成,生成{len(intermediate_data)}个中间键")# Shuffle阶段print("执行Shuffle阶段...")shuffled_data = self.shuffle_phase(intermediate_data)print("Shuffle阶段完成")# Reduce阶段print("执行Reduce阶段...")final_results = self.reduce_phase(shuffled_data, reducer_func)print(f"Reduce阶段完成,生成{len(final_results)}个最终结果")return final_results# WordCount示例
def word_count_mapper(line: str) -> Iterator[Tuple[str, int]]:"""WordCount的Mapper函数Args:line: 输入的一行文本Yields:Tuple[str, int]: (单词, 1)的键值对"""# 清理文本并分割单词words = re.findall(r'\b\w+\b', line.lower())for word in words:yield (word, 1)def word_count_reducer(word: str, counts: List[int]) -> int:"""WordCount的Reducer函数Args:word: 单词counts: 该单词的计数列表Returns:int: 单词的总计数"""return sum(counts)# 日志分析示例
def log_analysis_mapper(log_line: str) -> Iterator[Tuple[str, Dict[str, Any]]]:"""日志分析的Mapper函数Args:log_line: 日志行Yields:Tuple[str, Dict]: (IP地址, 请求信息)的键值对"""# 简单的日志解析(假设是Apache访问日志格式)# 192.168.1.1 - - [10/Oct/2023:13:55:36 +0000] "GET /index.html HTTP/1.1" 200 2326parts = log_line.split()if len(parts) >= 10:ip = parts[0]timestamp = parts[3] + ' ' + parts[4]method = parts[5].strip('"')url = parts[6]status_code = parts[8]response_size = parts[9] if parts[9].isdigit() else 0yield (ip, {'timestamp': timestamp,'method': method,'url': url,'status_code': int(status_code),'response_size': int(response_size),'request_count': 1})def log_analysis_reducer(ip: str, requests: List[Dict[str, Any]]) -> Dict[str, Any]:"""日志分析的Reducer函数Args:ip: IP地址requests: 该IP的请求列表Returns:Dict: IP的统计信息"""total_requests = len(requests)total_bytes = sum(req['response_size'] for req in requests)status_codes = defaultdict(int)methods = defaultdict(int)urls = defaultdict(int)for req in requests:status_codes[req['status_code']] += 1methods[req['method']] += 1urls[req['url']] += 1return {'total_requests': total_requests,'total_bytes': total_bytes,'avg_response_size': total_bytes / total_requests if total_requests > 0 else 0,'status_codes': dict(status_codes),'methods': dict(methods),'top_urls': dict(sorted(urls.items(), key=lambda x: x[1], reverse=True)[:5])}# 销售数据分析示例
def sales_analysis_mapper(sales_record: str) -> Iterator[Tuple[str, Dict[str, Any]]]:"""销售数据分析的Mapper函数Args:sales_record: 销售记录(CSV格式)Yields:Tuple[str, Dict]: (产品类别, 销售信息)的键值对"""# 假设CSV格式:date,product_id,category,quantity,price,customer_idparts = sales_record.strip().split(',')if len(parts) == 6:date, product_id, category, quantity, price, customer_id = partstry:quantity = int(quantity)price = float(price)revenue = quantity * priceyield (category, {'date': date,'product_id': product_id,'quantity': quantity,'price': price,'revenue': revenue,'customer_id': customer_id})except ValueError:pass # 跳过无效记录def sales_analysis_reducer(category: str, sales: List[Dict[str, Any]]) -> Dict[str, Any]:"""销售数据分析的Reducer函数Args:category: 产品类别sales: 该类别的销售记录列表Returns:Dict: 类别的统计信息"""total_quantity = sum(sale['quantity'] for sale in sales)total_revenue = sum(sale['revenue'] for sale in sales)unique_customers = len(set(sale['customer_id'] for sale in sales))unique_products = len(set(sale['product_id'] for sale in sales))avg_price = sum(sale['price'] for sale in sales) / len(sales)avg_quantity_per_order = total_quantity / len(sales)return {'total_orders': len(sales),'total_quantity': total_quantity,'total_revenue': total_revenue,'unique_customers': unique_customers,'unique_products': unique_products,'avg_price': avg_price,'avg_quantity_per_order': avg_quantity_per_order,'avg_revenue_per_order': total_revenue / len(sales)}# 使用示例
if __name__ == "__main__":# 创建MapReduce框架mr_framework = MapReduceFramework()print("=== WordCount示例 ===")# WordCount示例text_data = ["Hello world hello hadoop","Hadoop is great for big data","Big data processing with hadoop","Hello big data world"]word_counts = mr_framework.run_job(text_data, word_count_mapper, word_count_reducer)print("\n单词计数结果:")for word, count in sorted(word_counts.items()):print(f" {word}: {count}")print("\n=== 日志分析示例 ===")# 日志分析示例log_data = ['192.168.1.1 - - [10/Oct/2023:13:55:36 +0000] "GET /index.html HTTP/1.1" 200 2326','192.168.1.1 - - [10/Oct/2023:13:55:37 +0000] "GET /about.html HTTP/1.1" 200 1024','192.168.1.2 - - [10/Oct/2023:13:55:38 +0000] "POST /api/data HTTP/1.1" 201 512','192.168.1.1 - - [10/Oct/2023:13:55:39 +0000] "GET /contact.html HTTP/1.1" 404 256','192.168.1.3 - - [10/Oct/2023:13:55:40 +0000] "GET /index.html HTTP/1.1" 200 2326']log_analysis = mr_framework.run_job(log_data, log_analysis_mapper, log_analysis_reducer)print("\n日志分析结果:")for ip, stats in log_analysis.items():print(f" IP {ip}:")print(f" 总请求数: {stats['total_requests']}")print(f" 总字节数: {stats['total_bytes']}")print(f" 状态码分布: {stats['status_codes']}")print(f" 请求方法: {stats['methods']}")print("\n=== 销售数据分析示例 ===")# 销售数据分析示例sales_data = ["2023-10-01,P001,Electronics,2,299.99,C001","2023-10-01,P002,Books,1,19.99,C002","2023-10-01,P003,Electronics,1,599.99,C003","2023-10-02,P004,Books,3,29.99,C001","2023-10-02,P005,Clothing,2,49.99,C004","2023-10-02,P001,Electronics,1,299.99,C005"]sales_analysis = mr_framework.run_job(sales_data, sales_analysis_mapper, sales_analysis_reducer)print("\n销售数据分析结果:")for category, stats in sales_analysis.items():print(f" 类别 {category}:")print(f" 总订单数: {stats['total_orders']}")print(f" 总销量: {stats['total_quantity']}")print(f" 总收入: ${stats['total_revenue']:.2f}")print(f" 独立客户数: {stats['unique_customers']}")print(f" 平均订单金额: ${stats['avg_revenue_per_order']:.2f}")
1.3.3 YARN资源管理器
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
YARN资源管理示例
演示YARN的资源调度和应用管理
"""import time
import uuid
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime
import threading
import queueclass ApplicationState(Enum):"""应用状态枚举"""NEW = "NEW"SUBMITTED = "SUBMITTED"ACCEPTED = "ACCEPTED"RUNNING = "RUNNING"FINISHED = "FINISHED"FAILED = "FAILED"KILLED = "KILLED"class ContainerState(Enum):"""容器状态枚举"""NEW = "NEW"ALLOCATED = "ALLOCATED"RUNNING = "RUNNING"COMPLETE = "COMPLETE"FAILED = "FAILED"@dataclass
class Resource:"""资源定义"""memory_mb: intvcores: intdef __add__(self, other):return Resource(memory_mb=self.memory_mb + other.memory_mb,vcores=self.vcores + other.vcores)def __sub__(self, other):return Resource(memory_mb=self.memory_mb - other.memory_mb,vcores=self.vcores - other.vcores)def can_satisfy(self, required: 'Resource') -> bool:"""检查是否能满足资源需求"""return (self.memory_mb >= required.memory_mb and self.vcores >= required.vcores)@dataclass
class Container:"""容器定义"""container_id: strnode_id: strresource: Resourcestate: ContainerStateapplication_id: strstart_time: Optional[datetime] = Nonefinish_time: Optional[datetime] = Noneexit_code: int = 0diagnostics: str = ""@dataclass
class Application:"""应用定义"""application_id: strapplication_name: strapplication_type: struser: strqueue: strstate: ApplicationStateresource_request: Resourcesubmit_time: datetimestart_time: Optional[datetime] = Nonefinish_time: Optional[datetime] = Noneprogress: float = 0.0containers: List[Container] = Nonedef __post_init__(self):if self.containers is None:self.containers = []@dataclass
class NodeManager:"""节点管理器"""node_id: strhostname: strtotal_resource: Resourceavailable_resource: Resourceused_resource: Resourcecontainers: List[Container]last_heartbeat: datetimedef __post_init__(self):if self.containers is None:self.containers = []if self.used_resource is None:self.used_resource = Resource(0, 0)if self.available_resource is None:self.available_resource = self.total_resourceclass ResourceScheduler:"""资源调度器"""def __init__(self, scheduler_type: str = "fair"):self.scheduler_type = scheduler_typeself.queues = {"default": {"capacity": 0.5, "max_capacity": 1.0},"production": {"capacity": 0.3, "max_capacity": 0.6},"development": {"capacity": 0.2, "max_capacity": 0.4}}def schedule_application(self, application: Application, available_nodes: List[NodeManager]) -> Optional[NodeManager]:"""为应用调度资源Args:application: 待调度的应用available_nodes: 可用节点列表Returns:Optional[NodeManager]: 分配的节点,如果没有可用资源则返回None"""# 根据调度策略选择节点if self.scheduler_type == "fair":return self._fair_scheduler(application, available_nodes)elif self.scheduler_type == "capacity":return self._capacity_scheduler(application, available_nodes)else:return self._fifo_scheduler(application, available_nodes)def _fair_scheduler(self, application: Application, available_nodes: List[NodeManager]) -> Optional[NodeManager]:"""公平调度器"""# 选择资源利用率最低的节点best_node = Nonemin_utilization = float('inf')for node in available_nodes:if node.available_resource.can_satisfy(application.resource_request):# 计算资源利用率memory_util = node.used_resource.memory_mb / node.total_resource.memory_mbcpu_util = node.used_resource.vcores / node.total_resource.vcoresavg_util = (memory_util + cpu_util) / 2if avg_util < min_utilization:min_utilization = avg_utilbest_node = nodereturn best_nodedef _capacity_scheduler(self, application: Application, available_nodes: List[NodeManager]) -> Optional[NodeManager]:"""容量调度器"""# 根据队列容量进行调度queue_info = self.queues.get(application.queue, self.queues["default"])# 简化实现:选择第一个满足条件的节点for node in available_nodes:if node.available_resource.can_satisfy(application.resource_request):return nodereturn Nonedef _fifo_scheduler(self, application: Application, available_nodes: List[NodeManager]) -> Optional[NodeManager]:"""FIFO调度器"""# 选择第一个满足条件的节点for node in available_nodes:if node.available_resource.can_satisfy(application.resource_request):return nodereturn Noneclass YARNResourceManager:"""YARN资源管理器"""def __init__(self, scheduler_type: str = "fair"):self.applications: Dict[str, Application] = {}self.nodes: Dict[str, NodeManager] = {}self.containers: Dict[str, Container] = {}self.scheduler = ResourceScheduler(scheduler_type)self.application_queue = queue.Queue()self.running = Falseself.scheduler_thread = Nonedef register_node(self, node_id: str, hostname: str, memory_mb: int, vcores: int) -> bool:"""注册节点管理器Args:node_id: 节点IDhostname: 主机名memory_mb: 内存大小(MB)vcores: 虚拟核心数Returns:bool: 注册是否成功"""try:total_resource = Resource(memory_mb, vcores)node = NodeManager(node_id=node_id,hostname=hostname,total_resource=total_resource,available_resource=total_resource,used_resource=Resource(0, 0),containers=[],last_heartbeat=datetime.now())self.nodes[node_id] = nodeprint(f"节点注册成功: {node_id} ({hostname}) - {memory_mb}MB, {vcores} cores")return Trueexcept Exception as e:print(f"节点注册失败: {e}")return Falsedef submit_application(self, application_name: str, application_type: str,user: str, queue: str, memory_mb: int, vcores: int) -> str:"""提交应用Args:application_name: 应用名称application_type: 应用类型user: 用户queue: 队列memory_mb: 内存需求(MB)vcores: CPU核心需求Returns:str: 应用ID"""application_id = f"application_{int(time.time())}_{uuid.uuid4().hex[:8]}"application = Application(application_id=application_id,application_name=application_name,application_type=application_type,user=user,queue=queue,state=ApplicationState.SUBMITTED,resource_request=Resource(memory_mb, vcores),submit_time=datetime.now())self.applications[application_id] = applicationself.application_queue.put(application_id)print(f"应用提交成功: {application_id} ({application_name})")return application_iddef allocate_container(self, application_id: str, node: NodeManager) -> Optional[Container]:"""分配容器Args:application_id: 应用IDnode: 目标节点Returns:Optional[Container]: 分配的容器"""application = self.applications.get(application_id)if not application:return None# 检查节点资源是否足够if not node.available_resource.can_satisfy(application.resource_request):return None# 创建容器container_id = f"container_{application_id}_{len(application.containers) + 1}"container = Container(container_id=container_id,node_id=node.node_id,resource=application.resource_request,state=ContainerState.ALLOCATED,application_id=application_id,start_time=datetime.now())# 更新资源使用情况node.available_resource = node.available_resource - application.resource_requestnode.used_resource = node.used_resource + application.resource_requestnode.containers.append(container)# 更新应用状态application.containers.append(container)if application.state == ApplicationState.ACCEPTED:application.state = ApplicationState.RUNNINGapplication.start_time = datetime.now()self.containers[container_id] = containerprint(f"容器分配成功: {container_id} 在节点 {node.node_id}")return containerdef start_container(self, container_id: str) -> bool:"""启动容器Args:container_id: 容器IDReturns:bool: 启动是否成功"""container = self.containers.get(container_id)if not container:return Falsecontainer.state = ContainerState.RUNNINGprint(f"容器启动成功: {container_id}")return Truedef complete_container(self, container_id: str, exit_code: int = 0, diagnostics: str = "") -> bool:"""完成容器Args:container_id: 容器IDexit_code: 退出码diagnostics: 诊断信息Returns:bool: 完成是否成功"""container = self.containers.get(container_id)if not container:return False# 更新容器状态container.state = ContainerState.COMPLETE if exit_code == 0 else ContainerState.FAILEDcontainer.finish_time = datetime.now()container.exit_code = exit_codecontainer.diagnostics = diagnostics# 释放资源node = self.nodes.get(container.node_id)if node:node.available_resource = node.available_resource + container.resourcenode.used_resource = node.used_resource - container.resourcenode.containers.remove(container)# 检查应用是否完成application = self.applications.get(container.application_id)if application:completed_containers = sum(1 for c in application.containers if c.state in [ContainerState.COMPLETE, ContainerState.FAILED])if completed_containers == len(application.containers):application.state = ApplicationState.FINISHEDapplication.finish_time = datetime.now()application.progress = 1.0print(f"容器完成: {container_id} (退出码: {exit_code})")return Truedef kill_application(self, application_id: str) -> bool:"""杀死应用Args:application_id: 应用IDReturns:bool: 杀死是否成功"""application = self.applications.get(application_id)if not application:return False# 杀死所有容器for container in application.containers:if container.state == ContainerState.RUNNING:self.complete_container(container.container_id, exit_code=-1, diagnostics="Application killed")application.state = ApplicationState.KILLEDapplication.finish_time = datetime.now()print(f"应用已杀死: {application_id}")return Truedef start_scheduler(self):"""启动调度器"""self.running = Trueself.scheduler_thread = threading.Thread(target=self._scheduler_loop)self.scheduler_thread.start()print("调度器已启动")def stop_scheduler(self):"""停止调度器"""self.running = Falseif self.scheduler_thread:self.scheduler_thread.join()print("调度器已停止")def _scheduler_loop(self):"""调度器主循环"""while self.running:try:# 获取待调度的应用application_id = self.application_queue.get(timeout=1)application = self.applications.get(application_id)if application and application.state == ApplicationState.SUBMITTED:# 接受应用application.state = ApplicationState.ACCEPTED# 查找可用节点available_nodes = [node for node in self.nodes.values() if node.available_resource.can_satisfy(application.resource_request)]if available_nodes:# 调度资源selected_node = self.scheduler.schedule_application(application, available_nodes)if selected_node:# 分配容器container = self.allocate_container(application_id, selected_node)if container:# 启动容器self.start_container(container.container_id)# 模拟容器执行threading.Thread(target=self._simulate_container_execution, args=(container.container_id,)).start()else:# 没有可用资源,重新排队self.application_queue.put(application_id)time.sleep(1)except queue.Empty:continueexcept Exception as e:print(f"调度器错误: {e}")def _simulate_container_execution(self, container_id: str):"""模拟容器执行"""# 模拟容器运行时间(5-15秒)import randomexecution_time = random.randint(5, 15)time.sleep(execution_time)# 模拟成功完成(90%概率)exit_code = 0 if random.random() < 0.9 else 1self.complete_container(container_id, exit_code)def get_cluster_metrics(self) -> Dict[str, Any]:"""获取集群指标"""total_memory = sum(node.total_resource.memory_mb for node in self.nodes.values())total_vcores = sum(node.total_resource.vcores for node in self.nodes.values())used_memory = sum(node.used_resource.memory_mb for node in self.nodes.values())used_vcores = sum(node.used_resource.vcores for node in self.nodes.values())running_apps = sum(1 for app in self.applications.values() if app.state == ApplicationState.RUNNING)completed_apps = sum(1 for app in self.applications.values() if app.state == ApplicationState.FINISHED)failed_apps = sum(1 for app in self.applications.values() if app.state == ApplicationState.FAILED)return {'cluster_id': 'yarn-cluster-001','total_nodes': len(self.nodes),'active_nodes': len([n for n in self.nodes.values() if (datetime.now() - n.last_heartbeat).seconds < 60]),'total_memory_mb': total_memory,'used_memory_mb': used_memory,'available_memory_mb': total_memory - used_memory,'memory_utilization': used_memory / total_memory if total_memory > 0 else 0,'total_vcores': total_vcores,'used_vcores': used_vcores,'available_vcores': total_vcores - used_vcores,'vcores_utilization': used_vcores / total_vcores if total_vcores > 0 else 0,'total_applications': len(self.applications),'running_applications': running_apps,'completed_applications': completed_apps,'failed_applications': failed_apps,'total_containers': len(self.containers),'running_containers': sum(1 for c in self.containers.values() if c.state == ContainerState.RUNNING)}def get_application_info(self, application_id: str) -> Optional[Dict[str, Any]]:"""获取应用信息"""application = self.applications.get(application_id)if not application:return Nonereturn {'application_id': application.application_id,'application_name': application.application_name,'application_type': application.application_type,'user': application.user,'queue': application.queue,'state': application.state.value,'progress': application.progress,'submit_time': application.submit_time.isoformat(),'start_time': application.start_time.isoformat() if application.start_time else None,'finish_time': application.finish_time.isoformat() if application.finish_time else None,'resource_request': {'memory_mb': application.resource_request.memory_mb,'vcores': application.resource_request.vcores},'containers': [{'container_id': c.container_id,'node_id': c.node_id,'state': c.state.value,'resource': {'memory_mb': c.resource.memory_mb,'vcores': c.resource.vcores}}for c in application.containers]}# 使用示例
if __name__ == "__main__":# 创建YARN资源管理器yarn_rm = YARNResourceManager(scheduler_type="fair")# 注册节点yarn_rm.register_node("node1", "worker1.example.com", 8192, 4)yarn_rm.register_node("node2", "worker2.example.com", 8192, 4)yarn_rm.register_node("node3", "worker3.example.com", 16384, 8)# 启动调度器yarn_rm.start_scheduler()# 提交应用app1_id = yarn_rm.submit_application("WordCount Job", "MapReduce", "user1", "default", 2048, 1)app2_id = yarn_rm.submit_application("Data Processing", "Spark", "user2", "production", 4096, 2)app3_id = yarn_rm.submit_application("ML Training", "TensorFlow", "user3", "development", 8192, 4)# 等待一段时间让应用运行time.sleep(10)# 获取集群指标metrics = yarn_rm.get_cluster_metrics()print("\n=== 集群指标 ===")print(f"总节点数: {metrics['total_nodes']}")print(f"活跃节点数: {metrics['active_nodes']}")print(f"内存使用率: {metrics['memory_utilization']:.2%}")print(f"CPU使用率: {metrics['vcores_utilization']:.2%}")print(f"运行中应用: {metrics['running_applications']}")print(f"运行中容器: {metrics['running_containers']}")# 获取应用信息print("\n=== 应用信息 ===")for app_id in [app1_id, app2_id, app3_id]:app_info = yarn_rm.get_application_info(app_id)if app_info:print(f"应用 {app_info['application_name']} ({app_id}):")print(f" 状态: {app_info['state']}")print(f" 进度: {app_info['progress']:.2%}")print(f" 容器数: {len(app_info['containers'])}")# 等待应用完成time.sleep(20)# 停止调度器yarn_rm.stop_scheduler()# 最终状态final_metrics = yarn_rm.get_cluster_metrics()print("\n=== 最终状态 ===")print(f"完成应用数: {final_metrics['completed_applications']}")print(f"失败应用数: {final_metrics['failed_applications']}")
1.4 Hadoop生态系统
1.4.1 核心组件
1.4.2 生态系统组件详解
数据存储层
HDFS(Hadoop Distributed File System)
- 功能:分布式文件系统,提供高可靠性的数据存储
- 特点:高容错性、高吞吐量、适合大文件存储
- 使用场景:大数据存储、数据备份、数据归档
HBase
- 功能:分布式NoSQL数据库,基于HDFS构建
- 特点:实时读写、列式存储、自动分片
- 使用场景:实时查询、时序数据、用户画像
数据处理层
MapReduce
- 功能:分布式计算框架,批处理计算
- 特点:容错性强、编程模型简单
- 使用场景:ETL处理、数据清洗、批量计算
Spark
- 功能:内存计算框架,支持批处理和流处理
- 特点:高性能、易用性、丰富的API
- 使用场景:机器学习、图计算、实时分析
Hive
- 功能:数据仓库软件,提供SQL查询接口
- 特点:SQL兼容、元数据管理、查询优化
- 使用场景:数据分析、报表生成、OLAP查询
数据传输层
Kafka
- 功能:分布式消息队列,高吞吐量数据流
- 特点:高性能、持久化、分区机制
- 使用场景:日志收集、实时数据流、事件驱动
Flume
- 功能:日志收集系统,可靠的数据传输
- 特点:可靠性、可扩展性、配置灵活
- 使用场景:日志收集、数据导入、实时传输
资源管理层
YARN(Yet Another Resource Negotiator)
- 功能:集群资源管理器,统一资源调度
- 特点:多租户、资源隔离、高可用
- 使用场景:资源调度、多框架支持、集群管理
协调服务层
Zookeeper
- 功能:分布式协调服务,配置管理
- 特点:一致性、可靠性、高性能
- 使用场景:配置管理、服务发现、分布式锁
1.4.3 技术选型指南
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hadoop生态系统技术选型指南
"""from typing import Dict, List, Any
from enum import Enumclass DataType(Enum):"""数据类型"""STRUCTURED = "structured"SEMI_STRUCTURED = "semi_structured"UNSTRUCTURED = "unstructured"class ProcessingType(Enum):"""处理类型"""BATCH = "batch"STREAM = "stream"INTERACTIVE = "interactive"REAL_TIME = "real_time"class ScaleLevel(Enum):"""规模级别"""SMALL = "small" # < 1TBMEDIUM = "medium" # 1TB - 100TBLARGE = "large" # 100TB - 1PBVERY_LARGE = "very_large" # > 1PBclass TechnologySelector:"""技术选型器"""def __init__(self):self.recommendations = {'storage': {'structured': {'small': ['MySQL', 'PostgreSQL'],'medium': ['HBase', 'Cassandra'],'large': ['HDFS + Parquet', 'HBase'],'very_large': ['HDFS + ORC', 'Kudu']},'semi_structured': {'small': ['MongoDB', 'Elasticsearch'],'medium': ['HDFS + JSON', 'HBase'],'large': ['HDFS + Avro', 'HBase'],'very_large': ['HDFS + Parquet', 'Kudu']},'unstructured': {'small': ['Local FileSystem', 'S3'],'medium': ['HDFS', 'S3'],'large': ['HDFS', 'Object Storage'],'very_large': ['HDFS', 'Distributed Object Storage']}},'processing': {'batch': {'small': ['Pandas', 'Local Processing'],'medium': ['Spark', 'MapReduce'],'large': ['Spark', 'MapReduce'],'very_large': ['Spark', 'Flink Batch']},'stream': {'small': ['Kafka Streams', 'Storm'],'medium': ['Spark Streaming', 'Flink'],'large': ['Flink', 'Spark Streaming'],'very_large': ['Flink', 'Storm']},'interactive': {'small': ['Jupyter', 'R Studio'],'medium': ['Spark SQL', 'Presto'],'large': ['Spark SQL', 'Impala'],'very_large': ['Presto', 'Drill']}}}def recommend_storage(self, data_type: DataType, scale: ScaleLevel) -> List[str]:"""推荐存储技术"""return self.recommendations['storage'][data_type.value][scale.value]def recommend_processing(self, processing_type: ProcessingType, scale: ScaleLevel) -> List[str]:"""推荐处理技术"""return self.recommendations['processing'][processing_type.value][scale.value]def get_architecture_recommendation(self, requirements: Dict[str, Any]) -> Dict[str, Any]:"""获取架构推荐"""data_volume = requirements.get('data_volume', 'small')data_types = requirements.get('data_types', ['structured'])processing_types = requirements.get('processing_types', ['batch'])latency_requirement = requirements.get('latency', 'normal') # low, normal, highscale = ScaleLevel(data_volume)recommendations = {'storage_layer': {},'processing_layer': {},'architecture_pattern': '','key_considerations': []}# 存储层推荐for data_type in data_types:dt = DataType(data_type)recommendations['storage_layer'][data_type] = self.recommend_storage(dt, scale)# 处理层推荐for processing_type in processing_types:pt = ProcessingType(processing_type)recommendations['processing_layer'][processing_type] = self.recommend_processing(pt, scale)# 架构模式推荐if 'stream' in processing_types and latency_requirement == 'low':recommendations['architecture_pattern'] = 'Lambda Architecture'elif 'stream' in processing_types:recommendations['architecture_pattern'] = 'Kappa Architecture'else:recommendations['architecture_pattern'] = 'Batch Architecture'# 关键考虑因素if scale in [ScaleLevel.LARGE, ScaleLevel.VERY_LARGE]:recommendations['key_considerations'].extend(['考虑数据分区策略','实施数据压缩','设计容错机制'])if 'stream' in processing_types:recommendations['key_considerations'].extend(['设计背压机制','考虑数据一致性','实施检查点机制'])return recommendations# 使用示例
if __name__ == "__main__":selector = TechnologySelector()# 示例需求requirements = {'data_volume': 'large','data_types': ['structured', 'semi_structured'],'processing_types': ['batch', 'stream'],'latency': 'low'}recommendations = selector.get_architecture_recommendation(requirements)print("=== 技术选型推荐 ===")print(f"架构模式: {recommendations['architecture_pattern']}")print("\n存储层推荐:")for data_type, technologies in recommendations['storage_layer'].items():print(f" {data_type}: {', '.join(technologies)}")print("\n处理层推荐:")for processing_type, technologies in recommendations['processing_layer'].items():print(f" {processing_type}: {', '.join(technologies)}")print("\n关键考虑因素:")for consideration in recommendations['key_considerations']:print(f" - {consideration}")
1.5 Hadoop应用场景
1.5.1 典型应用场景
1. 大数据分析
- 日志分析:网站访问日志、应用日志、系统日志
- 用户行为分析:点击流分析、用户画像、推荐系统
- 业务智能:销售分析、财务报表、运营指标
2. 数据仓库
- ETL处理:数据抽取、转换、加载
- 数据集市:部门级数据仓库
- OLAP分析:多维数据分析
3. 机器学习
- 特征工程:数据预处理、特征提取
- 模型训练:大规模机器学习算法
- 模型评估:交叉验证、性能评估
4. 实时处理
- 流数据处理:实时监控、告警系统
- 事件处理:复杂事件处理(CEP)
- 实时推荐:个性化推荐系统
1.5.2 行业应用案例
电商行业
# 电商大数据应用架构## 数据源
- 用户行为数据(点击、浏览、购买)
- 商品数据(价格、库存、属性)
- 交易数据(订单、支付、物流)
- 外部数据(天气、节假日、竞品)## 技术栈
- **数据收集**: Flume, Kafka
- **数据存储**: HDFS, HBase
- **数据处理**: Spark, Hive
- **实时计算**: Spark Streaming, Flink
- **数据服务**: Presto, Kylin## 应用场景
- 实时推荐系统
- 用户画像分析
- 库存优化
- 价格策略
- 反欺诈检测
金融行业
# 金融大数据应用架构## 数据源
- 交易数据(股票、债券、外汇)
- 客户数据(账户、资产、行为)
- 市场数据(行情、新闻、研报)
- 风险数据(信用、市场、操作)## 技术栈
- **数据收集**: Kafka, Flume
- **数据存储**: HDFS, HBase, Kudu
- **数据处理**: Spark, Flink
- **实时计算**: Storm, Spark Streaming
- **数据分析**: Hive, Impala## 应用场景
- 风险管理
- 反洗钱监控
- 算法交易
- 客户分析
- 监管报告
电信行业
# 电信大数据应用架构## 数据源
- 通话详单(CDR)
- 网络性能数据
- 客户服务数据
- 位置数据(LBS)## 技术栈
- **数据收集**: Flume, Sqoop
- **数据存储**: HDFS, HBase
- **数据处理**: MapReduce, Spark
- **实时分析**: Storm, Flink
- **数据挖掘**: Mahout, MLlib## 应用场景
- 网络优化
- 客户流失预测
- 精准营销
- 欺诈检测
- 位置服务
1.6 Hadoop优势与挑战
1.6.1 主要优势
1. 成本效益
- 硬件成本低:使用商用服务器,成本远低于专用设备
- 软件开源:无许可费用,降低总体拥有成本
- 运维成本:自动化程度高,减少人工干预
2. 可扩展性
- 水平扩展:通过增加节点线性提升性能
- 存储扩展:支持PB级数据存储
- 计算扩展:支持数千节点并行计算
3. 可靠性
- 数据冗余:多副本机制保证数据安全
- 故障恢复:自动检测和恢复故障节点
- 无单点故障:分布式架构避免单点故障
4. 灵活性
- 数据格式:支持结构化、半结构化、非结构化数据
- 处理模式:支持批处理、流处理、交互式查询
- 编程语言:支持Java、Python、Scala、R等多种语言
1.6.2 主要挑战
1. 复杂性
- 架构复杂:涉及多个组件,学习曲线陡峭
- 运维复杂:需要专业的运维团队
- 调优困难:性能调优需要深入理解
2. 实时性
- 批处理延迟:MapReduce不适合实时处理
- 启动开销:作业启动时间较长
- 数据延迟:从数据产生到结果输出有延迟
3. 小文件问题
- NameNode压力:大量小文件增加NameNode负担
- 存储效率:小文件存储效率低
- 处理性能:小文件处理性能差
4. 安全性
- 认证授权:需要额外的安全框架
- 数据加密:静态和传输数据加密
- 审计日志:完整的操作审计
1.6.3 发展趋势
1. 云原生化
- 容器化部署:Docker、Kubernetes支持
- 云服务集成:与AWS、Azure、GCP集成
- 弹性伸缩:根据负载自动调整资源
2. 实时化
- 流批一体:统一的流批处理框架
- 低延迟:毫秒级数据处理
- 事件驱动:基于事件的架构模式
3. 智能化
- 自动调优:基于机器学习的性能优化
- 智能运维:预测性维护和故障诊断
- 自适应:根据工作负载自动调整配置
4. 生态整合
- 多云支持:跨云平台数据处理
- API标准化:统一的数据访问接口
- 工具集成:与BI、ML工具深度集成
1.7 学习路径建议
1.7.1 基础阶段(1-2个月)
理论学习
- 分布式系统基础:CAP理论、一致性、分区容错
- Hadoop核心概念:HDFS、MapReduce、YARN
- Linux基础:命令行操作、shell脚本
实践项目
- 环境搭建:单机伪分布式集群
- 基础操作:HDFS文件操作、MapReduce作业
- 简单应用:WordCount、日志分析
1.7.2 进阶阶段(2-3个月)
深入学习
- 集群部署:多节点集群搭建和配置
- 性能调优:参数调优、资源配置
- 监控运维:集群监控、故障处理
生态系统
- Hive:数据仓库和SQL查询
- HBase:NoSQL数据库操作
- Spark:内存计算和机器学习
1.7.3 高级阶段(3-6个月)
架构设计
- 方案设计:根据业务需求设计大数据架构
- 技术选型:选择合适的技术栈
- 容量规划:集群规模和资源规划
项目实战
- 完整项目:端到端的大数据项目
- 性能优化:深度性能调优
- 运维自动化:自动化部署和监控
本章小结
本章介绍了Hadoop的基本概念、核心组件、生态系统和应用场景。Hadoop作为大数据处理的基础平台,提供了可靠、可扩展、成本效益的解决方案。通过学习HDFS、MapReduce和YARN三大核心组件,以及丰富的生态系统工具,可以构建完整的大数据处理平台。
在实际应用中,需要根据具体的业务需求和技术约束,选择合适的技术栈和架构模式。同时,要充分考虑Hadoop的优势和挑战,制定合理的学习和实施计划。
下一章将详细介绍Hadoop的安装和配置,帮助读者搭建自己的Hadoop开发环境。