Package pyspark :: Module rdd
[frames] | no frames]

Source Code for Module pyspark.rdd

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one or more 
   3  # contributor license agreements.  See the NOTICE file distributed with 
   4  # this work for additional information regarding copyright ownership. 
   5  # The ASF licenses this file to You under the Apache License, Version 2.0 
   6  # (the "License"); you may not use this file except in compliance with 
   7  # the License.  You may obtain a copy of the License at 
   8  # 
   9  #    http://www.apache.org/licenses/LICENSE-2.0 
  10  # 
  11  # Unless required by applicable law or agreed to in writing, software 
  12  # distributed under the License is distributed on an "AS IS" BASIS, 
  13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14  # See the License for the specific language governing permissions and 
  15  # limitations under the License. 
  16  # 
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from collections import namedtuple 
  22  from itertools import chain, ifilter, imap 
  23  import operator 
  24  import os 
  25  import sys 
  26  import shlex 
  27  import traceback 
  28  from subprocess import Popen, PIPE 
  29  from tempfile import NamedTemporaryFile 
  30  from threading import Thread 
  31  import warnings 
  32  import heapq 
  33  from random import Random 
  34   
  35  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  36      BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long 
  37  from pyspark.join import python_join, python_left_outer_join, \ 
  38      python_right_outer_join, python_cogroup 
  39  from pyspark.statcounter import StatCounter 
  40  from pyspark.rddsampler import RDDSampler 
  41  from pyspark.storagelevel import StorageLevel 
  42  from pyspark.resultiterable import ResultIterable 
  43   
  44  from py4j.java_collections import ListConverter, MapConverter 
  45   
  46  __all__ = ["RDD"] 
