[转载]Hadoop的任务调度

public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {

ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();

Collection<JobInProgress> jobQueue =
jobQueueJobInProgressListener.getJobQueue();

//
// Get map + reduce counts for the current tracker.
//
int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
int numMaps = taskTracker.countMapTasks();
int numReduces = taskTracker.countReduceTasks();

//
// Compute average map and reduce task numbers across pool
//
int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
int totalMapTasks = job.desiredMaps();
int totalReduceTasks = job.desiredReduces();
remainingMapLoad += (totalMapTasks - job.finishedMaps());
remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
}
}
}

// find out the maximum number of maps or reduces that we are willing
// to run on any node.
int maxMapLoad = 0;
int maxReduceLoad = 0;
if (numTaskTrackers > 0) {
maxMapLoad = Math.min(maxCurrentMapTasks,
(int) Math.ceil((double) remainingMapLoad /
numTaskTrackers));
maxReduceLoad = Math.min(maxCurrentReduceTasks,
(int) Math.ceil((double) remainingReduceLoad
/ numTaskTrackers));
}

int totalMaps = clusterStatus.getMapTasks();
int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
int totalReduces = clusterStatus.getReduceTasks();
int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();

//
// In the below steps, we allocate first a map task (if appropriate),
// and then a reduce task if appropriate. We go through all jobs
// in order of job arrival; jobs only get serviced if their
// predecessors are serviced, too.
//

//
// We hand a task to the current taskTracker if the given machine
// has a workload that's less than the maximum load of that kind of
// task.
//

if (numMaps < maxMapLoad) {

int totalNeededMaps = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
//这里是取得Task的地方,需要到job中去取
Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}

//
// Beyond the highest-priority task, reserve a little
// room for failures and speculative executions; don't
// schedule tasks to the hilt.
//
totalNeededMaps += job.desiredMaps();
int padding = 0;
if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
padding = Math.min(maxCurrentMapTasks,
(int)(totalNeededMaps * padFraction));
}
if (totalMaps + padding >= totalMapTaskCapacity) {
break;
}
}
}
}

//
// Same thing, but for reduce tasks
//
if (numReduces < maxReduceLoad) {

int totalNeededReduces = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}

Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}

//
// Beyond the highest-priority task, reserve a little
// room for failures and speculative executions; don't
// schedule tasks to the hilt.
//
totalNeededReduces += job.desiredReduces();
int padding = 0;
if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
padding =
Math.min(maxCurrentReduceTasks,
(int) (totalNeededReduces * padFraction));
}
if (totalReduces + padding >= totalReduceTaskCapacity) {
break;
}
}
}
}
return null;
}
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean initialContact, boolean acceptNewTasks, short responseId)
throws IOException {
.............
//如果是接受新任务的话,让JotTracker去进行调度,这里会调用taskScheduler的assignTasks
if (acceptNewTasks) {
TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
//这里是准备assignTask的地方,由配置的调度器来决定怎样调度
if (tasks == null ) {
tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
actions.add(new LaunchTaskAction(task));
}
}
}
}

mark一下,供调度器做参考。
转载自:http://jiwenke.javaeye.com/blog/334146 和 http://jiwenke.javaeye.com/blog/335093
花了许多功夫把Hadoop的mapreduce实现过了一遍,基本线索理清楚了:
1. 任务的运行时TaskTracker通过heartbeat取得
2. TaskTracker得到hearbeatresponse之后,会根据封装在response里的action来决定行为
3. 如果是launchaction的话,调用TasklLauncher,在startNewTasks中的localizeJob调用 launchTaskForJob然后再TaskInProgress中launchTask让runner.start(); - 这里面的startNewTasks是在线程中的run方法中,而TaskLauncher的notifyall会把线程唤醒:

public void addToTaskQueue(LaunchTaskAction action) {
synchronized (tasksToLaunch) {
TaskInProgress tip = registerTask(action, this);
tasksToLaunch.add(tip);
tasksToLaunch.notifyAll();
}
}

而这个addToTaskQueue方法是在offerService中调用的,这样就整过过程就街上了:

if (action instanceof LaunchTaskAction) {
addToTaskQueue((LaunchTaskAction)action);
} else if (action instanceof CommitTaskAction)

4. 这个时候就回到TaskRunner.run中去launchJVM,当然要把这个JVM的参数构造好,这个JVM就是我们看到map任务运行的JVM
5. 然后JVMRunner会spawn JVM, 这是通过shexe Child这个类来实现的,这个Child的main就是新起的JVM的主函数入口
6.在Child.main中会对任务的类型进行判断,调用相应的MapTask.run和ReduceTask.run
7.这个时候就可以看到mapper.map入口啦,然后就开始执行用户定义的mapper!

所以Child启动以后的log输出和前面TaskTracker的输出不在同一个文件里,因为已经不是一个虚拟机了。
--------------------------------------
上一节看到TaskTracker启动新任务的过程,这里接着看看在JobTracker中是怎样响应和调度的,在hadoop中,我们看到采用的是pull的方式拿到任务。

HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted, askForNewTask,
heartbeatResponseId);

这里是TaskTracker想JobTracker发送heartbeat的地方 - 使用的是RPC,这样我们你就来到JobTracker了:

这个taskScheduler采用的是默认的

taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass,conf);

这是在配置文件中指定的,"mapred.jobtracker.taskScheduler",常常是JobQueueTaskScheduler是hadoop的实现,FIFO类型的调度器,让我们看看这个调度器是怎样assignTasks的:

task的取得就要到JobInProgress中去obtainNewReduceTask了,需要对集群的状态进行查询处理了。

1 thought on “[转载]Hadoop的任务调度

  1. Morgan

    关于ssh的多主机key-based登陆,其实你可以用一个for循环,比如你的机器有3台,servername-{1,2,3},那么你可以在3个机器上运行下面的命令(放入脚本),这样你也不需要去修改authorized_keys文件的权限,创建.ssh目录等繁琐操作:#!/usr/bin/env bash# Description: Auto deploy ssh key to pvriode auto-login.echo StrictHostKeyChecking no' >>$HOME/.ssh/configssh-keygen -q -t rsa -N -f $HOME/.ssh/id_rsafor node in servername-{1,2,3} ; do ssh-copy-id -i $HOME/.ssh/id_rsa.pub root@$nodedone#Done脚本中echo的那一句是你作为ssh的client时,第一次ssh到一个server时要建立known_hosts时还要输入yes,比较麻烦。

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *