public class StratifiedSamplingUtils
extends Object
Essentially, when exact sample size is necessary, we make additional passes over the RDD to compute the exact threshold value to use for each stratum to guarantee exact sample size with high probability. This is achieved by maintaining a waitlist of size O(log(s)), where s is the desired sample size for each stratum.
Like in simple random sampling, we generate a random value for each item from the uniform distribution [0.0, 1.0]. All items with values less than or equal to min(values of items in the waitlist) are accepted into the sample instantly. The threshold for instant accept is designed so that s - numAccepted = O(sqrt(s)), where s is again the desired sample size. Thus, by maintaining a waitlist size = O(sqrt(s)), we will be able to create a sample of the exact size s by adding a portion of the waitlist to the set of items that are instantly accepted. The exact threshold is computed by sorting the values in the waitlist and picking the value at (s - numAccepted).
Note that since we use the same seed for the RNG when computing the thresholds and the actual sample, our computed thresholds are guaranteed to produce the desired sample size.
For more theoretical background on the sampling techniques used here, please refer to http://jmlr.org/proceedings/papers/v28/meng13a.html
Constructor and Description |
---|
StratifiedSamplingUtils() |
Modifier and Type | Method and Description |
---|---|
static <K> scala.collection.Map<K,Object> |
computeThresholdByKey(scala.collection.Map<K,org.apache.spark.util.random.AcceptanceResult> finalResult,
scala.collection.Map<K,Object> fractions)
Given the result returned by getCounts, determine the threshold for accepting items to
generate exact sample size.
|
static <K,V> scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult> |
getAcceptanceResults(RDD<scala.Tuple2<K,V>> rdd,
boolean withReplacement,
scala.collection.Map<K,Object> fractions,
scala.Option<scala.collection.Map<K,Object>> counts,
long seed)
Count the number of items instantly accepted and generate the waitlist for each stratum.
|
static <K,V> scala.Function2<Object,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<scala.Tuple2<K,V>>> |
getBernoulliSamplingFunction(RDD<scala.Tuple2<K,V>> rdd,
scala.collection.Map<K,Object> fractions,
boolean exact,
long seed)
Return the per partition sampling function used for sampling without replacement.
|
static <K> scala.Function2<scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>> |
getCombOp()
Returns the function used combine results returned by seqOp from different partitions.
|
static <K,V> scala.Function2<Object,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<scala.Tuple2<K,V>>> |
getPoissonSamplingFunction(RDD<scala.Tuple2<K,V>> rdd,
scala.collection.Map<K,Object> fractions,
boolean exact,
long seed,
scala.reflect.ClassTag<K> evidence$1,
scala.reflect.ClassTag<V> evidence$2)
Return the per partition sampling function used for sampling with replacement.
|
static <K,V> scala.Function2<scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.Tuple2<K,V>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>> |
getSeqOp(boolean withReplacement,
scala.collection.Map<K,Object> fractions,
org.apache.spark.util.random.StratifiedSamplingUtils.RandomDataGenerator rng,
scala.Option<scala.collection.Map<K,Object>> counts)
Returns the function used by aggregate to collect sampling statistics for each partition.
|
static void |
org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1) |
static org.slf4j.Logger |
org$apache$spark$internal$Logging$$log_() |
public static <K,V> scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult> getAcceptanceResults(RDD<scala.Tuple2<K,V>> rdd, boolean withReplacement, scala.collection.Map<K,Object> fractions, scala.Option<scala.collection.Map<K,Object>> counts, long seed)
This is only invoked when exact sample size is required.
rdd
- (undocumented)withReplacement
- (undocumented)fractions
- (undocumented)counts
- (undocumented)seed
- (undocumented)public static <K,V> scala.Function2<scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.Tuple2<K,V>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>> getSeqOp(boolean withReplacement, scala.collection.Map<K,Object> fractions, org.apache.spark.util.random.StratifiedSamplingUtils.RandomDataGenerator rng, scala.Option<scala.collection.Map<K,Object>> counts)
withReplacement
- (undocumented)fractions
- (undocumented)rng
- (undocumented)counts
- (undocumented)public static <K> scala.Function2<scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>,scala.collection.mutable.Map<K,org.apache.spark.util.random.AcceptanceResult>> getCombOp()
public static <K> scala.collection.Map<K,Object> computeThresholdByKey(scala.collection.Map<K,org.apache.spark.util.random.AcceptanceResult> finalResult, scala.collection.Map<K,Object> fractions)
To do so, we compute sampleSize = math.ceil(size * samplingRate) for each stratum and compare it to the number of items that were accepted instantly and the number of items in the waitlist for that stratum.
Most of the time,
numAccepted <= sampleSize <= (numAccepted + numWaitlisted)
which means we need to sort the elements in the waitlist by their associated values in order
to find the value T s.t.
|{elements in the stratum whose associated values <= T}| = sampleSize
.
Note that all elements in the waitlist have values greater than or equal to bound for instant
accept, so a T value in the waitlist range would allow all elements that were instantly
accepted on the first pass to be included in the sample.finalResult
- (undocumented)fractions
- (undocumented)public static <K,V> scala.Function2<Object,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<scala.Tuple2<K,V>>> getBernoulliSamplingFunction(RDD<scala.Tuple2<K,V>> rdd, scala.collection.Map<K,Object> fractions, boolean exact, long seed)
When exact sample size is required, we make an additional pass over the RDD to determine the exact sampling rate that guarantees sample size with high confidence.
The sampling function has a unique seed per partition.
rdd
- (undocumented)fractions
- (undocumented)exact
- (undocumented)seed
- (undocumented)public static <K,V> scala.Function2<Object,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<scala.Tuple2<K,V>>> getPoissonSamplingFunction(RDD<scala.Tuple2<K,V>> rdd, scala.collection.Map<K,Object> fractions, boolean exact, long seed, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2)
When exact sample size is required, we make two additional passed over the RDD to determine the exact sampling rate that guarantees sample size with high confidence. The first pass counts the number of items in each stratum (group of items with the same key) in the RDD, and the second pass uses the counts to determine exact sampling rates.
The sampling function has a unique seed per partition.
rdd
- (undocumented)fractions
- (undocumented)exact
- (undocumented)seed
- (undocumented)evidence$1
- (undocumented)evidence$2
- (undocumented)public static org.slf4j.Logger org$apache$spark$internal$Logging$$log_()
public static void org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1)