在hadoop中Job提交的流程
/*** 表示一个MapReduce作业,并提供了向集群提交作业、跟踪其进度以及控制执行的方法。* 这是用户与MapReduce框架交互的主要入口点。*/
public class Job implements JobContext {/*** 提交作业至集群,并等待其完成。* 此方法是作业执行的最终触发点,它封装了完整的提交、执行和监控生命周期。** @param verbose 是否在控制台打印详细的进度和状态信息。* @return {@code true} 如果作业成功完成;{@code false} 如果作业失败。* @throws IOException 如果在提交或监控过程中发生I/O异常。* @throws ClassNotFoundException 如果找不到作业所需的类。* @throws InterruptedException 如果等待过程被中断。*/public boolean waitForCompletion(boolean verbose) throws IOException,InterruptedException, ClassNotFoundException {// 1. 提交作业到集群。如果提交失败,则立即返回false。if (state == JobState.DEFINE) {// 内部调用 submit() 方法submit();}// 2. 作业提交后,进入监控循环。if (verbose) {// 监控循环:定期从集群拉取作业的最新状态和进度。while (!isComplete()) {// 2.1 打印进度信息,包括Map和Reduce的完成百分比。log.info("Map: " + getMapProgress() + " Reduce: " + getReduceProgress());// 2.2 线程睡眠一段时间(例如1秒)后再次检查,避免过度频繁查询。Thread.sleep(1000);}} else {// 非详细模式:静默等待,直到作业完成(成功或失败)。while (!isComplete()) {Thread.sleep(1000);}}// 3. 返回作业的最终状态(成功与否)。return isSuccessful();}/*** 提交作业到MapReduce集群。* 这是提交流程的公开入口,内部会进行状态检查并触发实际的提交逻辑。** @throws IOException 如果在提交过程中发生I/O异常。* @throws InterruptedException 如果操作被中断。* @throws ClassNotFoundException 如果找不到作业配置的类。*/public void submit() throws IOException, InterruptedException, ClassNotFoundException {// 确保作业处于“已定义”状态,避免重复提交。ensureState(JobState.DEFINE);// 1.1.1 建立与集群的连接。此步骤至关重要,它决定了作业的运行环境(本地/Yarn)。// 创建一个 Cluster 对象,该对象作为与集群交互的代理。setUseNewAPI();connect();// 获取一个提交器(Submitter)实例,该实例包含了提交作业的核心内部逻辑。final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());// 1.1.2 调用内部提交方法,执行繁重的提交工作。status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {return submitter.submitJobInternal(Job.this, cluster);}});// 提交成功后,将作业状态更新为“运行中”。state = JobState.RUNNING;}/*** 建立与MapReduce集群的连接。* 通过分析配置,初始化一个 Cluster 实例,该实例抽象了底层的运行环境。** @throws IOException 如果连接失败。* @throws InterruptedException 如果连接过程被中断。*/private void connect() throws IOException, InterruptedException {// 创建 Cluster 对象。在构造函数中,会调用 initialize 方法来确定运行环境。// 关键参数:mapreduce.framework.name (设置为 'yarn' 或 'local')cluster = Cluster.getInstance(getConfiguration());}// --- 内部类 JobSubmitter 的核心方法 submitJobInternal 详解 ---/*** JobSubmitter 类的核心方法,负责将作业的所有必要资源准备并提交到集群。* 这是整个提交过程中最复杂、最关键的步骤。** @param job 要提交的Job对象。* @param cluster 代表集群连接的Cluster对象。* @return 作业的初始状态(JobStatus)。* @throws IOException 如果在准备或提交文件时发生I/O异常。* @throws InterruptedException 如果操作被中断。* @throws ClassNotFoundException 如果作业配置的InputFormat等类找不到。*/JobStatus submitJobInternal(Job job, Cluster cluster) throws IOException, InterruptedException, ClassNotFoundException {// 1.1.2.1 输出检查:确保作业配置正确且安全。checkSpecs(job);// 1.1.2.2 创建Staging目录:在集群文件系统(HDFS或本地)上为本次作业创建临时工作区。// 路径示例(Yarn on HDFS):/user/<username>/.stagingPath jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 1.1.2.3 获取JobID:向集群申请一个全局唯一的作业标识符。// 格式通常为:job_<集群启动时间戳>_<序列号> (e.g., job_1687891234567_0001)JobID jobId = submitClient.getNewJobID();// 1.1.2.4 创建Job专属提交路径:在Staging目录下,以JobID命名创建子目录。// 路径示例:/user/<username>/.staging/job_1687891234567_0001/Path submitJobDir = new Path(jobStagingArea, jobId.toString());// 1.1.2.5 拷贝资源到集群:将作业运行所需的JAR包、依赖库、配置文件等上传到提交目录。// 这包括:// - 作业的JAR包(如果存在)// - 通过 -libjars 添加的第三方JAR// - 通过 -files 添加的分布式缓存文件// - 通过 -archives 添加的归档文件copyAndConfigureFiles(job, submitJobDir);// 定义变量来保存计算出的切片数量,这对决定需要启动多少个Map任务至关重要。int maps;// 1.1.2.6 计算切片并生成切片规划文件:这是并行度的基础。try {// writeSplits 方法内部会调用 writeNewSplitsmaps = writeSplits(job, submitJobDir);}// ... 异常处理// 将计算出的Map任务数量正式设置到作业配置中。conf.setInt(MRJobConfig.NUM_MAPS, maps);// 1.1.2.7 写入作业配置文件:将当前的Configuration对象序列化为XML文件(job.xml)并上传到提交目录。// 这个文件包含了作业运行所需的所有参数,将在任务运行时被分布式加载。writeConf(conf, submitJobFile);// 1.1.2.8 正式提交作业:调用集群客户端(LocalJobRunner 或 YARNRunner)的提交方法。// 此时,所有准备工作已完成,集群开始接管作业的执行。status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());return status;}/*** 检查作业的输出规范是否合法。** @param job 要检查的作业。* @throws IOException 如果输出路径未设置或已存在。*/private void checkSpecs(Job job) throws IOException {// 检查1:输出路径是否已设置。OutputFormat<?, ?> output = ReflectionUtils.newInstance(job.getOutputFormatClass(), conf);output.checkOutputSpecs(job);// OutputFormat的checkOutputSpecs实现通常会:// - 确保 job.getOutputPath() 不为空。// - 确保 job.getOutputPath() 指向的路径在文件系统中不存在,防止数据被意外覆盖。}/*** 计算输入数据的切片,并生成切片信息文件。** @param job 当前作业。* @param jobSubmitDir 作业提交目录。* @return 计算出的切片数量(即Map任务数量)。* @throws IOException 如果计算切片或写入文件时发生I/O错误。* @throws InterruptedException 如果操作被中断。*/private int writeNewSplits(Job job, Path jobSubmitDir) throws IOException, InterruptedException {// 通过反射实例化用户在作业中配置的 InputFormat 类(如 TextInputFormat, KeyValueTextInputFormat)。InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);// 核心调用:获取输入数据的逻辑切片列表。// 此方法会调用具体InputFormat的getSplits方法,其逻辑大致如下:// 1. 获取输入文件列表。// 2. 对于每个文件:// a. 获取文件长度(fs.getFileStatus(path).getLen())。// b. 根据配置的切片大小(mapreduce.input.fileinputformat.split.maxsize)计算文件需要被切分成几块。// c. 每个切片(FileSplit)包含以下元信息:// - 文件路径// - 切片在文件中的起始偏移量// - 切片长度// - 切片所在的宿主(位置信息,用于优化调度)List<InputSplit> splits = input.getSplits(job);// 将切片信息序列化到提交目录下的 'job.split' 文件中。// 同时,将切片的元数据信息写入 'job.splitmetainfo' 文件以供快速访问。JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), splits);// 返回切片数量,这决定了集群需要启动多少个Map任务。return splits.size();}/*** 将作业的配置信息写入XML文件。** @param conf 作业的配置对象。* @param confFile 配置文件的目标路径(通常是提交目录下的 'job.xml')。* @throws IOException 如果写入文件失败。*/private void writeConf(Configuration conf, Path confFile) throws IOException {// 创建一个指向 confFile 的 FSDataOutputStream。// 将Configuration对象中的所有键值对以XML格式序列化到该输出流中。// 这个 job.xml 文件将被分发到每个Map和Reduce任务节点,任务通过加载此文件来获取运行参数。conf.writeXml(out);}
}/*** 代表一个MapReduce集群。根据配置,它可能是本地集群(LocalJobRunner)或远程Yarn集群(YARNRunner)。* 这个类封装了与特定集群环境交互的细节。*/
public class Cluster {/*** 根据配置初始化集群客户端。** @param conf 作业配置。* @throws IOException* @throws InterruptedException*/protected void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException, InterruptedException {// 决策逻辑:// 读取 conf.get(MRConfig.FRAMEWORK_NAME) 的值。// 如果值为 "local" -> 创建 LocalJobRunner 客户端。// - LocalJobRunner: 在单个JVM进程中顺序执行Map和Reduce任务,用于本地测试和调试。// 如果值为 "yarn" -> 创建 YARNRunner 客户端。// - YARNRunner: 与Yarn ResourceManager通信,将作业作为Application提交到Yarn集群上执行。this.client = clientProvider.createClient(jobTrackAddr, conf);}// ... 其他方法,如 getNewJobID(), submitJob() 等,实际委托给具体的 client (LocalJobRunner/YARNRunner) 执行。
}
总结:
环境探测与连接建立:首先确定作业的运行环境(本地或Yarn),并建立相应的连接代理(
Cluster
)。安全性与规范性检查:检查输出路径,防止数据覆盖等错误。
资源准备与上传:在集群文件系统上创建工作目录(Staging Area),并将作业的JAR包、依赖库、配置文件等资源上传至此。
并行度规划(核心):通过用户指定的
InputFormat
对输入数据进行逻辑切片(Splitting),切片数量直接决定了Map任务的数量,这是MapReduce并行计算的基石。配置序列化:将作业的所有配置参数写入XML文件,以便在任务运行时被分发和加载。
最终提交与执行:将所有准备就绪的文件路径等信息传递给集群客户端(
LocalJobRunner
/YARNRunner
),由集群正式接管并调度执行该作业。持续监控:作业提交后,客户端进入监控循环,不断从集群获取状态和进度,直到作业完成。