MR 中 Job 提交過程源碼分析總結

分享 ? AIZero ? 于 2020-08-23 20:43:20 ? 最后回復由 AIZero 2020-08-25 17:32:04 ? 475 閱讀

源碼分析被老師吐槽后,認真做了一遍,結合網上資源慢慢整理理出來,用到線程的地方看得有點迷,其中第二層核心面板submitJobInternal()方法中生成密鑰那部分不知道在干啥,還有提交后Yarn如何進入到MapTask中的方法這段中間的細節比較模糊,希望有大佬指點。

偽代碼總結
waitForCompletion()

submit();

// 1.建立連接
    connect();  
        // 創建提交Job的代理
        new Cluster(getConfiguration());
            // 判斷是本地模式還是yarn模式
            initialize(jobTrackAddr, conf); 

// 2.準備作業資源目錄
submitter.submitJobInternal(Job.this, cluster)
    // 創建給集群提交數據的Stag路徑
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    // 獲取jobid,并創建Job路徑
    JobID jobId = submitClient.getNewJobID();

    // 建立資源目錄,并拷貝相關到該目錄下
    copyAndConfigureFiles(job, submitJobDir);   
    rUploader.uploadFiles(job, jobSubmitDir);

// 3.計算切片,并放入資源目錄下
writeSplits(job, submitJobDir);
        maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);

// 4.生成job.xml文件并放入資源目錄
writeConf(conf, submitJobFile);
    conf.writeXml(out);

// 5.提交Job,返回提交狀態
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());


入口位置
 public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      // Job提交過程在集中在這個方法中
      submit();
    }
    if (verbose) {
        // 檢測并打印Job相關信息
        monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }


第一層核心面板:submit()
public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    // 再次確認作業狀態
    ensureState(JobState.DEFINE);
    // 兼容舊版本API
    setUseNewAPI();
    // 副核心面板:為Job中的Cluster賦值,建立連接
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    // 主核心面板
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }


第二層副核心:connect()

connect():作業提交時連接集群的方法,本質上是構造Cluster的實例cluster。Cluster內部,ClientProtocolProvider的create方法創造能與集群通信的客戶端通信協議ClientProtocol實例client。client存在兩種模式,一個是Yarn模式中的YARNRunner,另一個是Local模式的LocalJobRunner。其中YARNRunner中的resMgrDelegate是與Yarn集群通信的核心。

T0. Cluster類成員信息

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Cluster {

  @InterfaceStability.Evolving
  public static enum JobTrackerStatus {INITIALIZING, RUNNING};

  private ClientProtocolProvider clientProtocolProvider; // 協議生成者
  private ClientProtocol client;    // 客戶端協議實例
  private UserGroupInformation ugi;
  private Configuration conf;
  private FileSystem fs = null;
  private Path sysDir = null;
  private Path stagingAreaDir = null;  // 作業資源存放路徑
  private Path jobHistoryDir = null;
  private static final Log LOG = LogFactory.getLog(Cluster.class);

  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
      ServiceLoader.load(ClientProtocolProvider.class);

  static {
    ConfigUtil.loadResources();
  }
  ...
}

T1. 第一級代碼

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);// 初始化入口
  }

T2. 第二級代碼

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        ClientProtocol clientProtocol = null; 
        try {
          // 配置文件判斷有無YARN的配置有則YARN集群運行,無則本地運行
            if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: ", e);
        }
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }

T3.1 第三級:本地運行返回:LocalRunner

public ClientProtocol create(Configuration conf) throws IOException {
        String framework = conf.get("mapreduce.framework.name", "local");
        if (!"local".equals(framework)) {
            return null;
        } else {
            conf.setInt("mapreduce.job.maps", 1);
            return new LocalJobRunner(conf);
        }
    }

T3.2 第三級:YARN集群運行,返回YarnRunner

public ClientProtocol create(Configuration conf) throws IOException {
    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
      return new YARNRunner(conf);
    }
    return null;
  }

T4. 第四級:YARNRunner類

public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
      ClientCache clientCache) {
    this.conf = conf;
    try {
      // ResourceManager的代理類,與Yarn集群通信的核心
      this.resMgrDelegate = resMgrDelegate; 
      this.clientCache = clientCache;
      this.defaultFileContext = FileContext.getFileContext(this.conf);
    } catch (UnsupportedFileSystemException ufe) {
      throw new RuntimeException("Error in instantiating YarnClient", ufe);
    }
  }


第二層核心面板:submitJobInternal()

JobSubmitter中實例成員對象

class JobSubmitter {
  protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
  private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
  private static final int SHUFFLE_KEY_LENGTH = 64;
  private FileSystem jtFs;
  private ClientProtocol submitClient;
  private String submitHostName;
  private String submitHostAddress;
  ...
}

T0. 核心面板

JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    // 路徑檢查【1】
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    // 添加應用框架路徑到分布式緩存中
    addMRFrameworkToDistributedCache(conf);
    // 通過cluster.getStagingAreaDir()獲得作業執行時相關資源存放路徑 【2】
    // 例如mapred\staging\admin768650134\.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    // 配置conf記錄提交作業的主機IP、主機名
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    // 生成JobId
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    // 構造提交作業路徑,在jobStagingArea加上/jobId
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // 獲得路徑的授權令牌
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      // 獲取密鑰和令牌,并將它們儲存到TokenCache中
      populateTokenCache(conf, job.getCredentials());

      // 這里并不是很懂?求大佬指點
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }
        // 這里創建需要提交的目錄,并將所需文件copy過來 【3】
        // 例如\mapred\staging\admin768650134\.staging\job_local768650134_0001在本地運行時,這個目錄是空的
      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      // 核心:數據分片 【4】
      // 經過這個方法,數據分片信息將存在工作資源目錄下
        int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // 配置文件的寫入,job.xml的生成 【5】
      writeConf(conf, submitJobFile);

      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      // 實際提交作業,Client類提交,YARNRunner或者LocalJobRunner 【6】
        status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

T1. checkSpecs(job)校驗輸入輸出路徑

private void checkSpecs(Job job) throws ClassNotFoundException, 
      InterruptedException, IOException {
    JobConf jConf = (JobConf)job.getConfiguration();
    // Check the output specification
    if (jConf.getNumReduceTasks() == 0 ? 
        jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
      org.apache.hadoop.mapreduce.OutputFormat<?, ?> output =
        ReflectionUtils.newInstance(job.getOutputFormatClass(),
          job.getConfiguration());
      output.checkOutputSpecs(job);
    } else {
      jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
    }
  }

T2. getStagingDir(cluster, conf)獲得文件相關路徑

public static Path getStagingDir(Cluster cluster, Configuration conf) 
  throws IOException,InterruptedException {
    Path stagingArea = cluster.getStagingAreaDir();
    FileSystem fs = stagingArea.getFileSystem(conf);
    String realUser;
    String currentUser;
    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
    realUser = ugi.getShortUserName();
    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
    if (fs.exists(stagingArea)) {
      FileStatus fsStatus = fs.getFileStatus(stagingArea);
      String owner = fsStatus.getOwner();
      if (!(owner.equals(currentUser) || owner.equals(realUser))) {
         throw new IOException("The ownership on the staging directory " +
                      stagingArea + " is not as expected. " +
                      "It is owned by " + owner + ". The directory must " +
                      "be owned by the submitter " + currentUser + " or " +
                      "by " + realUser);
      }
      if (!fsStatus.getPermission().equals(JOB_DIR_PERMISSION)) {
        LOG.info("Permissions on staging directory " + stagingArea + " are " +
          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
          "to correct value " + JOB_DIR_PERMISSION);
        fs.setPermission(stagingArea, JOB_DIR_PERMISSION);
      }
    } else {
      fs.mkdirs(stagingArea, 
          new FsPermission(JOB_DIR_PERMISSION));
    }
    return stagingArea;
  }

}

T3. copyAndConfigureFiles(job, submitJobDir)

private void copyAndConfigureFiles(Job job, Path jobSubmitDir) 
  throws IOException {
    JobResourceUploader rUploader = new JobResourceUploader(jtFs);
    rUploader.uploadFiles(job, jobSubmitDir);

    // Get the working directory. If not set, sets it to filesystem working dir
    // This code has been added so that working directory reset before running
    // the job. This is necessary for backward compatibility as other systems
    // might use the public API JobConf#setWorkingDirectory to reset the working
    // directory.
    job.getWorkingDirectory();
  }

T3.1 uploadFiles:創建資源目錄并拷貝jar包等文件到資源目錄下

public void uploadFiles(Job job, Path submitJobDir) throws IOException {
    Configuration conf = job.getConfiguration();
    short replication =
        (short) conf.getInt(Job.SUBMIT_REPLICATION,
            Job.DEFAULT_SUBMIT_REPLICATION);

    if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
      LOG.warn("Hadoop command-line option parsing not performed. "
          + "Implement the Tool interface and execute your application "
          + "with ToolRunner to remedy this.");
    }

    // get all the command line arguments passed in by the user conf
    String files = conf.get("tmpfiles");
    String libjars = conf.get("tmpjars");
    String archives = conf.get("tmparchives");
    String jobJar = job.getJar();

    // Create a number of filenames in the JobTracker's fs namespace
    LOG.debug("default FileSystem: " + jtFs.getUri());
    if (jtFs.exists(submitJobDir)) {
      throw new IOException("Not submitting job. Job directory " + submitJobDir
          + " already exists!! This is unexpected.Please check what's there in"
          + " that directory");
    }
    submitJobDir = jtFs.makeQualified(submitJobDir);
    submitJobDir = new Path(submitJobDir.toUri().getPath());
    FsPermission mapredSysPerms =
        new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
