Class ShuffleStatus
- All Implemented Interfaces:
org.apache.spark.internal.Logging
MapOutputTrackerMaster
to perform bookkeeping for a single
ShuffleMapStage.
This class maintains a mapping from map index to MapStatus
. It also maintains a cache of
serialized map statuses in order to speed up tasks' requests for map output statuses.
All public methods of this class are thread-safe.
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.apache.spark.internal.Logging
org.apache.spark.internal.Logging.LogStringContext, org.apache.spark.internal.Logging.SparkShellLoggingFilter
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
addMapOutput
(int mapIndex, MapStatus status) Register a map output.void
addMergeResult
(int reduceId, org.apache.spark.scheduler.MergeStatus status) Register a merge result.scala.collection.immutable.Seq<Object>
Returns the sequence of partition ids that are missing (i.e.scala.Option<MapStatus>
getMapStatus
(long mapId) Get the map output that corresponding to a given mapId.scala.collection.immutable.Seq<BlockManagerId>
boolean
void
Clears the cached serialized map output statuses.void
Clears the cached serialized merge result statuses.MapStatus for each partition.Keep the previous deleted MapStatus for recovery.org.apache.spark.scheduler.MergeStatus[]
MergeStatus for each shuffle partition when push-based shuffle is enabled.int
Number of partitions that have shuffle map outputs.int
Number of shuffle partitions that have already been merge finalized when push-based is enabled.void
registerShuffleMergerLocations
(scala.collection.immutable.Seq<BlockManagerId> shuffleMergers) void
removeMapOutput
(int mapIndex, BlockManagerId bmAddress) Remove the map output which was served by the specified block manager.void
removeMergeResult
(int reduceId, BlockManagerId bmAddress) Remove the merge result which was served by the specified block manager.void
removeMergeResultsByFilter
(scala.Function1<BlockManagerId, Object> f) Removes all shuffle merge result which satisfies the filter.void
removeOutputsByFilter
(scala.Function1<BlockManagerId, Object> f) Removes all shuffle outputs which satisfies the filter.void
removeOutputsOnExecutor
(String execId) Removes all map outputs associated with the specified executor.void
removeOutputsOnHost
(String host) Removes all shuffle outputs associated with this host.void
scala.Tuple2<byte[],
byte[]> serializedMapAndMergeStatus
(org.apache.spark.broadcast.BroadcastManager broadcastManager, boolean isLocal, int minBroadcastSize, SparkConf conf) Serializes the mapStatuses and mergeStatuses array into an efficient compressed format.byte[]
serializedMapStatus
(org.apache.spark.broadcast.BroadcastManager broadcastManager, boolean isLocal, int minBroadcastSize, SparkConf conf) Serializes the mapStatuses array into an efficient compressed format.void
updateMapOutput
(long mapId, BlockManagerId bmAddress) Update the map output location (e.g.<T> T
withMapStatuses
(scala.Function1<MapStatus[], T> f) Helper function which provides thread-safe access to the mapStatuses array.<T> T
withMergeStatuses
(scala.Function1<org.apache.spark.scheduler.MergeStatus[], T> f) Methods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.apache.spark.internal.Logging
initializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, isTraceEnabled, log, logDebug, logDebug, logDebug, logDebug, logError, logError, logError, logError, logInfo, logInfo, logInfo, logInfo, logName, LogStringContext, logTrace, logTrace, logTrace, logTrace, logWarning, logWarning, logWarning, logWarning, org$apache$spark$internal$Logging$$log_, org$apache$spark$internal$Logging$$log__$eq, withLogContext
-
Constructor Details
-
ShuffleStatus
public ShuffleStatus(int numPartitions, int numReducers)
-
-
Method Details
-
mapStatuses
MapStatus for each partition. The index of the array is the map partition id. Each value in the array is the MapStatus for a partition, or null if the partition is not available. Even though in theory a task may run multiple times (due to speculation, stage retries, etc.), in practice the likelihood of a map output being available at multiple locations is so small that we choose to ignore that case and store only a single location for each output.- Returns:
- (undocumented)
-
mapStatusesDeleted
Keep the previous deleted MapStatus for recovery.- Returns:
- (undocumented)
-
mergeStatuses
public org.apache.spark.scheduler.MergeStatus[] mergeStatuses()MergeStatus for each shuffle partition when push-based shuffle is enabled. The index of the array is the shuffle partition id (reduce id). Each value in the array is the MergeStatus for a shuffle partition, or null if not available. When push-based shuffle is enabled, this array provides a reducer oriented view of the shuffle status specifically for the results of merging shuffle partition blocks into per-partition merged shuffle files.- Returns:
- (undocumented)
-
addMapOutput
Register a map output. If there is already a registered location for the map output then it will be replaced by the new location.- Parameters:
mapIndex
- (undocumented)status
- (undocumented)
-
getMapStatus
Get the map output that corresponding to a given mapId.- Parameters:
mapId
- (undocumented)- Returns:
- (undocumented)
-
updateMapOutput
Update the map output location (e.g. during migration).- Parameters:
mapId
- (undocumented)bmAddress
- (undocumented)
-
removeMapOutput
Remove the map output which was served by the specified block manager. This is a no-op if there is no registered map output or if the registered output is from a different block manager.- Parameters:
mapIndex
- (undocumented)bmAddress
- (undocumented)
-
addMergeResult
public void addMergeResult(int reduceId, org.apache.spark.scheduler.MergeStatus status) Register a merge result.- Parameters:
reduceId
- (undocumented)status
- (undocumented)
-
registerShuffleMergerLocations
public void registerShuffleMergerLocations(scala.collection.immutable.Seq<BlockManagerId> shuffleMergers) -
removeShuffleMergerLocations
public void removeShuffleMergerLocations() -
removeMergeResult
Remove the merge result which was served by the specified block manager.- Parameters:
reduceId
- (undocumented)bmAddress
- (undocumented)
-
removeOutputsOnHost
Removes all shuffle outputs associated with this host. Note that this will also remove outputs which are served by an external shuffle server (if one exists).- Parameters:
host
- (undocumented)
-
removeOutputsOnExecutor
Removes all map outputs associated with the specified executor. Note that this will also remove outputs which are served by an external shuffle server (if one exists), as they are still registered with that execId.- Parameters:
execId
- (undocumented)
-
removeOutputsByFilter
Removes all shuffle outputs which satisfies the filter. Note that this will also remove outputs which are served by an external shuffle server (if one exists).- Parameters:
f
- (undocumented)
-
removeMergeResultsByFilter
Removes all shuffle merge result which satisfies the filter.- Parameters:
f
- (undocumented)
-
numAvailableMapOutputs
public int numAvailableMapOutputs()Number of partitions that have shuffle map outputs.- Returns:
- (undocumented)
-
numAvailableMergeResults
public int numAvailableMergeResults()Number of shuffle partitions that have already been merge finalized when push-based is enabled.- Returns:
- (undocumented)
-
findMissingPartitions
Returns the sequence of partition ids that are missing (i.e. needs to be computed).- Returns:
- (undocumented)
-
serializedMapStatus
public byte[] serializedMapStatus(org.apache.spark.broadcast.BroadcastManager broadcastManager, boolean isLocal, int minBroadcastSize, SparkConf conf) Serializes the mapStatuses array into an efficient compressed format. See the comments onMapOutputTracker.serializeOutputStatuses()
for more details on the serialization format.This method is designed to be called multiple times and implements caching in order to speed up subsequent requests. If the cache is empty and multiple threads concurrently attempt to serialize the map statuses then serialization will only be performed in a single thread and all other threads will block until the cache is populated.
- Parameters:
broadcastManager
- (undocumented)isLocal
- (undocumented)minBroadcastSize
- (undocumented)conf
- (undocumented)- Returns:
- (undocumented)
-
serializedMapAndMergeStatus
public scala.Tuple2<byte[],byte[]> serializedMapAndMergeStatus(org.apache.spark.broadcast.BroadcastManager broadcastManager, boolean isLocal, int minBroadcastSize, SparkConf conf) Serializes the mapStatuses and mergeStatuses array into an efficient compressed format. See the comments onMapOutputTracker.serializeOutputStatuses()
for more details on the serialization format.This method is designed to be called multiple times and implements caching in order to speed up subsequent requests. If the cache is empty and multiple threads concurrently attempt to serialize the statuses array then serialization will only be performed in a single thread and all other threads will block until the cache is populated.
- Parameters:
broadcastManager
- (undocumented)isLocal
- (undocumented)minBroadcastSize
- (undocumented)conf
- (undocumented)- Returns:
- (undocumented)
-
hasCachedSerializedBroadcast
public boolean hasCachedSerializedBroadcast() -
withMapStatuses
Helper function which provides thread-safe access to the mapStatuses array. The function should NOT mutate the array.- Parameters:
f
- (undocumented)- Returns:
- (undocumented)
-
withMergeStatuses
public <T> T withMergeStatuses(scala.Function1<org.apache.spark.scheduler.MergeStatus[], T> f) -
getShufflePushMergerLocations
-
invalidateSerializedMapOutputStatusCache
public void invalidateSerializedMapOutputStatusCache()Clears the cached serialized map output statuses. -
invalidateSerializedMergeOutputStatusCache
public void invalidateSerializedMergeOutputStatusCache()Clears the cached serialized merge result statuses.
-