JobTracker和TaskTracker分别启动之后(JobTracker启动流程源码级分析TaskTracker启动过程源码级分析),taskTracker会通过心跳与JobTracker通信,并获取分配它的任务。用户将作业提交到JobTracker之后,放入相应的数据结构中,静等被分配。mapreduce job提交流程源码级分析(三)这篇文章已经分析了用户提交作业的最后步骤,主要是构造作业对应的JobInProgress并加入jobs,告知所有的JobInProgressListener。

  默认调度器创建了两个Listener:JobQueueJobInProgressListener和EagerTaskInitializationListener,用户提交的作业被封装成JobInProgress job加入这两个Listener。

  一、JobQueueJobInProgressListener.jobAdded(job)会将此JobInProgress放入Map<JobSchedulingInfo, JobInProgress> jobQueue中。

  二、EagerTaskInitializationListener.jobAdded(job)会将此JobInProgress放入List<JobInProgress> jobInitQueue中,然后调用resortInitQueue()对这个列表进行排序先按优先级相同则按开始时间;然后唤醒在此对象监视器上等待的所有线程jobInitQueue.notifyAll()。EagerTaskInitializationListener.start()方法已经在调度器start时运行,会创建一个线程JobInitManager implements Runnable,它的run方法主要是监控jobInitQueue列表,一旦发现不为空就获取第一个JobInProgress,然后创建一个InitJob implements Runnable初始化线程并放入线程池ExecutorService threadPool(这个线程池在构建EagerTaskInitializationListener对象时由构造方法实现),InitJob线程的run方法就一句话ttm.initJob(job),调用的是JobTracker的initJob(job)方法对JIP进行初始化,实际调用JobInProgress.initTasks()对job进行初始化,initTasks()方法代码如下:

  /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
    */
   //任务Task分两种: MapTask 和reduceTask,它们的管理对象都是TaskInProgress 。
   public synchronized void initTasks()
   throws IOException, KillInterruptedException, UnknownHostException {
     if (tasksInited || isComplete()) {
       return;
     }
     synchronized(jobInitKillStatus){
       if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
         return;
       }
       jobInitKillStatus.initStarted = true;
     }

     LOG.info("Initializing " + jobId);
     final long startTimeFinal = this.startTime;
     // log job info as the user running the job
     try {
     userUGI.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws Exception {
         JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,
             startTimeFinal, hasRestarted());
         return null;
       }
     });
     } catch(InterruptedException ie) {
       throw new IOException(ie);
     }

     // log the job priority
     setPriority(this.priority);

     //
     // generate security keys needed by Tasks
     //
     generateAndStoreTokens();

     //
     // read input splits and create a map per a split
     //
     TaskSplitMetaInfo[] splits = createSplits(jobId);
     if (numMapTasks != splits.length) {
       throw new IOException("Number of maps in JobConf doesn't match number of " +
               "recieved splits for job " + jobId + "! " +
               "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
     }
     numMapTasks = splits.length;//map task的个数就是input split的个数

     // Sanity check the locations so we don't create/initialize unnecessary tasks
     for (TaskSplitMetaInfo split : splits) {
       NetUtils.verifyHostnames(split.getLocations());
     }

     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
     this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
     this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);

     maps = new TaskInProgress[numMapTasks]; //为每个map tasks生成一个TaskInProgress来处理一个input split
     for(int i=0; i < numMapTasks; ++i) {
       inputLength += splits[i].getInputDataLength();
       maps[i] = new TaskInProgress(jobId, jobFile,         //类型是map task
                                    splits[i],
                                    jobtracker, conf, this, i, numSlotsPerMap);
     }
     LOG.info("Input size for job " + jobId + " = " + inputLength
         + ". Number of splits = " + splits.length);

     // Set localityWaitFactor before creating cache
     localityWaitFactor =
       conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
     /* 对于map task,将其放入nonRunningMapCache,是一个Map<Node,List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input
     split所在的Node上。在此,Node代表一个datanode或者机架或者数据中  心。nonRunningMapCache将在JobTracker向TaskTracker分配map task的 时候使用。
     */
     if (numMapTasks > 0) {
         //通过createCache()方法为这些TaskInProgress对象产生一个未执行任务的Map缓存nonRunningMapCache。
         //slave端的TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。
       nonRunningMapCache = createCache(splits, maxLevel);
     }

     // set the launch time
     this.launchTime = jobtracker.getClock().getTime();

     //
     // Create reduce tasks
     //
     //其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,
     //缺省只创建1个Reduce任务。监控和调度Reduce任务的是TaskInProgress类,不过构造方法有所不同,
     //TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地,
     //initTasks()也会通过createCache()方法产生nonRunningReduceCache成员。
     this.reduces = new TaskInProgress[numReduceTasks];
     for (int i = 0; i < numReduceTasks; i++) {
       reduces[i] = new TaskInProgress(jobId, jobFile,     //这是reduce task
                                       numMapTasks, i,
                                       jobtracker, conf, this, numSlotsPerReduce);
       /*reducetask放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。*/
       nonRunningReduces.add(reduces[i]);
     }

     // Calculate the minimum number of maps to be complete before
     // we should start scheduling reduces
     completedMapsForReduceSlowstart =
       (int)Math.ceil(
           (conf.getFloat("mapred.reduce.slowstart.completed.maps",
                          DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
            numMapTasks));

     // ... use the same for estimating the total output of all maps
     resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);

     // create cleanup two cleanup tips, one map and one reduce.
   //创建两个cleanup task,一个用来清理map,一个用来清理reduce.
     cleanup = new TaskInProgress[2];

     // cleanup map tip. This map doesn't use any splits. Just assign an empty
     // split.
     TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
     cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
             jobtracker, conf, this, numMapTasks, 1);
     cleanup[0].setJobCleanupTask();

     // cleanup reduce tip.
     cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks, jobtracker, conf, this, 1);
     cleanup[1].setJobCleanupTask();

     // create two setup tips, one map and one reduce.
     //创建两个初始化 task,一个初始化map,一个初始化reduce.
     setup = new TaskInProgress[2];

     // setup map tip. This map doesn't use any split. Just assign an empty
     // split.
     setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
             jobtracker, conf, this, numMapTasks + 1, 1);
     setup[0].setJobSetupTask();

     // setup reduce tip.
     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks + 1, jobtracker, conf, this, 1);
     setup[1].setJobSetupTask();

     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
       if(jobInitKillStatus.killed) {
         throw new KillInterruptedException("Job " + jobId + " killed in init");
       }
     }
     //JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,
     //然后再调用JobHistory.JobInfo.logInited()记录job的执行日志。
     tasksInited = true;
     JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
                                  numMapTasks, numReduceTasks);

    // Log the number of map and reduce tasks
    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
             + " map tasks and " + numReduceTasks + " reduce tasks.");
   }

  initTasks方法的主要工作是读取上传的分片信息,检查分片的有效性及要和配置文件中的numMapTasks相等,然后创建numMapTasks个TaskInProgress作为Map Task。通过createCache方法,将没有找到对应分片的map放入nonLocalMaps中,获取分片所在节点,然后将节点与其上分片对应的map对应起来,放入Map<Node, List<TaskInProgress>> cache之中,需要注意的是还会根据设定的网络深度存储父节点(可能存在多个子节点)下所有子节点包含的map,从这可以看出这里实现了本地化,将这个cache赋值给nonRunningMapCache表示还未运行的map。然后是创建reduce task,创建numReduceTasks个TaskInProgress,放入nonRunningReduces。这里需要注意:map和reduce都是TaskInProgress那以后咋区分呢?其实这两种的构造函数是不同的,判断两种类型的task的根据就是splitInfo有无设置,map task对splitInfo进行了设置,而reduce task则设splitInfo=null。然后是获取map task完成的最小数量才可以调度reduce task。创建两个清理task:cleanup = new TaskInProgress[2],一个用来清理map task(这个也是一个map task),一个用来清理reduce task(这个也是一个reduce task),TaskInProgress构造函数的task个数参数都为1,map的splitInfo是JobSplit.EMPTY_TASK_SPLIT;创建两个初始化task:setup = new TaskInProgress[2],一个用来初始化map task(这个也是一个map task),一个用来初始化reduce task(这个也是一个reduce task),这4个TaskInProgress都会设置对应的标记为来表示类型。最后是设置一个标记位来表示完成初始化工作。

  这样EagerTaskInitializationListener在JobTracker端就完成了对Job的初始化工作,所有task等待taskTracker的心跳被调度。

  来看TaskTracker通过心跳提交状态的方法JobTracker.heartbeat,该方法代码:

   /**
    * The periodic heartbeat mechanism between the {@link TaskTracker} and
    * the {@link JobTracker}.
    *
    * The {@link JobTracker} processes the status information sent by the
    * {@link TaskTracker} and responds with instructions to start/stop
    * tasks or jobs, and also 'reset' instructions during contingencies.
    */
   public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
                                                   boolean restarted,
                                                   boolean initialContact,
                                                   boolean acceptNewTasks,
                                                   short responseId)
     throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Got heartbeat from: " + status.getTrackerName() +
                 " (restarted: " + restarted +
                 " initialContact: " + initialContact +
                 " acceptNewTasks: " + acceptNewTasks + ")" +
                 " with responseId: " + responseId);
     }

     // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
     if (!acceptTaskTracker(status)) {
       throw new DisallowedTaskTrackerException(status);
     }

     // First check if the last heartbeat response got through
     String trackerName = status.getTrackerName();
     long now = clock.getTime();
     if (restarted) {
       faultyTrackers.markTrackerHealthy(status.getHost());
     } else {
       faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
     }

     HeartbeatResponse prevHeartbeatResponse =
       trackerToHeartbeatResponseMap.get(trackerName);
     boolean addRestartInfo = false;

     if (initialContact != true) {
       // If this isn't the 'initial contact' from the tasktracker,
       // there is something seriously wrong if the JobTracker has
       // no record of the 'previous heartbeat'; if so, ask the
       // tasktracker to re-initialize itself.
       if (prevHeartbeatResponse == null) {
         // This is the first heartbeat from the old tracker to the newly
         // started JobTracker
         if (hasRestarted()) {
           addRestartInfo = true;
           // inform the recovery manager about this tracker joining back
           recoveryManager.unMarkTracker(trackerName);
         } else {
           // Jobtracker might have restarted but no recovery is needed
           // otherwise this code should not be reached
           LOG.warn("Serious problem, cannot find record of 'previous' " +
                    "heartbeat for '" + trackerName +
                    "'; reinitializing the tasktracker");
           return new HeartbeatResponse(responseId,
               new TaskTrackerAction[] {new ReinitTrackerAction()});
         }

       } else {

         // It is completely safe to not process a 'duplicate' heartbeat from a
         // {@link TaskTracker} since it resends the heartbeat when rpcs are
         // lost see {@link TaskTracker.transmitHeartbeat()};
         // acknowledge it by re-sending the previous response to let the
         // {@link TaskTracker} go forward.
         if (prevHeartbeatResponse.getResponseId() != responseId) {
           LOG.info("Ignoring 'duplicate' heartbeat from '" +
               trackerName + "'; resending the previous 'lost' response");
           return prevHeartbeatResponse;
         }
       }
     }

     // Process this heartbeat
     short newResponseId = (short)(responseId + 1);  //响应编号+1
     status.setLastSeen(now);
     if (!processHeartbeat(status, initialContact, now)) {
       if (prevHeartbeatResponse != null) {
         trackerToHeartbeatResponseMap.remove(trackerName);
       }
       return new HeartbeatResponse(newResponseId,
                    new TaskTrackerAction[] {new ReinitTrackerAction()});
     }

     // Initialize the response to be sent for the heartbeat
     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
     boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
     // Check for new tasks to be executed on the tasktracker
     if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
       TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
       } else {
           //setup和cleanup的task优先级最高
         List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
         if (tasks == null ) {
             //任务调度器分配任务
           tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));    //分配任务Map OR Reduce Task
         }

         if (tasks != null) {
           for (Task task : tasks) {
             //将任务放入actions列表,返回给TaskTracker
             expireLaunchingTasks.addNewTask(task.getTaskID());
             if(LOG.isDebugEnabled()) {
               LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
             }
             actions.add(new LaunchTaskAction(task));
           }
         }
       }
     }

     // Check for tasks to be killed
     List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
     if (killTasksList != null) {
       actions.addAll(killTasksList);
     }

     // Check for jobs to be killed/cleanedup
     List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
     if (killJobsList != null) {
       actions.addAll(killJobsList);
     }

     // Check for tasks whose outputs can be saved
     List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
     if (commitTasksList != null) {
       actions.addAll(commitTasksList);
     }

     // calculate next heartbeat interval and put in heartbeat response
     int nextInterval = getNextHeartbeatInterval();
     response.setHeartbeatInterval(nextInterval);
     response.setActions(
                         actions.toArray(new TaskTrackerAction[actions.size()]));

     // check if the restart info is req
     if (addRestartInfo) {
       response.setRecoveredJobs(recoveryManager.getJobsToRecover());
     }

     // Update the trackerToHeartbeatResponseMap
     trackerToHeartbeatResponseMap.put(trackerName, response);

     // Done processing the hearbeat, now remove 'marked' tasks
     removeMarkedTasks(trackerName);

     return response;
   }

 一、该方法包括5个参数:A、status封装了TaskTracker上的各种状态信息,包括: TaskTracker名称;TaskTracker主机名;TaskTracker对外的HTTp端口号;该TaskTracker上已经失败的任务总数;正在运行的各个任务的运行状态;上次汇报心跳的时间;Map slot总数,即同时运行的Map Task总数;Reduce slot总数;TaskTracker健康状态;TaskTracker资源(内存、CPU)信息。B、restarted表示TaskTracker是否刚刚重启。C、initialContact表示TaskTracker是否初次链接JobTracker。D、acceptNewTasks表示TaskTracker是否可以接受新的任务,这通常取决于solt是否有剩余和节点的健康状况等。E、responseID表示心跳相应编号,用于防止重复发送心跳,没接收一次心跳后该值加1。

  二、acceptTaskTracker(status)检查心跳是否来自于JobTracker所允许的TaskTracker,当一个TaskTracker在mapred.hosts(include list是合法的节点列表,只有位于该列表中的节点才可以允许JobTracker发起链接请求)指定的主机列表中,不在mapred.exclude(exclude list是一个非法节点列表,所有位于这个列表中的节点将无法与JobTracker链接)指定的主机列表中时,可以接入JobTracker。默认情况下这两个列表都为空,可在配置文件mapred-site.xml中配置,可动态加载。

  三、如果TaskTracker重启了,则将它标注为健康的TaskTracker,并从黑名单(Hadoop允许用户编写一个脚本监控TaskTracker是否健康,并通过心跳将检测结果发送给JobTracker,一旦发现不健康,JobTracker会将该TaskTracker加入黑名单,不再分配任务,直到检测结果为健康)或灰名单(JobTracker会记录每个TaskTracker被作业加入黑名单的次数#backlist,满足一定的要求就加入JobTracker的灰名单)中清除,否则,启动TaskTracker容错机制以检查它是否处于健康状态。

  四、获取该TaskTracker对应的HeartbeatResponse,并检查。如果不是第一次连接JobTracker,且对应的HeartbeatResponse等于null(表明JobTracker没有对应的记录,可能TaskTracker出错也可能JobTracker重启了),如果JobTracker重启了,则从recoveryManager中删除这个trackerName,否则向TaskTracker发送初始化命令ReinitTrackerAction;HeartbeatResponse不等于null,有可能是TaskTracker重复发送心跳,如果是重复发送心跳则返回当前的HeartbeatResponse。

  五、更新响应编号(+1);记录心跳发送时间status.setLastSeen(now);然后调用processHeartbeat(status, initialContact, now)方法来处理TaskTracker发送过来的心跳,先通过updateTaskTrackerStatus方法更新一些资源统计情况,并替换掉旧的taskTracker的状态,如果是初次链接JobTracker且JobTracker中有此taskTracker的记录(TT重启),则需要清空和这个TaskTracker相关的信息,如果不是初次链接JobTracker且JobTracker并没有发现此TaskTracker以前的记录,则直接返回false;如果初次链接JobTracker且包含在黑名单中,则increment the count of blacklisted trackers,然后加入trackerExpiryQueue和hostnameToTaskTracker;updateTaskStatuses(trackerStatus)更新task的状态,这个好复杂留待以后分析;updateNodeHealthStatus(trackerStatus, timeStamp)更新节点健康状态;返回true。若返回false,需要从trackerToHeartbeatResponseMap中删除对应的trackerName信息并返回给TaskTracker初始化命令ReinitTrackerAction。

  六、构造TaskTracker的心跳应答。首先获取setup和cleanup的tasks,如果tasks==null则用调度器(默认是JobQueueTaskScheduler)去分配task,tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)),会获得Map Task或者Reduce Task,对应assignTasks方法的代码如下:

 //JobQueueTaskScheduler最重要的方法是assignTasks,他实现了工作调度。
   @Override
   public synchronized List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
     TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     final int numTaskTrackers = clusterStatus.getTaskTrackers();
     final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
     final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

     Collection<JobInProgress> jobQueue =
       jobQueueJobInProgressListener.getJobQueue();
     //首先它会检查 TaskTracker 端还可以做多少个 map 和 reduce 任务,将要派发的任务数是否超出这个数,
     //是否超出集群的任务平均剩余可负载数。如果都没超出,则为此TaskTracker 分配一个 MapTask 或 ReduceTask 。
     //
     // Get map + reduce counts for the current tracker.
     //
     final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
     final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
     final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
     final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();

     // Assigned tasks
     List<Task> assignedTasks = new ArrayList<Task>();

     //
     // Compute (running + pending) map and reduce task numbers across pool
     //
   //计算剩余的map和reduce的工作量:remaining
     int remainingReduceLoad = 0;
     int remainingMapLoad = 0;
     synchronized (jobQueue) {
       for (JobInProgress job : jobQueue) {
         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
           remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
           if (job.scheduleReduces()) {
             remainingReduceLoad +=
               (job.desiredReduces() - job.finishedReduces());
           }
         }
       }
     }

     // Compute the 'load factor' for maps and reduces
     double mapLoadFactor = 0.0;
     if (clusterMapCapacity > 0) {
       mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
     }
     double reduceLoadFactor = 0.0;
     if (clusterReduceCapacity > 0) {
       reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
     }

     //
     // In the below steps, we allocate first map tasks (if appropriate),
     // and then reduce tasks if appropriate.  We go through all jobs
     // in order of job arrival; jobs only get serviced if their
     // predecessors are serviced, too.
     //

     //
     // We assign tasks to the current taskTracker if the given machine
     // has a workload that's less than the maximum load of that kind of
     // task.
     // However, if the cluster is close to getting loaded i.e. we don't
     // have enough _padding_ for speculative executions etc., we only
     // schedule the "highest priority" task i.e. the task from the job
     // with the highest priority.
     //

     final int trackerCurrentMapCapacity =
       Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
                               trackerMapCapacity);
     int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
     boolean exceededMapPadding = false;
     if (availableMapSlots > 0) {
       exceededMapPadding =
         exceededPadding(true, clusterStatus, trackerMapCapacity);
     }
     int numLocalMaps = 0;
     int numNonLocalMaps = 0;
     scheduleMaps:
     for (int i=0; i < availableMapSlots; ++i) {
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
             continue;
           }

           Task t = null;

           // Try to schedule a node-local or rack-local Map task
           t =
             job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus,
                 numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
           if (t != null) {
             assignedTasks.add(t);
             ++numLocalMaps;

             // Don't assign map tasks to the hilt!
             // Leave some free slots in the cluster for future task-failures,
             // speculative tasks etc. beyond the highest priority job
             if (exceededMapPadding) {
               break scheduleMaps;
             }

             // Try all jobs again for the next Map task
             break;
           }

           // Try to schedule a node-local or rack-local Map task
           //产生 Map 任务使用 JobInProgress 的obtainNewMapTask() 方法,
           //实质上最后调用了 JobInProgress 的 findNewMapTask() 访问nonRunningMapCache 。
           t =
             job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                    taskTrackerManager.getNumberOfUniqueHosts());

           if (t != null) {
             assignedTasks.add(t);
             ++numNonLocalMaps;

             // We assign at most 1 off-switch or speculative task
             // This is to prevent TaskTrackers from stealing local-tasks
             // from other TaskTrackers.
             break scheduleMaps;
           }
         }
       }
     }
     int assignedMaps = assignedTasks.size();

     //
     // Same thing, but for reduce tasks
     // However we _never_ assign more than 1 reduce task per heartbeat
     ////分配完map task,再分配reduce task
     final int trackerCurrentReduceCapacity =
       Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
                trackerReduceCapacity);
     final int availableReduceSlots =
       Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
     boolean exceededReducePadding = false;
     if (availableReduceSlots > 0) {
       exceededReducePadding = exceededPadding(false, clusterStatus,
                                               trackerReduceCapacity);
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
           if (job.getStatus().getRunState() != JobStatus.RUNNING ||
               job.numReduceTasks == 0) {
             continue;
           }
           //使用JobInProgress.obtainNewReduceTask() 方法,
           //实质上最后调用了JobInProgress的 findNewReduceTask() 访问 nonRuningReduceCache
           Task t =
             job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
                                     taskTrackerManager.getNumberOfUniqueHosts()
                                     );
           if (t != null) {
             assignedTasks.add(t);
             break;
           }

           // Don't assign reduce tasks to the hilt!
           // Leave some free slots in the cluster for future task-failures,
           // speculative tasks etc. beyond the highest priority job
           if (exceededReducePadding) {
             break;
           }
         }
       }
     }

     if (LOG.isDebugEnabled()) {
       LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +
                 "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +
                 trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +
                 (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
                 assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +
                 ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +
                 trackerCurrentReduceCapacity + "," + trackerRunningReduces +
                 "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +
                 ", " + (assignedTasks.size()-assignedMaps) + "]");
     }

     return assignedTasks;
   }

  该方法会先获取集群的基本信息,容量,然后获取此tasktracker的基本信息(slots及正在运行的task数);然后计算所有运行中的job的剩余量的总和(remainingReduceLoad和remainingMapLoad);分别计算map和reduce的负载因子(都是两种类型的剩余占对应的最大容量比)mapLoadFactor、reduceLoadFactor;然后计算trackerCurrentMapCapacity当前容量这里会使得集群中的所有tasktracker的负载尽量平均,因为Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), trackerMapCapacity),mapLoadFactor * trackerMapCapacity会使得该节点当前map的容量和集群整体的负载相近;然后获取当前tasktracker可用的mapslot,该tasktracker超过集群目前的负载水平后就不分配task,否则会有空闲的slot等待分配task;然后为每个mapslot选择一个map task,选择的过程十分复杂,首先会遍历jobQueue中的每个处于非运行状态的JobInProgress,调JobInProgress.obtainNewNodeOrRackLocalMapTask方法获取基于节点本地或者机架本地的map task,obtainNewNodeOrRackLocalMapTask会通过调用findNewMapTask获取map数组中的索引值。

  (1)首先从失败task选取合适的task直接返回。findNewMapTask方法会先通过findTaskFromList方法从failedMaps获取合适的失败map并返回(返回条件是A、该tasktracker没运行过TaskInProgress;B、该TaskInProgress失败过的节点数不低于运行taskTracker的主机数,这两个满足一个即可),如果有合适的失败map task,则通过scheduleMap(tip)方法将其加入nonLocalRunningMaps(该task没有对应的分片信息)或者runningMapCache(每个分片的存储Node及其对应的maptask列表,还有Node的父节点Node及对应的maptask列表也要加入),然后返回给obtainNewNodeOrRackLocalMapTask这个maptask在map数组中的索引值,此时从失败的task中寻找合适的task并不考虑数据的本地性。

  final SortedSet<TaskInProgress> failedMaps是按照task attempt失败次数排序的TaskInProgress集合。

  Set<TaskInProgress> nonLocalRunningMaps是no-local且正在运行的TaskInProgress结合。

  Map<Node, Set<TaskInProgress>> runningMapCache是Node与运行的TaskInProgress集合映射关系,一个任务获得调度机会,其TaskInProgress便会添加进来。

  (2)如果没有合适的失败task,则获取当前tasktracker对应的Node,然后“从近到远一层一层地寻找,直到找到合适的TaskInProgress”(通过不断获取父Node)从nonRunningMapCache中获取此Node的所有map task列表,如果列表不为空则调用findTaskFromList方法从这个列表中获取合适的TaskInProgress,如果tip!=null 则调用scheduleMap(tip)(上面已经介绍),然后检查列表是否为空,为空则从nonRunningMapCache清除这个Node的所有信息,再返回给obtainNewNodeOrRackLocalMapTask这个maptask在map数组中的索引值,如果遍历拓扑最大层数还是没有合适的task,则返回给obtainNewNodeOrRackLocalMapTask一个值-1,这里说明如果方法findNewMapTask的参数maxCacheLevel大于0则是获取(node-local或者rack-local,后面的其他情况不予考虑),其实就是优先考虑tasktracker对应Node有分片信息的本地的map(是node-local),然后再考虑父Node(同一个机架rack-local)的,再其他的(跨机架off-switch,这点得看设置的网络深度,大于2才会考虑),这样由近及远的做法会使得减少数据的拷贝距离,降低网络开销。  

  Map<Node, List<TaskInProgress>> nonRunningMapCache是Node与未运行的TaskInProgress的集合映射关系,通过作业的InputFormat可直接获取。

  (3)然后获取cache大网络深度的Node;获取该tasktracker对应Node的最深父Node;剩下的和上面(2)中的类似,只不过这次找的跨机架(或者更高一级,主要看设置的网络深度)。选择跨机架的task,scheduleMap(tip);返回给obtainNewNodeOrRackLocalMapTask这个maptask在map数组中的索引值。  

  (4)然后是查找nonLocalMaps中有无合适的task,这种任务没有输入数据,不需考虑本地性。scheduleMap(tip);返回给obtainNewNodeOrRackLocalMapTask这个maptask在map数组中的索引值。

  final List<TaskInProgress> nonLocalMaps是一些计算密集型任务,比如hadoop example中的PI作业。

  (5)如果有“拖后腿”的task(hasSpeculativeMaps==true),遍历runningMapCache,异常从node-local、rack-local、off-switch选择合适的“拖后腿”task,返回给obtainNewNodeOrRackLocalMapTask这个maptask在map数组中的索引值,这不需要scheduleMap(tip),很明显已经在runningMapCache中了。

  (6)从nonLocalRunningMaps中查找“拖后腿”的task,这是计算密集型任务在拖后腿,返回给obtainNewNodeOrRackLocalMapTask这个maptask在map数组中的索引值。

  (7)再找不到返回-1.

  obtainNewNodeOrRackLocalMapTask方法只执行到(2),要么返回一个MapTask要么返回null(findNewMapTask返回的是-1)这个maptask在map数组中的索引值,不再进行后续步骤。

  返回到obtainNewMapTask方法,获得map数组索引值后,还要获取该TaskInProgress的task(可能是MapTask或者ReduceTask,这里是MapTask),把这个task返回给assignTasks方法,加入分配task列表assignedTasks,跳出内层for循环,准备为下一个mapslot找合适的MapTask,如果没有合适的MapTask(node-local或者rack-local),则调用obtainNewNonLocalMapTask获取(除了上面的(2)不执行,其他都执行)MapTask,加入分配task列表assignedTasks,跳出内层for循环。

  然后分配ReduceTask,每次心跳分配不超过1个ReduceTask。和分配mapslot类似,这里至多分配一个reduceslot,遍历jobQueue通过obtainNewReduceTask方法获取合适的ReduceTask。obtainNewReduceTask方法会先做一个检查,和Map Task一样,会对节点的可靠性和磁盘空间进行检查;然后判断Job的map是否运行到该调用reduce的比例,若不到就返回null;然后调用findNewReduceTask方法获取reduce的索引值。findNewReduceTask方法会先检查该Job是否有reduce,没有就返回-1,检查此taskTracker是否可以运行reduce任务,然后调用方法findTaskFromList从nonRunningReduces中选择合适的TaskInProgress,放入runningReduces中,直接返回给obtainNewReduceTask对应的索引;如果没有合适的就从“拖后腿”的runningReduces中通过findSpeculativeTask方法找出退后退的reduce,放入runningReduces中,直接返回给obtainNewReduceTask对应的索引;再找不到就直接返回给obtainNewReduceTask方法-1。然后返回到obtainNewReduceTask方法,获取相应的ReduceTask,返回给assignTasks方法,加入分配任务列表assignedTasks中。

  在分配mapslot和reduceslot时循环中都有判断exceededReducePadding真假值的代码,exceededReducePadding是通过exceededPadding方法来获取的。在任务调度器JobQueueTaskScheduler的实现中,如果在集群中的TaskTracker节点比较多的情况下,它总是会想办法让若干个TaskTracker节点预留一些空闲的slots(计算能力),以便能够快速的处理优先级比较高的Job的Task或者发生错误的Task,以保证已经被调度的作业的完成。exceededPadding方法判断当前集群是否需要预留一部分map/reduce计算能力来执行那些失败的、紧急的或特殊的任务。

  还有一点需要注意的是对于每个slot总是会优先考虑jobQueue中的第一个job的任务(map、reduce),如果分配不成功才会考虑其他Job的,这样尽量保证优先处理第一个Job。

  assignTasks方法最后返回分配任务列表assignedTasks。调度器只分配MapTask和ReduceTask。而作业的其它辅助任务都是交由JobTracker来调度的,如JobSetup、JobCleanup、TaskCleanup任务等。

  对于JobQueueTaskScheduler的任务调度实现原则可总结如下:
     1.先调度优先级高的作业,统一优先级的作业则先进先出;
     2.尽量使集群每一个TaskTracker达到负载均衡(这个均衡是task数量上的而不是实际的工作强度);
     3.尽量分配作业的本地任务给TaskTracker,但不是尽快分配作业的本地任务给TaskTracker,最多分配一个非本地任务给TaskTracker(一是保证任务的并发性,二是避免有些TaskTracker的本地任务被偷走),最多分配一个reduce任务;
      4.为优先级或者紧急的Task预留一定的slot;

  七、遍历任务列表tasks,将所有task放入expireLaunchingTasks中监控是否过期expireLaunchingTasks.addNewTask(task.getTaskID()),然后放入actions.add(new LaunchTaskAction(task))。

  八、遍历taskTracker对应的所有task是否有需要kill的,以及trackerToTasksToCleanup中对应此tasktracker的task需要清理,封装成KillTaskAction,加入actions中。

  九、获取trackerToJobsToCleanup中对应此tasktracker的所有jobs,封装成KillJobAction,加入actions中。

  十、检查tasktracker的所有的task中状态等于TaskStatus.State.COMMIT_PENDING的,封装成CommitTaskAction,加入actions中。表示这个task的输出可以保存。

  十一、计算下一次心跳间隔与actions一同加入响应信息response。

  十二、如果JobTracker重启了,则将需要将需要恢复的Job列表加入response。response.setRecoveredJobs(recoveryManager.getJobsToRecover())

  十三、将trackerName及其响应信息response,加入trackerToHeartbeatResponseMap

  十四、因为已经将任务分配出去了,所以需要更新JobTracker的一些数据结构。removeMarkedTasks(trackerName)从一些相关的数据结构中清除trackerName对应的数据,比如trackerToMarkedTasksMap、taskidToTrackerMap、trackerToTaskMap、taskidToTIPMap等。

  十五、最后返回响应信息response。