// 創建目錄文件夾
    FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
    Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
    Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
    Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
    // add all the command line files/ jars and archive
    // first copy them to jobtrackers filesystem

    if (files != null) {
      FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
      String[] fileArr = files.split(",");
      for (String tmpFile : fileArr) {
        URI tmpURI = null;
        try {
          tmpURI = new URI(tmpFile);
        } catch (URISyntaxException e) {
          throw new IllegalArgumentException(e);
        }
        Path tmp = new Path(tmpURI);
        Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
        try {
          URI pathURI = getPathURI(newPath, tmpURI.getFragment());
          DistributedCache.addCacheFile(pathURI, conf);
        } catch (URISyntaxException ue) {
          // should not throw a uri exception
          throw new IOException("Failed to create uri for " + tmpFile, ue);
        }
      }
    }
    ...
  }

T4. writeSplits(job, submitJobDir)數據分片

private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    // 通過反射獲取InputFormat,這里需要調用TextInputFormat父類FileInputFormat的getSplits方法來幫助分片,不同類型的分片方式不一樣
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    // 第二級方法入口
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

T4.1 第二級:getSplits方法,計算切片的核心代碼

public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    // 獲取輸入源所有文件
    List<FileStatus> files = listStatus(job);
    // 這個for循環意味著面向文件輸出切片,所以一個切片讀取是不能跨文件的。
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        // 文件允許切片則運行,有些文件無法切片,比如壓縮文件
          if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          // SPLIT_SLOP默認1.1,如果最后一個切片小于splitSize的1.1倍則直接作為一個切片
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            // 第三級入口:切片信息與塊信息是如何關聯的
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            // 生成切片,參數包含:切片的位置,偏移量,大小,數據塊位置信息,切片內存位置
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
            // 剩下的單獨成片
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
            // 不可切片的單獨成片
            splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
  }

T4.1.1 第三級:切片信息與塊信息的聯系

protected int getBlockIndex(BlockLocation[] blkLocations, 
                              long offset) {
    for (int i = 0 ; i < blkLocations.length; i++) {
      // is the offset inside this block?
      if ((blkLocations[i].getOffset() <= offset) &&
          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
         // 通過塊的偏移量夾出切片的偏移量在哪個塊中
         return i;
      }
    }

T5. 創建Job.xml,并寫入配置信息

private void writeConf(Configuration conf, Path jobFile) 
      throws IOException {
    // Write job file to JobTracker's fs        
    // 創建job.xml文件
    FSDataOutputStream out = 
      FileSystem.create(jtFs, jobFile, 
         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
    try {
      // 將配置信息寫入job.xml
      conf.writeXml(out);
    } finally {
      out.close();
    }
  }

T6. submitClient.submitJob()作業提交

T6.1 本地模式作業提交

public JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, Credentials credentials) throws IOException {
        LocalJobRunner.Job job = new LocalJobRunner.Job(JobID.downgrade(jobid), jobSubmitDir);
        job.job.setCredentials(credentials);
        return job.status;
    }

T6.2 YARN模式作業提交

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {

    addHistoryToken(ts);

    // Construct necessary information to start the MR AM
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    try {
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }


版權聲明:原創作品,允許轉載,轉載時務必以超鏈接的形式表明出處和作者信息。否則將追究法律責任。來自海牛部落-AIZero,http://hainiubl.com/topics/75296
本帖已被設為精華帖!
本帖由 青牛 于 3月前 加精
回復數量: 2
  • 青牛 國內首批大數據從業者,就職于金山,擔任大數據團隊核心研發工程師
    ? 2020-08-24 00:11:03

    主動分析源碼,打賞10元。
    建議:文章中最后一步看到的是運行了yarn的appMaster,運行mapTask的流程應該是client上到jar包到HDFS,運行maptask的節點從HDFS下載jar包,然后在節點運行的mapTask去主動聯系appMaster,所以后續應該再去看一下mapTask的代碼。

  • AIZero
    ? 2020-08-25 17:32:04

    @青牛 嗯嗯,謝謝老師,昨天總的流程過了幾遍,最近會把MR的源碼部分總結完

暫無評論~~
  • 請注意單詞拼寫,以及中英文排版,參考此頁
  • 支持 Markdown 格式, **粗體**、~~刪除線~~、`單行代碼`, 更多語法請見這里 Markdown 語法
  • 支持表情,可用Emoji的自動補全, 在輸入的時候只需要 ":" 就可以自動提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上傳圖片, 支持拖拽和剪切板黏貼上傳, 格式限制 - jpg, png, gif,教程
  • 發布框支持本地存儲功能,會在內容變更時保存,「提交」按鈕點擊時清空
Ctrl+Enter
上海麻将垃圾胡技巧 3152337821651583887231294417011912494468321574614047746652437530849134232897888476935842690262695241 (function(){ var bp = document.createElement('script'); var curProtocol = window.location.protocol.split(':')[0]; if (curProtocol === 'https') { bp.src = 'https://zz.bdstatic.com/linksubmit/push.js'; } else { bp.src = 'http://push.zhanzhang.baidu.com/push.js'; } var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(bp, s); })();