Package pyspark :: Module rdd
[frames] | no frames]

Source Code for Module pyspark.rdd

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one or more 
   3  # contributor license agreements.  See the NOTICE file distributed with 
   4  # this work for additional information regarding copyright ownership. 
   5  # The ASF licenses this file to You under the Apache License, Version 2.0 
   6  # (the "License"); you may not use this file except in compliance with 
   7  # the License.  You may obtain a copy of the License at 
   8  # 
   9  #    http://www.apache.org/licenses/LICENSE-2.0 
  10  # 
  11  # Unless required by applicable law or agreed to in writing, software 
  12  # distributed under the License is distributed on an "AS IS" BASIS, 
  13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14  # See the License for the specific language governing permissions and 
  15  # limitations under the License. 
  16  # 
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from collections import namedtuple 
  22  from itertools import chain, ifilter, imap 
  23  import operator 
  24  import os 
  25  import sys 
  26  import shlex 
  27  import traceback 
  28  from subprocess import Popen, PIPE 
  29  from tempfile import NamedTemporaryFile 
  30  from threading import Thread 
  31  import warnings 
  32  import heapq 
  33  import bisect 
  34  from random import Random 
  35  from math import sqrt, log, isinf, isnan 
  36   
  37  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  38      BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ 
  39      PickleSerializer, pack_long, CompressedSerializer 
  40  from pyspark.join import python_join, python_left_outer_join, \ 
  41      python_right_outer_join, python_cogroup 
  42  from pyspark.statcounter import StatCounter 
  43  from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler 
  44  from pyspark.storagelevel import StorageLevel 
  45  from pyspark.resultiterable import ResultIterable 
  46  from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \ 
  47      get_used_memory 
  48   
  49  from py4j.java_collections import ListConverter, MapConverter 
  50   
  51  __all__ = ["RDD"] 
