Class PushBasedFetchHelper
- All Implemented Interfaces:
org.apache.spark.internal.Logging
ShuffleBlockFetcherIterator
that encapsulates all the push-based
functionality to fetch push-merged block meta and shuffle chunks.
A push-merged block contains multiple shuffle chunks where each shuffle chunk contains multiple
shuffle blocks that belong to the common reduce partition and were merged by the
external shuffle service to that chunk.-
Nested Class Summary
Nested classes/interfaces inherited from interface org.apache.spark.internal.Logging
org.apache.spark.internal.Logging.LogStringContext, org.apache.spark.internal.Logging.SparkShellLoggingFilter
-
Constructor Summary
ConstructorDescriptionPushBasedFetchHelper
(org.apache.spark.storage.ShuffleBlockFetcherIterator iterator, org.apache.spark.network.shuffle.BlockStoreClient shuffleClient, org.apache.spark.storage.BlockManager blockManager, org.apache.spark.MapOutputTracker mapOutputTracker, org.apache.spark.shuffle.ShuffleReadMetricsReporter shuffleMetrics) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addChunk
(ShuffleBlockChunkId blockId, org.roaringbitmap.RoaringBitmap chunkMeta) This is executed by the task thread when theiterator.next()
is invoked and the iterator processes a response of typeShuffleBlockFetcherIterator.PushMergedLocalMetaFetchResult
.createChunkBlockInfosFromMetaResponse
(int shuffleId, int shuffleMergeId, int reduceId, long blockSize, org.roaringbitmap.RoaringBitmap[] bitmaps) This is executed by the task thread when theiterator.next()
is invoked and the iterator processes a response of typeShuffleBlockFetcherIterator.PushMergedRemoteMetaFetchResult
.void
fetchAllPushMergedLocalBlocks
(scala.collection.mutable.LinkedHashSet<BlockId> pushMergedLocalBlocks) This is executed by the task thread when the iterator is initialized.scala.Option<org.roaringbitmap.RoaringBitmap>
getRoaringBitMap
(ShuffleBlockChunkId blockId) Get the RoaringBitMap for a specific ShuffleBlockChunkIdint
Get the number of map blocks in a ShuffleBlockChunkvoid
initiateFallbackFetchForPushMergedBlock
(BlockId blockId, BlockManagerId address) This is executed by the task thread when theiterator.next()
is invoked and the iterator processes a response of type: 1)ShuffleBlockFetcherIterator.SuccessFetchResult
2)ShuffleBlockFetcherIterator.FallbackOnPushMergedFailureResult
3)ShuffleBlockFetcherIterator.PushMergedRemoteMetaFailedFetchResult
boolean
Returns true if the address is of a push-merged-local block.boolean
Returns true if the address is for a push-merged block.boolean
Returns true if the address is of a remote push-merged block.void
removeChunk
(ShuffleBlockChunkId blockId) This is executed by the task thread when theiterator.next()
is invoked and the iterator processes a response of typeShuffleBlockFetcherIterator.SuccessFetchResult
.void
sendFetchMergedStatusRequest
(org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchRequest req) This is executed by the task thread when the iterator is initialized and only if it has push-merged blocks for which it needs to fetch the metadata.Methods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.apache.spark.internal.Logging
initializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, isTraceEnabled, log, logDebug, logDebug, logDebug, logDebug, logError, logError, logError, logError, logInfo, logInfo, logInfo, logInfo, logName, LogStringContext, logTrace, logTrace, logTrace, logTrace, logWarning, logWarning, logWarning, logWarning, org$apache$spark$internal$Logging$$log_, org$apache$spark$internal$Logging$$log__$eq, withLogContext
-
Constructor Details
-
PushBasedFetchHelper
public PushBasedFetchHelper(org.apache.spark.storage.ShuffleBlockFetcherIterator iterator, org.apache.spark.network.shuffle.BlockStoreClient shuffleClient, org.apache.spark.storage.BlockManager blockManager, org.apache.spark.MapOutputTracker mapOutputTracker, org.apache.spark.shuffle.ShuffleReadMetricsReporter shuffleMetrics)
-
-
Method Details
-
addChunk
This is executed by the task thread when theiterator.next()
is invoked and the iterator processes a response of typeShuffleBlockFetcherIterator.PushMergedLocalMetaFetchResult
.- Parameters:
blockId
- shuffle chunk id.chunkMeta
- (undocumented)
-
createChunkBlockInfosFromMetaResponse
public scala.collection.mutable.ArrayBuffer<scala.Tuple3<BlockId,Object, createChunkBlockInfosFromMetaResponseObject>> (int shuffleId, int shuffleMergeId, int reduceId, long blockSize, org.roaringbitmap.RoaringBitmap[] bitmaps) This is executed by the task thread when theiterator.next()
is invoked and the iterator processes a response of typeShuffleBlockFetcherIterator.PushMergedRemoteMetaFetchResult
.- Parameters:
shuffleId
- shuffle id.reduceId
- reduce id.blockSize
- size of the push-merged block.bitmaps
- chunk bitmaps, where each bitmap contains all the mapIds that were merged to that chunk.shuffleMergeId
- (undocumented)- Returns:
- shuffle chunks to fetch.
-
fetchAllPushMergedLocalBlocks
public void fetchAllPushMergedLocalBlocks(scala.collection.mutable.LinkedHashSet<BlockId> pushMergedLocalBlocks) This is executed by the task thread when the iterator is initialized. It fetches all the outstanding push-merged local blocks.- Parameters:
pushMergedLocalBlocks
- set of identified merged local blocks and their sizes.
-
getRoaringBitMap
Get the RoaringBitMap for a specific ShuffleBlockChunkId- Parameters:
blockId
- shuffle chunk id.- Returns:
- (undocumented)
-
getShuffleChunkCardinality
Get the number of map blocks in a ShuffleBlockChunk- Parameters:
blockId
-- Returns:
-
initiateFallbackFetchForPushMergedBlock
This is executed by the task thread when theiterator.next()
is invoked and the iterator processes a response of type: 1)ShuffleBlockFetcherIterator.SuccessFetchResult
2)ShuffleBlockFetcherIterator.FallbackOnPushMergedFailureResult
3)ShuffleBlockFetcherIterator.PushMergedRemoteMetaFailedFetchResult
This initiates fetching fallback blocks for a push-merged block or a shuffle chunk that failed to fetch. It makes a call to the map output tracker to get the list of original blocks for the given push-merged block/shuffle chunk, split them into remote and local blocks, and process them accordingly. It also updates the numberOfBlocksToFetch in the iterator as it processes failed response and finds more push-merged requests to remote and again updates it with additional requests for original blocks. The fallback happens when: 1. There is an exception while creating shuffle chunks from push-merged-local shuffle block. See fetchLocalBlock. 2. There is a failure when fetching remote shuffle chunks. 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk (local or remote). 4. There is a zero-size buffer when processing SuccessFetchResult for a shuffle chunk (local or remote).
- Parameters:
blockId
- (undocumented)address
- (undocumented)
-
isLocalPushMergedBlockAddress
Returns true if the address is of a push-merged-local block. false otherwise.- Parameters:
address
- (undocumented)- Returns:
- (undocumented)
-
isPushMergedShuffleBlockAddress
Returns true if the address is for a push-merged block.- Parameters:
address
- (undocumented)- Returns:
- (undocumented)
-
isRemotePushMergedBlockAddress
Returns true if the address is of a remote push-merged block. false otherwise.- Parameters:
address
- (undocumented)- Returns:
- (undocumented)
-
removeChunk
This is executed by the task thread when theiterator.next()
is invoked and the iterator processes a response of typeShuffleBlockFetcherIterator.SuccessFetchResult
.- Parameters:
blockId
- shuffle chunk id.
-
sendFetchMergedStatusRequest
public void sendFetchMergedStatusRequest(org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchRequest req) This is executed by the task thread when the iterator is initialized and only if it has push-merged blocks for which it needs to fetch the metadata.- Parameters:
req
-ShuffleBlockFetcherIterator.FetchRequest
that only contains requests to fetch metadata of push-merged blocks.
-