47 48 49 # TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized 50 # hash for string 51 -def portable_hash(x):
52 """ 53 This function returns consistent hash code for builtin types, especially 54 for None and tuple with None. 55 56 The algrithm is similar to that one used by CPython 2.7 57 58 >>> portable_hash(None) 59 0 60 >>> portable_hash((None, 1)) 61 219750521 62 """ 63 if x is None: 64 return 0 65 if isinstance(x, tuple): 66 h = 0x345678 67 for i in x: 68 h ^= portable_hash(i) 69 h *= 1000003 70 h &= 0xffffffff 71 h ^= len(x) 72 if h == -1: 73 h = -2 74 return h 75 return hash(x)
76
77 78 -def _extract_concise_traceback():
79 """ 80 This function returns the traceback info for a callsite, returns a dict 81 with function name, file name and line number 82 """ 83 tb = traceback.extract_stack() 84 callsite = namedtuple("Callsite", "function file linenum") 85 if len(tb) == 0: 86 return None 87 file, line, module, what = tb[len(tb) - 1] 88 sparkpath = os.path.dirname(file) 89 first_spark_frame = len(tb) - 1 90 for i in range(0, len(tb)): 91 file, line, fun, what = tb[i] 92 if file.startswith(sparkpath): 93 first_spark_frame = i 94 break 95 if first_spark_frame == 0: 96 file, line, fun, what = tb[0] 97 return callsite(function=fun, file=file, linenum=line) 98 sfile, sline, sfun, swhat = tb[first_spark_frame] 99 ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 100 return callsite(function=sfun, file=ufile, linenum=uline)
101 102 _spark_stack_depth = 0
103 104 -class _JavaStackTrace(object):
105 - def __init__(self, sc):
106 tb = _extract_concise_traceback() 107 if tb is not None: 108 self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum) 109 else: 110 self._traceback = "Error! Could not extract traceback info" 111 self._context = sc
112
113 - def __enter__(self):
114 global _spark_stack_depth 115 if _spark_stack_depth == 0: 116 self._context._jsc.setCallSite(self._traceback) 117 _spark_stack_depth += 1
118
119 - def __exit__(self, type, value, tb):
120 global _spark_stack_depth 121 _spark_stack_depth -= 1 122 if _spark_stack_depth == 0: 123 self._context._jsc.setCallSite(None)
124
125 -class MaxHeapQ(object):
126 """ 127 An implementation of MaxHeap. 128 >>> import pyspark.rdd 129 >>> heap = pyspark.rdd.MaxHeapQ(5) 130 >>> [heap.insert(i) for i in range(10)] 131 [None, None, None, None, None, None, None, None, None, None] 132 >>> sorted(heap.getElements()) 133 [0, 1, 2, 3, 4] 134 >>> heap = pyspark.rdd.MaxHeapQ(5) 135 >>> [heap.insert(i) for i in range(9, -1, -1)] 136 [None, None, None, None, None, None, None, None, None, None] 137 >>> sorted(heap.getElements()) 138 [0, 1, 2, 3, 4] 139 >>> heap = pyspark.rdd.MaxHeapQ(1) 140 >>> [heap.insert(i) for i in range(9, -1, -1)] 141 [None, None, None, None, None, None, None, None, None, None] 142 >>> heap.getElements() 143 [0] 144 """ 145
146 - def __init__(self, maxsize):
147 # we start from q[1], this makes calculating children as trivial as 2 * k 148 self.q = [0] 149 self.maxsize = maxsize
150
151 - def _swim(self, k):
152 while (k > 1) and (self.q[k/2] < self.q[k]): 153 self._swap(k, k/2) 154 k = k/2
155
156 - def _swap(self, i, j):
157 t = self.q[i] 158 self.q[i] = self.q[j] 159 self.q[j] = t
160
161 - def _sink(self, k):
162 N = self.size() 163 while 2 * k <= N: 164 j = 2 * k 165 # Here we test if both children are greater than parent 166 # if not swap with larger one. 167 if j < N and self.q[j] < self.q[j + 1]: 168 j = j + 1 169 if(self.q[k] > self.q[j]): 170 break 171 self._swap(k, j) 172 k = j
173
174 - def size(self):
175 return len(self.q) - 1
176
177 - def insert(self, value):
178 if (self.size()) < self.maxsize: 179 self.q.append(value) 180 self._swim(self.size()) 181 else: 182 self._replaceRoot(value)
183
184 - def getElements(self):
185 return self.q[1:]
186
187 - def _replaceRoot(self, value):
188 if(self.q[1] > value): 189 self.q[1] = value 190 self._sink(1)
191
192 -class RDD(object):
193 """ 194 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 195 Represents an immutable, partitioned collection of elements that can be 196 operated on in parallel. 197 """ 198
199 - def __init__(self, jrdd, ctx, jrdd_deserializer):
200 self._jrdd = jrdd 201 self.is_cached = False 202 self.is_checkpointed = False 203 self.ctx = ctx 204 self._jrdd_deserializer = jrdd_deserializer 205 self._id = jrdd.id()
206
207 - def id(self):
208 """ 209 A unique ID for this RDD (within its SparkContext). 210 """ 211 return self._id
212
213 - def __repr__(self):
214 return self._jrdd.toString()
215 216 @property
217 - def context(self):
218 """ 219 The L{SparkContext} that this RDD was created on. 220 """ 221 return self.ctx
222
223 - def cache(self):
224 """ 225 Persist this RDD with the default storage level (C{MEMORY_ONLY}). 226 """ 227 self.is_cached = True 228 self._jrdd.cache() 229 return self
230
231 - def persist(self, storageLevel):
232 """ 233 Set this RDD's storage level to persist its values across operations after the first time 234 it is computed. This can only be used to assign a new storage level if the RDD does not 235 have a storage level set yet. 236 """ 237 self.is_cached = True 238 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 239 self._jrdd.persist(javaStorageLevel) 240 return self
241
242 - def unpersist(self):
243 """ 244 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 245 """ 246 self.is_cached = False 247 self._jrdd.unpersist() 248 return self
249
250 - def checkpoint(self):
251 """ 252 Mark this RDD for checkpointing. It will be saved to a file inside the 253 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 254 all references to its parent RDDs will be removed. This function must 255 be called before any job has been executed on this RDD. It is strongly 256 recommended that this RDD is persisted in memory, otherwise saving it 257 on a file will require recomputation. 258 """ 259 self.is_checkpointed = True 260 self._jrdd.rdd().checkpoint()
261
262 - def isCheckpointed(self):
263 """ 264 Return whether this RDD has been checkpointed or not 265 """ 266 return self._jrdd.rdd().isCheckpointed()
267
268 - def getCheckpointFile(self):
269 """ 270 Gets the name of the file to which this RDD was checkpointed 271 """ 272 checkpointFile = self._jrdd.rdd().getCheckpointFile() 273 if checkpointFile.isDefined(): 274 return checkpointFile.get() 275 else: 276 return None
277
278 - def map(self, f, preservesPartitioning=False):
279 """ 280 Return a new RDD by applying a function to each element of this RDD. 281 282 >>> rdd = sc.parallelize(["b", "a", "c"]) 283 >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 284 [('a', 1), ('b', 1), ('c', 1)] 285 """ 286 def func(split, iterator): return imap(f, iterator) 287 return PipelinedRDD(self, func, preservesPartitioning)
288
289 - def flatMap(self, f, preservesPartitioning=False):
290 """ 291 Return a new RDD by first applying a function to all elements of this 292 RDD, and then flattening the results. 293 294 >>> rdd = sc.parallelize([2, 3, 4]) 295 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 296 [1, 1, 1, 2, 2, 3] 297 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 298 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 299 """ 300 def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 301 return self.mapPartitionsWithIndex(func, preservesPartitioning)
302
303 - def mapPartitions(self, f, preservesPartitioning=False):
304 """ 305 Return a new RDD by applying a function to each partition of this RDD. 306 307 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 308 >>> def f(iterator): yield sum(iterator) 309 >>> rdd.mapPartitions(f).collect() 310 [3, 7] 311 """ 312 def func(s, iterator): return f(iterator) 313 return self.mapPartitionsWithIndex(func)
314
315 - def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
316 """ 317 Return a new RDD by applying a function to each partition of this RDD, 318 while tracking the index of the original partition. 319 320 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 321 >>> def f(splitIndex, iterator): yield splitIndex 322 >>> rdd.mapPartitionsWithIndex(f).sum() 323 6 324 """ 325 return PipelinedRDD(self, f, preservesPartitioning)
326
327 - def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
328 """ 329 Deprecated: use mapPartitionsWithIndex instead. 330 331 Return a new RDD by applying a function to each partition of this RDD, 332 while tracking the index of the original partition. 333 334 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 335 >>> def f(splitIndex, iterator): yield splitIndex 336 >>> rdd.mapPartitionsWithSplit(f).sum() 337 6 338 """ 339 warnings.warn("mapPartitionsWithSplit is deprecated; " 340 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 341 return self.mapPartitionsWithIndex(f, preservesPartitioning)
342
343 - def filter(self, f):
344 """ 345 Return a new RDD containing only the elements that satisfy a predicate. 346 347 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 348 >>> rdd.filter(lambda x: x % 2 == 0).collect() 349 [2, 4] 350 """ 351 def func(iterator): return ifilter(f, iterator) 352 return self.mapPartitions(func)
353
354 - def distinct(self):
355 """ 356 Return a new RDD containing the distinct elements in this RDD. 357 358 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 359 [1, 2, 3] 360 """ 361 return self.map(lambda x: (x, None)) \ 362 .reduceByKey(lambda x, _: x) \ 363 .map(lambda (x, _): x)
364
365 - def sample(self, withReplacement, fraction, seed=None):
366 """ 367 Return a sampled subset of this RDD (relies on numpy and falls back 368 on default random generator if numpy is unavailable). 369 370 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 371 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 372 """ 373 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction 374 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
375 376 # this is ported from scala/spark/RDD.scala
377 - def takeSample(self, withReplacement, num, seed=None):
378 """ 379 Return a fixed-size sampled subset of this RDD (currently requires numpy). 380 381 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 382 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 383 """ 384 385 fraction = 0.0 386 total = 0 387 multiplier = 3.0 388 initialCount = self.count() 389 maxSelected = 0 390 391 if (num < 0): 392 raise ValueError 393 394 if (initialCount == 0): 395 return list() 396 397 if initialCount > sys.maxint - 1: 398 maxSelected = sys.maxint - 1 399 else: 400 maxSelected = initialCount 401 402 if num > initialCount and not withReplacement: 403 total = maxSelected 404 fraction = multiplier * (maxSelected + 1) / initialCount 405 else: 406 fraction = multiplier * (num + 1) / initialCount 407 total = num 408 409 samples = self.sample(withReplacement, fraction, seed).collect() 410 411 # If the first sample didn't turn out large enough, keep trying to take samples; 412 # this shouldn't happen often because we use a big multiplier for their initial size. 413 # See: scala/spark/RDD.scala 414 rand = Random(seed) 415 while len(samples) < total: 416 samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect() 417 418 sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint)) 419 sampler.shuffle(samples) 420 return samples[0:total]
421
422 - def union(self, other):
423 """ 424 Return the union of this RDD and another one. 425 426 >>> rdd = sc.parallelize([1, 1, 2, 3]) 427 >>> rdd.union(rdd).collect() 428 [1, 1, 2, 3, 1, 1, 2, 3] 429 """ 430 if self._jrdd_deserializer == other._jrdd_deserializer: 431 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 432 self._jrdd_deserializer) 433 return rdd 434 else: 435 # These RDDs contain data in different serialized formats, so we 436 # must normalize them to the default serializer. 437 self_copy = self._reserialize() 438 other_copy = other._reserialize() 439 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 440 self.ctx.serializer)
441
442 - def intersection(self, other):
443 """ 444 Return the intersection of this RDD and another one. The output will not 445 contain any duplicate elements, even if the input RDDs did. 446 447 Note that this method performs a shuffle internally. 448 449 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 450 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 451 >>> rdd1.intersection(rdd2).collect() 452 [1, 2, 3] 453 """ 454 return self.map(lambda v: (v, None)) \ 455 .cogroup(other.map(lambda v: (v, None))) \ 456 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ 457 .keys()
458
459 - def _reserialize(self):
460 if self._jrdd_deserializer == self.ctx.serializer: 461 return self 462 else: 463 return self.map(lambda x: x, preservesPartitioning=True)
464
465 - def __add__(self, other):
466 """ 467 Return the union of this RDD and another one. 468 469 >>> rdd = sc.parallelize([1, 1, 2, 3]) 470 >>> (rdd + rdd).collect() 471 [1, 1, 2, 3, 1, 1, 2, 3] 472 """ 473 if not isinstance(other, RDD): 474 raise TypeError 475 return self.union(other)
476
477 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
478 """ 479 Sorts this RDD, which is assumed to consist of (key, value) pairs. 480 481 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 482 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 483 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 484 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 485 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 486 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 487 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 488 """ 489 if numPartitions is None: 490 numPartitions = self.ctx.defaultParallelism 491 492 bounds = list() 493 494 # first compute the boundary of each part via sampling: we want to partition 495 # the key-space into bins such that the bins have roughly the same 496 # number of (key, value) pairs falling into them 497 if numPartitions > 1: 498 rddSize = self.count() 499 maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner 500 fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 501 502 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 503 samples = sorted(samples, reverse=(not ascending), key=keyfunc) 504 505 # we have numPartitions many parts but one of the them has 506 # an implicit boundary 507 for i in range(0, numPartitions - 1): 508 index = (len(samples) - 1) * (i + 1) / numPartitions 509 bounds.append(samples[index]) 510 511 def rangePartitionFunc(k): 512 p = 0 513 while p < len(bounds) and keyfunc(k) > bounds[p]: 514 p += 1 515 if ascending: 516 return p 517 else: 518 return numPartitions-1-p
519 520 def mapFunc(iterator): 521 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
522 523 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 524 .mapPartitions(mapFunc,preservesPartitioning=True) 525 .flatMap(lambda x: x, preservesPartitioning=True)) 526
527 - def glom(self):
528 """ 529 Return an RDD created by coalescing all elements within each partition 530 into a list. 531 532 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 533 >>> sorted(rdd.glom().collect()) 534 [[1, 2], [3, 4]] 535 """ 536 def func(iterator): yield list(iterator) 537 return self.mapPartitions(func)
538
539 - def cartesian(self, other):
540 """ 541 Return the Cartesian product of this RDD and another one, that is, the 542 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 543 C{b} is in C{other}. 544 545 >>> rdd = sc.parallelize([1, 2]) 546 >>> sorted(rdd.cartesian(rdd).collect()) 547 [(1, 1), (1, 2), (2, 1), (2, 2)] 548 """ 549 # Due to batching, we can't use the Java cartesian method. 550 deserializer = CartesianDeserializer(self._jrdd_deserializer, 551 other._jrdd_deserializer) 552 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
553
554 - def groupBy(self, f, numPartitions=None):
555 """ 556 Return an RDD of grouped items. 557 558 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 559 >>> result = rdd.groupBy(lambda x: x % 2).collect() 560 >>> sorted([(x, sorted(y)) for (x, y) in result]) 561 [(0, [2, 8]), (1, [1, 1, 3, 5])] 562 """ 563 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
564
565 - def pipe(self, command, env={}):
566 """ 567 Return an RDD created by piping elements to a forked external process. 568 569 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 570 ['1', '2', '', '3'] 571 """ 572 def func(iterator): 573 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 574 def pipe_objs(out): 575 for obj in iterator: 576 out.write(str(obj).rstrip('\n') + '\n') 577 out.close()
578 Thread(target=pipe_objs, args=[pipe.stdin]).start() 579 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) 580 return self.mapPartitions(func) 581
582 - def foreach(self, f):
583 """ 584 Applies a function to all elements of this RDD. 585 586 >>> def f(x): print x 587 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 588 """ 589 def processPartition(iterator): 590 for x in iterator: 591 f(x) 592 yield None
593 self.mapPartitions(processPartition).collect() # Force evaluation 594
595 - def foreachPartition(self, f):
596 """ 597 Applies a function to each partition of this RDD. 598 599 >>> def f(iterator): 600 ... for x in iterator: 601 ... print x 602 ... yield None 603 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 604 """ 605 self.mapPartitions(f).collect() # Force evaluation
606
607 - def collect(self):
608 """ 609 Return a list that contains all of the elements in this RDD. 610 """ 611 with _JavaStackTrace(self.context) as st: 612 bytesInJava = self._jrdd.collect().iterator() 613 return list(self._collect_iterator_through_file(bytesInJava))
614
615 - def _collect_iterator_through_file(self, iterator):
616 # Transferring lots of data through Py4J can be slow because 617 # socket.readline() is inefficient. Instead, we'll dump the data to a 618 # file and read it back. 619 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 620 tempFile.close() 621 self.ctx._writeToFile(iterator, tempFile.name) 622 # Read the data into Python and deserialize it: 623 with open(tempFile.name, 'rb') as tempFile: 624 for item in self._jrdd_deserializer.load_stream(tempFile): 625 yield item 626 os.unlink(tempFile.name)
627
628 - def reduce(self, f):
629 """ 630 Reduces the elements of this RDD using the specified commutative and 631 associative binary operator. Currently reduces partitions locally. 632 633 >>> from operator import add 634 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 635 15 636 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 637 10 638 """ 639 def func(iterator): 640 acc = None 641 for obj in iterator: 642 if acc is None: 643 acc = obj 644 else: 645 acc = f(obj, acc) 646 if acc is not None: 647 yield acc
648 vals = self.mapPartitions(func).collect() 649 return reduce(f, vals) 650
651 - def fold(self, zeroValue, op):
652 """ 653 Aggregate the elements of each partition, and then the results for all 654 the partitions, using a given associative function and a neutral "zero 655 value." 656 657 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 658 as its result value to avoid object allocation; however, it should not 659 modify C{t2}. 660 661 >>> from operator import add 662 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 663 15 664 """ 665 def func(iterator): 666 acc = zeroValue 667 for obj in iterator: 668 acc = op(obj, acc) 669 yield acc
670 vals = self.mapPartitions(func).collect() 671 return reduce(op, vals, zeroValue) 672
673 - def aggregate(self, zeroValue, seqOp, combOp):
674 """ 675 Aggregate the elements of each partition, and then the results for all 676 the partitions, using a given combine functions and a neutral "zero 677 value." 678 679 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 680 as its result value to avoid object allocation; however, it should not 681 modify C{t2}. 682 683 The first function (seqOp) can return a different result type, U, than 684 the type of this RDD. Thus, we need one operation for merging a T into an U 685 and one operation for merging two U 686 687 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 688 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 689 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 690 (10, 4) 691 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 692 (0, 0) 693 """ 694 def func(iterator): 695 acc = zeroValue 696 for obj in iterator: 697 acc = seqOp(acc, obj) 698 yield acc
699 700 return self.mapPartitions(func).fold(zeroValue, combOp) 701 702
703 - def max(self):
704 """ 705 Find the maximum item in this RDD. 706 707 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 708 43.0 709 """ 710 return self.reduce(max)
711
712 - def min(self):
713 """ 714 Find the maximum item in this RDD. 715 716 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 717 1.0 718 """ 719 return self.reduce(min)
720
721 - def sum(self):
722 """ 723 Add up the elements in this RDD. 724 725 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 726 6.0 727 """ 728 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
729
730 - def count(self):
731 """ 732 Return the number of elements in this RDD. 733 734 >>> sc.parallelize([2, 3, 4]).count() 735 3 736 """ 737 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
738
739 - def stats(self):
740 """ 741 Return a L{StatCounter} object that captures the mean, variance 742 and count of the RDD's elements in one operation. 743 """ 744 def redFunc(left_counter, right_counter): 745 return left_counter.mergeStats(right_counter)
746 747 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 748
749 - def mean(self):
750 """ 751 Compute the mean of this RDD's elements. 752 753 >>> sc.parallelize([1, 2, 3]).mean() 754 2.0 755 """ 756 return self.stats().mean()
757
758 - def variance(self):
759 """ 760 Compute the variance of this RDD's elements. 761 762 >>> sc.parallelize([1, 2, 3]).variance() 763 0.666... 764 """ 765 return self.stats().variance()
766
767 - def stdev(self):
768 """ 769 Compute the standard deviation of this RDD's elements. 770 771 >>> sc.parallelize([1, 2, 3]).stdev() 772 0.816... 773 """ 774 return self.stats().stdev()
775
776 - def sampleStdev(self):
777 """ 778 Compute the sample standard deviation of this RDD's elements (which corrects for bias in 779 estimating the standard deviation by dividing by N-1 instead of N). 780 781 >>> sc.parallelize([1, 2, 3]).sampleStdev() 782 1.0 783 """ 784 return self.stats().sampleStdev()
785
786 - def sampleVariance(self):
787 """ 788 Compute the sample variance of this RDD's elements (which corrects for bias in 789 estimating the variance by dividing by N-1 instead of N). 790 791 >>> sc.parallelize([1, 2, 3]).sampleVariance() 792 1.0 793 """ 794 return self.stats().sampleVariance()
795
796 - def countByValue(self):
797 """ 798 Return the count of each unique value in this RDD as a dictionary of 799 (value, count) pairs. 800 801 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 802 [(1, 2), (2, 3)] 803 """ 804 def countPartition(iterator): 805 counts = defaultdict(int) 806 for obj in iterator: 807 counts[obj] += 1 808 yield counts
809 def mergeMaps(m1, m2): 810 for (k, v) in m2.iteritems(): 811 m1[k] += v 812 return m1 813 return self.mapPartitions(countPartition).reduce(mergeMaps) 814
815 - def top(self, num):
816 """ 817 Get the top N elements from a RDD. 818 819 Note: It returns the list sorted in descending order. 820 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 821 [12] 822 >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) 823 [6, 5] 824 """ 825 def topIterator(iterator): 826 q = [] 827 for k in iterator: 828 if len(q) < num: 829 heapq.heappush(q, k) 830 else: 831 heapq.heappushpop(q, k) 832 yield q
833 834 def merge(a, b): 835 return next(topIterator(a + b)) 836 837 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 838
839 - def takeOrdered(self, num, key=None):
840 """ 841 Get the N elements from a RDD ordered in ascending order or as specified 842 by the optional key function. 843 844 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 845 [1, 2, 3, 4, 5, 6] 846 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 847 [10, 9, 7, 6, 5, 4] 848 """ 849 850 def topNKeyedElems(iterator, key_=None): 851 q = MaxHeapQ(num) 852 for k in iterator: 853 if key_ != None: 854 k = (key_(k), k) 855 q.insert(k) 856 yield q.getElements()
857 858 def unKey(x, key_=None): 859 if key_ != None: 860 x = [i[1] for i in x] 861 return x 862 863 def merge(a, b): 864 return next(topNKeyedElems(a + b)) 865 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) 866 return sorted(unKey(result, key), key=key) 867 868
869 - def take(self, num):
870 """ 871 Take the first num elements of the RDD. 872 873 This currently scans the partitions *one by one*, so it will be slow if 874 a lot of partitions are required. In that case, use L{collect} to get 875 the whole RDD instead. 876 877 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 878 [2, 3] 879 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 880 [2, 3, 4, 5, 6] 881 """ 882 def takeUpToNum(iterator): 883 taken = 0 884 while taken < num: 885 yield next(iterator) 886 taken += 1
887 # Take only up to num elements from each partition we try 888 mapped = self.mapPartitions(takeUpToNum) 889 items = [] 890 # TODO(shivaram): Similar to the scala implementation, update the take 891 # method to scan multiple splits based on an estimate of how many elements 892 # we have per-split. 893 with _JavaStackTrace(self.context) as st: 894 for partition in range(mapped._jrdd.splits().size()): 895 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 896 partitionsToTake[0] = partition 897 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 898 items.extend(mapped._collect_iterator_through_file(iterator)) 899 if len(items) >= num: 900 break 901 return items[:num] 902
903 - def first(self):
904 """ 905 Return the first element in this RDD. 906 907 >>> sc.parallelize([2, 3, 4]).first() 908 2 909 """ 910 return self.take(1)[0]
911
912 - def saveAsTextFile(self, path):
913 """ 914 Save this RDD as a text file, using string representations of elements. 915 916 >>> tempFile = NamedTemporaryFile(delete=True) 917 >>> tempFile.close() 918 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 919 >>> from fileinput import input 920 >>> from glob import glob 921 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 922 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 923 924 Empty lines are tolerated when saving to text files. 925 926 >>> tempFile2 = NamedTemporaryFile(delete=True) 927 >>> tempFile2.close() 928 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 929 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 930 '\\n\\n\\nbar\\nfoo\\n' 931 """ 932 def func(split, iterator): 933 for x in iterator: 934 if not isinstance(x, basestring): 935 x = unicode(x) 936 yield x.encode("utf-8")
937 keyed = PipelinedRDD(self, func) 938 keyed._bypass_serializer = True 939 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 940 941 # Pair functions 942
943 - def collectAsMap(self):
944 """ 945 Return the key-value pairs in this RDD to the master as a dictionary. 946 947 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 948 >>> m[1] 949 2 950 >>> m[3] 951 4 952 """ 953 return dict(self.collect())
954
955 - def keys(self):
956 """ 957 Return an RDD with the keys of each tuple. 958 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 959 >>> m.collect() 960 [1, 3] 961 """ 962 return self.map(lambda (k, v): k)
963
964 - def values(self):
965 """ 966 Return an RDD with the values of each tuple. 967 >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 968 >>> m.collect() 969 [2, 4] 970 """ 971 return self.map(lambda (k, v): v)
972
973 - def reduceByKey(self, func, numPartitions=None):
974 """ 975 Merge the values for each key using an associative reduce function. 976 977 This will also perform the merging locally on each mapper before 978 sending results to a reducer, similarly to a "combiner" in MapReduce. 979 980 Output will be hash-partitioned with C{numPartitions} partitions, or 981 the default parallelism level if C{numPartitions} is not specified. 982 983 >>> from operator import add 984 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 985 >>> sorted(rdd.reduceByKey(add).collect()) 986 [('a', 2), ('b', 1)] 987 """ 988 return self.combineByKey(lambda x: x, func, func, numPartitions)
989
990 - def reduceByKeyLocally(self, func):
991 """ 992 Merge the values for each key using an associative reduce function, but 993 return the results immediately to the master as a dictionary. 994 995 This will also perform the merging locally on each mapper before 996 sending results to a reducer, similarly to a "combiner" in MapReduce. 997 998 >>> from operator import add 999 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1000 >>> sorted(rdd.reduceByKeyLocally(add).items()) 1001 [('a', 2), ('b', 1)] 1002 """ 1003 def reducePartition(iterator): 1004 m = {} 1005 for (k, v) in iterator: 1006 m[k] = v if k not in m else func(m[k], v) 1007 yield m
1008 def mergeMaps(m1, m2): 1009 for (k, v) in m2.iteritems(): 1010 m1[k] = v if k not in m1 else func(m1[k], v) 1011 return m1 1012 return self.mapPartitions(reducePartition).reduce(mergeMaps) 1013
1014 - def countByKey(self):
1015 """ 1016 Count the number of elements for each key, and return the result to the 1017 master as a dictionary. 1018 1019 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1020 >>> sorted(rdd.countByKey().items()) 1021 [('a', 2), ('b', 1)] 1022 """ 1023 return self.map(lambda x: x[0]).countByValue()
1024
1025 - def join(self, other, numPartitions=None):
1026 """ 1027 Return an RDD containing all pairs of elements with matching keys in 1028 C{self} and C{other}. 1029 1030 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 1031 (k, v1) is in C{self} and (k, v2) is in C{other}. 1032 1033 Performs a hash join across the cluster. 1034 1035 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1036 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 1037 >>> sorted(x.join(y).collect()) 1038 [('a', (1, 2)), ('a', (1, 3))] 1039 """ 1040 return python_join(self, other, numPartitions)
1041
1042 - def leftOuterJoin(self, other, numPartitions=None):
1043 """ 1044 Perform a left outer join of C{self} and C{other}. 1045 1046 For each element (k, v) in C{self}, the resulting RDD will either 1047 contain all pairs (k, (v, w)) for w in C{other}, or the pair 1048 (k, (v, None)) if no elements in other have key k. 1049 1050 Hash-partitions the resulting RDD into the given number of partitions. 1051 1052 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1053 >>> y = sc.parallelize([("a", 2)]) 1054 >>> sorted(x.leftOuterJoin(y).collect()) 1055 [('a', (1, 2)), ('b', (4, None))] 1056 """ 1057 return python_left_outer_join(self, other, numPartitions)
1058
1059 - def rightOuterJoin(self, other, numPartitions=None):
1060 """ 1061 Perform a right outer join of C{self} and C{other}. 1062 1063 For each element (k, w) in C{other}, the resulting RDD will either 1064 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 1065 if no elements in C{self} have key k. 1066 1067 Hash-partitions the resulting RDD into the given number of partitions. 1068 1069 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1070 >>> y = sc.parallelize([("a", 2)]) 1071 >>> sorted(y.rightOuterJoin(x).collect()) 1072 [('a', (2, 1)), ('b', (None, 4))] 1073 """ 1074 return python_right_outer_join(self, other, numPartitions)
1075 1076 # TODO: add option to control map-side combining 1077 # portable_hash is used as default, because builtin hash of None is different 1078 # cross machines.
1079 - def partitionBy(self, numPartitions, partitionFunc=portable_hash):
1080 """ 1081 Return a copy of the RDD partitioned using the specified partitioner. 1082 1083 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 1084 >>> sets = pairs.partitionBy(2).glom().collect() 1085 >>> set(sets[0]).intersection(set(sets[1])) 1086 set([]) 1087 """ 1088 if numPartitions is None: 1089 numPartitions = self.ctx.defaultParallelism 1090 1091 # Transferring O(n) objects to Java is too expensive. Instead, we'll 1092 # form the hash buckets in Python, transferring O(numPartitions) objects 1093 # to Java. Each object is a (splitNumber, [objects]) pair. 1094 outputSerializer = self.ctx._unbatched_serializer 1095 def add_shuffle_key(split, iterator): 1096 1097 buckets = defaultdict(list) 1098 1099 for (k, v) in iterator: 1100 buckets[partitionFunc(k) % numPartitions].append((k, v)) 1101 for (split, items) in buckets.iteritems(): 1102 yield pack_long(split) 1103 yield outputSerializer.dumps(items)
1104 keyed = PipelinedRDD(self, add_shuffle_key) 1105 keyed._bypass_serializer = True 1106 with _JavaStackTrace(self.context) as st: 1107 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 1108 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 1109 id(partitionFunc)) 1110 jrdd = pairRDD.partitionBy(partitioner).values() 1111 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 1112 # This is required so that id(partitionFunc) remains unique, even if 1113 # partitionFunc is a lambda: 1114 rdd._partitionFunc = partitionFunc 1115 return rdd 1116 1117 # TODO: add control over map-side aggregation
1118 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 1119 numPartitions=None):
1120 """ 1121 Generic function to combine the elements for each key using a custom 1122 set of aggregation functions. 1123 1124 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 1125 type" C. Note that V and C can be different -- for example, one might 1126 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 1127 1128 Users provide three functions: 1129 1130 - C{createCombiner}, which turns a V into a C (e.g., creates 1131 a one-element list) 1132 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 1133 a list) 1134 - C{mergeCombiners}, to combine two C's into a single one. 1135 1136 In addition, users can control the partitioning of the output RDD. 1137 1138 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1139 >>> def f(x): return x 1140 >>> def add(a, b): return a + str(b) 1141 >>> sorted(x.combineByKey(str, add, add).collect()) 1142 [('a', '11'), ('b', '1')] 1143 """ 1144 if numPartitions is None: 1145 numPartitions = self.ctx.defaultParallelism 1146 def combineLocally(iterator): 1147 combiners = {} 1148 for x in iterator: 1149 (k, v) = x 1150 if k not in combiners: 1151 combiners[k] = createCombiner(v) 1152 else: 1153 combiners[k] = mergeValue(combiners[k], v) 1154 return combiners.iteritems()
1155 locally_combined = self.mapPartitions(combineLocally) 1156 shuffled = locally_combined.partitionBy(numPartitions) 1157 def _mergeCombiners(iterator): 1158 combiners = {} 1159 for (k, v) in iterator: 1160 if not k in combiners: 1161 combiners[k] = v 1162 else: 1163 combiners[k] = mergeCombiners(combiners[k], v) 1164 return combiners.iteritems() 1165 return shuffled.mapPartitions(_mergeCombiners) 1166
1167 - def foldByKey(self, zeroValue, func, numPartitions=None):
1168 """ 1169 Merge the values for each key using an associative function "func" and a neutral "zeroValue" 1170 which may be added to the result an arbitrary number of times, and must not change 1171 the result (e.g., 0 for addition, or 1 for multiplication.). 1172 1173 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1174 >>> from operator import add 1175 >>> rdd.foldByKey(0, add).collect() 1176 [('a', 2), ('b', 1)] 1177 """ 1178 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1179 1180 1181 # TODO: support variant with custom partitioner
1182 - def groupByKey(self, numPartitions=None):
1183 """ 1184 Group the values for each key in the RDD into a single sequence. 1185 Hash-partitions the resulting RDD with into numPartitions partitions. 1186 1187 Note: If you are grouping in order to perform an aggregation (such as a 1188 sum or average) over each key, using reduceByKey will provide much better 1189 performance. 1190 1191 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1192 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) 1193 [('a', [1, 1]), ('b', [1])] 1194 """ 1195 1196 def createCombiner(x): 1197 return [x]
1198 1199 def mergeValue(xs, x): 1200 xs.append(x) 1201 return xs 1202 1203 def mergeCombiners(a, b): 1204 return a + b 1205 1206 return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 1207 numPartitions).mapValues(lambda x: ResultIterable(x)) 1208 1209 # TODO: add tests
1210 - def flatMapValues(self, f):
1211 """ 1212 Pass each value in the key-value pair RDD through a flatMap function 1213 without changing the keys; this also retains the original RDD's 1214 partitioning. 1215 1216 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 1217 >>> def f(x): return x 1218 >>> x.flatMapValues(f).collect() 1219 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 1220 """ 1221 flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 1222 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1223
1224 - def mapValues(self, f):
1225 """ 1226 Pass each value in the key-value pair RDD through a map function 1227 without changing the keys; this also retains the original RDD's 1228 partitioning. 1229 1230 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 1231 >>> def f(x): return len(x) 1232 >>> x.mapValues(f).collect() 1233 [('a', 3), ('b', 1)] 1234 """ 1235 map_values_fn = lambda (k, v): (k, f(v)) 1236 return self.map(map_values_fn, preservesPartitioning=True)
1237 1238 # TODO: support varargs cogroup of several RDDs.
1239 - def groupWith(self, other):
1240 """ 1241 Alias for cogroup. 1242 """ 1243 return self.cogroup(other)
1244 1245 # TODO: add variant with custom parittioner
1246 - def cogroup(self, other, numPartitions=None):
1247 """ 1248 For each key k in C{self} or C{other}, return a resulting RDD that 1249 contains a tuple with the list of values for that key in C{self} as well 1250 as C{other}. 1251 1252 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1253 >>> y = sc.parallelize([("a", 2)]) 1254 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) 1255 [('a', ([1], [2])), ('b', ([4], []))] 1256 """ 1257 return python_cogroup(self, other, numPartitions)
1258
1259 - def subtractByKey(self, other, numPartitions=None):
1260 """ 1261 Return each (key, value) pair in C{self} that has no pair with matching key 1262 in C{other}. 1263 1264 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 1265 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1266 >>> sorted(x.subtractByKey(y).collect()) 1267 [('b', 4), ('b', 5)] 1268 """ 1269 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 1270 map_func = lambda (key, vals): [(key, val) for val in vals[0]] 1271 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1272
1273 - def subtract(self, other, numPartitions=None):
1274 """ 1275 Return each value in C{self} that is not contained in C{other}. 1276 1277 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 1278 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1279 >>> sorted(x.subtract(y).collect()) 1280 [('a', 1), ('b', 4), ('b', 5)] 1281 """ 1282 rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder 1283 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
1284
1285 - def keyBy(self, f):
1286 """ 1287 Creates tuples of the elements in this RDD by applying C{f}. 1288 1289 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 1290 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 1291 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) 1292 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 1293 """ 1294 return self.map(lambda x: (f(x), x))
1295
1296 - def repartition(self, numPartitions):
1297 """ 1298 Return a new RDD that has exactly numPartitions partitions. 1299 1300 Can increase or decrease the level of parallelism in this RDD. Internally, this uses 1301 a shuffle to redistribute data. 1302 If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 1303 which can avoid performing a shuffle. 1304 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 1305 >>> sorted(rdd.glom().collect()) 1306 [[1], [2, 3], [4, 5], [6, 7]] 1307 >>> len(rdd.repartition(2).glom().collect()) 1308 2 1309 >>> len(rdd.repartition(10).glom().collect()) 1310 10 1311 """ 1312 jrdd = self._jrdd.repartition(numPartitions) 1313 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1314
1315 - def coalesce(self, numPartitions, shuffle=False):
1316 """ 1317 Return a new RDD that is reduced into `numPartitions` partitions. 1318 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 1319 [[1], [2, 3], [4, 5]] 1320 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 1321 [[1, 2, 3, 4, 5]] 1322 """ 1323 jrdd = self._jrdd.coalesce(numPartitions) 1324 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1325
1326 - def zip(self, other):
1327 """ 1328 Zips this RDD with another one, returning key-value pairs with the first element in each RDD 1329 second element in each RDD, etc. Assumes that the two RDDs have the same number of 1330 partitions and the same number of elements in each partition (e.g. one was made through 1331 a map on the other). 1332 1333 >>> x = sc.parallelize(range(0,5)) 1334 >>> y = sc.parallelize(range(1000, 1005)) 1335 >>> x.zip(y).collect() 1336 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 1337 """ 1338 pairRDD = self._jrdd.zip(other._jrdd) 1339 deserializer = PairDeserializer(self._jrdd_deserializer, 1340 other._jrdd_deserializer) 1341 return RDD(pairRDD, self.ctx, deserializer)
1342
1343 - def name(self):
1344 """ 1345 Return the name of this RDD. 1346 """ 1347 name_ = self._jrdd.name() 1348 if not name_: 1349 return None 1350 return name_.encode('utf-8')
1351
1352 - def setName(self, name):
1353 """ 1354 Assign a name to this RDD. 1355 >>> rdd1 = sc.parallelize([1,2]) 1356 >>> rdd1.setName('RDD1') 1357 >>> rdd1.name() 1358 'RDD1' 1359 """ 1360 self._jrdd.setName(name)
1361
1362 - def toDebugString(self):
1363 """ 1364 A description of this RDD and its recursive dependencies for debugging. 1365 """ 1366 debug_string = self._jrdd.toDebugString() 1367 if not debug_string: 1368 return None 1369 return debug_string.encode('utf-8')
1370
1371 - def getStorageLevel(self):
1372 """ 1373 Get the RDD's current storage level. 1374 >>> rdd1 = sc.parallelize([1,2]) 1375 >>> rdd1.getStorageLevel() 1376 StorageLevel(False, False, False, False, 1) 1377 """ 1378 java_storage_level = self._jrdd.getStorageLevel() 1379 storage_level = StorageLevel(java_storage_level.useDisk(), 1380 java_storage_level.useMemory(), 1381 java_storage_level.useOffHeap(), 1382 java_storage_level.deserialized(), 1383 java_storage_level.replication()) 1384 return storage_level
1385
1386 # TODO: `lookup` is disabled because we can't make direct comparisons based 1387 # on the key; we need to compare the hash of the key to the hash of the 1388 # keys in the pairs. This could be an expensive operation, since those 1389 # hashes aren't retained. 1390 1391 -class PipelinedRDD(RDD):
1392 """ 1393 Pipelined maps: 1394 >>> rdd = sc.parallelize([1, 2, 3, 4]) 1395 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 1396 [4, 8, 12, 16] 1397 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 1398 [4, 8, 12, 16] 1399 1400 Pipelined reduces: 1401 >>> from operator import add 1402 >>> rdd.map(lambda x: 2 * x).reduce(add) 1403 20 1404 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 1405 20 1406 """
1407 - def __init__(self, prev, func, preservesPartitioning=False):
1408 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 1409 # This transformation is the first in its stage: 1410 self.func = func 1411 self.preservesPartitioning = preservesPartitioning 1412 self._prev_jrdd = prev._jrdd 1413 self._prev_jrdd_deserializer = prev._jrdd_deserializer 1414 else: 1415 prev_func = prev.func 1416 def pipeline_func(split, iterator): 1417 return func(split, prev_func(split, iterator))
1418 self.func = pipeline_func 1419 self.preservesPartitioning = \ 1420 prev.preservesPartitioning and preservesPartitioning 1421 self._prev_jrdd = prev._prev_jrdd # maintain the pipeline 1422 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 1423 self.is_cached = False 1424 self.is_checkpointed = False 1425 self.ctx = prev.ctx 1426 self.prev = prev 1427 self._jrdd_val = None 1428 self._jrdd_deserializer = self.ctx.serializer 1429 self._bypass_serializer = False
1430 1431 @property
1432 - def _jrdd(self):
1433 if self._jrdd_val: 1434 return self._jrdd_val 1435 if self._bypass_serializer: 1436 serializer = NoOpSerializer() 1437 else: 1438 serializer = self.ctx.serializer 1439 command = (self.func, self._prev_jrdd_deserializer, serializer) 1440 pickled_command = CloudPickleSerializer().dumps(command) 1441 broadcast_vars = ListConverter().convert( 1442 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 1443 self.ctx._gateway._gateway_client) 1444 self.ctx._pickled_broadcast_vars.clear() 1445 class_tag = self._prev_jrdd.classTag() 1446 env = MapConverter().convert(self.ctx.environment, 1447 self.ctx._gateway._gateway_client) 1448 includes = ListConverter().convert(self.ctx._python_includes, 1449 self.ctx._gateway._gateway_client) 1450 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 1451 bytearray(pickled_command), env, includes, self.preservesPartitioning, 1452 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 1453 class_tag) 1454 self._jrdd_val = python_rdd.asJavaRDD() 1455 return self._jrdd_val
1456
1457 - def _is_pipelinable(self):
1458 return not (self.is_cached or self.is_checkpointed)
1459
1460 1461 -def _test():
1462 import doctest 1463 from pyspark.context import SparkContext 1464 globs = globals().copy() 1465 # The small batch size here ensures that we see multiple batches, 1466 # even in these small test examples: 1467 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 1468 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 1469 globs['sc'].stop() 1470 if failure_count: 1471 exit(-1)
1472 1473 1474 if __name__ == "__main__": 1475 _test() 1476