当前位置: 首页 > news >正文

在hadoop中Job提交的流程

/*** 表示一个MapReduce作业,并提供了向集群提交作业、跟踪其进度以及控制执行的方法。* 这是用户与MapReduce框架交互的主要入口点。*/
public class Job implements JobContext {/*** 提交作业至集群,并等待其完成。* 此方法是作业执行的最终触发点,它封装了完整的提交、执行和监控生命周期。** @param verbose 是否在控制台打印详细的进度和状态信息。* @return {@code true} 如果作业成功完成;{@code false} 如果作业失败。* @throws IOException 如果在提交或监控过程中发生I/O异常。* @throws ClassNotFoundException 如果找不到作业所需的类。* @throws InterruptedException 如果等待过程被中断。*/public boolean waitForCompletion(boolean verbose) throws IOException,InterruptedException, ClassNotFoundException {// 1. 提交作业到集群。如果提交失败,则立即返回false。if (state == JobState.DEFINE) {// 内部调用 submit() 方法submit();}// 2. 作业提交后,进入监控循环。if (verbose) {// 监控循环:定期从集群拉取作业的最新状态和进度。while (!isComplete()) {// 2.1 打印进度信息,包括Map和Reduce的完成百分比。log.info("Map: " + getMapProgress() + " Reduce: " + getReduceProgress());// 2.2 线程睡眠一段时间(例如1秒)后再次检查,避免过度频繁查询。Thread.sleep(1000);}} else {// 非详细模式:静默等待,直到作业完成(成功或失败)。while (!isComplete()) {Thread.sleep(1000);}}// 3. 返回作业的最终状态(成功与否)。return isSuccessful();}/*** 提交作业到MapReduce集群。* 这是提交流程的公开入口,内部会进行状态检查并触发实际的提交逻辑。** @throws IOException 如果在提交过程中发生I/O异常。* @throws InterruptedException 如果操作被中断。* @throws ClassNotFoundException 如果找不到作业配置的类。*/public void submit() throws IOException, InterruptedException, ClassNotFoundException {// 确保作业处于“已定义”状态,避免重复提交。ensureState(JobState.DEFINE);// 1.1.1 建立与集群的连接。此步骤至关重要,它决定了作业的运行环境(本地/Yarn)。// 创建一个 Cluster 对象,该对象作为与集群交互的代理。setUseNewAPI();connect();// 获取一个提交器(Submitter)实例,该实例包含了提交作业的核心内部逻辑。final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());// 1.1.2 调用内部提交方法,执行繁重的提交工作。status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {return submitter.submitJobInternal(Job.this, cluster);}});// 提交成功后,将作业状态更新为“运行中”。state = JobState.RUNNING;}/*** 建立与MapReduce集群的连接。* 通过分析配置,初始化一个 Cluster 实例,该实例抽象了底层的运行环境。** @throws IOException 如果连接失败。* @throws InterruptedException 如果连接过程被中断。*/private void connect() throws IOException, InterruptedException {// 创建 Cluster 对象。在构造函数中,会调用 initialize 方法来确定运行环境。// 关键参数:mapreduce.framework.name (设置为 'yarn' 或 'local')cluster = Cluster.getInstance(getConfiguration());}// --- 内部类 JobSubmitter 的核心方法 submitJobInternal 详解 ---/*** JobSubmitter 类的核心方法,负责将作业的所有必要资源准备并提交到集群。* 这是整个提交过程中最复杂、最关键的步骤。** @param job 要提交的Job对象。* @param cluster 代表集群连接的Cluster对象。* @return 作业的初始状态(JobStatus)。* @throws IOException 如果在准备或提交文件时发生I/O异常。* @throws InterruptedException 如果操作被中断。* @throws ClassNotFoundException 如果作业配置的InputFormat等类找不到。*/JobStatus submitJobInternal(Job job, Cluster cluster) throws IOException, InterruptedException, ClassNotFoundException {// 1.1.2.1 输出检查:确保作业配置正确且安全。checkSpecs(job);// 1.1.2.2 创建Staging目录:在集群文件系统(HDFS或本地)上为本次作业创建临时工作区。// 路径示例(Yarn on HDFS):/user/<username>/.stagingPath jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 1.1.2.3 获取JobID:向集群申请一个全局唯一的作业标识符。// 格式通常为:job_<集群启动时间戳>_<序列号> (e.g., job_1687891234567_0001)JobID jobId = submitClient.getNewJobID();// 1.1.2.4 创建Job专属提交路径:在Staging目录下,以JobID命名创建子目录。// 路径示例:/user/<username>/.staging/job_1687891234567_0001/Path submitJobDir = new Path(jobStagingArea, jobId.toString());// 1.1.2.5 拷贝资源到集群:将作业运行所需的JAR包、依赖库、配置文件等上传到提交目录。// 这包括://   - 作业的JAR包(如果存在)//   - 通过 -libjars 添加的第三方JAR//   - 通过 -files 添加的分布式缓存文件//   - 通过 -archives 添加的归档文件copyAndConfigureFiles(job, submitJobDir);// 定义变量来保存计算出的切片数量,这对决定需要启动多少个Map任务至关重要。int maps;// 1.1.2.6 计算切片并生成切片规划文件:这是并行度的基础。try {// writeSplits 方法内部会调用 writeNewSplitsmaps = writeSplits(job, submitJobDir);}// ... 异常处理// 将计算出的Map任务数量正式设置到作业配置中。conf.setInt(MRJobConfig.NUM_MAPS, maps);// 1.1.2.7 写入作业配置文件:将当前的Configuration对象序列化为XML文件(job.xml)并上传到提交目录。// 这个文件包含了作业运行所需的所有参数,将在任务运行时被分布式加载。writeConf(conf, submitJobFile);// 1.1.2.8 正式提交作业:调用集群客户端(LocalJobRunner 或 YARNRunner)的提交方法。// 此时,所有准备工作已完成,集群开始接管作业的执行。status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());return status;}/*** 检查作业的输出规范是否合法。** @param job 要检查的作业。* @throws IOException 如果输出路径未设置或已存在。*/private void checkSpecs(Job job) throws IOException {// 检查1:输出路径是否已设置。OutputFormat<?, ?> output = ReflectionUtils.newInstance(job.getOutputFormatClass(), conf);output.checkOutputSpecs(job);// OutputFormat的checkOutputSpecs实现通常会://   - 确保 job.getOutputPath() 不为空。//   - 确保 job.getOutputPath() 指向的路径在文件系统中不存在,防止数据被意外覆盖。}/*** 计算输入数据的切片,并生成切片信息文件。** @param job 当前作业。* @param jobSubmitDir 作业提交目录。* @return 计算出的切片数量(即Map任务数量)。* @throws IOException 如果计算切片或写入文件时发生I/O错误。* @throws InterruptedException 如果操作被中断。*/private int writeNewSplits(Job job, Path jobSubmitDir) throws IOException, InterruptedException {// 通过反射实例化用户在作业中配置的 InputFormat 类(如 TextInputFormat, KeyValueTextInputFormat)。InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);// 核心调用:获取输入数据的逻辑切片列表。// 此方法会调用具体InputFormat的getSplits方法,其逻辑大致如下:// 1. 获取输入文件列表。// 2. 对于每个文件://    a. 获取文件长度(fs.getFileStatus(path).getLen())。//    b. 根据配置的切片大小(mapreduce.input.fileinputformat.split.maxsize)计算文件需要被切分成几块。//    c. 每个切片(FileSplit)包含以下元信息://        - 文件路径//        - 切片在文件中的起始偏移量//        - 切片长度//        - 切片所在的宿主(位置信息,用于优化调度)List<InputSplit> splits = input.getSplits(job);// 将切片信息序列化到提交目录下的 'job.split' 文件中。// 同时,将切片的元数据信息写入 'job.splitmetainfo' 文件以供快速访问。JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), splits);// 返回切片数量,这决定了集群需要启动多少个Map任务。return splits.size();}/*** 将作业的配置信息写入XML文件。** @param conf 作业的配置对象。* @param confFile 配置文件的目标路径(通常是提交目录下的 'job.xml')。* @throws IOException 如果写入文件失败。*/private void writeConf(Configuration conf, Path confFile) throws IOException {// 创建一个指向 confFile 的 FSDataOutputStream。// 将Configuration对象中的所有键值对以XML格式序列化到该输出流中。// 这个 job.xml 文件将被分发到每个Map和Reduce任务节点,任务通过加载此文件来获取运行参数。conf.writeXml(out);}
}/*** 代表一个MapReduce集群。根据配置,它可能是本地集群(LocalJobRunner)或远程Yarn集群(YARNRunner)。* 这个类封装了与特定集群环境交互的细节。*/
public class Cluster {/*** 根据配置初始化集群客户端。** @param conf 作业配置。* @throws IOException* @throws InterruptedException*/protected void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException, InterruptedException {// 决策逻辑:// 读取 conf.get(MRConfig.FRAMEWORK_NAME) 的值。// 如果值为 "local" -> 创建 LocalJobRunner 客户端。//    - LocalJobRunner: 在单个JVM进程中顺序执行Map和Reduce任务,用于本地测试和调试。// 如果值为 "yarn" -> 创建 YARNRunner 客户端。//    - YARNRunner: 与Yarn ResourceManager通信,将作业作为Application提交到Yarn集群上执行。this.client = clientProvider.createClient(jobTrackAddr, conf);}// ... 其他方法,如 getNewJobID(), submitJob() 等,实际委托给具体的 client (LocalJobRunner/YARNRunner) 执行。
}

总结:

