org.apache.spark.rdd
Class PartitionCoalescer

Object
  extended by org.apache.spark.rdd.PartitionCoalescer

public class PartitionCoalescer
extends Object

Coalesce the partitions of a parent RDD (prev) into fewer partitions, so that each partition of this RDD computes one or more of the parent ones. It will produce exactly maxPartitions if the parent had more than maxPartitions, or fewer if the parent had fewer.

This transformation is useful when an RDD with many partitions gets filtered into a smaller one, or to avoid having a large number of small tasks when processing a directory with many files.

If there is no locality information (no preferredLocations) in the parent, then the coalescing is very simple: chunk parents that are close in the Array in chunks. If there is locality information, it proceeds to pack them with the following four goals:

(1) Balance the groups so they roughly have the same number of parent partitions (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard) (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine

Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000. We assume the final number of desired partitions is small, e.g. less than 1000.

The algorithm tries to assign unique preferred machines to each partition. If the number of desired partitions is greater than the number of preferred machines (can happen), it needs to start picking duplicate preferred machines. This is determined using coupon collector estimation (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist: it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions according to locality. (contact alig for questions)


Nested Class Summary
 class PartitionCoalescer.LocationIterator
           
 
Constructor Summary
PartitionCoalescer(int maxPartitions, RDD<?> prev, double balanceSlack)
           
 
Method Summary
 boolean addPartToPGroup(Partition part, PartitionGroup pgroup)
           
 boolean compare(scala.Option<PartitionGroup> o1, scala.Option<PartitionGroup> o2)
           
 boolean compare(PartitionGroup o1, PartitionGroup o2)
           
 scala.collection.Seq<String> currPrefLocs(Partition part)
           
 scala.Option<PartitionGroup> getLeastGroupHash(String key)
          Sorts and gets the least element of the list associated with key in groupHash The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
 PartitionGroup[] getPartitions()
           
 scala.collection.mutable.ArrayBuffer<PartitionGroup> groupArr()
           
 scala.collection.mutable.Map<String,scala.collection.mutable.ArrayBuffer<PartitionGroup>> groupHash()
           
 scala.collection.mutable.Set<Partition> initialHash()
           
 boolean noLocality()
           
 PartitionGroup pickBin(Partition p)
          Takes a parent RDD partition and decides which of the partition groups to put it in Takes locality into account, but also uses power of 2 choices to load balance It strikes a balance between the two use the balanceSlack variable
 scala.util.Random rnd()
           
 PartitionGroup[] run()
          Runs the packing algorithm and returns an array of PartitionGroups that if possible are load balanced and grouped by locality
 void setupGroups(int targetLen)
          Initializes targetLen partition groups and assigns a preferredLocation This uses coupon collector to estimate how many preferredLocations it must rotate through until it has seen most of the preferred locations (2 * n log(n))
 int slack()
           
 void throwBalls()
           
 
Methods inherited from class Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

PartitionCoalescer

public PartitionCoalescer(int maxPartitions,
                          RDD<?> prev,
                          double balanceSlack)
Method Detail

compare

public boolean compare(PartitionGroup o1,
                       PartitionGroup o2)

compare

public boolean compare(scala.Option<PartitionGroup> o1,
                       scala.Option<PartitionGroup> o2)

rnd

public scala.util.Random rnd()

groupArr

public scala.collection.mutable.ArrayBuffer<PartitionGroup> groupArr()

groupHash

public scala.collection.mutable.Map<String,scala.collection.mutable.ArrayBuffer<PartitionGroup>> groupHash()

initialHash

public scala.collection.mutable.Set<Partition> initialHash()

slack

public int slack()

noLocality

public boolean noLocality()

currPrefLocs

public scala.collection.Seq<String> currPrefLocs(Partition part)

getLeastGroupHash

public scala.Option<PartitionGroup> getLeastGroupHash(String key)
Sorts and gets the least element of the list associated with key in groupHash The returned PartitionGroup is the least loaded of all groups that represent the machine "key"

Parameters:
key - string representing a partitioned group on preferred machine key
Returns:
Option of PartitionGroup that has least elements for key

addPartToPGroup

public boolean addPartToPGroup(Partition part,
                               PartitionGroup pgroup)

setupGroups

public void setupGroups(int targetLen)
Initializes targetLen partition groups and assigns a preferredLocation This uses coupon collector to estimate how many preferredLocations it must rotate through until it has seen most of the preferred locations (2 * n log(n))

Parameters:
targetLen -

pickBin

public PartitionGroup pickBin(Partition p)
Takes a parent RDD partition and decides which of the partition groups to put it in Takes locality into account, but also uses power of 2 choices to load balance It strikes a balance between the two use the balanceSlack variable

Parameters:
p - partition (ball to be thrown)
Returns:
partition group (bin to be put in)

throwBalls

public void throwBalls()

getPartitions

public PartitionGroup[] getPartitions()

run

public PartitionGroup[] run()
Runs the packing algorithm and returns an array of PartitionGroups that if possible are load balanced and grouped by locality

Returns:
array of partition groups