public class PushBasedFetchHelper
extends Object
implements org.apache.spark.internal.Logging
ShuffleBlockFetcherIterator
that encapsulates all the push-based
functionality to fetch push-merged block meta and shuffle chunks.
A push-merged block contains multiple shuffle chunks where each shuffle chunk contains multiple
shuffle blocks that belong to the common reduce partition and were merged by the
external shuffle service to that chunk.Constructor and Description |
---|
PushBasedFetchHelper(org.apache.spark.storage.ShuffleBlockFetcherIterator iterator,
org.apache.spark.network.shuffle.BlockStoreClient shuffleClient,
org.apache.spark.storage.BlockManager blockManager,
org.apache.spark.MapOutputTracker mapOutputTracker,
org.apache.spark.shuffle.ShuffleReadMetricsReporter shuffleMetrics) |
Modifier and Type | Method and Description |
---|---|
void |
addChunk(ShuffleBlockChunkId blockId,
org.roaringbitmap.RoaringBitmap chunkMeta)
This is executed by the task thread when the
iterator.next() is invoked and the iterator
processes a response of type ShuffleBlockFetcherIterator.PushMergedLocalMetaFetchResult . |
scala.collection.mutable.ArrayBuffer<scala.Tuple3<BlockId,Object,Object>> |
createChunkBlockInfosFromMetaResponse(int shuffleId,
int shuffleMergeId,
int reduceId,
long blockSize,
org.roaringbitmap.RoaringBitmap[] bitmaps)
This is executed by the task thread when the
iterator.next() is invoked and the iterator
processes a response of type ShuffleBlockFetcherIterator.PushMergedRemoteMetaFetchResult . |
void |
fetchAllPushMergedLocalBlocks(scala.collection.mutable.LinkedHashSet<BlockId> pushMergedLocalBlocks)
This is executed by the task thread when the iterator is initialized.
|
scala.Option<org.roaringbitmap.RoaringBitmap> |
getRoaringBitMap(ShuffleBlockChunkId blockId)
Get the RoaringBitMap for a specific ShuffleBlockChunkId
|
int |
getShuffleChunkCardinality(ShuffleBlockChunkId blockId)
Get the number of map blocks in a ShuffleBlockChunk
|
void |
initiateFallbackFetchForPushMergedBlock(BlockId blockId,
BlockManagerId address)
This is executed by the task thread when the
iterator.next() is invoked and the iterator
processes a response of type:
1) ShuffleBlockFetcherIterator.SuccessFetchResult
2) ShuffleBlockFetcherIterator.FallbackOnPushMergedFailureResult
3) ShuffleBlockFetcherIterator.PushMergedRemoteMetaFailedFetchResult |
boolean |
isLocalPushMergedBlockAddress(BlockManagerId address)
Returns true if the address is of a push-merged-local block.
|
boolean |
isPushMergedShuffleBlockAddress(BlockManagerId address)
Returns true if the address is for a push-merged block.
|
boolean |
isRemotePushMergedBlockAddress(BlockManagerId address)
Returns true if the address is of a remote push-merged block.
|
void |
removeChunk(ShuffleBlockChunkId blockId)
This is executed by the task thread when the
iterator.next() is invoked and the iterator
processes a response of type ShuffleBlockFetcherIterator.SuccessFetchResult . |
void |
sendFetchMergedStatusRequest(org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchRequest req)
This is executed by the task thread when the iterator is initialized and only if it has
push-merged blocks for which it needs to fetch the metadata.
|
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
$init$, initializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, initLock, isTraceEnabled, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning, org$apache$spark$internal$Logging$$log__$eq, org$apache$spark$internal$Logging$$log_, uninitialize
public PushBasedFetchHelper(org.apache.spark.storage.ShuffleBlockFetcherIterator iterator, org.apache.spark.network.shuffle.BlockStoreClient shuffleClient, org.apache.spark.storage.BlockManager blockManager, org.apache.spark.MapOutputTracker mapOutputTracker, org.apache.spark.shuffle.ShuffleReadMetricsReporter shuffleMetrics)
public void addChunk(ShuffleBlockChunkId blockId, org.roaringbitmap.RoaringBitmap chunkMeta)
iterator.next()
is invoked and the iterator
processes a response of type ShuffleBlockFetcherIterator.PushMergedLocalMetaFetchResult
.
blockId
- shuffle chunk id.chunkMeta
- (undocumented)public scala.collection.mutable.ArrayBuffer<scala.Tuple3<BlockId,Object,Object>> createChunkBlockInfosFromMetaResponse(int shuffleId, int shuffleMergeId, int reduceId, long blockSize, org.roaringbitmap.RoaringBitmap[] bitmaps)
iterator.next()
is invoked and the iterator
processes a response of type ShuffleBlockFetcherIterator.PushMergedRemoteMetaFetchResult
.
shuffleId
- shuffle id.reduceId
- reduce id.blockSize
- size of the push-merged block.bitmaps
- chunk bitmaps, where each bitmap contains all the mapIds that were merged
to that chunk.shuffleMergeId
- (undocumented)public void fetchAllPushMergedLocalBlocks(scala.collection.mutable.LinkedHashSet<BlockId> pushMergedLocalBlocks)
pushMergedLocalBlocks
- set of identified merged local blocks and their sizes.public scala.Option<org.roaringbitmap.RoaringBitmap> getRoaringBitMap(ShuffleBlockChunkId blockId)
blockId
- shuffle chunk id.public int getShuffleChunkCardinality(ShuffleBlockChunkId blockId)
blockId
- public void initiateFallbackFetchForPushMergedBlock(BlockId blockId, BlockManagerId address)
iterator.next()
is invoked and the iterator
processes a response of type:
1) ShuffleBlockFetcherIterator.SuccessFetchResult
2) ShuffleBlockFetcherIterator.FallbackOnPushMergedFailureResult
3) ShuffleBlockFetcherIterator.PushMergedRemoteMetaFailedFetchResult
This initiates fetching fallback blocks for a push-merged block or a shuffle chunk that failed to fetch. It makes a call to the map output tracker to get the list of original blocks for the given push-merged block/shuffle chunk, split them into remote and local blocks, and process them accordingly. It also updates the numberOfBlocksToFetch in the iterator as it processes failed response and finds more push-merged requests to remote and again updates it with additional requests for original blocks. The fallback happens when: 1. There is an exception while creating shuffle chunks from push-merged-local shuffle block. See fetchLocalBlock. 2. There is a failure when fetching remote shuffle chunks. 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk (local or remote). 4. There is a zero-size buffer when processing SuccessFetchResult for a shuffle chunk (local or remote).
blockId
- (undocumented)address
- (undocumented)public boolean isLocalPushMergedBlockAddress(BlockManagerId address)
address
- (undocumented)public boolean isPushMergedShuffleBlockAddress(BlockManagerId address)
address
- (undocumented)public boolean isRemotePushMergedBlockAddress(BlockManagerId address)
address
- (undocumented)public void removeChunk(ShuffleBlockChunkId blockId)
iterator.next()
is invoked and the iterator
processes a response of type ShuffleBlockFetcherIterator.SuccessFetchResult
.
blockId
- shuffle chunk id.public void sendFetchMergedStatusRequest(org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchRequest req)
req
- ShuffleBlockFetcherIterator.FetchRequest
that only contains requests to fetch
metadata of push-merged blocks.