参考:

1,、董西成,《hadoop技术内幕---深入理解MapReduce架构设计与实现原理》

2、http://blog.csdn.net/xhh198781/article/details/7046389

监听器初始化Job、JobTracker相应TaskTracker心跳、调度器分配task源码级分析的更多相关文章

  1. TaskTracker任务初始化及启动task源码级分析

    在监听器初始化Job.JobTracker相应TaskTracker心跳.调度器分配task源码级分析中我们分析的Tasktracker发送心跳的机制,这一节我们分析TaskTracker接受JobT ...

  2. Cocos2d-X3.0 刨根问底(六)----- 调度器Scheduler类源码分析

    上一章,我们分析Node类的源码,在Node类里面耦合了一个 Scheduler 类的对象,这章我们就来剖析Cocos2d-x的调度器 Scheduler 类的源码,从源码中去了解它的实现与应用方法. ...

  3. 【Cocos2d-x 3.x】 调度器Scheduler类源码分析

    非个人的全部理解,部分摘自cocos官网教程,感谢cocos官网. 在<CCScheduler.h>头文件中,定义了关于调度器的五个类:Timer,TimerTargetSelector, ...

  4. 深入理解分布式调度框架TBSchedule及源码分析

    简介 由于最近工作比较忙,前前后后花了两个月的时间把TBSchedule的源码翻了个底朝天.关于TBSchedule的使用,网上也有很多参考资料,这里不做过多的阐述.本文着重介绍TBSchedule的 ...

  5. quartz集群调度机制调研及源码分析---转载

    quartz2.2.1集群调度机制调研及源码分析引言quartz集群架构调度器实例化调度过程触发器的获取触发trigger:Job执行过程:总结:附: 引言 quratz是目前最为成熟,使用最广泛的j ...

  6. 定时组件quartz系列&lt;三&gt;quartz调度机制调研及源码分析

    quartz2.2.1集群调度机制调研及源码分析引言quartz集群架构调度器实例化调度过程触发器的获取触发trigger:Job执行过程:总结:附: 引言 quratz是目前最为成熟,使用最广泛的j ...

  7. (1)quartz集群调度机制调研及源码分析---转载

    quartz2.2.1集群调度机制调研及源码分析 原文地址:http://demo.netfoucs.com/gklifg/article/details/27090179 引言quartz集群架构调 ...

  8. JobTracker启动流程源码级分析

    org.apache.hadoop.mapred.JobTracker类是个独立的进程,有自己的main函数.JobTracker是在网络环境中提交及运行MR任务的核心位置. main方法主要代码有两 ...

  9. TaskTracker启动过程源码级分析

    TaskTracker也是作为一个单独的JVM来运行的,其main函数就是TaskTracker的入口函数,当运行start-all.sh时,脚本就是通过SSH运行该函数来启动TaskTracker的 ...

