Package pyspark :: Module rdd
[frames] | no frames]

Source Code for Module pyspark.rdd

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one or more 
   3  # contributor license agreements.  See the NOTICE file distributed with 
   4  # this work for additional information regarding copyright ownership. 
   5  # The ASF licenses this file to You under the Apache License, Version 2.0 
   6  # (the "License"); you may not use this file except in compliance with 
   7  # the License.  You may obtain a copy of the License at 
   8  # 
   9  #    http://www.apache.org/licenses/LICENSE-2.0 
  10  # 
  11  # Unless required by applicable law or agreed to in writing, software 
  12  # distributed under the License is distributed on an "AS IS" BASIS, 
  13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14  # See the License for the specific language governing permissions and 
  15  # limitations under the License. 
  16  # 
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from collections import namedtuple 
  22  from itertools import chain, ifilter, imap 
  23  import operator 
  24  import os 
  25  import sys 
  26  import shlex 
  27  import traceback 
  28  from subprocess import Popen, PIPE 
  29  from tempfile import NamedTemporaryFile 
  30  from threading import Thread 
  31  import warnings 
  32  import heapq 
  33  import bisect 
  34  from random import Random 
  35  from math import sqrt, log, isinf, isnan 
  36   
  37  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  38      BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ 
  39      PickleSerializer, pack_long, CompressedSerializer 
  40  from pyspark.join import python_join, python_left_outer_join, \ 
  41      python_right_outer_join, python_cogroup 
  42  from pyspark.statcounter import StatCounter 
  43  from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler 
  44  from pyspark.storagelevel import StorageLevel 
  45  from pyspark.resultiterable import ResultIterable 
  46  from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \ 
  47      get_used_memory 
  48   
  49  from py4j.java_collections import ListConverter, MapConverter 
  50   
  51  __all__ = ["RDD"] 