  1. 环境探测与连接建立:首先确定作业的运行环境(本地或Yarn),并建立相应的连接代理(Cluster)。

  2. 安全性与规范性检查:检查输出路径,防止数据覆盖等错误。

  3. 资源准备与上传:在集群文件系统上创建工作目录(Staging Area),并将作业的JAR包、依赖库、配置文件等资源上传至此。

  4. 并行度规划(核心):通过用户指定的 InputFormat 对输入数据进行逻辑切片(Splitting),切片数量直接决定了Map任务的数量,这是MapReduce并行计算的基石。

  5. 配置序列化:将作业的所有配置参数写入XML文件,以便在任务运行时被分发和加载。

  6. 最终提交与执行:将所有准备就绪的文件路径等信息传递给集群客户端(LocalJobRunner/YARNRunner),由集群正式接管并调度执行该作业。

  7. 持续监控:作业提交后,客户端进入监控循环,不断从集群获取状态和进度,直到作业完成。

http://www.dtcms.com/a/405766.html

相关文章:

  • 基于Qt和FFmpeg的安卓监控模拟器/手机摄像头模拟成onvif和28181设备
  • 01MemoryOS环境搭建 python3.10
  • 建设部网站职责划定html精美登录界面源码
  • 网站建设基本步骤顺序网站的整体风格
  • Leetcode 146. LRU 缓存 哈希表 + 双向链表
  • VideollaMA 3论文阅读
  • Android 14 系统 ANR (Application Not Responding) 深度分析与解决指南
  • 《红色脉络:一部PLMN在中国的演进史诗 (1G-6G)》 第11篇 | 核心网演进终局:从EPC到5GC——微服务与“云原生”
  • k8s中的NetworkPolicy
  • 【大语言模型】大模型后训练入门指南
  • 【初学】使用 node 编写 MCP Server
  • 阿里云云原生挑战官方用例SPL
  • 销售管理软件免费版什么叫seo优化
  • Apache POI 在 Linux 无图形界面环境下因字体配置问题导致Excel导出失败的解决方案
  • 咨询顾问进阶——146页PPT详解麦肯锡-企业管理整合咨询-组织设计方案【附全文阅读】
  • 力扣995. K 连续位的最小翻转次数
  • Resources$NotFoundException
  • pg下使用 TimescaleDB并创建1亿数据
  • 自动化脚本的操作逻辑与实现
  • UVa12418 Game of 999
  • 基于51单片机的音乐弹奏系统
  • 负载均衡式的在线OJ项目编写(二)
  • 美篇在哪个网站做的外链代发工具
  • Linux高级技巧之集群部署(七)
  • 外贸做那种网站wordpress获取图片的绝对地址
  • 【自然语言处理与大模型】RAFT(Retrieval Augmented Fine Tuning)方法
  • 湖南网站建设公司 找磐石网络一流跨境电商平台app排名
  • 动态IP使用中 报错407 怎么办???
  • 手机百度建设网站台州企业网站建设
  • 鞍山网站建设制作新潮远网站建设