52 53 54 # TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized 55 # hash for string 56 -def portable_hash(x):
57 """ 58 This function returns consistent hash code for builtin types, especially 59 for None and tuple with None. 60 61 The algrithm is similar to that one used by CPython 2.7 62 63 >>> portable_hash(None) 64 0 65 >>> portable_hash((None, 1)) 66 219750521 67 """ 68 if x is None: 69 return 0 70 if isinstance(x, tuple): 71 h = 0x345678 72 for i in x: 73 h ^= portable_hash(i) 74 h *= 1000003 75 h &= 0xffffffff 76 h ^= len(x) 77 if h == -1: 78 h = -2 79 return h 80 return hash(x)
81
82 83 -def _extract_concise_traceback():
84 """ 85 This function returns the traceback info for a callsite, returns a dict 86 with function name, file name and line number 87 """ 88 tb = traceback.extract_stack() 89 callsite = namedtuple("Callsite", "function file linenum") 90 if len(tb) == 0: 91 return None 92 file, line, module, what = tb[len(tb) - 1] 93 sparkpath = os.path.dirname(file) 94 first_spark_frame = len(tb) - 1 95 for i in range(0, len(tb)): 96 file, line, fun, what = tb[i] 97 if file.startswith(sparkpath): 98 first_spark_frame = i 99 break 100 if first_spark_frame == 0: 101 file, line, fun, what = tb[0] 102 return callsite(function=fun, file=file, linenum=line) 103 sfile, sline, sfun, swhat = tb[first_spark_frame] 104 ufile, uline, ufun, uwhat = tb[first_spark_frame - 1] 105 return callsite(function=sfun, file=ufile, linenum=uline)
106 107 _spark_stack_depth = 0
108 109 110 -class _JavaStackTrace(object):
111
112 - def __init__(self, sc):
113 tb = _extract_concise_traceback() 114 if tb is not None: 115 self._traceback = "%s at %s:%s" % ( 116 tb.function, tb.file, tb.linenum) 117 else: 118 self._traceback = "Error! Could not extract traceback info" 119 self._context = sc
120
121 - def __enter__(self):
122 global _spark_stack_depth 123 if _spark_stack_depth == 0: 124 self._context._jsc.setCallSite(self._traceback) 125 _spark_stack_depth += 1
126
127 - def __exit__(self, type, value, tb):
128 global _spark_stack_depth 129 _spark_stack_depth -= 1 130 if _spark_stack_depth == 0: 131 self._context._jsc.setCallSite(None)
132
133 134 -class MaxHeapQ(object):
135 136 """ 137 An implementation of MaxHeap. 138 139 >>> import pyspark.rdd 140 >>> heap = pyspark.rdd.MaxHeapQ(5) 141 >>> [heap.insert(i) for i in range(10)] 142 [None, None, None, None, None, None, None, None, None, None] 143 >>> sorted(heap.getElements()) 144 [0, 1, 2, 3, 4] 145 >>> heap = pyspark.rdd.MaxHeapQ(5) 146 >>> [heap.insert(i) for i in range(9, -1, -1)] 147 [None, None, None, None, None, None, None, None, None, None] 148 >>> sorted(heap.getElements()) 149 [0, 1, 2, 3, 4] 150 >>> heap = pyspark.rdd.MaxHeapQ(1) 151 >>> [heap.insert(i) for i in range(9, -1, -1)] 152 [None, None, None, None, None, None, None, None, None, None] 153 >>> heap.getElements() 154 [0] 155 """ 156
157 - def __init__(self, maxsize):
158 # We start from q[1], so its children are always 2 * k 159 self.q = [0] 160 self.maxsize = maxsize
161
162 - def _swim(self, k):
163 while (k > 1) and (self.q[k / 2] < self.q[k]): 164 self._swap(k, k / 2) 165 k = k / 2
166
167 - def _swap(self, i, j):
168 t = self.q[i] 169 self.q[i] = self.q[j] 170 self.q[j] = t
171
172 - def _sink(self, k):
173 N = self.size() 174 while 2 * k <= N: 175 j = 2 * k 176 # Here we test if both children are greater than parent 177 # if not swap with larger one. 178 if j < N and self.q[j] < self.q[j + 1]: 179 j = j + 1 180 if(self.q[k] > self.q[j]): 181 break 182 self._swap(k, j) 183 k = j
184
185 - def size(self):
186 return len(self.q) - 1
187
188 - def insert(self, value):
189 if (self.size()) < self.maxsize: 190 self.q.append(value) 191 self._swim(self.size()) 192 else: 193 self._replaceRoot(value)
194
195 - def getElements(self):
196 return self.q[1:]
197
198 - def _replaceRoot(self, value):
199 if(self.q[1] > value): 200 self.q[1] = value 201 self._sink(1)
202
203 204 -def _parse_memory(s):
205 """ 206 Parse a memory string in the format supported by Java (e.g. 1g, 200m) and 207 return the value in MB 208 209 >>> _parse_memory("256m") 210 256 211 >>> _parse_memory("2g") 212 2048 213 """ 214 units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024} 215 if s[-1] not in units: 216 raise ValueError("invalid format: " + s) 217 return int(float(s[:-1]) * units[s[-1].lower()])
218
219 220 -class RDD(object):
221 222 """ 223 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 224 Represents an immutable, partitioned collection of elements that can be 225 operated on in parallel. 226 """ 227
228 - def __init__(self, jrdd, ctx, jrdd_deserializer):
229 self._jrdd = jrdd 230 self.is_cached = False 231 self.is_checkpointed = False 232 self.ctx = ctx 233 self._jrdd_deserializer = jrdd_deserializer 234 self._id = jrdd.id()
235
236 - def _toPickleSerialization(self):
237 if (self._jrdd_deserializer == PickleSerializer() or 238 self._jrdd_deserializer == BatchedSerializer(PickleSerializer())): 239 return self 240 else: 241 return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
242
243 - def id(self):
244 """ 245 A unique ID for this RDD (within its SparkContext). 246 """ 247 return self._id
248
249 - def __repr__(self):
250 return self._jrdd.toString()
251 252 @property
253 - def context(self):
254 """ 255 The L{SparkContext} that this RDD was created on. 256 """ 257 return self.ctx
258
259 - def cache(self):
260 """ 261 Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}). 262 """ 263 self.is_cached = True 264 self.persist(StorageLevel.MEMORY_ONLY_SER) 265 return self
266
267 - def persist(self, storageLevel):
268 """ 269 Set this RDD's storage level to persist its values across operations 270 after the first time it is computed. This can only be used to assign 271 a new storage level if the RDD does not have a storage level set yet. 272 """ 273 self.is_cached = True 274 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 275 self._jrdd.persist(javaStorageLevel) 276 return self
277
278 - def unpersist(self):
279 """ 280 Mark the RDD as non-persistent, and remove all blocks for it from 281 memory and disk. 282 """ 283 self.is_cached = False 284 self._jrdd.unpersist() 285 return self
286
287 - def checkpoint(self):
288 """ 289 Mark this RDD for checkpointing. It will be saved to a file inside the 290 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 291 all references to its parent RDDs will be removed. This function must 292 be called before any job has been executed on this RDD. It is strongly 293 recommended that this RDD is persisted in memory, otherwise saving it 294 on a file will require recomputation. 295 """ 296 self.is_checkpointed = True 297 self._jrdd.rdd().checkpoint()
298
299 - def isCheckpointed(self):
300 """ 301 Return whether this RDD has been checkpointed or not 302 """ 303 return self._jrdd.rdd().isCheckpointed()
304
305 - def getCheckpointFile(self):
306 """ 307 Gets the name of the file to which this RDD was checkpointed 308 """ 309 checkpointFile = self._jrdd.rdd().getCheckpointFile() 310 if checkpointFile.isDefined(): 311 return checkpointFile.get() 312 else: 313 return None
314
315 - def map(self, f, preservesPartitioning=False):
316 """ 317 Return a new RDD by applying a function to each element of this RDD. 318 319 >>> rdd = sc.parallelize(["b", "a", "c"]) 320 >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 321 [('a', 1), ('b', 1), ('c', 1)] 322 """ 323 def func(_, iterator): 324 return imap(f, iterator)
325 return self.mapPartitionsWithIndex(func, preservesPartitioning)
326
327 - def flatMap(self, f, preservesPartitioning=False):
328 """ 329 Return a new RDD by first applying a function to all elements of this 330 RDD, and then flattening the results. 331 332 >>> rdd = sc.parallelize([2, 3, 4]) 333 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 334 [1, 1, 1, 2, 2, 3] 335 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 336 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 337 """ 338 def func(s, iterator): 339 return chain.from_iterable(imap(f, iterator))
340 return self.mapPartitionsWithIndex(func, preservesPartitioning) 341
342 - def mapPartitions(self, f, preservesPartitioning=False):
343 """ 344 Return a new RDD by applying a function to each partition of this RDD. 345 346 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 347 >>> def f(iterator): yield sum(iterator) 348 >>> rdd.mapPartitions(f).collect() 349 [3, 7] 350 """ 351 def func(s, iterator): 352 return f(iterator)
353 return self.mapPartitionsWithIndex(func) 354
355 - def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
356 """ 357 Return a new RDD by applying a function to each partition of this RDD, 358 while tracking the index of the original partition. 359 360 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 361 >>> def f(splitIndex, iterator): yield splitIndex 362 >>> rdd.mapPartitionsWithIndex(f).sum() 363 6 364 """ 365 return PipelinedRDD(self, f, preservesPartitioning)
366
367 - def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
368 """ 369 Deprecated: use mapPartitionsWithIndex instead. 370 371 Return a new RDD by applying a function to each partition of this RDD, 372 while tracking the index of the original partition. 373 374 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 375 >>> def f(splitIndex, iterator): yield splitIndex 376 >>> rdd.mapPartitionsWithSplit(f).sum() 377 6 378 """ 379 warnings.warn("mapPartitionsWithSplit is deprecated; " 380 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 381 return self.mapPartitionsWithIndex(f, preservesPartitioning)
382
383 - def getNumPartitions(self):
384 """ 385 Returns the number of partitions in RDD 386 387 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 388 >>> rdd.getNumPartitions() 389 2 390 """ 391 return self._jrdd.partitions().size()
392
393 - def filter(self, f):
394 """ 395 Return a new RDD containing only the elements that satisfy a predicate. 396 397 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 398 >>> rdd.filter(lambda x: x % 2 == 0).collect() 399 [2, 4] 400 """ 401 def func(iterator): 402 return ifilter(f, iterator)
403 return self.mapPartitions(func) 404
405 - def distinct(self):
406 """ 407 Return a new RDD containing the distinct elements in this RDD. 408 409 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 410 [1, 2, 3] 411 """ 412 return self.map(lambda x: (x, None)) \ 413 .reduceByKey(lambda x, _: x) \ 414 .map(lambda (x, _): x)
415
416 - def sample(self, withReplacement, fraction, seed=None):
417 """ 418 Return a sampled subset of this RDD (relies on numpy and falls back 419 on default random generator if numpy is unavailable). 420 """ 421 assert fraction >= 0.0, "Negative fraction value: %s" % fraction 422 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
423 424 # this is ported from scala/spark/RDD.scala
425 - def takeSample(self, withReplacement, num, seed=None):
426 """ 427 Return a fixed-size sampled subset of this RDD (currently requires 428 numpy). 429 430 >>> rdd = sc.parallelize(range(0, 10)) 431 >>> len(rdd.takeSample(True, 20, 1)) 432 20 433 >>> len(rdd.takeSample(False, 5, 2)) 434 5 435 >>> len(rdd.takeSample(False, 15, 3)) 436 10 437 """ 438 numStDev = 10.0 439 440 if num < 0: 441 raise ValueError("Sample size cannot be negative.") 442 elif num == 0: 443 return [] 444 445 initialCount = self.count() 446 if initialCount == 0: 447 return [] 448 449 rand = Random(seed) 450 451 if (not withReplacement) and num >= initialCount: 452 # shuffle current RDD and return 453 samples = self.collect() 454 rand.shuffle(samples) 455 return samples 456 457 maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint)) 458 if num > maxSampleSize: 459 raise ValueError( 460 "Sample size cannot be greater than %d." % maxSampleSize) 461 462 fraction = RDD._computeFractionForSampleSize( 463 num, initialCount, withReplacement) 464 samples = self.sample(withReplacement, fraction, seed).collect() 465 466 # If the first sample didn't turn out large enough, keep trying to take samples; 467 # this shouldn't happen often because we use a big multiplier for their initial size. 468 # See: scala/spark/RDD.scala 469 while len(samples) < num: 470 # TODO: add log warning for when more than one iteration was run 471 seed = rand.randint(0, sys.maxint) 472 samples = self.sample(withReplacement, fraction, seed).collect() 473 474 rand.shuffle(samples) 475 476 return samples[0:num]
477 478 @staticmethod
479 - def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement):
480 """ 481 Returns a sampling rate that guarantees a sample of 482 size >= sampleSizeLowerBound 99.99% of the time. 483 484 How the sampling rate is determined: 485 Let p = num / total, where num is the sample size and total is the 486 total number of data points in the RDD. We're trying to compute 487 q > p such that 488 - when sampling with replacement, we're drawing each data point 489 with prob_i ~ Pois(q), where we want to guarantee 490 Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to 491 total), i.e. the failure rate of not having a sufficiently large 492 sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient 493 to guarantee 0.9999 success rate for num > 12, but we need a 494 slightly larger q (9 empirically determined). 495 - when sampling without replacement, we're drawing each data point 496 with prob_i ~ Binomial(total, fraction) and our choice of q 497 guarantees 1-delta, or 0.9999 success rate, where success rate is 498 defined the same as in sampling with replacement. 499 """ 500 fraction = float(sampleSizeLowerBound) / total 501 if withReplacement: 502 numStDev = 5 503 if (sampleSizeLowerBound < 12): 504 numStDev = 9 505 return fraction + numStDev * sqrt(fraction / total) 506 else: 507 delta = 0.00005 508 gamma = - log(delta) / total 509 return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction))
510
511 - def union(self, other):
512 """ 513 Return the union of this RDD and another one. 514 515 >>> rdd = sc.parallelize([1, 1, 2, 3]) 516 >>> rdd.union(rdd).collect() 517 [1, 1, 2, 3, 1, 1, 2, 3] 518 """ 519 if self._jrdd_deserializer == other._jrdd_deserializer: 520 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 521 self._jrdd_deserializer) 522 return rdd 523 else: 524 # These RDDs contain data in different serialized formats, so we 525 # must normalize them to the default serializer. 526 self_copy = self._reserialize() 527 other_copy = other._reserialize() 528 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 529 self.ctx.serializer)
530
531 - def intersection(self, other):
532 """ 533 Return the intersection of this RDD and another one. The output will 534 not contain any duplicate elements, even if the input RDDs did. 535 536 Note that this method performs a shuffle internally. 537 538 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 539 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 540 >>> rdd1.intersection(rdd2).collect() 541 [1, 2, 3] 542 """ 543 return self.map(lambda v: (v, None)) \ 544 .cogroup(other.map(lambda v: (v, None))) \ 545 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ 546 .keys()
547
548 - def _reserialize(self, serializer=None):
549 serializer = serializer or self.ctx.serializer 550 if self._jrdd_deserializer == serializer: 551 return self 552 else: 553 converted = self.map(lambda x: x, preservesPartitioning=True) 554 converted._jrdd_deserializer = serializer 555 return converted
556
557 - def __add__(self, other):
558 """ 559 Return the union of this RDD and another one. 560 561 >>> rdd = sc.parallelize([1, 1, 2, 3]) 562 >>> (rdd + rdd).collect() 563 [1, 1, 2, 3, 1, 1, 2, 3] 564 """ 565 if not isinstance(other, RDD): 566 raise TypeError 567 return self.union(other)
568
569 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
570 """ 571 Sorts this RDD, which is assumed to consist of (key, value) pairs. 572 # noqa 573 574 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 575 >>> sc.parallelize(tmp).sortByKey().first() 576 ('1', 3) 577 >>> sc.parallelize(tmp).sortByKey(True, 1).collect() 578 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 579 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 580 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 581 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 582 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 583 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 584 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)] 585 """ 586 if numPartitions is None: 587 numPartitions = self._defaultReducePartitions() 588 589 def sortPartition(iterator): 590 return iter(sorted(iterator, key=lambda (k, v): keyfunc(k), reverse=not ascending))
591 592 if numPartitions == 1: 593 if self.getNumPartitions() > 1: 594 self = self.coalesce(1) 595 return self.mapPartitions(sortPartition) 596 597 # first compute the boundary of each part via sampling: we want to partition 598 # the key-space into bins such that the bins have roughly the same 599 # number of (key, value) pairs falling into them 600 rddSize = self.count() 601 if not rddSize: 602 return self # empty RDD 603 maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner 604 fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 605 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 606 samples = sorted(samples, reverse=(not ascending), key=keyfunc) 607 608 # we have numPartitions many parts but one of the them has 609 # an implicit boundary 610 bounds = [samples[len(samples) * (i + 1) / numPartitions] 611 for i in range(0, numPartitions - 1)] 612 613 def rangePartitioner(k): 614 p = bisect.bisect_left(bounds, keyfunc(k)) 615 if ascending: 616 return p 617 else: 618 return numPartitions - 1 - p 619 620 return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True) 621
622 - def sortBy(self, keyfunc, ascending=True, numPartitions=None):
623 """ 624 Sorts this RDD by the given keyfunc 625 626 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 627 >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() 628 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 629 >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect() 630 [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 631 """ 632 return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
633
634 - def glom(self):
635 """ 636 Return an RDD created by coalescing all elements within each partition 637 into a list. 638 639 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 640 >>> sorted(rdd.glom().collect()) 641 [[1, 2], [3, 4]] 642 """ 643 def func(iterator): 644 yield list(iterator)
645 return self.mapPartitions(func) 646
647 - def cartesian(self, other):
648 """ 649 Return the Cartesian product of this RDD and another one, that is, the 650 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 651 C{b} is in C{other}. 652 653 >>> rdd = sc.parallelize([1, 2]) 654 >>> sorted(rdd.cartesian(rdd).collect()) 655 [(1, 1), (1, 2), (2, 1), (2, 2)] 656 """ 657 # Due to batching, we can't use the Java cartesian method. 658 deserializer = CartesianDeserializer(self._jrdd_deserializer, 659 other._jrdd_deserializer) 660 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
661
662 - def groupBy(self, f, numPartitions=None):
663 """ 664 Return an RDD of grouped items. 665 666 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 667 >>> result = rdd.groupBy(lambda x: x % 2).collect() 668 >>> sorted([(x, sorted(y)) for (x, y) in result]) 669 [(0, [2, 8]), (1, [1, 1, 3, 5])] 670 """ 671 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
672
673 - def pipe(self, command, env={}):
674 """ 675 Return an RDD created by piping elements to a forked external process. 676 677 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 678 ['1', '2', '', '3'] 679 """ 680 def func(iterator): 681 pipe = Popen( 682 shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 683 684 def pipe_objs(out): 685 for obj in iterator: 686 out.write(str(obj).rstrip('\n') + '\n') 687 out.close()
688 Thread(target=pipe_objs, args=[pipe.stdin]).start() 689 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) 690 return self.mapPartitions(func) 691
692 - def foreach(self, f):
693 """ 694 Applies a function to all elements of this RDD. 695 696 >>> def f(x): print x 697 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 698 """ 699 def processPartition(iterator): 700 for x in iterator: 701 f(x) 702 yield None
703 self.mapPartitions(processPartition).collect() # Force evaluation 704
705 - def foreachPartition(self, f):
706 """ 707 Applies a function to each partition of this RDD. 708 709 >>> def f(iterator): 710 ... for x in iterator: 711 ... print x 712 ... yield None 713 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 714 """ 715 self.mapPartitions(f).collect() # Force evaluation
716
717 - def collect(self):
718 """ 719 Return a list that contains all of the elements in this RDD. 720 """ 721 with _JavaStackTrace(self.context) as st: 722 bytesInJava = self._jrdd.collect().iterator() 723 return list(self._collect_iterator_through_file(bytesInJava))
724
725 - def _collect_iterator_through_file(self, iterator):
726 # Transferring lots of data through Py4J can be slow because 727 # socket.readline() is inefficient. Instead, we'll dump the data to a 728 # file and read it back. 729 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 730 tempFile.close() 731 self.ctx._writeToFile(iterator, tempFile.name) 732 # Read the data into Python and deserialize it: 733 with open(tempFile.name, 'rb') as tempFile: 734 for item in self._jrdd_deserializer.load_stream(tempFile): 735 yield item 736 os.unlink(tempFile.name)
737
738 - def reduce(self, f):
739 """ 740 Reduces the elements of this RDD using the specified commutative and 741 associative binary operator. Currently reduces partitions locally. 742 743 >>> from operator import add 744 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 745 15 746 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 747 10 748 """ 749 def func(iterator): 750 acc = None 751 for obj in iterator: 752 if acc is None: 753 acc = obj 754 else: 755 acc = f(obj, acc) 756 if acc is not None: 757 yield acc
758 vals = self.mapPartitions(func).collect() 759 return reduce(f, vals) 760
761 - def fold(self, zeroValue, op):
762 """ 763 Aggregate the elements of each partition, and then the results for all 764 the partitions, using a given associative function and a neutral "zero 765 value." 766 767 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 768 as its result value to avoid object allocation; however, it should not 769 modify C{t2}. 770 771 >>> from operator import add 772 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 773 15 774 """ 775 def func(iterator): 776 acc = zeroValue 777 for obj in iterator: 778 acc = op(obj, acc) 779 yield acc
780 vals = self.mapPartitions(func).collect() 781 return reduce(op, vals, zeroValue) 782
783 - def aggregate(self, zeroValue, seqOp, combOp):
784 """ 785 Aggregate the elements of each partition, and then the results for all 786 the partitions, using a given combine functions and a neutral "zero 787 value." 788 789 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 790 as its result value to avoid object allocation; however, it should not 791 modify C{t2}. 792 793 The first function (seqOp) can return a different result type, U, than 794 the type of this RDD. Thus, we need one operation for merging a T into 795 an U and one operation for merging two U 796 797 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 798 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 799 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 800 (10, 4) 801 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 802 (0, 0) 803 """ 804 def func(iterator): 805 acc = zeroValue 806 for obj in iterator: 807 acc = seqOp(acc, obj) 808 yield acc
809 810 return self.mapPartitions(func).fold(zeroValue, combOp) 811
812 - def max(self):
813 """ 814 Find the maximum item in this RDD. 815 816 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 817 43.0 818 """ 819 return self.reduce(max)
820
821 - def min(self):
822 """ 823 Find the minimum item in this RDD. 824 825 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 826 1.0 827 """ 828 return self.reduce(min)
829
830 - def sum(self):
831 """ 832 Add up the elements in this RDD. 833 834 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 835 6.0 836 """ 837 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
838
839 - def count(self):
840 """ 841 Return the number of elements in this RDD. 842 843 >>> sc.parallelize([2, 3, 4]).count() 844 3 845 """ 846 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
847
848 - def stats(self):
849 """ 850 Return a L{StatCounter} object that captures the mean, variance 851 and count of the RDD's elements in one operation. 852 """ 853 def redFunc(left_counter, right_counter): 854 return left_counter.mergeStats(right_counter)
855 856 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 857
858 - def histogram(self, buckets):
859 """ 860 Compute a histogram using the provided buckets. The buckets 861 are all open to the right except for the last which is closed. 862 e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], 863 which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 864 and 50 we would have a histogram of 1,0,1. 865 866 If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), 867 this can be switched from an O(log n) inseration to O(1) per 868 element(where n = # buckets). 869 870 Buckets must be sorted and not contain any duplicates, must be 871 at least two elements. 872 873 If `buckets` is a number, it will generates buckets which are 874 evenly spaced between the minimum and maximum of the RDD. For 875 example, if the min value is 0 and the max is 100, given buckets 876 as 2, the resulting buckets will be [0,50) [50,100]. buckets must 877 be at least 1 If the RDD contains infinity, NaN throws an exception 878 If the elements in RDD do not vary (max == min) always returns 879 a single bucket. 880 881 It will return an tuple of buckets and histogram. 882 883 >>> rdd = sc.parallelize(range(51)) 884 >>> rdd.histogram(2) 885 ([0, 25, 50], [25, 26]) 886 >>> rdd.histogram([0, 5, 25, 50]) 887 ([0, 5, 25, 50], [5, 20, 26]) 888 >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets 889 ([0, 15, 30, 45, 60], [15, 15, 15, 6]) 890 >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) 891 >>> rdd.histogram(("a", "b", "c")) 892 (('a', 'b', 'c'), [2, 2]) 893 """ 894 895 if isinstance(buckets, (int, long)): 896 if buckets < 1: 897 raise ValueError("number of buckets must be >= 1") 898 899 # filter out non-comparable elements 900 def comparable(x): 901 if x is None: 902 return False 903 if type(x) is float and isnan(x): 904 return False 905 return True
906 907 filtered = self.filter(comparable) 908 909 # faster than stats() 910 def minmax(a, b): 911 return min(a[0], b[0]), max(a[1], b[1]) 912 try: 913 minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax) 914 except TypeError as e: 915 if " empty " in str(e): 916 raise ValueError("can not generate buckets from empty RDD") 917 raise 918 919 if minv == maxv or buckets == 1: 920 return [minv, maxv], [filtered.count()] 921 922 try: 923 inc = (maxv - minv) / buckets 924 except TypeError: 925 raise TypeError("Can not generate buckets with non-number in RDD") 926 927 if isinf(inc): 928 raise ValueError("Can not generate buckets with infinite value") 929 930 # keep them as integer if possible 931 if inc * buckets != maxv - minv: 932 inc = (maxv - minv) * 1.0 / buckets 933 934 buckets = [i * inc + minv for i in range(buckets)] 935 buckets.append(maxv) # fix accumulated error 936 even = True 937 938 elif isinstance(buckets, (list, tuple)): 939 if len(buckets) < 2: 940 raise ValueError("buckets should have more than one value") 941 942 if any(i is None or isinstance(i, float) and isnan(i) for i in buckets): 943 raise ValueError("can not have None or NaN in buckets") 944 945 if sorted(buckets) != list(buckets): 946 raise ValueError("buckets should be sorted") 947 948 if len(set(buckets)) != len(buckets): 949 raise ValueError("buckets should not contain duplicated values") 950 951 minv = buckets[0] 952 maxv = buckets[-1] 953 even = False 954 inc = None 955 try: 956 steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)] 957 except TypeError: 958 pass # objects in buckets do not support '-' 959 else: 960 if max(steps) - min(steps) < 1e-10: # handle precision errors 961 even = True 962 inc = (maxv - minv) / (len(buckets) - 1) 963 964 else: 965 raise TypeError("buckets should be a list or tuple or number(int or long)") 966 967 def histogram(iterator): 968 counters = [0] * len(buckets) 969 for i in iterator: 970 if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv: 971 continue 972 t = (int((i - minv) / inc) if even 973 else bisect.bisect_right(buckets, i) - 1) 974 counters[t] += 1 975 # add last two together 976 last = counters.pop() 977 counters[-1] += last 978 return [counters] 979 980 def mergeCounters(a, b): 981 return [i + j for i, j in zip(a, b)] 982 983 return buckets, self.mapPartitions(histogram).reduce(mergeCounters) 984
985 - def mean(self):
986 """ 987 Compute the mean of this RDD's elements. 988 989 >>> sc.parallelize([1, 2, 3]).mean() 990 2.0 991 """ 992 return self.stats().mean()
993
994 - def variance(self):
995 """ 996 Compute the variance of this RDD's elements. 997 998 >>> sc.parallelize([1, 2, 3]).variance() 999 0.666... 1000 """ 1001 return self.stats().variance()
1002
1003 - def stdev(self):
1004 """ 1005 Compute the standard deviation of this RDD's elements. 1006 1007 >>> sc.parallelize([1, 2, 3]).stdev() 1008 0.816... 1009 """ 1010 return self.stats().stdev()
1011
1012 - def sampleStdev(self):
1013 """ 1014 Compute the sample standard deviation of this RDD's elements (which 1015 corrects for bias in estimating the standard deviation by dividing by 1016 N-1 instead of N). 1017 1018 >>> sc.parallelize([1, 2, 3]).sampleStdev() 1019 1.0 1020 """ 1021 return self.stats().sampleStdev()
1022
1023 - def sampleVariance(self):
1024 """ 1025 Compute the sample variance of this RDD's elements (which corrects 1026 for bias in estimating the variance by dividing by N-1 instead of N). 1027 1028 >>> sc.parallelize([1, 2, 3]).sampleVariance() 1029 1.0 1030 """ 1031 return self.stats().sampleVariance()
1032
1033 - def countByValue(self):
1034 """ 1035 Return the count of each unique value in this RDD as a dictionary of 1036 (value, count) pairs. 1037 1038 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 1039 [(1, 2), (2, 3)] 1040 """ 1041 def countPartition(iterator): 1042 counts = defaultdict(int) 1043 for obj in iterator: 1044 counts[obj] += 1 1045 yield counts
1046 1047 def mergeMaps(m1, m2): 1048 for (k, v) in m2.iteritems(): 1049 m1[k] += v 1050 return m1 1051 return self.mapPartitions(countPartition).reduce(mergeMaps) 1052
1053 - def top(self, num):
1054 """ 1055 Get the top N elements from a RDD. 1056 1057 Note: It returns the list sorted in descending order. 1058 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 1059 [12] 1060 >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) 1061 [6, 5] 1062 """ 1063 def topIterator(iterator): 1064 q = [] 1065 for k in iterator: 1066 if len(q) < num: 1067 heapq.heappush(q, k) 1068 else: 1069 heapq.heappushpop(q, k) 1070 yield q
1071 1072 def merge(a, b): 1073 return next(topIterator(a + b)) 1074 1075 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 1076
1077 - def takeOrdered(self, num, key=None):
1078 """ 1079 Get the N elements from a RDD ordered in ascending order or as 1080 specified by the optional key function. 1081 1082 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 1083 [1, 2, 3, 4, 5, 6] 1084 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 1085 [10, 9, 7, 6, 5, 4] 1086 """ 1087 1088 def topNKeyedElems(iterator, key_=None): 1089 q = MaxHeapQ(num) 1090 for k in iterator: 1091 if key_ is not None: 1092 k = (key_(k), k) 1093 q.insert(k) 1094 yield q.getElements()
1095 1096 def unKey(x, key_=None): 1097 if key_ is not None: 1098 x = [i[1] for i in x] 1099 return x 1100 1101 def merge(a, b): 1102 return next(topNKeyedElems(a + b)) 1103 result = self.mapPartitions( 1104 lambda i: topNKeyedElems(i, key)).reduce(merge) 1105 return sorted(unKey(result, key), key=key) 1106
1107 - def take(self, num):
1108 """ 1109 Take the first num elements of the RDD. 1110 1111 It works by first scanning one partition, and use the results from 1112 that partition to estimate the number of additional partitions needed 1113 to satisfy the limit. 1114 1115 Translated from the Scala implementation in RDD#take(). 1116 1117 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 1118 [2, 3] 1119 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 1120 [2, 3, 4, 5, 6] 1121 >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3) 1122 [91, 92, 93] 1123 """ 1124 items = [] 1125 totalParts = self._jrdd.partitions().size() 1126 partsScanned = 0 1127 1128 while len(items) < num and partsScanned < totalParts: 1129 # The number of partitions to try in this iteration. 1130 # It is ok for this number to be greater than totalParts because 1131 # we actually cap it at totalParts in runJob. 1132 numPartsToTry = 1 1133 if partsScanned > 0: 1134 # If we didn't find any rows after the previous iteration, 1135 # quadruple and retry. Otherwise, interpolate the number of 1136 # partitions we need to try, but overestimate it by 50%. 1137 if len(items) == 0: 1138 numPartsToTry = partsScanned * 4 1139 else: 1140 numPartsToTry = int(1.5 * num * partsScanned / len(items)) 1141 1142 left = num - len(items) 1143 1144 def takeUpToNumLeft(iterator): 1145 taken = 0 1146 while taken < left: 1147 yield next(iterator) 1148 taken += 1
1149 1150 p = range( 1151 partsScanned, min(partsScanned + numPartsToTry, totalParts)) 1152 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1153 1154 items += res 1155 partsScanned += numPartsToTry 1156 1157 return items[:num] 1158
1159 - def first(self):
1160 """ 1161 Return the first element in this RDD. 1162 1163 >>> sc.parallelize([2, 3, 4]).first() 1164 2 1165 """ 1166 return self.take(1)[0]
1167
1168 - def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
1169 """ 1170 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1171 system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are 1172 converted for output using either user specified converters or, by default, 1173 L{org.apache.spark.api.python.JavaToWritableConverter}. 1174 1175 @param conf: Hadoop job configuration, passed in as a dict 1176 @param keyConverter: (None by default) 1177 @param valueConverter: (None by default) 1178 """ 1179 jconf = self.ctx._dictToJavaMap(conf) 1180 pickledRDD = self._toPickleSerialization() 1181 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1182 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, 1183 keyConverter, valueConverter, True)
1184
1185 - def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 1186 keyConverter=None, valueConverter=None, conf=None):
1187 """ 1188 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1189 system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types 1190 will be inferred if not specified. Keys and values are converted for output using either 1191 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 1192 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 1193 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 1194 1195 @param path: path to Hadoop file 1196 @param outputFormatClass: fully qualified classname of Hadoop OutputFormat 1197 (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") 1198 @param keyClass: fully qualified classname of key Writable class 1199 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 1200 @param valueClass: fully qualified classname of value Writable class 1201 (e.g. "org.apache.hadoop.io.Text", None by default) 1202 @param keyConverter: (None by default) 1203 @param valueConverter: (None by default) 1204 @param conf: Hadoop job configuration, passed in as a dict (None by default) 1205 """ 1206 jconf = self.ctx._dictToJavaMap(conf) 1207 pickledRDD = self._toPickleSerialization() 1208 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1209 self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path, 1210 outputFormatClass, 1211 keyClass, valueClass, 1212 keyConverter, valueConverter, jconf)
1213
1214 - def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
1215 """ 1216 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1217 system, using the old Hadoop OutputFormat API (mapred package). Keys/values are 1218 converted for output using either user specified converters or, by default, 1219 L{org.apache.spark.api.python.JavaToWritableConverter}. 1220 1221 @param conf: Hadoop job configuration, passed in as a dict 1222 @param keyConverter: (None by default) 1223 @param valueConverter: (None by default) 1224 """ 1225 jconf = self.ctx._dictToJavaMap(conf) 1226 pickledRDD = self._toPickleSerialization() 1227 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1228 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, 1229 keyConverter, valueConverter, False)
1230
1231 - def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 1232 keyConverter=None, valueConverter=None, conf=None, 1233 compressionCodecClass=None):
1234 """ 1235 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1236 system, using the old Hadoop OutputFormat API (mapred package). Key and value types 1237 will be inferred if not specified. Keys and values are converted for output using either 1238 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 1239 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 1240 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 1241 1242 @param path: path to Hadoop file 1243 @param outputFormatClass: fully qualified classname of Hadoop OutputFormat 1244 (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat") 1245 @param keyClass: fully qualified classname of key Writable class 1246 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 1247 @param valueClass: fully qualified classname of value Writable class 1248 (e.g. "org.apache.hadoop.io.Text", None by default) 1249 @param keyConverter: (None by default) 1250 @param valueConverter: (None by default) 1251 @param conf: (None by default) 1252 @param compressionCodecClass: (None by default) 1253 """ 1254 jconf = self.ctx._dictToJavaMap(conf) 1255 pickledRDD = self._toPickleSerialization() 1256 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1257 self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path, 1258 outputFormatClass, 1259 keyClass, valueClass, 1260 keyConverter, valueConverter, 1261 jconf, compressionCodecClass)
1262
1263 - def saveAsSequenceFile(self, path, compressionCodecClass=None):
1264 """ 1265 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1266 system, using the L{org.apache.hadoop.io.Writable} types that we convert from the 1267 RDD's key and value types. The mechanism is as follows: 1268 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. 1269 2. Keys and values of this Java RDD are converted to Writables and written out. 1270 1271 @param path: path to sequence file 1272 @param compressionCodecClass: (None by default) 1273 """ 1274 pickledRDD = self._toPickleSerialization() 1275 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1276 self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched, 1277 path, compressionCodecClass)
1278
1279 - def saveAsPickleFile(self, path, batchSize=10):
1280 """ 1281 Save this RDD as a SequenceFile of serialized objects. The serializer 1282 used is L{pyspark.serializers.PickleSerializer}, default batch size 1283 is 10. 1284 1285 >>> tmpFile = NamedTemporaryFile(delete=True) 1286 >>> tmpFile.close() 1287 >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) 1288 >>> sorted(sc.pickleFile(tmpFile.name, 5).collect()) 1289 [1, 2, 'rdd', 'spark'] 1290 """ 1291 self._reserialize(BatchedSerializer(PickleSerializer(), 1292 batchSize))._jrdd.saveAsObjectFile(path)
1293
1294 - def saveAsTextFile(self, path):
1295 """ 1296 Save this RDD as a text file, using string representations of elements. 1297 1298 >>> tempFile = NamedTemporaryFile(delete=True) 1299 >>> tempFile.close() 1300 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 1301 >>> from fileinput import input 1302 >>> from glob import glob 1303 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 1304 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 1305 1306 Empty lines are tolerated when saving to text files. 1307 1308 >>> tempFile2 = NamedTemporaryFile(delete=True) 1309 >>> tempFile2.close() 1310 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 1311 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 1312 '\\n\\n\\nbar\\nfoo\\n' 1313 """ 1314 def func(split, iterator): 1315 for x in iterator: 1316 if not isinstance(x, basestring): 1317 x = unicode(x) 1318 if isinstance(x, unicode): 1319 x = x.encode("utf-8") 1320 yield x
1321 keyed = self.mapPartitionsWithIndex(func) 1322 keyed._bypass_serializer = True 1323 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 1324 1325 # Pair functions 1326
1327 - def collectAsMap(self):
1328 """ 1329 Return the key-value pairs in this RDD to the master as a dictionary. 1330 1331 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 1332 >>> m[1] 1333 2 1334 >>> m[3] 1335 4 1336 """ 1337 return dict(self.collect())
1338
1339 - def keys(self):
1340 """ 1341 Return an RDD with the keys of each tuple. 1342 1343 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 1344 >>> m.collect() 1345 [1, 3] 1346 """ 1347 return self.map(lambda (k, v): k)
1348
1349 - def values(self):
1350 """ 1351 Return an RDD with the values of each tuple. 1352 1353 >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 1354 >>> m.collect() 1355 [2, 4] 1356 """ 1357 return self.map(lambda (k, v): v)
1358
1359 - def reduceByKey(self, func, numPartitions=None):
1360 """ 1361 Merge the values for each key using an associative reduce function. 1362 1363 This will also perform the merging locally on each mapper before 1364 sending results to a reducer, similarly to a "combiner" in MapReduce. 1365 1366 Output will be hash-partitioned with C{numPartitions} partitions, or 1367 the default parallelism level if C{numPartitions} is not specified. 1368 1369 >>> from operator import add 1370 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1371 >>> sorted(rdd.reduceByKey(add).collect()) 1372 [('a', 2), ('b', 1)] 1373 """ 1374 return self.combineByKey(lambda x: x, func, func, numPartitions)
1375
1376 - def reduceByKeyLocally(self, func):
1377 """ 1378 Merge the values for each key using an associative reduce function, but 1379 return the results immediately to the master as a dictionary. 1380 1381 This will also perform the merging locally on each mapper before 1382 sending results to a reducer, similarly to a "combiner" in MapReduce. 1383 1384 >>> from operator import add 1385 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1386 >>> sorted(rdd.reduceByKeyLocally(add).items()) 1387 [('a', 2), ('b', 1)] 1388 """ 1389 def reducePartition(iterator): 1390 m = {} 1391 for (k, v) in iterator: 1392 m[k] = v if k not in m else func(m[k], v) 1393 yield m
1394 1395 def mergeMaps(m1, m2): 1396 for (k, v) in m2.iteritems(): 1397 m1[k] = v if k not in m1 else func(m1[k], v) 1398 return m1 1399 return self.mapPartitions(reducePartition).reduce(mergeMaps) 1400
1401 - def countByKey(self):
1402 """ 1403 Count the number of elements for each key, and return the result to the 1404 master as a dictionary. 1405 1406 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1407 >>> sorted(rdd.countByKey().items()) 1408 [('a', 2), ('b', 1)] 1409 """ 1410 return self.map(lambda x: x[0]).countByValue()
1411
1412 - def join(self, other, numPartitions=None):
1413 """ 1414 Return an RDD containing all pairs of elements with matching keys in 1415 C{self} and C{other}. 1416 1417 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 1418 (k, v1) is in C{self} and (k, v2) is in C{other}. 1419 1420 Performs a hash join across the cluster. 1421 1422 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1423 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 1424 >>> sorted(x.join(y).collect()) 1425 [('a', (1, 2)), ('a', (1, 3))] 1426 """ 1427 return python_join(self, other, numPartitions)
1428
1429 - def leftOuterJoin(self, other, numPartitions=None):
1430 """ 1431 Perform a left outer join of C{self} and C{other}. 1432 1433 For each element (k, v) in C{self}, the resulting RDD will either 1434 contain all pairs (k, (v, w)) for w in C{other}, or the pair 1435 (k, (v, None)) if no elements in other have key k. 1436 1437 Hash-partitions the resulting RDD into the given number of partitions. 1438 1439 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1440 >>> y = sc.parallelize([("a", 2)]) 1441 >>> sorted(x.leftOuterJoin(y).collect()) 1442 [('a', (1, 2)), ('b', (4, None))] 1443 """ 1444 return python_left_outer_join(self, other, numPartitions)
1445
1446 - def rightOuterJoin(self, other, numPartitions=None):
1447 """ 1448 Perform a right outer join of C{self} and C{other}. 1449 1450 For each element (k, w) in C{other}, the resulting RDD will either 1451 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 1452 if no elements in C{self} have key k. 1453 1454 Hash-partitions the resulting RDD into the given number of partitions. 1455 1456 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1457 >>> y = sc.parallelize([("a", 2)]) 1458 >>> sorted(y.rightOuterJoin(x).collect()) 1459 [('a', (2, 1)), ('b', (None, 4))] 1460 """ 1461 return python_right_outer_join(self, other, numPartitions)
1462 1463 # TODO: add option to control map-side combining 1464 # portable_hash is used as default, because builtin hash of None is different 1465 # cross machines.
1466 - def partitionBy(self, numPartitions, partitionFunc=portable_hash):
1467 """ 1468 Return a copy of the RDD partitioned using the specified partitioner. 1469 1470 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 1471 >>> sets = pairs.partitionBy(2).glom().collect() 1472 >>> set(sets[0]).intersection(set(sets[1])) 1473 set([]) 1474 """ 1475 if numPartitions is None: 1476 numPartitions = self._defaultReducePartitions() 1477 1478 # Transferring O(n) objects to Java is too expensive. 1479 # Instead, we'll form the hash buckets in Python, 1480 # transferring O(numPartitions) objects to Java. 1481 # Each object is a (splitNumber, [objects]) pair. 1482 # In order to avoid too huge objects, the objects are 1483 # grouped into chunks. 1484 outputSerializer = self.ctx._unbatched_serializer 1485 1486 limit = (_parse_memory(self.ctx._conf.get( 1487 "spark.python.worker.memory", "512m")) / 2) 1488 1489 def add_shuffle_key(split, iterator): 1490 1491 buckets = defaultdict(list) 1492 c, batch = 0, min(10 * numPartitions, 1000) 1493 1494 for (k, v) in iterator: 1495 buckets[partitionFunc(k) % numPartitions].append((k, v)) 1496 c += 1 1497 1498 # check used memory and avg size of chunk of objects 1499 if (c % 1000 == 0 and get_used_memory() > limit 1500 or c > batch): 1501 n, size = len(buckets), 0 1502 for split in buckets.keys(): 1503 yield pack_long(split) 1504 d = outputSerializer.dumps(buckets[split]) 1505 del buckets[split] 1506 yield d 1507 size += len(d) 1508 1509 avg = (size / n) >> 20 1510 # let 1M < avg < 10M 1511 if avg < 1: 1512 batch *= 1.5 1513 elif avg > 10: 1514 batch = max(batch / 1.5, 1) 1515 c = 0 1516 1517 for (split, items) in buckets.iteritems(): 1518 yield pack_long(split) 1519 yield outputSerializer.dumps(items)
1520 1521 keyed = self.mapPartitionsWithIndex(add_shuffle_key) 1522 keyed._bypass_serializer = True 1523 with _JavaStackTrace(self.context) as st: 1524 pairRDD = self.ctx._jvm.PairwiseRDD( 1525 keyed._jrdd.rdd()).asJavaPairRDD() 1526 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 1527 id(partitionFunc)) 1528 jrdd = pairRDD.partitionBy(partitioner).values() 1529 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 1530 # This is required so that id(partitionFunc) remains unique, 1531 # even if partitionFunc is a lambda: 1532 rdd._partitionFunc = partitionFunc 1533 return rdd 1534 1535 # TODO: add control over map-side aggregation
1536 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 1537 numPartitions=None):
1538 """ 1539 Generic function to combine the elements for each key using a custom 1540 set of aggregation functions. 1541 1542 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 1543 type" C. Note that V and C can be different -- for example, one might 1544 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 1545 1546 Users provide three functions: 1547 1548 - C{createCombiner}, which turns a V into a C (e.g., creates 1549 a one-element list) 1550 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 1551 a list) 1552 - C{mergeCombiners}, to combine two C's into a single one. 1553 1554 In addition, users can control the partitioning of the output RDD. 1555 1556 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1557 >>> def f(x): return x 1558 >>> def add(a, b): return a + str(b) 1559 >>> sorted(x.combineByKey(str, add, add).collect()) 1560 [('a', '11'), ('b', '1')] 1561 """ 1562 if numPartitions is None: 1563 numPartitions = self._defaultReducePartitions() 1564 1565 serializer = self.ctx.serializer 1566 spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() 1567 == 'true') 1568 memory = _parse_memory(self.ctx._conf.get( 1569 "spark.python.worker.memory", "512m")) 1570 agg = Aggregator(createCombiner, mergeValue, mergeCombiners) 1571 1572 def combineLocally(iterator): 1573 merger = ExternalMerger(agg, memory * 0.9, serializer) \ 1574 if spill else InMemoryMerger(agg) 1575 merger.mergeValues(iterator) 1576 return merger.iteritems()
1577 1578 locally_combined = self.mapPartitions(combineLocally) 1579 shuffled = locally_combined.partitionBy(numPartitions) 1580 1581 def _mergeCombiners(iterator): 1582 merger = ExternalMerger(agg, memory, serializer) \ 1583 if spill else InMemoryMerger(agg) 1584 merger.mergeCombiners(iterator) 1585 return merger.iteritems() 1586 1587 return shuffled.mapPartitions(_mergeCombiners) 1588
1589 - def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
1590 """ 1591 Aggregate the values of each key, using given combine functions and a neutral 1592 "zero value". This function can return a different result type, U, than the type 1593 of the values in this RDD, V. Thus, we need one operation for merging a V into 1594 a U and one operation for merging two U's, The former operation is used for merging 1595 values within a partition, and the latter is used for merging values between 1596 partitions. To avoid memory allocation, both of these functions are 1597 allowed to modify and return their first argument instead of creating a new U. 1598 """ 1599 def createZero(): 1600 return copy.deepcopy(zeroValue)
1601 1602 return self.combineByKey( 1603 lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) 1604
1605 - def foldByKey(self, zeroValue, func, numPartitions=None):
1606 """ 1607 Merge the values for each key using an associative function "func" 1608 and a neutral "zeroValue" which may be added to the result an 1609 arbitrary number of times, and must not change the result 1610 (e.g., 0 for addition, or 1 for multiplication.). 1611 1612 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1613 >>> from operator import add 1614 >>> rdd.foldByKey(0, add).collect() 1615 [('a', 2), ('b', 1)] 1616 """ 1617 def createZero(): 1618 return copy.deepcopy(zeroValue)
1619 1620 return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions) 1621 1622 # TODO: support variant with custom partitioner
1623 - def groupByKey(self, numPartitions=None):
1624 """ 1625 Group the values for each key in the RDD into a single sequence. 1626 Hash-partitions the resulting RDD with into numPartitions partitions. 1627 1628 Note: If you are grouping in order to perform an aggregation (such as a 1629 sum or average) over each key, using reduceByKey will provide much 1630 better performance. 1631 1632 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1633 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) 1634 [('a', [1, 1]), ('b', [1])] 1635 """ 1636 1637 def createCombiner(x): 1638 return [x]
1639 1640 def mergeValue(xs, x): 1641 xs.append(x) 1642 return xs 1643 1644 def mergeCombiners(a, b): 1645 a.extend(b) 1646 return a 1647 1648 return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 1649 numPartitions).mapValues(lambda x: ResultIterable(x)) 1650 1651 # TODO: add tests
1652 - def flatMapValues(self, f):
1653 """ 1654 Pass each value in the key-value pair RDD through a flatMap function 1655 without changing the keys; this also retains the original RDD's 1656 partitioning. 1657 1658 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 1659 >>> def f(x): return x 1660 >>> x.flatMapValues(f).collect() 1661 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 1662 """ 1663 flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 1664 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1665
1666 - def mapValues(self, f):
1667 """ 1668 Pass each value in the key-value pair RDD through a map function 1669 without changing the keys; this also retains the original RDD's 1670 partitioning. 1671 1672 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 1673 >>> def f(x): return len(x) 1674 >>> x.mapValues(f).collect() 1675 [('a', 3), ('b', 1)] 1676 """ 1677 map_values_fn = lambda (k, v): (k, f(v)) 1678 return self.map(map_values_fn, preservesPartitioning=True)
1679
1680 - def groupWith(self, other, *others):
1681 """ 1682 Alias for cogroup but with support for multiple RDDs. 1683 1684 >>> w = sc.parallelize([("a", 5), ("b", 6)]) 1685 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1686 >>> y = sc.parallelize([("a", 2)]) 1687 >>> z = sc.parallelize([("b", 42)]) 1688 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \ 1689 sorted(list(w.groupWith(x, y, z).collect()))) 1690 [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))] 1691 1692 """ 1693 return python_cogroup((self, other) + others, numPartitions=None)
1694 1695 # TODO: add variant with custom parittioner
1696 - def cogroup(self, other, numPartitions=None):
1697 """ 1698 For each key k in C{self} or C{other}, return a resulting RDD that 1699 contains a tuple with the list of values for that key in C{self} as 1700 well as C{other}. 1701 1702 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1703 >>> y = sc.parallelize([("a", 2)]) 1704 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) 1705 [('a', ([1], [2])), ('b', ([4], []))] 1706 """ 1707 return python_cogroup((self, other), numPartitions)
1708
1709 - def sampleByKey(self, withReplacement, fractions, seed=None):
1710 """ 1711 Return a subset of this RDD sampled by key (via stratified sampling). 1712 Create a sample of this RDD using variable sampling rates for 1713 different keys as specified by fractions, a key to sampling rate map. 1714 1715 >>> fractions = {"a": 0.2, "b": 0.1} 1716 >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000))) 1717 >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect()) 1718 >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150 1719 True 1720 >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0 1721 True 1722 >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0 1723 True 1724 """ 1725 for fraction in fractions.values(): 1726 assert fraction >= 0.0, "Negative fraction value: %s" % fraction 1727 return self.mapPartitionsWithIndex( 1728 RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
1729
1730 - def subtractByKey(self, other, numPartitions=None):
1731 """ 1732 Return each (key, value) pair in C{self} that has no pair with matching 1733 key in C{other}. 1734 1735 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 1736 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1737 >>> sorted(x.subtractByKey(y).collect()) 1738 [('b', 4), ('b', 5)] 1739 """ 1740 def filter_func((key, vals)): 1741 return len(vals[0]) > 0 and len(vals[1]) == 0
1742 map_func = lambda (key, vals): [(key, val) for val in vals[0]] 1743 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) 1744
1745 - def subtract(self, other, numPartitions=None):
1746 """ 1747 Return each value in C{self} that is not contained in C{other}. 1748 1749 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 1750 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1751 >>> sorted(x.subtract(y).collect()) 1752 [('a', 1), ('b', 4), ('b', 5)] 1753 """ 1754 # note: here 'True' is just a placeholder 1755 rdd = other.map(lambda x: (x, True)) 1756 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
1757
1758 - def keyBy(self, f):
1759 """ 1760 Creates tuples of the elements in this RDD by applying C{f}. 1761 1762 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 1763 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 1764 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) 1765 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 1766 """ 1767 return self.map(lambda x: (f(x), x))
1768
1769 - def repartition(self, numPartitions):
1770 """ 1771 Return a new RDD that has exactly numPartitions partitions. 1772 1773 Can increase or decrease the level of parallelism in this RDD. 1774 Internally, this uses a shuffle to redistribute data. 1775 If you are decreasing the number of partitions in this RDD, consider 1776 using `coalesce`, which can avoid performing a shuffle. 1777 1778 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 1779 >>> sorted(rdd.glom().collect()) 1780 [[1], [2, 3], [4, 5], [6, 7]] 1781 >>> len(rdd.repartition(2).glom().collect()) 1782 2 1783 >>> len(rdd.repartition(10).glom().collect()) 1784 10 1785 """ 1786 jrdd = self._jrdd.repartition(numPartitions) 1787 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1788
1789 - def coalesce(self, numPartitions, shuffle=False):
1790 """ 1791 Return a new RDD that is reduced into `numPartitions` partitions. 1792 1793 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 1794 [[1], [2, 3], [4, 5]] 1795 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 1796 [[1, 2, 3, 4, 5]] 1797 """ 1798 jrdd = self._jrdd.coalesce(numPartitions) 1799 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1800
1801 - def zip(self, other):
1802 """ 1803 Zips this RDD with another one, returning key-value pairs with the 1804 first element in each RDD second element in each RDD, etc. Assumes 1805 that the two RDDs have the same number of partitions and the same 1806 number of elements in each partition (e.g. one was made through 1807 a map on the other). 1808 1809 >>> x = sc.parallelize(range(0,5)) 1810 >>> y = sc.parallelize(range(1000, 1005)) 1811 >>> x.zip(y).collect() 1812 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 1813 """ 1814 if self.getNumPartitions() != other.getNumPartitions(): 1815 raise ValueError("Can only zip with RDD which has the same number of partitions") 1816 1817 def get_batch_size(ser): 1818 if isinstance(ser, BatchedSerializer): 1819 return ser.batchSize 1820 return 0
1821 1822 def batch_as(rdd, batchSize): 1823 ser = rdd._jrdd_deserializer 1824 if isinstance(ser, BatchedSerializer): 1825 ser = ser.serializer 1826 return rdd._reserialize(BatchedSerializer(ser, batchSize)) 1827 1828 my_batch = get_batch_size(self._jrdd_deserializer) 1829 other_batch = get_batch_size(other._jrdd_deserializer) 1830 if my_batch != other_batch: 1831 # use the greatest batchSize to batch the other one. 1832 if my_batch > other_batch: 1833 other = batch_as(other, my_batch) 1834 else: 1835 self = batch_as(self, other_batch) 1836 1837 # There will be an Exception in JVM if there are different number 1838 # of items in each partitions. 1839 pairRDD = self._jrdd.zip(other._jrdd) 1840 deserializer = PairDeserializer(self._jrdd_deserializer, 1841 other._jrdd_deserializer) 1842 return RDD(pairRDD, self.ctx, deserializer) 1843
1844 - def zipWithIndex(self):
1845 """ 1846 Zips this RDD with its element indices. 1847 1848 The ordering is first based on the partition index and then the 1849 ordering of items within each partition. So the first item in 1850 the first partition gets index 0, and the last item in the last 1851 partition receives the largest index. 1852 1853 This method needs to trigger a spark job when this RDD contains 1854 more than one partitions. 1855 1856 >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect() 1857 [('a', 0), ('b', 1), ('c', 2), ('d', 3)] 1858 """ 1859 starts = [0] 1860 if self.getNumPartitions() > 1: 1861 nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect() 1862 for i in range(len(nums) - 1): 1863 starts.append(starts[-1] + nums[i]) 1864 1865 def func(k, it): 1866 for i, v in enumerate(it, starts[k]): 1867 yield v, i
1868 1869 return self.mapPartitionsWithIndex(func) 1870
1871 - def zipWithUniqueId(self):
1872 """ 1873 Zips this RDD with generated unique Long ids. 1874 1875 Items in the kth partition will get ids k, n+k, 2*n+k, ..., where 1876 n is the number of partitions. So there may exist gaps, but this 1877 method won't trigger a spark job, which is different from 1878 L{zipWithIndex} 1879 1880 >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect() 1881 [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] 1882 """ 1883 n = self.getNumPartitions() 1884 1885 def func(k, it): 1886 for i, v in enumerate(it): 1887 yield v, i * n + k
1888 1889 return self.mapPartitionsWithIndex(func) 1890
1891 - def name(self):
1892 """ 1893 Return the name of this RDD. 1894 """ 1895 name_ = self._jrdd.name() 1896 if not name_: 1897 return None 1898 return name_.encode('utf-8')
1899
1900 - def setName(self, name):
1901 """ 1902 Assign a name to this RDD. 1903 1904 >>> rdd1 = sc.parallelize([1,2]) 1905 >>> rdd1.setName('RDD1') 1906 >>> rdd1.name() 1907 'RDD1' 1908 """ 1909 self._jrdd.setName(name)
1910
1911 - def toDebugString(self):
1912 """ 1913 A description of this RDD and its recursive dependencies for debugging. 1914 """ 1915 debug_string = self._jrdd.toDebugString() 1916 if not debug_string: 1917 return None 1918 return debug_string.encode('utf-8')
1919
1920 - def getStorageLevel(self):
1921 """ 1922 Get the RDD's current storage level. 1923 1924 >>> rdd1 = sc.parallelize([1,2]) 1925 >>> rdd1.getStorageLevel() 1926 StorageLevel(False, False, False, False, 1) 1927 >>> print(rdd1.getStorageLevel()) 1928 Serialized 1x Replicated 1929 """ 1930 java_storage_level = self._jrdd.getStorageLevel() 1931 storage_level = StorageLevel(java_storage_level.useDisk(), 1932 java_storage_level.useMemory(), 1933 java_storage_level.useOffHeap(), 1934 java_storage_level.deserialized(), 1935 java_storage_level.replication()) 1936 return storage_level
1937
1938 - def _defaultReducePartitions(self):
1939 """ 1940 Returns the default number of partitions to use during reduce tasks (e.g., groupBy). 1941 If spark.default.parallelism is set, then we'll use the value from SparkContext 1942 defaultParallelism, otherwise we'll use the number of partitions in this RDD. 1943 1944 This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce 1945 the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will 1946 be inherent. 1947 """ 1948 if self.ctx._conf.contains("spark.default.parallelism"): 1949 return self.ctx.defaultParallelism 1950 else: 1951 return self.getNumPartitions()
1952
1953 # TODO: `lookup` is disabled because we can't make direct comparisons based 1954 # on the key; we need to compare the hash of the key to the hash of the 1955 # keys in the pairs. This could be an expensive operation, since those 1956 # hashes aren't retained. 1957 1958 1959 -class PipelinedRDD(RDD):
1960 1961 """ 1962 Pipelined maps: 1963 1964 >>> rdd = sc.parallelize([1, 2, 3, 4]) 1965 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 1966 [4, 8, 12, 16] 1967 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 1968 [4, 8, 12, 16] 1969 1970 Pipelined reduces: 1971 >>> from operator import add 1972 >>> rdd.map(lambda x: 2 * x).reduce(add) 1973 20 1974 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 1975 20 1976 """ 1977
1978 - def __init__(self, prev, func, preservesPartitioning=False):
1979 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 1980 # This transformation is the first in its stage: 1981 self.func = func 1982 self.preservesPartitioning = preservesPartitioning 1983 self._prev_jrdd = prev._jrdd 1984 self._prev_jrdd_deserializer = prev._jrdd_deserializer 1985 else: 1986 prev_func = prev.func 1987 1988 def pipeline_func(split, iterator): 1989 return func(split, prev_func(split, iterator))
1990 self.func = pipeline_func 1991 self.preservesPartitioning = \ 1992 prev.preservesPartitioning and preservesPartitioning 1993 self._prev_jrdd = prev._prev_jrdd # maintain the pipeline 1994 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 1995 self.is_cached = False 1996 self.is_checkpointed = False 1997 self.ctx = prev.ctx 1998 self.prev = prev 1999 self._jrdd_val = None 2000 self._jrdd_deserializer = self.ctx.serializer 2001 self._bypass_serializer = False
2002 2003 @property
2004 - def _jrdd(self):
2005 if self._jrdd_val: 2006 return self._jrdd_val 2007 if self._bypass_serializer: 2008 self._jrdd_deserializer = NoOpSerializer() 2009 command = (self.func, self._prev_jrdd_deserializer, 2010 self._jrdd_deserializer) 2011 ser = CloudPickleSerializer() 2012 pickled_command = ser.dumps(command) 2013 broadcast_vars = ListConverter().convert( 2014 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 2015 self.ctx._gateway._gateway_client) 2016 self.ctx._pickled_broadcast_vars.clear() 2017 env = MapConverter().convert(self.ctx.environment, 2018 self.ctx._gateway._gateway_client) 2019 includes = ListConverter().convert(self.ctx._python_includes, 2020 self.ctx._gateway._gateway_client) 2021 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 2022 bytearray(pickled_command), 2023 env, includes, self.preservesPartitioning, 2024 self.ctx.pythonExec, 2025 broadcast_vars, self.ctx._javaAccumulator) 2026 self._jrdd_val = python_rdd.asJavaRDD() 2027 return self._jrdd_val
2028
2029 - def _is_pipelinable(self):
2030 return not (self.is_cached or self.is_checkpointed)
2031
2032 2033 -def _test():
2034 import doctest 2035 from pyspark.context import SparkContext 2036 globs = globals().copy() 2037 # The small batch size here ensures that we see multiple batches, 2038 # even in these small test examples: 2039 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 2040 (failure_count, test_count) = doctest.testmod( 2041 globs=globs, optionflags=doctest.ELLIPSIS) 2042 globs['sc'].stop() 2043 if failure_count: 2044 exit(-1)
2045 2046 2047 if __name__ == "__main__": 2048 _test() 2049