52 53 54 # TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized 55 # hash for string 56 -def portable_hash(x):
57 """ 58 This function returns consistant hash code for builtin types, especially 59 for None and tuple with None. 60 61 The algrithm is similar to that one used by CPython 2.7 62 63 >>> portable_hash(None) 64 0 65 >>> portable_hash((None, 1)) 66 219750521 67 """ 68 if x is None: 69 return 0 70 if isinstance(x, tuple): 71 h = 0x345678 72 for i in x: 73 h ^= portable_hash(i) 74 h *= 1000003 75 h &= 0xffffffff 76 h ^= len(x) 77 if h == -1: 78 h = -2 79 return h 80 return hash(x)
81
82 83 -def _extract_concise_traceback():
84 """ 85 This function returns the traceback info for a callsite, returns a dict 86 with function name, file name and line number 87 """ 88 tb = traceback.extract_stack() 89 callsite = namedtuple("Callsite", "function file linenum") 90 if len(tb) == 0: 91 return None 92 file, line, module, what = tb[len(tb) - 1] 93 sparkpath = os.path.dirname(file) 94 first_spark_frame = len(tb) - 1 95 for i in range(0, len(tb)): 96 file, line, fun, what = tb[i] 97 if file.startswith(sparkpath): 98 first_spark_frame = i 99 break 100 if first_spark_frame == 0: 101 file, line, fun, what = tb[0] 102 return callsite(function=fun, file=file, linenum=line) 103 sfile, sline, sfun, swhat = tb[first_spark_frame] 104 ufile, uline, ufun, uwhat = tb[first_spark_frame - 1] 105 return callsite(function=sfun, file=ufile, linenum=uline)
106 107 _spark_stack_depth = 0
108 109 110 -class _JavaStackTrace(object):
111
112 - def __init__(self, sc):
113 tb = _extract_concise_traceback() 114 if tb is not None: 115 self._traceback = "%s at %s:%s" % ( 116 tb.function, tb.file, tb.linenum) 117 else: 118 self._traceback = "Error! Could not extract traceback info" 119 self._context = sc
120
121 - def __enter__(self):
122 global _spark_stack_depth 123 if _spark_stack_depth == 0: 124 self._context._jsc.setCallSite(self._traceback) 125 _spark_stack_depth += 1
126
127 - def __exit__(self, type, value, tb):
128 global _spark_stack_depth 129 _spark_stack_depth -= 1 130 if _spark_stack_depth == 0: 131 self._context._jsc.setCallSite(None)
132
133 134 -class MaxHeapQ(object):
135 136 """ 137 An implementation of MaxHeap. 138 139 >>> import pyspark.rdd 140 >>> heap = pyspark.rdd.MaxHeapQ(5) 141 >>> [heap.insert(i) for i in range(10)] 142 [None, None, None, None, None, None, None, None, None, None] 143 >>> sorted(heap.getElements()) 144 [0, 1, 2, 3, 4] 145 >>> heap = pyspark.rdd.MaxHeapQ(5) 146 >>> [heap.insert(i) for i in range(9, -1, -1)] 147 [None, None, None, None, None, None, None, None, None, None] 148 >>> sorted(heap.getElements()) 149 [0, 1, 2, 3, 4] 150 >>> heap = pyspark.rdd.MaxHeapQ(1) 151 >>> [heap.insert(i) for i in range(9, -1, -1)] 152 [None, None, None, None, None, None, None, None, None, None] 153 >>> heap.getElements() 154 [0] 155 """ 156
157 - def __init__(self, maxsize):
158 # We start from q[1], so its children are always 2 * k 159 self.q = [0] 160 self.maxsize = maxsize
161
162 - def _swim(self, k):
163 while (k > 1) and (self.q[k / 2] < self.q[k]): 164 self._swap(k, k / 2) 165 k = k / 2
166
167 - def _swap(self, i, j):
168 t = self.q[i] 169 self.q[i] = self.q[j] 170 self.q[j] = t
171
172 - def _sink(self, k):
173 N = self.size() 174 while 2 * k <= N: 175 j = 2 * k 176 # Here we test if both children are greater than parent 177 # if not swap with larger one. 178 if j < N and self.q[j] < self.q[j + 1]: 179 j = j + 1 180 if(self.q[k] > self.q[j]): 181 break 182 self._swap(k, j) 183 k = j
184
185 - def size(self):
186 return len(self.q) - 1
187
188 - def insert(self, value):
189 if (self.size()) < self.maxsize: 190 self.q.append(value) 191 self._swim(self.size()) 192 else: 193 self._replaceRoot(value)
194
195 - def getElements(self):
196 return self.q[1:]
197
198 - def _replaceRoot(self, value):
199 if(self.q[1] > value): 200 self.q[1] = value 201 self._sink(1)
202
203 204 -def _parse_memory(s):
205 """ 206 Parse a memory string in the format supported by Java (e.g. 1g, 200m) and 207 return the value in MB 208 209 >>> _parse_memory("256m") 210 256 211 >>> _parse_memory("2g") 212 2048 213 """ 214 units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024} 215 if s[-1] not in units: 216 raise ValueError("invalid format: " + s) 217 return int(float(s[:-1]) * units[s[-1].lower()])
218
219 220 -class RDD(object):
221 222 """ 223 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 224 Represents an immutable, partitioned collection of elements that can be 225 operated on in parallel. 226 """ 227
228 - def __init__(self, jrdd, ctx, jrdd_deserializer):
229 self._jrdd = jrdd 230 self.is_cached = False 231 self.is_checkpointed = False 232 self.ctx = ctx 233 self._jrdd_deserializer = jrdd_deserializer 234 self._id = jrdd.id()
235
236 - def _toPickleSerialization(self):
237 if (self._jrdd_deserializer == PickleSerializer() or 238 self._jrdd_deserializer == BatchedSerializer(PickleSerializer())): 239 return self 240 else: 241 return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
242
243 - def id(self):
244 """ 245 A unique ID for this RDD (within its SparkContext). 246 """ 247 return self._id
248
249 - def __repr__(self):
250 return self._jrdd.toString()
251 252 @property
253 - def context(self):
254 """ 255 The L{SparkContext} that this RDD was created on. 256 """ 257 return self.ctx
258
259 - def cache(self):
260 """ 261 Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}). 262 """ 263 self.is_cached = True 264 self.persist(StorageLevel.MEMORY_ONLY_SER) 265 return self
266
267 - def persist(self, storageLevel):
268 """ 269 Set this RDD's storage level to persist its values across operations 270 after the first time it is computed. This can only be used to assign 271 a new storage level if the RDD does not have a storage level set yet. 272 """ 273 self.is_cached = True 274 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 275 self._jrdd.persist(javaStorageLevel) 276 return self
277
278 - def unpersist(self):
279 """ 280 Mark the RDD as non-persistent, and remove all blocks for it from 281 memory and disk. 282 """ 283 self.is_cached = False 284 self._jrdd.unpersist() 285 return self
286
287 - def checkpoint(self):
288 """ 289 Mark this RDD for checkpointing. It will be saved to a file inside the 290 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 291 all references to its parent RDDs will be removed. This function must 292 be called before any job has been executed on this RDD. It is strongly 293 recommended that this RDD is persisted in memory, otherwise saving it 294 on a file will require recomputation. 295 """ 296 self.is_checkpointed = True 297 self._jrdd.rdd().checkpoint()
298
299 - def isCheckpointed(self):
300 """ 301 Return whether this RDD has been checkpointed or not 302 """ 303 return self._jrdd.rdd().isCheckpointed()
304
305 - def getCheckpointFile(self):
306 """ 307 Gets the name of the file to which this RDD was checkpointed 308 """ 309 checkpointFile = self._jrdd.rdd().getCheckpointFile() 310 if checkpointFile.isDefined(): 311 return checkpointFile.get() 312 else: 313 return None
314
315 - def map(self, f, preservesPartitioning=False):
316 """ 317 Return a new RDD by applying a function to each element of this RDD. 318 319 >>> rdd = sc.parallelize(["b", "a", "c"]) 320 >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 321 [('a', 1), ('b', 1), ('c', 1)] 322 """ 323 def func(_, iterator): 324 return imap(f, iterator)
325 return self.mapPartitionsWithIndex(func, preservesPartitioning)
326
327 - def flatMap(self, f, preservesPartitioning=False):
328 """ 329 Return a new RDD by first applying a function to all elements of this 330 RDD, and then flattening the results. 331 332 >>> rdd = sc.parallelize([2, 3, 4]) 333 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 334 [1, 1, 1, 2, 2, 3] 335 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 336 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 337 """ 338 def func(s, iterator): 339 return chain.from_iterable(imap(f, iterator))
340 return self.mapPartitionsWithIndex(func, preservesPartitioning) 341
342 - def mapPartitions(self, f, preservesPartitioning=False):
343 """ 344 Return a new RDD by applying a function to each partition of this RDD. 345 346 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 347 >>> def f(iterator): yield sum(iterator) 348 >>> rdd.mapPartitions(f).collect() 349 [3, 7] 350 """ 351 def func(s, iterator): 352 return f(iterator)
353 return self.mapPartitionsWithIndex(func) 354
355 - def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
356 """ 357 Return a new RDD by applying a function to each partition of this RDD, 358 while tracking the index of the original partition. 359 360 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 361 >>> def f(splitIndex, iterator): yield splitIndex 362 >>> rdd.mapPartitionsWithIndex(f).sum() 363 6 364 """ 365 return PipelinedRDD(self, f, preservesPartitioning)
366
367 - def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
368 """ 369 Deprecated: use mapPartitionsWithIndex instead. 370 371 Return a new RDD by applying a function to each partition of this RDD, 372 while tracking the index of the original partition. 373 374 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 375 >>> def f(splitIndex, iterator): yield splitIndex 376 >>> rdd.mapPartitionsWithSplit(f).sum() 377 6 378 """ 379 warnings.warn("mapPartitionsWithSplit is deprecated; " 380 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 381 return self.mapPartitionsWithIndex(f, preservesPartitioning)
382
383 - def getNumPartitions(self):
384 """ 385 Returns the number of partitions in RDD 386 387 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 388 >>> rdd.getNumPartitions() 389 2 390 """ 391 return self._jrdd.partitions().size()
392
393 - def filter(self, f):
394 """ 395 Return a new RDD containing only the elements that satisfy a predicate. 396 397 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 398 >>> rdd.filter(lambda x: x % 2 == 0).collect() 399 [2, 4] 400 """ 401 def func(iterator): 402 return ifilter(f, iterator)
403 return self.mapPartitions(func) 404
405 - def distinct(self):
406 """ 407 Return a new RDD containing the distinct elements in this RDD. 408 409 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 410 [1, 2, 3] 411 """ 412 return self.map(lambda x: (x, None)) \ 413 .reduceByKey(lambda x, _: x) \ 414 .map(lambda (x, _): x)
415
416 - def sample(self, withReplacement, fraction, seed=None):
417 """ 418 Return a sampled subset of this RDD (relies on numpy and falls back 419 on default random generator if numpy is unavailable). 420 421 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 422 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 423 """ 424 assert fraction >= 0.0, "Negative fraction value: %s" % fraction 425 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
426 427 # this is ported from scala/spark/RDD.scala
428 - def takeSample(self, withReplacement, num, seed=None):
429 """ 430 Return a fixed-size sampled subset of this RDD (currently requires 431 numpy). 432 433 >>> rdd = sc.parallelize(range(0, 10)) 434 >>> len(rdd.takeSample(True, 20, 1)) 435 20 436 >>> len(rdd.takeSample(False, 5, 2)) 437 5 438 >>> len(rdd.takeSample(False, 15, 3)) 439 10 440 """ 441 numStDev = 10.0 442 443 if num < 0: 444 raise ValueError("Sample size cannot be negative.") 445 elif num == 0: 446 return [] 447 448 initialCount = self.count() 449 if initialCount == 0: 450 return [] 451 452 rand = Random(seed) 453 454 if (not withReplacement) and num >= initialCount: 455 # shuffle current RDD and return 456 samples = self.collect() 457 rand.shuffle(samples) 458 return samples 459 460 maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint)) 461 if num > maxSampleSize: 462 raise ValueError( 463 "Sample size cannot be greater than %d." % maxSampleSize) 464 465 fraction = RDD._computeFractionForSampleSize( 466 num, initialCount, withReplacement) 467 samples = self.sample(withReplacement, fraction, seed).collect() 468 469 # If the first sample didn't turn out large enough, keep trying to take samples; 470 # this shouldn't happen often because we use a big multiplier for their initial size. 471 # See: scala/spark/RDD.scala 472 while len(samples) < num: 473 # TODO: add log warning for when more than one iteration was run 474 seed = rand.randint(0, sys.maxint) 475 samples = self.sample(withReplacement, fraction, seed).collect() 476 477 rand.shuffle(samples) 478 479 return samples[0:num]
480 481 @staticmethod
482 - def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement):
483 """ 484 Returns a sampling rate that guarantees a sample of 485 size >= sampleSizeLowerBound 99.99% of the time. 486 487 How the sampling rate is determined: 488 Let p = num / total, where num is the sample size and total is the 489 total number of data points in the RDD. We're trying to compute 490 q > p such that 491 - when sampling with replacement, we're drawing each data point 492 with prob_i ~ Pois(q), where we want to guarantee 493 Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to 494 total), i.e. the failure rate of not having a sufficiently large 495 sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient 496 to guarantee 0.9999 success rate for num > 12, but we need a 497 slightly larger q (9 empirically determined). 498 - when sampling without replacement, we're drawing each data point 499 with prob_i ~ Binomial(total, fraction) and our choice of q 500 guarantees 1-delta, or 0.9999 success rate, where success rate is 501 defined the same as in sampling with replacement. 502 """ 503 fraction = float(sampleSizeLowerBound) / total 504 if withReplacement: 505 numStDev = 5 506 if (sampleSizeLowerBound < 12): 507 numStDev = 9 508 return fraction + numStDev * sqrt(fraction / total) 509 else: 510 delta = 0.00005 511 gamma = - log(delta) / total 512 return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction))
513
514 - def union(self, other):
515 """ 516 Return the union of this RDD and another one. 517 518 >>> rdd = sc.parallelize([1, 1, 2, 3]) 519 >>> rdd.union(rdd).collect() 520 [1, 1, 2, 3, 1, 1, 2, 3] 521 """ 522 if self._jrdd_deserializer == other._jrdd_deserializer: 523 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 524 self._jrdd_deserializer) 525 return rdd 526 else: 527 # These RDDs contain data in different serialized formats, so we 528 # must normalize them to the default serializer. 529 self_copy = self._reserialize() 530 other_copy = other._reserialize() 531 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 532 self.ctx.serializer)
533
534 - def intersection(self, other):
535 """ 536 Return the intersection of this RDD and another one. The output will 537 not contain any duplicate elements, even if the input RDDs did. 538 539 Note that this method performs a shuffle internally. 540 541 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 542 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 543 >>> rdd1.intersection(rdd2).collect() 544 [1, 2, 3] 545 """ 546 return self.map(lambda v: (v, None)) \ 547 .cogroup(other.map(lambda v: (v, None))) \ 548 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ 549 .keys()
550
551 - def _reserialize(self, serializer=None):
552 serializer = serializer or self.ctx.serializer 553 if self._jrdd_deserializer == serializer: 554 return self 555 else: 556 converted = self.map(lambda x: x, preservesPartitioning=True) 557 converted._jrdd_deserializer = serializer 558 return converted
559
560 - def __add__(self, other):
561 """ 562 Return the union of this RDD and another one. 563 564 >>> rdd = sc.parallelize([1, 1, 2, 3]) 565 >>> (rdd + rdd).collect() 566 [1, 1, 2, 3, 1, 1, 2, 3] 567 """ 568 if not isinstance(other, RDD): 569 raise TypeError 570 return self.union(other)
571
572 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
573 """ 574 Sorts this RDD, which is assumed to consist of (key, value) pairs. 575 # noqa 576 577 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 578 >>> sc.parallelize(tmp).sortByKey().first() 579 ('1', 3) 580 >>> sc.parallelize(tmp).sortByKey(True, 1).collect() 581 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 582 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 583 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 584 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 585 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 586 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 587 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)] 588 """ 589 if numPartitions is None: 590 numPartitions = self._defaultReducePartitions() 591 592 def sortPartition(iterator): 593 return iter(sorted(iterator, key=lambda (k, v): keyfunc(k), reverse=not ascending))
594 595 if numPartitions == 1: 596 if self.getNumPartitions() > 1: 597 self = self.coalesce(1) 598 return self.mapPartitions(sortPartition) 599 600 # first compute the boundary of each part via sampling: we want to partition 601 # the key-space into bins such that the bins have roughly the same 602 # number of (key, value) pairs falling into them 603 rddSize = self.count() 604 maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner 605 fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 606 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 607 samples = sorted(samples, reverse=(not ascending), key=keyfunc) 608 609 # we have numPartitions many parts but one of the them has 610 # an implicit boundary 611 bounds = [samples[len(samples) * (i + 1) / numPartitions] 612 for i in range(0, numPartitions - 1)] 613 614 def rangePartitioner(k): 615 p = bisect.bisect_left(bounds, keyfunc(k)) 616 if ascending: 617 return p 618 else: 619 return numPartitions - 1 - p 620 621 return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True) 622
623 - def sortBy(self, keyfunc, ascending=True, numPartitions=None):
624 """ 625 Sorts this RDD by the given keyfunc 626 627 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 628 >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() 629 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 630 >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect() 631 [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 632 """ 633 return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
634
635 - def glom(self):
636 """ 637 Return an RDD created by coalescing all elements within each partition 638 into a list. 639 640 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 641 >>> sorted(rdd.glom().collect()) 642 [[1, 2], [3, 4]] 643 """ 644 def func(iterator): 645 yield list(iterator)
646 return self.mapPartitions(func) 647
648 - def cartesian(self, other):
649 """ 650 Return the Cartesian product of this RDD and another one, that is, the 651 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 652 C{b} is in C{other}. 653 654 >>> rdd = sc.parallelize([1, 2]) 655 >>> sorted(rdd.cartesian(rdd).collect()) 656 [(1, 1), (1, 2), (2, 1), (2, 2)] 657 """ 658 # Due to batching, we can't use the Java cartesian method. 659 deserializer = CartesianDeserializer(self._jrdd_deserializer, 660 other._jrdd_deserializer) 661 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
662
663 - def groupBy(self, f, numPartitions=None):
664 """ 665 Return an RDD of grouped items. 666 667 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 668 >>> result = rdd.groupBy(lambda x: x % 2).collect() 669 >>> sorted([(x, sorted(y)) for (x, y) in result]) 670 [(0, [2, 8]), (1, [1, 1, 3, 5])] 671 """ 672 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
673
674 - def pipe(self, command, env={}):
675 """ 676 Return an RDD created by piping elements to a forked external process. 677 678 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 679 ['1', '2', '', '3'] 680 """ 681 def func(iterator): 682 pipe = Popen( 683 shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 684 685 def pipe_objs(out): 686 for obj in iterator: 687 out.write(str(obj).rstrip('\n') + '\n') 688 out.close()
689 Thread(target=pipe_objs, args=[pipe.stdin]).start() 690 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) 691 return self.mapPartitions(func) 692
693 - def foreach(self, f):
694 """ 695 Applies a function to all elements of this RDD. 696 697 >>> def f(x): print x 698 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 699 """ 700 def processPartition(iterator): 701 for x in iterator: 702 f(x) 703 yield None
704 self.mapPartitions(processPartition).collect() # Force evaluation 705
706 - def foreachPartition(self, f):
707 """ 708 Applies a function to each partition of this RDD. 709 710 >>> def f(iterator): 711 ... for x in iterator: 712 ... print x 713 ... yield None 714 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 715 """ 716 self.mapPartitions(f).collect() # Force evaluation
717
718 - def collect(self):
719 """ 720 Return a list that contains all of the elements in this RDD. 721 """ 722 with _JavaStackTrace(self.context) as st: 723 bytesInJava = self._jrdd.collect().iterator() 724 return list(self._collect_iterator_through_file(bytesInJava))
725
726 - def _collect_iterator_through_file(self, iterator):
727 # Transferring lots of data through Py4J can be slow because 728 # socket.readline() is inefficient. Instead, we'll dump the data to a 729 # file and read it back. 730 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 731 tempFile.close() 732 self.ctx._writeToFile(iterator, tempFile.name) 733 # Read the data into Python and deserialize it: 734 with open(tempFile.name, 'rb') as tempFile: 735 for item in self._jrdd_deserializer.load_stream(tempFile): 736 yield item 737 os.unlink(tempFile.name)
738
739 - def reduce(self, f):
740 """ 741 Reduces the elements of this RDD using the specified commutative and 742 associative binary operator. Currently reduces partitions locally. 743 744 >>> from operator import add 745 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 746 15 747 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 748 10 749 """ 750 def func(iterator): 751 acc = None 752 for obj in iterator: 753 if acc is None: 754 acc = obj 755 else: 756 acc = f(obj, acc) 757 if acc is not None: 758 yield acc
759 vals = self.mapPartitions(func).collect() 760 return reduce(f, vals) 761
762 - def fold(self, zeroValue, op):
763 """ 764 Aggregate the elements of each partition, and then the results for all 765 the partitions, using a given associative function and a neutral "zero 766 value." 767 768 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 769 as its result value to avoid object allocation; however, it should not 770 modify C{t2}. 771 772 >>> from operator import add 773 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 774 15 775 """ 776 def func(iterator): 777 acc = zeroValue 778 for obj in iterator: 779 acc = op(obj, acc) 780 yield acc
781 vals = self.mapPartitions(func).collect() 782 return reduce(op, vals, zeroValue) 783
784 - def aggregate(self, zeroValue, seqOp, combOp):
785 """ 786 Aggregate the elements of each partition, and then the results for all 787 the partitions, using a given combine functions and a neutral "zero 788 value." 789 790 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 791 as its result value to avoid object allocation; however, it should not 792 modify C{t2}. 793 794 The first function (seqOp) can return a different result type, U, than 795 the type of this RDD. Thus, we need one operation for merging a T into 796 an U and one operation for merging two U 797 798 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 799 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 800 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 801 (10, 4) 802 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 803 (0, 0) 804 """ 805 def func(iterator): 806 acc = zeroValue 807 for obj in iterator: 808 acc = seqOp(acc, obj) 809 yield acc
810 811 return self.mapPartitions(func).fold(zeroValue, combOp) 812
813 - def max(self):
814 """ 815 Find the maximum item in this RDD. 816 817 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 818 43.0 819 """ 820 return self.reduce(max)
821
822 - def min(self):
823 """ 824 Find the minimum item in this RDD. 825 826 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 827 1.0 828 """ 829 return self.reduce(min)
830
831 - def sum(self):
832 """ 833 Add up the elements in this RDD. 834 835 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 836 6.0 837 """ 838 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
839
840 - def count(self):
841 """ 842 Return the number of elements in this RDD. 843 844 >>> sc.parallelize([2, 3, 4]).count() 845 3 846 """ 847 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
848
849 - def stats(self):
850 """ 851 Return a L{StatCounter} object that captures the mean, variance 852 and count of the RDD's elements in one operation. 853 """ 854 def redFunc(left_counter, right_counter): 855 return left_counter.mergeStats(right_counter)
856 857 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 858
859 - def histogram(self, buckets):
860 """ 861 Compute a histogram using the provided buckets. The buckets 862 are all open to the right except for the last which is closed. 863 e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], 864 which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 865 and 50 we would have a histogram of 1,0,1. 866 867 If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), 868 this can be switched from an O(log n) inseration to O(1) per 869 element(where n = # buckets). 870 871 Buckets must be sorted and not contain any duplicates, must be 872 at least two elements. 873 874 If `buckets` is a number, it will generates buckets which are 875 evenly spaced between the minimum and maximum of the RDD. For 876 example, if the min value is 0 and the max is 100, given buckets 877 as 2, the resulting buckets will be [0,50) [50,100]. buckets must 878 be at least 1 If the RDD contains infinity, NaN throws an exception 879 If the elements in RDD do not vary (max == min) always returns 880 a single bucket. 881 882 It will return an tuple of buckets and histogram. 883 884 >>> rdd = sc.parallelize(range(51)) 885 >>> rdd.histogram(2) 886 ([0, 25, 50], [25, 26]) 887 >>> rdd.histogram([0, 5, 25, 50]) 888 ([0, 5, 25, 50], [5, 20, 26]) 889 >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets 890 ([0, 15, 30, 45, 60], [15, 15, 15, 6]) 891 >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) 892 >>> rdd.histogram(("a", "b", "c")) 893 (('a', 'b', 'c'), [2, 2]) 894 """ 895 896 if isinstance(buckets, (int, long)): 897 if buckets < 1: 898 raise ValueError("number of buckets must be >= 1") 899 900 # filter out non-comparable elements 901 def comparable(x): 902 if x is None: 903 return False 904 if type(x) is float and isnan(x): 905 return False 906 return True
907 908 filtered = self.filter(comparable) 909 910 # faster than stats() 911 def minmax(a, b): 912 return min(a[0], b[0]), max(a[1], b[1]) 913 try: 914 minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax) 915 except TypeError as e: 916 if " empty " in str(e): 917 raise ValueError("can not generate buckets from empty RDD") 918 raise 919 920 if minv == maxv or buckets == 1: 921 return [minv, maxv], [filtered.count()] 922 923 try: 924 inc = (maxv - minv) / buckets 925 except TypeError: 926 raise TypeError("Can not generate buckets with non-number in RDD") 927 928 if isinf(inc): 929 raise ValueError("Can not generate buckets with infinite value") 930 931 # keep them as integer if possible 932 if inc * buckets != maxv - minv: 933 inc = (maxv - minv) * 1.0 / buckets 934 935 buckets = [i * inc + minv for i in range(buckets)] 936 buckets.append(maxv) # fix accumulated error 937 even = True 938 939 elif isinstance(buckets, (list, tuple)): 940 if len(buckets) < 2: 941 raise ValueError("buckets should have more than one value") 942 943 if any(i is None or isinstance(i, float) and isnan(i) for i in buckets): 944 raise ValueError("can not have None or NaN in buckets") 945 946 if sorted(buckets) != list(buckets): 947 raise ValueError("buckets should be sorted") 948 949 if len(set(buckets)) != len(buckets): 950 raise ValueError("buckets should not contain duplicated values") 951 952 minv = buckets[0] 953 maxv = buckets[-1] 954 even = False 955 inc = None 956 try: 957 steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)] 958 except TypeError: 959 pass # objects in buckets do not support '-' 960 else: 961 if max(steps) - min(steps) < 1e-10: # handle precision errors 962 even = True 963 inc = (maxv - minv) / (len(buckets) - 1) 964 965 else: 966 raise TypeError("buckets should be a list or tuple or number(int or long)") 967 968 def histogram(iterator): 969 counters = [0] * len(buckets) 970 for i in iterator: 971 if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv: 972 continue 973 t = (int((i - minv) / inc) if even 974 else bisect.bisect_right(buckets, i) - 1) 975 counters[t] += 1 976 # add last two together 977 last = counters.pop() 978 counters[-1] += last 979 return [counters] 980 981 def mergeCounters(a, b): 982 return [i + j for i, j in zip(a, b)] 983 984 return buckets, self.mapPartitions(histogram).reduce(mergeCounters) 985
986 - def mean(self):
987 """ 988 Compute the mean of this RDD's elements. 989 990 >>> sc.parallelize([1, 2, 3]).mean() 991 2.0 992 """ 993 return self.stats().mean()
994
995 - def variance(self):
996 """ 997 Compute the variance of this RDD's elements. 998 999 >>> sc.parallelize([1, 2, 3]).variance() 1000 0.666... 1001 """ 1002 return self.stats().variance()
1003
1004 - def stdev(self):
1005 """ 1006 Compute the standard deviation of this RDD's elements. 1007 1008 >>> sc.parallelize([1, 2, 3]).stdev() 1009 0.816... 1010 """ 1011 return self.stats().stdev()
1012
1013 - def sampleStdev(self):
1014 """ 1015 Compute the sample standard deviation of this RDD's elements (which 1016 corrects for bias in estimating the standard deviation by dividing by 1017 N-1 instead of N). 1018 1019 >>> sc.parallelize([1, 2, 3]).sampleStdev() 1020 1.0 1021 """ 1022 return self.stats().sampleStdev()
1023
1024 - def sampleVariance(self):
1025 """ 1026 Compute the sample variance of this RDD's elements (which corrects 1027 for bias in estimating the variance by dividing by N-1 instead of N). 1028 1029 >>> sc.parallelize([1, 2, 3]).sampleVariance() 1030 1.0 1031 """ 1032 return self.stats().sampleVariance()
1033
1034 - def countByValue(self):
1035 """ 1036 Return the count of each unique value in this RDD as a dictionary of 1037 (value, count) pairs. 1038 1039 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 1040 [(1, 2), (2, 3)] 1041 """ 1042 def countPartition(iterator): 1043 counts = defaultdict(int) 1044 for obj in iterator: 1045 counts[obj] += 1 1046 yield counts
1047 1048 def mergeMaps(m1, m2): 1049 for (k, v) in m2.iteritems(): 1050 m1[k] += v 1051 return m1 1052 return self.mapPartitions(countPartition).reduce(mergeMaps) 1053
1054 - def top(self, num):
1055 """ 1056 Get the top N elements from a RDD. 1057 1058 Note: It returns the list sorted in descending order. 1059 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 1060 [12] 1061 >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) 1062 [6, 5] 1063 """ 1064 def topIterator(iterator): 1065 q = [] 1066 for k in iterator: 1067 if len(q) < num: 1068 heapq.heappush(q, k) 1069 else: 1070 heapq.heappushpop(q, k) 1071 yield q
1072 1073 def merge(a, b): 1074 return next(topIterator(a + b)) 1075 1076 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 1077
1078 - def takeOrdered(self, num, key=None):
1079 """ 1080 Get the N elements from a RDD ordered in ascending order or as 1081 specified by the optional key function. 1082 1083 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 1084 [1, 2, 3, 4, 5, 6] 1085 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 1086 [10, 9, 7, 6, 5, 4] 1087 """ 1088 1089 def topNKeyedElems(iterator, key_=None): 1090 q = MaxHeapQ(num) 1091 for k in iterator: 1092 if key_ is not None: 1093 k = (key_(k), k) 1094 q.insert(k) 1095 yield q.getElements()
1096 1097 def unKey(x, key_=None): 1098 if key_ is not None: 1099 x = [i[1] for i in x] 1100 return x 1101 1102 def merge(a, b): 1103 return next(topNKeyedElems(a + b)) 1104 result = self.mapPartitions( 1105 lambda i: topNKeyedElems(i, key)).reduce(merge) 1106 return sorted(unKey(result, key), key=key) 1107
1108 - def take(self, num):
1109 """ 1110 Take the first num elements of the RDD. 1111 1112 It works by first scanning one partition, and use the results from 1113 that partition to estimate the number of additional partitions needed 1114 to satisfy the limit. 1115 1116 Translated from the Scala implementation in RDD#take(). 1117 1118 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 1119 [2, 3] 1120 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 1121 [2, 3, 4, 5, 6] 1122 >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3) 1123 [91, 92, 93] 1124 """ 1125 items = [] 1126 totalParts = self._jrdd.partitions().size() 1127 partsScanned = 0 1128 1129 while len(items) < num and partsScanned < totalParts: 1130 # The number of partitions to try in this iteration. 1131 # It is ok for this number to be greater than totalParts because 1132 # we actually cap it at totalParts in runJob. 1133 numPartsToTry = 1 1134 if partsScanned > 0: 1135 # If we didn't find any rows after the previous iteration, 1136 # quadruple and retry. Otherwise, interpolate the number of 1137 # partitions we need to try, but overestimate it by 50%. 1138 if len(items) == 0: 1139 numPartsToTry = partsScanned * 4 1140 else: 1141 numPartsToTry = int(1.5 * num * partsScanned / len(items)) 1142 1143 left = num - len(items) 1144 1145 def takeUpToNumLeft(iterator): 1146 taken = 0 1147 while taken < left: 1148 yield next(iterator) 1149 taken += 1
1150 1151 p = range( 1152 partsScanned, min(partsScanned + numPartsToTry, totalParts)) 1153 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1154 1155 items += res 1156 partsScanned += numPartsToTry 1157 1158 return items[:num] 1159
1160 - def first(self):
1161 """ 1162 Return the first element in this RDD. 1163 1164 >>> sc.parallelize([2, 3, 4]).first() 1165 2 1166 """ 1167 return self.take(1)[0]
1168
1169 - def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
1170 """ 1171 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1172 system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are 1173 converted for output using either user specified converters or, by default, 1174 L{org.apache.spark.api.python.JavaToWritableConverter}. 1175 1176 @param conf: Hadoop job configuration, passed in as a dict 1177 @param keyConverter: (None by default) 1178 @param valueConverter: (None by default) 1179 """ 1180 jconf = self.ctx._dictToJavaMap(conf) 1181 pickledRDD = self._toPickleSerialization() 1182 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1183 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, 1184 keyConverter, valueConverter, True)
1185
1186 - def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 1187 keyConverter=None, valueConverter=None, conf=None):
1188 """ 1189 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1190 system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types 1191 will be inferred if not specified. Keys and values are converted for output using either 1192 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 1193 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 1194 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 1195 1196 @param path: path to Hadoop file 1197 @param outputFormatClass: fully qualified classname of Hadoop OutputFormat 1198 (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") 1199 @param keyClass: fully qualified classname of key Writable class 1200 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 1201 @param valueClass: fully qualified classname of value Writable class 1202 (e.g. "org.apache.hadoop.io.Text", None by default) 1203 @param keyConverter: (None by default) 1204 @param valueConverter: (None by default) 1205 @param conf: Hadoop job configuration, passed in as a dict (None by default) 1206 """ 1207 jconf = self.ctx._dictToJavaMap(conf) 1208 pickledRDD = self._toPickleSerialization() 1209 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1210 self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path, 1211 outputFormatClass, 1212 keyClass, valueClass, 1213 keyConverter, valueConverter, jconf)
1214
1215 - def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
1216 """ 1217 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1218 system, using the old Hadoop OutputFormat API (mapred package). Keys/values are 1219 converted for output using either user specified converters or, by default, 1220 L{org.apache.spark.api.python.JavaToWritableConverter}. 1221 1222 @param conf: Hadoop job configuration, passed in as a dict 1223 @param keyConverter: (None by default) 1224 @param valueConverter: (None by default) 1225 """ 1226 jconf = self.ctx._dictToJavaMap(conf) 1227 pickledRDD = self._toPickleSerialization() 1228 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1229 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, 1230 keyConverter, valueConverter, False)
1231
1232 - def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 1233 keyConverter=None, valueConverter=None, conf=None, 1234 compressionCodecClass=None):
1235 """ 1236 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1237 system, using the old Hadoop OutputFormat API (mapred package). Key and value types 1238 will be inferred if not specified. Keys and values are converted for output using either 1239 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 1240 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 1241 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 1242 1243 @param path: path to Hadoop file 1244 @param outputFormatClass: fully qualified classname of Hadoop OutputFormat 1245 (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat") 1246 @param keyClass: fully qualified classname of key Writable class 1247 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 1248 @param valueClass: fully qualified classname of value Writable class 1249 (e.g. "org.apache.hadoop.io.Text", None by default) 1250 @param keyConverter: (None by default) 1251 @param valueConverter: (None by default) 1252 @param conf: (None by default) 1253 @param compressionCodecClass: (None by default) 1254 """ 1255 jconf = self.ctx._dictToJavaMap(conf) 1256 pickledRDD = self._toPickleSerialization() 1257 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1258 self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path, 1259 outputFormatClass, 1260 keyClass, valueClass, 1261 keyConverter, valueConverter, 1262 jconf, compressionCodecClass)
1263
1264 - def saveAsSequenceFile(self, path, compressionCodecClass=None):
1265 """ 1266 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 1267 system, using the L{org.apache.hadoop.io.Writable} types that we convert from the 1268 RDD's key and value types. The mechanism is as follows: 1269 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. 1270 2. Keys and values of this Java RDD are converted to Writables and written out. 1271 1272 @param path: path to sequence file 1273 @param compressionCodecClass: (None by default) 1274 """ 1275 pickledRDD = self._toPickleSerialization() 1276 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 1277 self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched, 1278 path, compressionCodecClass)
1279
1280 - def saveAsPickleFile(self, path, batchSize=10):
1281 """ 1282 Save this RDD as a SequenceFile of serialized objects. The serializer 1283 used is L{pyspark.serializers.PickleSerializer}, default batch size 1284 is 10. 1285 1286 >>> tmpFile = NamedTemporaryFile(delete=True) 1287 >>> tmpFile.close() 1288 >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) 1289 >>> sorted(sc.pickleFile(tmpFile.name, 5).collect()) 1290 [1, 2, 'rdd', 'spark'] 1291 """ 1292 self._reserialize(BatchedSerializer(PickleSerializer(), 1293 batchSize))._jrdd.saveAsObjectFile(path)
1294
1295 - def saveAsTextFile(self, path):
1296 """ 1297 Save this RDD as a text file, using string representations of elements. 1298 1299 >>> tempFile = NamedTemporaryFile(delete=True) 1300 >>> tempFile.close() 1301 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 1302 >>> from fileinput import input 1303 >>> from glob import glob 1304 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 1305 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 1306 1307 Empty lines are tolerated when saving to text files. 1308 1309 >>> tempFile2 = NamedTemporaryFile(delete=True) 1310 >>> tempFile2.close() 1311 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 1312 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 1313 '\\n\\n\\nbar\\nfoo\\n' 1314 """ 1315 def func(split, iterator): 1316 for x in iterator: 1317 if not isinstance(x, basestring): 1318 x = unicode(x) 1319 if isinstance(x, unicode): 1320 x = x.encode("utf-8") 1321 yield x
1322 keyed = self.mapPartitionsWithIndex(func) 1323 keyed._bypass_serializer = True 1324 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 1325 1326 # Pair functions 1327
1328 - def collectAsMap(self):
1329 """ 1330 Return the key-value pairs in this RDD to the master as a dictionary. 1331 1332 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 1333 >>> m[1] 1334 2 1335 >>> m[3] 1336 4 1337 """ 1338 return dict(self.collect())
1339
1340 - def keys(self):
1341 """ 1342 Return an RDD with the keys of each tuple. 1343 1344 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 1345 >>> m.collect() 1346 [1, 3] 1347 """ 1348 return self.map(lambda (k, v): k)
1349
1350 - def values(self):
1351 """ 1352 Return an RDD with the values of each tuple. 1353 1354 >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 1355 >>> m.collect() 1356 [2, 4] 1357 """ 1358 return self.map(lambda (k, v): v)
1359
1360 - def reduceByKey(self, func, numPartitions=None):
1361 """ 1362 Merge the values for each key using an associative reduce function. 1363 1364 This will also perform the merging locally on each mapper before 1365 sending results to a reducer, similarly to a "combiner" in MapReduce. 1366 1367 Output will be hash-partitioned with C{numPartitions} partitions, or 1368 the default parallelism level if C{numPartitions} is not specified. 1369 1370 >>> from operator import add 1371 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1372 >>> sorted(rdd.reduceByKey(add).collect()) 1373 [('a', 2), ('b', 1)] 1374 """ 1375 return self.combineByKey(lambda x: x, func, func, numPartitions)
1376
1377 - def reduceByKeyLocally(self, func):
1378 """ 1379 Merge the values for each key using an associative reduce function, but 1380 return the results immediately to the master as a dictionary. 1381 1382 This will also perform the merging locally on each mapper before 1383 sending results to a reducer, similarly to a "combiner" in MapReduce. 1384 1385 >>> from operator import add 1386 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1387 >>> sorted(rdd.reduceByKeyLocally(add).items()) 1388 [('a', 2), ('b', 1)] 1389 """ 1390 def reducePartition(iterator): 1391 m = {} 1392 for (k, v) in iterator: 1393 m[k] = v if k not in m else func(m[k], v) 1394 yield m
1395 1396 def mergeMaps(m1, m2): 1397 for (k, v) in m2.iteritems(): 1398 m1[k] = v if k not in m1 else func(m1[k], v) 1399 return m1 1400 return self.mapPartitions(reducePartition).reduce(mergeMaps) 1401
1402 - def countByKey(self):
1403 """ 1404 Count the number of elements for each key, and return the result to the 1405 master as a dictionary. 1406 1407 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1408 >>> sorted(rdd.countByKey().items()) 1409 [('a', 2), ('b', 1)] 1410 """ 1411 return self.map(lambda x: x[0]).countByValue()
1412
1413 - def join(self, other, numPartitions=None):
1414 """ 1415 Return an RDD containing all pairs of elements with matching keys in 1416 C{self} and C{other}. 1417 1418 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 1419 (k, v1) is in C{self} and (k, v2) is in C{other}. 1420 1421 Performs a hash join across the cluster. 1422 1423 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1424 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 1425 >>> sorted(x.join(y).collect()) 1426 [('a', (1, 2)), ('a', (1, 3))] 1427 """ 1428 return python_join(self, other, numPartitions)
1429
1430 - def leftOuterJoin(self, other, numPartitions=None):
1431 """ 1432 Perform a left outer join of C{self} and C{other}. 1433 1434 For each element (k, v) in C{self}, the resulting RDD will either 1435 contain all pairs (k, (v, w)) for w in C{other}, or the pair 1436 (k, (v, None)) if no elements in other have key k. 1437 1438 Hash-partitions the resulting RDD into the given number of partitions. 1439 1440 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1441 >>> y = sc.parallelize([("a", 2)]) 1442 >>> sorted(x.leftOuterJoin(y).collect()) 1443 [('a', (1, 2)), ('b', (4, None))] 1444 """ 1445 return python_left_outer_join(self, other, numPartitions)
1446
1447 - def rightOuterJoin(self, other, numPartitions=None):
1448 """ 1449 Perform a right outer join of C{self} and C{other}. 1450 1451 For each element (k, w) in C{other}, the resulting RDD will either 1452 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 1453 if no elements in C{self} have key k. 1454 1455 Hash-partitions the resulting RDD into the given number of partitions. 1456 1457 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1458 >>> y = sc.parallelize([("a", 2)]) 1459 >>> sorted(y.rightOuterJoin(x).collect()) 1460 [('a', (2, 1)), ('b', (None, 4))] 1461 """ 1462 return python_right_outer_join(self, other, numPartitions)
1463 1464 # TODO: add option to control map-side combining 1465 # portable_hash is used as default, because builtin hash of None is different 1466 # cross machines.
1467 - def partitionBy(self, numPartitions, partitionFunc=portable_hash):
1468 """ 1469 Return a copy of the RDD partitioned using the specified partitioner. 1470 1471 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 1472 >>> sets = pairs.partitionBy(2).glom().collect() 1473 >>> set(sets[0]).intersection(set(sets[1])) 1474 set([]) 1475 """ 1476 if numPartitions is None: 1477 numPartitions = self._defaultReducePartitions() 1478 1479 # Transferring O(n) objects to Java is too expensive. 1480 # Instead, we'll form the hash buckets in Python, 1481 # transferring O(numPartitions) objects to Java. 1482 # Each object is a (splitNumber, [objects]) pair. 1483 # In order to avoid too huge objects, the objects are 1484 # grouped into chunks. 1485 outputSerializer = self.ctx._unbatched_serializer 1486 1487 limit = (_parse_memory(self.ctx._conf.get( 1488 "spark.python.worker.memory", "512m")) / 2) 1489 1490 def add_shuffle_key(split, iterator): 1491 1492 buckets = defaultdict(list) 1493 c, batch = 0, min(10 * numPartitions, 1000) 1494 1495 for (k, v) in iterator: 1496 buckets[partitionFunc(k) % numPartitions].append((k, v)) 1497 c += 1 1498 1499 # check used memory and avg size of chunk of objects 1500 if (c % 1000 == 0 and get_used_memory() > limit 1501 or c > batch): 1502 n, size = len(buckets), 0 1503 for split in buckets.keys(): 1504 yield pack_long(split) 1505 d = outputSerializer.dumps(buckets[split]) 1506 del buckets[split] 1507 yield d 1508 size += len(d) 1509 1510 avg = (size / n) >> 20 1511 # let 1M < avg < 10M 1512 if avg < 1: 1513 batch *= 1.5 1514 elif avg > 10: 1515 batch = max(batch / 1.5, 1) 1516 c = 0 1517 1518 for (split, items) in buckets.iteritems(): 1519 yield pack_long(split) 1520 yield outputSerializer.dumps(items)
1521 1522 keyed = self.mapPartitionsWithIndex(add_shuffle_key) 1523 keyed._bypass_serializer = True 1524 with _JavaStackTrace(self.context) as st: 1525 pairRDD = self.ctx._jvm.PairwiseRDD( 1526 keyed._jrdd.rdd()).asJavaPairRDD() 1527 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 1528 id(partitionFunc)) 1529 jrdd = pairRDD.partitionBy(partitioner).values() 1530 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 1531 # This is required so that id(partitionFunc) remains unique, 1532 # even if partitionFunc is a lambda: 1533 rdd._partitionFunc = partitionFunc 1534 return rdd 1535 1536 # TODO: add control over map-side aggregation
1537 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 1538 numPartitions=None):
1539 """ 1540 Generic function to combine the elements for each key using a custom 1541 set of aggregation functions. 1542 1543 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 1544 type" C. Note that V and C can be different -- for example, one might 1545 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 1546 1547 Users provide three functions: 1548 1549 - C{createCombiner}, which turns a V into a C (e.g., creates 1550 a one-element list) 1551 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 1552 a list) 1553 - C{mergeCombiners}, to combine two C's into a single one. 1554 1555 In addition, users can control the partitioning of the output RDD. 1556 1557 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1558 >>> def f(x): return x 1559 >>> def add(a, b): return a + str(b) 1560 >>> sorted(x.combineByKey(str, add, add).collect()) 1561 [('a', '11'), ('b', '1')] 1562 """ 1563 if numPartitions is None: 1564 numPartitions = self._defaultReducePartitions() 1565 1566 serializer = self.ctx.serializer 1567 spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() 1568 == 'true') 1569 memory = _parse_memory(self.ctx._conf.get( 1570 "spark.python.worker.memory", "512m")) 1571 agg = Aggregator(createCombiner, mergeValue, mergeCombiners) 1572 1573 def combineLocally(iterator): 1574 merger = ExternalMerger(agg, memory * 0.9, serializer) \ 1575 if spill else InMemoryMerger(agg) 1576 merger.mergeValues(iterator) 1577 return merger.iteritems()
1578 1579 locally_combined = self.mapPartitions(combineLocally) 1580 shuffled = locally_combined.partitionBy(numPartitions) 1581 1582 def _mergeCombiners(iterator): 1583 merger = ExternalMerger(agg, memory, serializer) \ 1584 if spill else InMemoryMerger(agg) 1585 merger.mergeCombiners(iterator) 1586 return merger.iteritems() 1587 1588 return shuffled.mapPartitions(_mergeCombiners) 1589
1590 - def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
1591 """ 1592 Aggregate the values of each key, using given combine functions and a neutral 1593 "zero value". This function can return a different result type, U, than the type 1594 of the values in this RDD, V. Thus, we need one operation for merging a V into 1595 a U and one operation for merging two U's, The former operation is used for merging 1596 values within a partition, and the latter is used for merging values between 1597 partitions. To avoid memory allocation, both of these functions are 1598 allowed to modify and return their first argument instead of creating a new U. 1599 """ 1600 def createZero(): 1601 return copy.deepcopy(zeroValue)
1602 1603 return self.combineByKey( 1604 lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) 1605
1606 - def foldByKey(self, zeroValue, func, numPartitions=None):
1607 """ 1608 Merge the values for each key using an associative function "func" 1609 and a neutral "zeroValue" which may be added to the result an 1610 arbitrary number of times, and must not change the result 1611 (e.g., 0 for addition, or 1 for multiplication.). 1612 1613 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1614 >>> from operator import add 1615 >>> rdd.foldByKey(0, add).collect() 1616 [('a', 2), ('b', 1)] 1617 """ 1618 def createZero(): 1619 return copy.deepcopy(zeroValue)
1620 1621 return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions) 1622 1623 # TODO: support variant with custom partitioner
1624 - def groupByKey(self, numPartitions=None):
1625 """ 1626 Group the values for each key in the RDD into a single sequence. 1627 Hash-partitions the resulting RDD with into numPartitions partitions. 1628 1629 Note: If you are grouping in order to perform an aggregation (such as a 1630 sum or average) over each key, using reduceByKey will provide much 1631 better performance. 1632 1633 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1634 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) 1635 [('a', [1, 1]), ('b', [1])] 1636 """ 1637 1638 def createCombiner(x): 1639 return [x]
1640 1641 def mergeValue(xs, x): 1642 xs.append(x) 1643 return xs 1644 1645 def mergeCombiners(a, b): 1646 a.extend(b) 1647 return a 1648 1649 return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 1650 numPartitions).mapValues(lambda x: ResultIterable(x)) 1651 1652 # TODO: add tests
1653 - def flatMapValues(self, f):
1654 """ 1655 Pass each value in the key-value pair RDD through a flatMap function 1656 without changing the keys; this also retains the original RDD's 1657 partitioning. 1658 1659 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 1660 >>> def f(x): return x 1661 >>> x.flatMapValues(f).collect() 1662 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 1663 """ 1664 flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 1665 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1666
1667 - def mapValues(self, f):
1668 """ 1669 Pass each value in the key-value pair RDD through a map function 1670 without changing the keys; this also retains the original RDD's 1671 partitioning. 1672 1673 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 1674 >>> def f(x): return len(x) 1675 >>> x.mapValues(f).collect() 1676 [('a', 3), ('b', 1)] 1677 """ 1678 map_values_fn = lambda (k, v): (k, f(v)) 1679 return self.map(map_values_fn, preservesPartitioning=True)
1680
1681 - def groupWith(self, other, *others):
1682 """ 1683 Alias for cogroup but with support for multiple RDDs. 1684 1685 >>> w = sc.parallelize([("a", 5), ("b", 6)]) 1686 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1687 >>> y = sc.parallelize([("a", 2)]) 1688 >>> z = sc.parallelize([("b", 42)]) 1689 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \ 1690 sorted(list(w.groupWith(x, y, z).collect()))) 1691 [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))] 1692 1693 """ 1694 return python_cogroup((self, other) + others, numPartitions=None)
1695 1696 # TODO: add variant with custom parittioner
1697 - def cogroup(self, other, numPartitions=None):
1698 """ 1699 For each key k in C{self} or C{other}, return a resulting RDD that 1700 contains a tuple with the list of values for that key in C{self} as 1701 well as C{other}. 1702 1703 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1704 >>> y = sc.parallelize([("a", 2)]) 1705 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) 1706 [('a', ([1], [2])), ('b', ([4], []))] 1707 """ 1708 return python_cogroup((self, other), numPartitions)
1709
1710 - def sampleByKey(self, withReplacement, fractions, seed=None):
1711 """ 1712 Return a subset of this RDD sampled by key (via stratified sampling). 1713 Create a sample of this RDD using variable sampling rates for 1714 different keys as specified by fractions, a key to sampling rate map. 1715 1716 >>> fractions = {"a": 0.2, "b": 0.1} 1717 >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000))) 1718 >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect()) 1719 >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150 1720 True 1721 >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0 1722 True 1723 >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0 1724 True 1725 """ 1726 for fraction in fractions.values(): 1727 assert fraction >= 0.0, "Negative fraction value: %s" % fraction 1728 return self.mapPartitionsWithIndex( 1729 RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
1730
1731 - def subtractByKey(self, other, numPartitions=None):
1732 """ 1733 Return each (key, value) pair in C{self} that has no pair with matching 1734 key in C{other}. 1735 1736 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 1737 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1738 >>> sorted(x.subtractByKey(y).collect()) 1739 [('b', 4), ('b', 5)] 1740 """ 1741 def filter_func((key, vals)): 1742 return len(vals[0]) > 0 and len(vals[1]) == 0
1743 map_func = lambda (key, vals): [(key, val) for val in vals[0]] 1744 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) 1745
1746 - def subtract(self, other, numPartitions=None):
1747 """ 1748 Return each value in C{self} that is not contained in C{other}. 1749 1750 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 1751 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1752 >>> sorted(x.subtract(y).collect()) 1753 [('a', 1), ('b', 4), ('b', 5)] 1754 """ 1755 # note: here 'True' is just a placeholder 1756 rdd = other.map(lambda x: (x, True)) 1757 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
1758
1759 - def keyBy(self, f):
1760 """ 1761 Creates tuples of the elements in this RDD by applying C{f}. 1762 1763 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 1764 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 1765 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) 1766 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 1767 """ 1768 return self.map(lambda x: (f(x), x))
1769
1770 - def repartition(self, numPartitions):
1771 """ 1772 Return a new RDD that has exactly numPartitions partitions. 1773 1774 Can increase or decrease the level of parallelism in this RDD. 1775 Internally, this uses a shuffle to redistribute data. 1776 If you are decreasing the number of partitions in this RDD, consider 1777 using `coalesce`, which can avoid performing a shuffle. 1778 1779 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 1780 >>> sorted(rdd.glom().collect()) 1781 [[1], [2, 3], [4, 5], [6, 7]] 1782 >>> len(rdd.repartition(2).glom().collect()) 1783 2 1784 >>> len(rdd.repartition(10).glom().collect()) 1785 10 1786 """ 1787 jrdd = self._jrdd.repartition(numPartitions) 1788 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1789
1790 - def coalesce(self, numPartitions, shuffle=False):
1791 """ 1792 Return a new RDD that is reduced into `numPartitions` partitions. 1793 1794 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 1795 [[1], [2, 3], [4, 5]] 1796 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 1797 [[1, 2, 3, 4, 5]] 1798 """ 1799 jrdd = self._jrdd.coalesce(numPartitions) 1800 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1801
1802 - def zip(self, other):
1803 """ 1804 Zips this RDD with another one, returning key-value pairs with the 1805 first element in each RDD second element in each RDD, etc. Assumes 1806 that the two RDDs have the same number of partitions and the same 1807 number of elements in each partition (e.g. one was made through 1808 a map on the other). 1809 1810 >>> x = sc.parallelize(range(0,5)) 1811 >>> y = sc.parallelize(range(1000, 1005)) 1812 >>> x.zip(y).collect() 1813 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 1814 """ 1815 if self.getNumPartitions() != other.getNumPartitions(): 1816 raise ValueError("Can only zip with RDD which has the same number of partitions") 1817 1818 def get_batch_size(ser): 1819 if isinstance(ser, BatchedSerializer): 1820 return ser.batchSize 1821 return 0
1822 1823 def batch_as(rdd, batchSize): 1824 ser = rdd._jrdd_deserializer 1825 if isinstance(ser, BatchedSerializer): 1826 ser = ser.serializer 1827 return rdd._reserialize(BatchedSerializer(ser, batchSize)) 1828 1829 my_batch = get_batch_size(self._jrdd_deserializer) 1830 other_batch = get_batch_size(other._jrdd_deserializer) 1831 if my_batch != other_batch: 1832 # use the greatest batchSize to batch the other one. 1833 if my_batch > other_batch: 1834 other = batch_as(other, my_batch) 1835 else: 1836 self = batch_as(self, other_batch) 1837 1838 # There will be an Exception in JVM if there are different number 1839 # of items in each partitions. 1840 pairRDD = self._jrdd.zip(other._jrdd) 1841 deserializer = PairDeserializer(self._jrdd_deserializer, 1842 other._jrdd_deserializer) 1843 return RDD(pairRDD, self.ctx, deserializer) 1844
1845 - def zipWithIndex(self):
1846 """ 1847 Zips this RDD with its element indices. 1848 1849 The ordering is first based on the partition index and then the 1850 ordering of items within each partition. So the first item in 1851 the first partition gets index 0, and the last item in the last 1852 partition receives the largest index. 1853 1854 This method needs to trigger a spark job when this RDD contains 1855 more than one partitions. 1856 1857 >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect() 1858 [('a', 0), ('b', 1), ('c', 2), ('d', 3)] 1859 """ 1860 starts = [0] 1861 if self.getNumPartitions() > 1: 1862 nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect() 1863 for i in range(len(nums) - 1): 1864 starts.append(starts[-1] + nums[i]) 1865 1866 def func(k, it): 1867 for i, v in enumerate(it, starts[k]): 1868 yield v, i
1869 1870 return self.mapPartitionsWithIndex(func) 1871
1872 - def zipWithUniqueId(self):
1873 """ 1874 Zips this RDD with generated unique Long ids. 1875 1876 Items in the kth partition will get ids k, n+k, 2*n+k, ..., where 1877 n is the number of partitions. So there may exist gaps, but this 1878 method won't trigger a spark job, which is different from 1879 L{zipWithIndex} 1880 1881 >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect() 1882 [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] 1883 """ 1884 n = self.getNumPartitions() 1885 1886 def func(k, it): 1887 for i, v in enumerate(it): 1888 yield v, i * n + k
1889 1890 return self.mapPartitionsWithIndex(func) 1891
1892 - def name(self):
1893 """ 1894 Return the name of this RDD. 1895 """ 1896 name_ = self._jrdd.name() 1897 if not name_: 1898 return None 1899 return name_.encode('utf-8')
1900
1901 - def setName(self, name):
1902 """ 1903 Assign a name to this RDD. 1904 1905 >>> rdd1 = sc.parallelize([1,2]) 1906 >>> rdd1.setName('RDD1') 1907 >>> rdd1.name() 1908 'RDD1' 1909 """ 1910 self._jrdd.setName(name)
1911
1912 - def toDebugString(self):
1913 """ 1914 A description of this RDD and its recursive dependencies for debugging. 1915 """ 1916 debug_string = self._jrdd.toDebugString() 1917 if not debug_string: 1918 return None 1919 return debug_string.encode('utf-8')
1920
1921 - def getStorageLevel(self):
1922 """ 1923 Get the RDD's current storage level. 1924 1925 >>> rdd1 = sc.parallelize([1,2]) 1926 >>> rdd1.getStorageLevel() 1927 StorageLevel(False, False, False, False, 1) 1928 >>> print(rdd1.getStorageLevel()) 1929 Serialized 1x Replicated 1930 """ 1931 java_storage_level = self._jrdd.getStorageLevel() 1932 storage_level = StorageLevel(java_storage_level.useDisk(), 1933 java_storage_level.useMemory(), 1934 java_storage_level.useOffHeap(), 1935 java_storage_level.deserialized(), 1936 java_storage_level.replication()) 1937 return storage_level
1938
1939 - def _defaultReducePartitions(self):
1940 """ 1941 Returns the default number of partitions to use during reduce tasks (e.g., groupBy). 1942 If spark.default.parallelism is set, then we'll use the value from SparkContext 1943 defaultParallelism, otherwise we'll use the number of partitions in this RDD. 1944 1945 This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce 1946 the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will 1947 be inherent. 1948 """ 1949 if self.ctx._conf.contains("spark.default.parallelism"): 1950 return self.ctx.defaultParallelism 1951 else: 1952 return self.getNumPartitions()
1953
1954 # TODO: `lookup` is disabled because we can't make direct comparisons based 1955 # on the key; we need to compare the hash of the key to the hash of the 1956 # keys in the pairs. This could be an expensive operation, since those 1957 # hashes aren't retained. 1958 1959 1960 -class PipelinedRDD(RDD):
1961 1962 """ 1963 Pipelined maps: 1964 1965 >>> rdd = sc.parallelize([1, 2, 3, 4]) 1966 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 1967 [4, 8, 12, 16] 1968 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 1969 [4, 8, 12, 16] 1970 1971 Pipelined reduces: 1972 >>> from operator import add 1973 >>> rdd.map(lambda x: 2 * x).reduce(add) 1974 20 1975 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 1976 20 1977 """ 1978
1979 - def __init__(self, prev, func, preservesPartitioning=False):
1980 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 1981 # This transformation is the first in its stage: 1982 self.func = func 1983 self.preservesPartitioning = preservesPartitioning 1984 self._prev_jrdd = prev._jrdd 1985 self._prev_jrdd_deserializer = prev._jrdd_deserializer 1986 else: 1987 prev_func = prev.func 1988 1989 def pipeline_func(split, iterator): 1990 return func(split, prev_func(split, iterator))
1991 self.func = pipeline_func 1992 self.preservesPartitioning = \ 1993 prev.preservesPartitioning and preservesPartitioning 1994 self._prev_jrdd = prev._prev_jrdd # maintain the pipeline 1995 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 1996 self.is_cached = False 1997 self.is_checkpointed = False 1998 self.ctx = prev.ctx 1999 self.prev = prev 2000 self._jrdd_val = None 2001 self._jrdd_deserializer = self.ctx.serializer 2002 self._bypass_serializer = False
2003 2004 @property
2005 - def _jrdd(self):
2006 if self._jrdd_val: 2007 return self._jrdd_val 2008 if self._bypass_serializer: 2009 self._jrdd_deserializer = NoOpSerializer() 2010 command = (self.func, self._prev_jrdd_deserializer, 2011 self._jrdd_deserializer) 2012 ser = CloudPickleSerializer() 2013 pickled_command = ser.dumps(command) 2014 broadcast_vars = ListConverter().convert( 2015 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 2016 self.ctx._gateway._gateway_client) 2017 self.ctx._pickled_broadcast_vars.clear() 2018 env = MapConverter().convert(self.ctx.environment, 2019 self.ctx._gateway._gateway_client) 2020 includes = ListConverter().convert(self.ctx._python_includes, 2021 self.ctx._gateway._gateway_client) 2022 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 2023 bytearray(pickled_command), 2024 env, includes, self.preservesPartitioning, 2025 self.ctx.pythonExec, 2026 broadcast_vars, self.ctx._javaAccumulator) 2027 self._jrdd_val = python_rdd.asJavaRDD() 2028 return self._jrdd_val
2029
2030 - def _is_pipelinable(self):
2031 return not (self.is_cached or self.is_checkpointed)
2032
2033 2034 -def _test():
2035 import doctest 2036 from pyspark.context import SparkContext 2037 globs = globals().copy() 2038 # The small batch size here ensures that we see multiple batches, 2039 # even in these small test examples: 2040 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 2041 (failure_count, test_count) = doctest.testmod( 2042 globs=globs, optionflags=doctest.ELLIPSIS) 2043 globs['sc'].stop() 2044 if failure_count: 2045 exit(-1)
2046 2047 2048 if __name__ == "__main__": 2049 _test() 2050