随机推荐

  1. 【大型网站技术实践】初级篇:搭建MySQL主从复制经典架构

    一.业务发展驱动数据发展 随着网站业务的不断发展,用户量的不断增加,数据量成倍地增长,数据库的访问量也呈线性地增长.特别是在用户访问高峰期间,并发访问量突然增大,数据库的负载压力也会增大,如果架构方案 ...

  2. jprofiler_监控远程linux服务器的JVM进程(实践)

    几天前写了一篇文章,jprofiler_监控远程linux服务器的tomcat进程(实践),介绍了使用jprofiler怎样监控远程linux的tomcat进程,这两天想了想,除了可以监控tomcat ...

  3. MediaCodec Name &amp; Type

    OMX.google.mp3.decoder support type:audio/mpegOMX.google.amrnb.decoder support type:audio/3gppOMX.go ...

  4. 在chrome 总调试cordova出现Detached from the target. Remote debugging has been terminated with reason: Connection lost. Please re-attach to the new target

    在chrome 总调试cordova出现如下错误: "Detached from the target. Remote debugging has been terminated with ...

  5. Web开发必知的八种隔离级别

    ACID性质是数据库理论中的奠基石,它定义了一个理论上可靠数据库所必须具备的四个性质:原子性,一致性,隔离性和持久性.虽然这四个性质都很重要,但是隔离性最为灵活.大部分数据库都提供了一些可供选择的隔离 ...

  6. js 获取系统时间

    <!doctype html> <html lang="en"> <head> <meta charset="UTF-8&quo ...

  7. C语言(4)

    C语言(4)--数据类型 C语言在用“/”是,注意左右两边都是整数时,商也是只有整数部分. 下面介绍一下C语言常用的数据类型:  注意: 1.char类型数据范围:256中字符. 2.float和do ...

  8. Extjs4 获取到前一天的日期

    Extjs4 获取到前一天的日期 Extjs官方示例 Ext.Date add( date, interval, value ) : Date Provides a convenient method ...

  9. C#相对路径转绝对路径,绝对路径转相对路径

    1.绝对路径转相对路径 绝对转相对似乎C#没有提供实现,需要自己写,这里摘选了一位博友的实现方法: string RelativePath(string absolutePath, string re ...

  10. System.Web.Optimization找不到引用

    在程序包管理控制程序中录入:Install-Package Microsoft.AspNet.Web.Optimization,安装即可.