Class StratifiedSamplingUtils

Object
org.apache.spark.util.random.StratifiedSamplingUtils

public class StratifiedSamplingUtils extends Object
Auxiliary functions and data structures for the sampleByKey method in PairRDDFunctions.

Essentially, when exact sample size is necessary, we make additional passes over the RDD to compute the exact threshold value to use for each stratum to guarantee exact sample size with high probability. This is achieved by maintaining a waitlist of size O(log(s)), where s is the desired sample size for each stratum.

Like in simple random sampling, we generate a random value for each item from the uniform distribution [0.0, 1.0]. All items with values less than or equal to min(values of items in the waitlist) are accepted into the sample instantly. The threshold for instant accept is designed so that s - numAccepted = O(sqrt(s)), where s is again the desired sample size. Thus, by maintaining a waitlist size = O(sqrt(s)), we will be able to create a sample of the exact size s by adding a portion of the waitlist to the set of items that are instantly accepted. The exact threshold is computed by sorting the values in the waitlist and picking the value at (s - numAccepted).

Note that since we use the same seed for the RNG when computing the thresholds and the actual sample, our computed thresholds are guaranteed to produce the desired sample size.

For more theoretical background on the sampling techniques used here, please refer to http://jmlr.org/proceedings/papers/v28/meng13a.html

  • Constructor Summary

    Constructors
    Constructor
    Description
     
  • Method Summary

    Modifier and Type
    Method
    Description
    static <K> scala.collection.Map<K,Object>
    computeThresholdByKey(scala.collection.Map<K,org.apache.spark.util.random.AcceptanceResult> finalResult, scala.collection.Map<K,Object> fractions)
    Given the result returned by getCounts, determine the threshold for accepting items to generate exact sample size.
    static <K, V> scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>
    getAcceptanceResults(RDD<scala.Tuple2<K,V>> rdd, boolean withReplacement, scala.collection.Map<K,Object> fractions, scala.Option<scala.collection.Map<K,Object>> counts, long seed)
    Count the number of items instantly accepted and generate the waitlist for each stratum.
    static <K, V> scala.Function2<Object,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<scala.Tuple2<K,V>>>
    getBernoulliSamplingFunction(RDD<scala.Tuple2<K,V>> rdd, scala.collection.Map<K,Object> fractions, boolean exact, long seed)
    Return the per partition sampling function used for sampling without replacement.
    static <K> scala.Function2<scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>>
    Returns the function used combine results returned by seqOp from different partitions.
    static <K, V> scala.Function2<Object,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<scala.Tuple2<K,V>>>
    getPoissonSamplingFunction(RDD<scala.Tuple2<K,V>> rdd, scala.collection.Map<K,Object> fractions, boolean exact, long seed, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2)
    Return the per partition sampling function used for sampling with replacement.
    static <K, V> scala.Function2<scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.Tuple2<K,V>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>>
    getSeqOp(boolean withReplacement, scala.collection.Map<K,Object> fractions, org.apache.spark.util.random.StratifiedSamplingUtils.RandomDataGenerator rng, scala.Option<scala.collection.Map<K,Object>> counts)
    Returns the function used by aggregate to collect sampling statistics for each partition.
    static org.slf4j.Logger
     
    static void
     

    Methods inherited from class java.lang.Object

    equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • StratifiedSamplingUtils

      public StratifiedSamplingUtils()
  • Method Details

    • getAcceptanceResults

      public static <K, V> scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult> getAcceptanceResults(RDD<scala.Tuple2<K,V>> rdd, boolean withReplacement, scala.collection.Map<K,Object> fractions, scala.Option<scala.collection.Map<K,Object>> counts, long seed)
      Count the number of items instantly accepted and generate the waitlist for each stratum.

      This is only invoked when exact sample size is required.

      Parameters:
      rdd - (undocumented)
      withReplacement - (undocumented)
      fractions - (undocumented)
      counts - (undocumented)
      seed - (undocumented)
      Returns:
      (undocumented)
    • getSeqOp

      public static <K, V> scala.Function2<scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.Tuple2<K,V>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>> getSeqOp(boolean withReplacement, scala.collection.Map<K,Object> fractions, org.apache.spark.util.random.StratifiedSamplingUtils.RandomDataGenerator rng, scala.Option<scala.collection.Map<K,Object>> counts)
      Returns the function used by aggregate to collect sampling statistics for each partition.
      Parameters:
      withReplacement - (undocumented)
      fractions - (undocumented)
      rng - (undocumented)
      counts - (undocumented)
      Returns:
      (undocumented)
    • getCombOp

      public static <K> scala.Function2<scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>> getCombOp()
      Returns the function used combine results returned by seqOp from different partitions.
      Returns:
      (undocumented)
    • computeThresholdByKey

      public static <K> scala.collection.Map<K,Object> computeThresholdByKey(scala.collection.Map<K,org.apache.spark.util.random.AcceptanceResult> finalResult, scala.collection.Map<K,Object> fractions)
      Given the result returned by getCounts, determine the threshold for accepting items to generate exact sample size.

      To do so, we compute sampleSize = math.ceil(size * samplingRate) for each stratum and compare it to the number of items that were accepted instantly and the number of items in the waitlist for that stratum.

      Most of the time,

      
       numAccepted <= sampleSize <= (numAccepted + numWaitlisted)
       
      which means we need to sort the elements in the waitlist by their associated values in order to find the value T s.t.
      
       |{elements in the stratum whose associated values <= T}| = sampleSize
       
      . Note that all elements in the waitlist have values greater than or equal to bound for instant accept, so a T value in the waitlist range would allow all elements that were instantly accepted on the first pass to be included in the sample.
      Parameters:
      finalResult - (undocumented)
      fractions - (undocumented)
      Returns:
      (undocumented)
    • getBernoulliSamplingFunction

      public static <K, V> scala.Function2<Object,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<scala.Tuple2<K,V>>> getBernoulliSamplingFunction(RDD<scala.Tuple2<K,V>> rdd, scala.collection.Map<K,Object> fractions, boolean exact, long seed)
      Return the per partition sampling function used for sampling without replacement.

      When exact sample size is required, we make an additional pass over the RDD to determine the exact sampling rate that guarantees sample size with high confidence.

      The sampling function has a unique seed per partition.

      Parameters:
      rdd - (undocumented)
      fractions - (undocumented)
      exact - (undocumented)
      seed - (undocumented)
      Returns:
      (undocumented)
    • getPoissonSamplingFunction

      public static <K, V> scala.Function2<Object,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<scala.Tuple2<K,V>>> getPoissonSamplingFunction(RDD<scala.Tuple2<K,V>> rdd, scala.collection.Map<K,Object> fractions, boolean exact, long seed, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2)
      Return the per partition sampling function used for sampling with replacement.

      When exact sample size is required, we make two additional passed over the RDD to determine the exact sampling rate that guarantees sample size with high confidence. The first pass counts the number of items in each stratum (group of items with the same key) in the RDD, and the second pass uses the counts to determine exact sampling rates.

      The sampling function has a unique seed per partition.

      Parameters:
      rdd - (undocumented)
      fractions - (undocumented)
      exact - (undocumented)
      seed - (undocumented)
      evidence$1 - (undocumented)
      evidence$2 - (undocumented)
      Returns:
      (undocumented)
    • org$apache$spark$internal$Logging$$log_

      public static org.slf4j.Logger org$apache$spark$internal$Logging$$log_()
    • org$apache$spark$internal$Logging$$log__$eq

      public static void org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1)