diff --git a/src/contrib/deadline-scheduler/build.xml b/src/contrib/deadline-scheduler/build.xml
new file mode 100644
index 0000000..54262c9
--- /dev/null
+++ b/src/contrib/deadline-scheduler/build.xml
@@ -0,0 +1,4 @@
+<?xml version="1.0"?>
+<project name="deadline-scheduler" default="jar">
+  <import file="../build-contrib.xml"/>
+</project>
diff --git a/src/contrib/deadline-scheduler/ivy.xml b/src/contrib/deadline-scheduler/ivy.xml
new file mode 100644
index 0000000..26aaacf
--- /dev/null
+++ b/src/contrib/deadline-scheduler/ivy.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" ?>
+<ivy-module version="1.0">
+  <info organisation="org.apache.hadoop" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
+    <description>
+        Apache Hadoop
+    </description>
+  </info>
+  <configurations defaultconfmapping="default">
+    <!--these match the Maven configurations-->
+    <conf name="default" extends="master,runtime"/>
+    <conf name="master" description="contains the artifact but no dependencies"/>
+    <conf name="runtime" description="runtime but not the artifact" />
+
+    <conf name="common" visibility="private"
+      extends="runtime"
+      description="artifacts needed to compile/test the application"/>
+    <conf name="test" visibility="private" extends="runtime"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+  <dependencies>
+    <dependency org="commons-logging"
+      name="commons-logging"
+      rev="${commons-logging.version}"
+      conf="common->default"/>
+   <dependency org="junit"
+      name="junit"
+      rev="${junit.version}"
+      conf="common->default"/>
+    <dependency org="log4j"
+      name="log4j"
+      rev="${log4j.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="servlet-api-2.5"
+      rev="${servlet-api-2.5.version}"
+      conf="common->master"/>
+    <dependency org="commons-httpclient"
+      name="commons-httpclient"
+      rev="${commons-httpclient.version}"
+      conf="common->master"/>
+  </dependencies>
+</ivy-module>
diff --git a/src/contrib/deadline-scheduler/ivy/libraries.properties b/src/contrib/deadline-scheduler/ivy/libraries.properties
new file mode 100644
index 0000000..d740528
--- /dev/null
+++ b/src/contrib/deadline-scheduler/ivy/libraries.properties
@@ -0,0 +1,5 @@
+#This properties file lists the versions of the various artifacts used by streaming.
+#It drives ivy and the generation of a maven POM
+
+#Please list the dependencies name with version if they are different from the ones
+#listed in the global libraries.properties file (in alphabetical order)
diff --git a/src/contrib/deadline-scheduler/src/java/org/apache/hadoop/mapred/DeadlineScheduler.java b/src/contrib/deadline-scheduler/src/java/org/apache/hadoop/mapred/DeadlineScheduler.java
new file mode 100644
index 0000000..2e7deaa
--- /dev/null
+++ b/src/contrib/deadline-scheduler/src/java/org/apache/hadoop/mapred/DeadlineScheduler.java
@@ -0,0 +1,326 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.PriorityQueue;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.EagerTaskInitializationListener;
+
+class DeadlineScheduler extends TaskScheduler {
+
+  public static final Log LOG
+    = LogFactory.getLog("org.apache.hadoop.mapred.DeadlineScheduler");
+
+  private EagerTaskInitializationListener eagerInitListener;
+  private DeadlineJobListener jobListener = new DeadlineJobListener();
+
+  private Map<JobInProgress, JobInfo> jobs
+    = new HashMap<JobInProgress, JobInfo>();
+
+  // Specifies whether the scheduler should maximize or minimize resource
+  // utilization.
+  private boolean maxAllocation = false;
+
+  public enum TaskType {
+    MAP, REDUCE
+  }
+
+  public enum Status {
+    NODATA, // Not enough data to estimate the job's requirements
+    ADJUST, // It is able to decide whether it needs more or less slots
+    UNDEAD  // Past the deadline
+  }
+
+  public class DeadlineJobListener extends JobInProgressListener {
+
+    public void jobAdded(JobInProgress job) {
+      synchronized (DeadlineScheduler.this) {
+        JobInfo info = new JobInfo(job);
+        jobs.put(job, info);
+      }
+    }
+
+    public void jobRemoved(JobInProgress job) {
+      synchronized (DeadlineScheduler.this) {
+        jobs.remove(job);
+      }
+    }
+
+    public void jobUpdated(JobChangeEvent event) { }
+
+  }
+
+  public void start() {
+    try {
+      super.start();
+      Configuration conf = this.getConf();
+      this.eagerInitListener = new EagerTaskInitializationListener(conf);
+      eagerInitListener.start();
+      taskTrackerManager.addJobInProgressListener(eagerInitListener);
+      taskTrackerManager.addJobInProgressListener(jobListener);
+      maxAllocation = conf.getBoolean("mapred.deadlinescheduler.maximize",
+                                      false);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start DeadlineScheduler", e);
+    }
+    LOG.info("Successfully configured DeadlineScheduler");
+  }
+
+  public class MapComparator implements Comparator<JobInfo> {
+
+    public int compare(JobInfo info1, JobInfo info2) {
+      Status s1 = info1.status;
+      Status s2 = info2.status;
+      JobInProgress j1 = info1.job;
+      JobInProgress j2 = info2.job;
+
+      if (s1 == Status.UNDEAD && s2 == Status.UNDEAD)
+        // TODO: Force some kind of order between UNDEAD jobs
+
+      if (s1 == Status.UNDEAD)
+        return -1;
+
+      if (s2 == Status.UNDEAD)
+        return 1;
+
+      if (s1 == Status.NODATA && s2 == Status.NODATA) {
+        long r1 = j1.runningMaps();
+        long r2 = j2.runningMaps();
+        return r1 > r2 ? 1 : -1;
+      }
+
+      if (s1 == Status.NODATA)
+        return -1;
+
+      if (s2 == Status.NODATA)
+        return 1;
+
+      if (s1 == Status.ADJUST && s2 == Status.ADJUST) {
+        double e1 = info1.estimatedMapSlots;
+        double e2 = info2.estimatedMapSlots;
+        return e1 > e2 ? -1 : 1;
+      }
+
+      return -1;
+    }
+
+  }
+
+  public class ReduceComparator implements Comparator<JobInfo> {
+
+    public int compare(JobInfo info1, JobInfo info2) {
+      Status s1 = info1.status;
+      Status s2 = info2.status;
+      JobInProgress j1 = info1.job;
+      JobInProgress j2 = info2.job;
+
+      if (s1 == Status.UNDEAD)
+        return -1;
+
+      if (s2 == Status.UNDEAD)
+        return 1;
+
+      if (s1 == Status.NODATA)
+        return -1;
+
+      if (s2 == Status.NODATA)
+        return 1;
+
+      if (s1 == Status.ADJUST && s2 == Status.ADJUST) {
+        double e1 = info1.job.pendingReduces();
+        double e2 = info2.job.pendingReduces();
+
+        if (info1.isReduceEstimable())
+          e1 = info1.getEstimatedReduceSlots();
+
+        if (info2.isReduceEstimable())
+          e2 = info2.getEstimatedReduceSlots();
+
+        return e1 > e2 ? -1 : 1;
+      }
+
+      return -1;
+    }
+
+  }
+
+  /*
+   * Returns the tasks to be executed on the TaskTracker right now.
+   */
+  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+    throws IOException {
+    ArrayList<Task> assigned = new ArrayList<Task>();
+    PriorityQueue<JobInfo> queueMap =
+      new PriorityQueue<JobInfo>(11, new MapComparator());
+    PriorityQueue<JobInfo> queueExtra =
+      new PriorityQueue<JobInfo>(11, new MapComparator());
+    PriorityQueue<JobInfo> queueReduce =
+      new PriorityQueue<JobInfo>(11, new ReduceComparator());
+
+    final int trackerMapCapacity = taskTracker.getMaxMapTasks();
+    final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
+    final int trackerRunningMaps = taskTracker.countMapTasks();
+    final int trackerRunningReduces = taskTracker.countReduceTasks();
+    int availableMapSlots = trackerMapCapacity - trackerRunningMaps;
+    int availableReduceSlots = trackerReduceCapacity - trackerRunningReduces;
+
+    if (availableMapSlots < 1 && availableReduceSlots < 1)
+      return null;
+
+    // Update status and sort available map tasks
+    for (JobInProgress job: this.jobs.keySet()) {
+      JobInfo info = this.jobs.get(job);
+      int remainingMaps = job.pendingMaps();
+
+      // Skip non-initialized and (theoretically) finished jobs
+      if (job.getStatus().getRunState() != JobStatus.RUNNING ||
+          remainingMaps < 1) {
+        continue;
+      }
+
+      if (info.status == Status.NODATA && info.isEstimable()) {
+        info.status = Status.ADJUST;
+      }
+
+      if (info.isPastDeadline()) {
+        info.status = Status.UNDEAD;
+        info.maxMapSlots = remainingMaps;
+      }
+
+      if (info.status == Status.ADJUST) {
+        info.estimatedMapSlots = info.getEstimatedMapSlots();
+        info.maxMapSlots = (int) Math.floor(info.estimatedMapSlots) -
+                           job.runningMaps();
+      }
+
+      if (info.maxMapSlots > 0) {
+        queueMap.add(info);
+      } else if (maxAllocation) {
+        info.maxMapSlots = Math.min(remainingMaps, availableMapSlots);
+        queueExtra.add(info);
+      }
+    }
+
+    // Update and sort reduce tasks
+    for (JobInProgress job: this.jobs.keySet()) {
+      JobInfo info = this.jobs.get(job);
+      int remainingReduces = job.pendingReduces();
+
+      if (job.getStatus().getRunState() != JobStatus.RUNNING ||
+          job.finishedMaps() < 1 || remainingReduces < 1) {
+        continue;
+      }
+
+      if (info.isReduceEstimable()) {
+        info.estimatedReduceSlots = info.getEstimatedReduceSlots();
+        info.maxReduceSlots = (int) Math.floor(info.estimatedReduceSlots) -
+                              job.runningReduces();
+      }
+
+      if (info.isPastDeadline()) {
+        info.status = Status.UNDEAD;
+        info.maxReduceSlots = remainingReduces;
+      }
+
+      if (info.maxReduceSlots > 0) {
+        queueReduce.add(info);
+      }
+    }
+
+    // Assign the minimum amount of map slots.
+    for (int i = 0; i < availableMapSlots; i++) {
+      Task task = this.getTopTask(taskTracker, queueMap, TaskType.MAP);
+      if (task == null)
+        break;
+      assigned.add(task);
+    }
+
+    // Assign the remaining map slots to ensure maximum utilization of the
+    // cluster if maxAllocation is enabled.
+    availableMapSlots = availableMapSlots - assigned.size();
+    for (int i = 0; i < availableMapSlots; i++) {
+      Task task = this.getTopTask(taskTracker, queueExtra, TaskType.MAP);
+      if (task == null)
+        break;
+      assigned.add(task);
+    }
+
+    // Assign as many reducers possible.
+    for (int i = 0; i < availableReduceSlots; i++) {
+      Task task = this.getTopTask(taskTracker, queueReduce, TaskType.REDUCE);
+      if (task == null)
+        break;
+      assigned.add(task);
+    }
+
+    return assigned.isEmpty() ? null : assigned;
+  }
+
+  /*
+   * Returns a Task of the Job with the highest need of CPU to meet its
+   * deadline, and updates the queue removing previous jobs that don't need
+   * more resources until the next run of the scheduler.
+   *
+   * The Task selection favours data locality on the given TaskTracker.
+   */
+  private Task getTopTask(TaskTrackerStatus taskTracker,
+                          PriorityQueue<JobInfo> queue,
+                          TaskType type)
+    throws IOException {
+
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+    final int numTaskTrackers = clusterStatus.getTaskTrackers();
+    final int numHosts = taskTrackerManager.getNumberOfUniqueHosts();
+
+    Task task = null;
+    JobInfo info = queue.peek();
+
+    // No jobs left in the queue
+    if (info == null)
+      return task;
+
+    int maxSlots = info.getMaxSlots(type);
+    JobInProgress job = info.job;
+
+    int pendingTasks = 0;
+    if (type == TaskType.MAP)
+      pendingTasks = job.pendingMaps();
+    else
+      pendingTasks = job.pendingReduces();
+
+    // Current jobs needs at least one task
+    if (maxSlots > 0 && pendingTasks > 0) {
+      if (type == TaskType.MAP) {
+        task = job.obtainNewMapTask(taskTracker, numTaskTrackers, numHosts);
+      } else {
+        task = job.obtainNewReduceTask(taskTracker, numTaskTrackers, numHosts);
+      }
+
+      if (task != null) {
+        info.decreaseMaxSlots(type);
+        return task;
+      }
+    }
+
+    // If there is no need to assign more resources to the current job,
+    // remove it from the queue to avoid going over it again if there are
+    // more available slots.
+    queue.poll();
+    return getTopTask(taskTracker, queue, type);
+  }
+
+  public Collection<JobInProgress> getJobs(String queueName) {
+    return this.jobs.keySet();
+  }
+
+}
+
+// vim: et sw=2 ts=2
diff --git a/src/contrib/deadline-scheduler/src/java/org/apache/hadoop/mapred/JobInfo.java b/src/contrib/deadline-scheduler/src/java/org/apache/hadoop/mapred/JobInfo.java
new file mode 100644
index 0000000..b14913e
--- /dev/null
+++ b/src/contrib/deadline-scheduler/src/java/org/apache/hadoop/mapred/JobInfo.java
@@ -0,0 +1,184 @@
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+import java.util.Vector;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.mapred.DeadlineScheduler.Status;
+import org.apache.hadoop.mapred.DeadlineScheduler.TaskType;
+
+public class JobInfo {
+
+  public Status status = Status.NODATA;
+  public JobInProgress job;
+
+  // Maximum number of slots that may be assigned to this job during the
+  // next assignTasks() execution.
+  //
+  // At the beginning of a new assignTasks() iteration, it is roughly
+  // equivalent to getEstimatedMapSlots(), but it decreases at each
+  // step until it reaches 0.
+  public int maxMapSlots;
+  public int maxReduceSlots;
+
+  // Cache slot status as of last update
+  public double estimatedMapSlots = 0.0;
+  public double estimatedReduceSlots = 0.0;
+
+  // Used to cache average{Map,Reduce}Time() until the number of finished maps
+  // or reduces changes
+  private int finishedMaps = 0;
+  private long averageMapTime = 0;
+  private int finishedReduces = 0;
+  private long averageReduceTime = 0;
+
+  public JobInfo(JobInProgress job) {
+    this.status = Status.NODATA;
+    this.job = job;
+
+    // New jobs are assigned the maximum amount of slots, as we can't make
+    // assumptions about it
+    this.maxMapSlots = job.desiredMaps();
+    this.maxReduceSlots = job.desiredReduces();
+  }
+
+  public int getMaxSlots(TaskType type) {
+    return (type == TaskType.MAP) ? this.maxMapSlots : this.maxReduceSlots;
+  }
+
+  public void decreaseMaxSlots(TaskType type) {
+    if (type == TaskType.MAP) {
+      this.maxMapSlots--;
+    } else {
+      this.maxReduceSlots--;
+    }
+  }
+
+  /**
+   * Returns the average time it takes to finish a map. The result is based
+   * on all previously completed map tasks.
+   */
+  private long getAverageMapTime() {
+    Vector<TaskInProgress> completedMaps
+      = this.job.reportTasksInProgress(true, true);
+    int numCompletedMaps = completedMaps.size();
+
+    // No need to loop over the list if the number of completed maps hasn't
+    // changed, simply return the cached result
+    if (this.finishedMaps == numCompletedMaps)
+      return this.averageMapTime;
+
+    long totalMapTime = getAverageTaskTime(completedMaps);
+    this.finishedMaps = numCompletedMaps;
+    this.averageMapTime = totalMapTime / numCompletedMaps;
+    return this.averageMapTime;
+  }
+
+  /**
+   * Returns the average time it takes to finish a reduce. The result is based
+   * on all previously completed reduce tasks.
+   */
+  private long getAverageReduceTime() {
+    Vector<TaskInProgress> completedReduces
+      = this.job.reportTasksInProgress(false, true);
+    int numCompletedReduces = completedReduces.size();
+
+    // No need to loop over the list if the number of completed maps hasn't
+    // changed, simply return the cached result
+    if (this.finishedReduces == numCompletedReduces)
+      return this.averageReduceTime;
+
+    long totalReduceTime = getAverageTaskTime(completedReduces);
+    this.finishedReduces = numCompletedReduces;
+    this.averageReduceTime = totalReduceTime / numCompletedReduces;
+    return this.averageReduceTime;
+  }
+
+  private long getAverageTaskTime(Vector<TaskInProgress> tasks) {
+    long total = 0;
+    for (TaskInProgress tip: tasks) {
+      long start = tip.getExecStartTime();
+      long finish = tip.getExecFinishTime();
+      total += finish - start;
+    }
+    return total / tasks.size();
+  }
+
+  public boolean isEstimable() {
+    return this.job.finishedMaps() > 0 &&
+           this.job.finishedMaps() < this.job.desiredMaps();
+  }
+
+  public boolean isReduceEstimable() {
+    return this.job.finishedReduces() > 0 &&
+           this.job.finishedReduces() < this.job.desiredReduces();
+  }
+
+  public boolean isPastDeadline() {
+    long currentTime = System.currentTimeMillis();
+    long deadlineTime = job.getLaunchTime() + job.getDeadline() * 1000;
+    return currentTime > deadlineTime;
+  }
+
+  public double getEstimatedMapSlots() {
+    long currentTime = System.currentTimeMillis();
+    long deadlineTime = this.job.getLaunchTime() + this.job.getDeadline() * 1000;
+    long averageTime = this.getAverageMapTime();
+
+    long reducePadding = this.getReducePadding();
+    long remainingTime = deadlineTime - reducePadding - currentTime;
+
+    int remainingMaps = this.job.pendingMaps();
+
+    if (remainingTime < 0)
+      return remainingMaps;
+
+    double turns = (double) remainingTime / averageTime;
+
+    Vector<TaskInProgress> uncompletedMaps
+      = this.job.reportTasksInProgress(true, false);
+    int runningMaps = 0;
+    long runningTime = 0;
+    for (TaskInProgress tip: uncompletedMaps) {
+      if (tip.isRunning() && !tip.isComplete()) {
+        long start = tip.getExecStartTime();
+        long elapsed = currentTime - start;
+        runningMaps += 1;
+        runningTime += elapsed;
+      }
+    }
+
+    double partial = ((averageTime * runningMaps) - runningTime) / averageTime;
+    partial = partial < 0.0 ? remainingMaps : partial + (double) remainingMaps;
+    double slots = partial / turns;
+
+    return slots;
+  }
+
+  public double getEstimatedReduceSlots() {
+    long currentTime = System.currentTimeMillis();
+    long deadlineTime = this.job.getLaunchTime() + this.job.getDeadline() * 1000;
+    long averageTime = this.getAverageReduceTime();
+
+    long remainingTime = deadlineTime - currentTime;
+    int remainingReduces = this.job.desiredReduces() -
+                           this.job.finishedReduces();
+
+    double turns = Math.floor((double) remainingTime / averageTime);
+    double slots = (double) remainingReduces / turns;
+    return slots;
+  }
+
+  /**
+   * Get the estimated padding required for reducers.
+   */
+  public long getReducePadding() {
+    long averageTime = this.getAverageMapTime();
+    int reducePercent = this.job.getReducePercent();
+    return Math.round((averageTime * reducePercent) / 100);
+  }
+
+}
+
+// vim: et sw=2 ts=2
diff --git a/src/mapred/org/apache/hadoop/mapred/JobConf.java b/src/mapred/org/apache/hadoop/mapred/JobConf.java
index d313f4d..b2edb30 100644
--- a/src/mapred/org/apache/hadoop/mapred/JobConf.java
+++ b/src/mapred/org/apache/hadoop/mapred/JobConf.java
@@ -1066,6 +1066,22 @@ public class JobConf extends Configuration {
     set("mapred.job.name", name);
   }
   
+  public long getDeadline() {
+    return getLong("mapred.job.deadline", 0);
+  }
+
+  public void setDeadline(long deadline) {
+    setLong("mapred.job.deadline", deadline);
+  }
+
+  public int getReducePercent() {
+    return getInt("mapred.job.reduce.percent", 100);
+  }
+
+  public void setReducePercent(int ratio) {
+    setInt("mapred.job.reduce.percent", ratio);
+  }
+
   /**
    * Get the user-specified session identifier. The default is the empty string.
    *
diff --git a/src/mapred/org/apache/hadoop/mapred/JobInProgress.java b/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
index 8aea17e..cf37d5c 100644
--- a/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
+++ b/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
@@ -634,6 +634,14 @@ class JobInProgress {
     JobHistory.JobInfo.logJobPriority(jobId, priority);
   }
 
+  public long getDeadline() {
+    return this.conf.getDeadline();
+  }
+
+  public int getReducePercent() {
+    return this.conf.getReducePercent();
+  }
+
   // Update the job start/launch time (upon restart) and log to history
   synchronized void updateJobInfo(long startTime, long launchTime) {
     // log and change to the job's start/launch time
diff --git a/src/mapred/org/apache/hadoop/mapreduce/Job.java b/src/mapred/org/apache/hadoop/mapreduce/Job.java
index f7414e5..4ee42af 100644
--- a/src/mapred/org/apache/hadoop/mapreduce/Job.java
+++ b/src/mapred/org/apache/hadoop/mapreduce/Job.java
@@ -254,6 +254,10 @@ public class Job extends JobContext {
     conf.setJobName(name);
   }
 
+  public void setDeadline(long deadline) throws IllegalStateException {
+    conf.setDeadline(deadline);
+  }
+
   /**
    * Turn speculative execution on or off for this job. 
    * 
diff --git a/src/mapred/org/apache/hadoop/mapreduce/JobContext.java b/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
index 0800c05..d367b7a 100644
--- a/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
+++ b/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
@@ -134,6 +134,10 @@ public class JobContext {
     return conf.getJobName();
   }
 
+  public long getDeadline() {
+    return conf.getDeadline();
+  }
+
   /**
    * Get the {@link InputFormat} class for the job.
    * 
