public class ExecutorAllocationManager extends Object implements Logging
The add policy depends on whether there are backlogged tasks waiting to be scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If the queue persists for another M seconds, then more executors are added and so on. The number added in each round increases exponentially from the previous round until an upper bound on the number of executors has been reached. The upper bound is based both on a configured property and on the number of tasks pending: the policy will never increase the number of executor requests past the number needed to handle all pending tasks.
The rationale for the exponential increase is twofold: (1) Executors should be added slowly in the beginning in case the number of extra executors needed turns out to be small. Otherwise, we may add more executors than we need just to remove them later. (2) Executors should be added quickly over time in case the maximum number of executors is very high. Otherwise, it will take a long time to ramp up under heavy workloads.
The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not been scheduled to run any tasks, then it is removed.
There is no retry logic in either case because we make the assumption that the cluster manager will eventually fulfill all requests it receives asynchronously.
The relevant Spark properties include the following:
spark.dynamicAllocation.enabled - Whether this feature is enabled spark.dynamicAllocation.minExecutors - Lower bound on the number of executors spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
spark.dynamicAllocation.schedulerBacklogTimeout (M) - If there are backlogged tasks for this duration, add new executors
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) - If the backlog is sustained for this duration, add more executors This is used only after the initial backlog timeout is exceeded
spark.dynamicAllocation.executorIdleTimeout (K) - If an executor has been idle for this duration, remove it
Constructor and Description |
---|
ExecutorAllocationManager(ExecutorAllocationClient client,
LiveListenerBus listenerBus,
SparkConf conf) |
Modifier and Type | Method and Description |
---|---|
static long |
NOT_SET() |
void |
setClock(Clock newClock)
Use a different clock for this allocation manager.
|
void |
start()
Register for scheduler callbacks to decide when to add and remove executors.
|
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
initializeIfNecessary, initializeLogging, isTraceEnabled, log_, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning
public ExecutorAllocationManager(ExecutorAllocationClient client, LiveListenerBus listenerBus, SparkConf conf)
public static long NOT_SET()
public void setClock(Clock newClock)
public void start()