Package pyspark :: Module rdd
[frames] | no frames]

Source Code for Module pyspark.rdd

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one or more 
   3  # contributor license agreements.  See the NOTICE file distributed with 
   4  # this work for additional information regarding copyright ownership. 
   5  # The ASF licenses this file to You under the Apache License, Version 2.0 
   6  # (the "License"); you may not use this file except in compliance with 
   7  # the License.  You may obtain a copy of the License at 
   8  # 
   9  #    http://www.apache.org/licenses/LICENSE-2.0 
  10  # 
  11  # Unless required by applicable law or agreed to in writing, software 
  12  # distributed under the License is distributed on an "AS IS" BASIS, 
  13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14  # See the License for the specific language governing permissions and 
  15  # limitations under the License. 
  16  # 
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from collections import namedtuple 
  22  from itertools import chain, ifilter, imap 
  23  import operator 
  24  import os 
  25  import sys 
  26  import shlex 
  27  import traceback 
  28  from subprocess import Popen, PIPE 
  29  from tempfile import NamedTemporaryFile 
  30  from threading import Thread 
  31  import warnings 
  32  import heapq 
  33  from random import Random 
  34   
  35  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  36      BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long 
  37  from pyspark.join import python_join, python_left_outer_join, \ 
  38      python_right_outer_join, python_cogroup 
  39  from pyspark.statcounter import StatCounter 
  40  from pyspark.rddsampler import RDDSampler 
  41  from pyspark.storagelevel import StorageLevel 
  42  from pyspark.resultiterable import ResultIterable 
  43   
  44  from py4j.java_collections import ListConverter, MapConverter 
  45   
  46  __all__ = ["RDD"] 
47 48 49 -def _extract_concise_traceback():
50 """ 51 This function returns the traceback info for a callsite, returns a dict 52 with function name, file name and line number 53 """ 54 tb = traceback.extract_stack() 55 callsite = namedtuple("Callsite", "function file linenum") 56 if len(tb) == 0: 57 return None 58 file, line, module, what = tb[len(tb) - 1] 59 sparkpath = os.path.dirname(file) 60 first_spark_frame = len(tb) - 1 61 for i in range(0, len(tb)): 62 file, line, fun, what = tb[i] 63 if file.startswith(sparkpath): 64 first_spark_frame = i 65 break 66 if first_spark_frame == 0: 67 file, line, fun, what = tb[0] 68 return callsite(function=fun, file=file, linenum=line) 69 sfile, sline, sfun, swhat = tb[first_spark_frame] 70 ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 71 return callsite(function=sfun, file=ufile, linenum=uline)
72 73 _spark_stack_depth = 0
74 75 -class _JavaStackTrace(object):
76 - def __init__(self, sc):
77 tb = _extract_concise_traceback() 78 if tb is not None: 79 self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum) 80 else: 81 self._traceback = "Error! Could not extract traceback info" 82 self._context = sc
83
84 - def __enter__(self):
85 global _spark_stack_depth 86 if _spark_stack_depth == 0: 87 self._context._jsc.setCallSite(self._traceback) 88 _spark_stack_depth += 1
89
90 - def __exit__(self, type, value, tb):
91 global _spark_stack_depth 92 _spark_stack_depth -= 1 93 if _spark_stack_depth == 0: 94 self._context._jsc.setCallSite(None)
95
96 -class MaxHeapQ(object):
97 """ 98 An implementation of MaxHeap. 99 >>> import pyspark.rdd 100 >>> heap = pyspark.rdd.MaxHeapQ(5) 101 >>> [heap.insert(i) for i in range(10)] 102 [None, None, None, None, None, None, None, None, None, None] 103 >>> sorted(heap.getElements()) 104 [0, 1, 2, 3, 4] 105 >>> heap = pyspark.rdd.MaxHeapQ(5) 106 >>> [heap.insert(i) for i in range(9, -1, -1)] 107 [None, None, None, None, None, None, None, None, None, None] 108 >>> sorted(heap.getElements()) 109 [0, 1, 2, 3, 4] 110 >>> heap = pyspark.rdd.MaxHeapQ(1) 111 >>> [heap.insert(i) for i in range(9, -1, -1)] 112 [None, None, None, None, None, None, None, None, None, None] 113 >>> heap.getElements() 114 [0] 115 """ 116
117 - def __init__(self, maxsize):
118 # we start from q[1], this makes calculating children as trivial as 2 * k 119 self.q = [0] 120 self.maxsize = maxsize
121
122 - def _swim(self, k):
123 while (k > 1) and (self.q[k/2] < self.q[k]): 124 self._swap(k, k/2) 125 k = k/2
126
127 - def _swap(self, i, j):
128 t = self.q[i] 129 self.q[i] = self.q[j] 130 self.q[j] = t
131
132 - def _sink(self, k):
133 N = self.size() 134 while 2 * k <= N: 135 j = 2 * k 136 # Here we test if both children are greater than parent 137 # if not swap with larger one. 138 if j < N and self.q[j] < self.q[j + 1]: 139 j = j + 1 140 if(self.q[k] > self.q[j]): 141 break 142 self._swap(k, j) 143 k = j
144
145 - def size(self):
146 return len(self.q) - 1
147
148 - def insert(self, value):
149 if (self.size()) < self.maxsize: 150 self.q.append(value) 151 self._swim(self.size()) 152 else: 153 self._replaceRoot(value)
154
155 - def getElements(self):
156 return self.q[1:]
157
158 - def _replaceRoot(self, value):
159 if(self.q[1] > value): 160 self.q[1] = value 161 self._sink(1)
162
163 -class RDD(object):
164 """ 165 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 166 Represents an immutable, partitioned collection of elements that can be 167 operated on in parallel. 168 """ 169
170 - def __init__(self, jrdd, ctx, jrdd_deserializer):
171 self._jrdd = jrdd 172 self.is_cached = False 173 self.is_checkpointed = False 174 self.ctx = ctx 175 self._jrdd_deserializer = jrdd_deserializer 176 self._id = jrdd.id()
177
178 - def id(self):
179 """ 180 A unique ID for this RDD (within its SparkContext). 181 """ 182 return self._id
183
184 - def __repr__(self):
185 return self._jrdd.toString()
186 187 @property
188 - def context(self):
189 """ 190 The L{SparkContext} that this RDD was created on. 191 """ 192 return self.ctx
193
194 - def cache(self):
195 """ 196 Persist this RDD with the default storage level (C{MEMORY_ONLY}). 197 """ 198 self.is_cached = True 199 self._jrdd.cache() 200 return self
201
202 - def persist(self, storageLevel):
203 """ 204 Set this RDD's storage level to persist its values across operations after the first time 205 it is computed. This can only be used to assign a new storage level if the RDD does not 206 have a storage level set yet. 207 """ 208 self.is_cached = True 209 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 210 self._jrdd.persist(javaStorageLevel) 211 return self
212
213 - def unpersist(self):
214 """ 215 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 216 """ 217 self.is_cached = False 218 self._jrdd.unpersist() 219 return self
220
221 - def checkpoint(self):
222 """ 223 Mark this RDD for checkpointing. It will be saved to a file inside the 224 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 225 all references to its parent RDDs will be removed. This function must 226 be called before any job has been executed on this RDD. It is strongly 227 recommended that this RDD is persisted in memory, otherwise saving it 228 on a file will require recomputation. 229 """ 230 self.is_checkpointed = True 231 self._jrdd.rdd().checkpoint()
232
233 - def isCheckpointed(self):
234 """ 235 Return whether this RDD has been checkpointed or not 236 """ 237 return self._jrdd.rdd().isCheckpointed()
238
239 - def getCheckpointFile(self):
240 """ 241 Gets the name of the file to which this RDD was checkpointed 242 """ 243 checkpointFile = self._jrdd.rdd().getCheckpointFile() 244 if checkpointFile.isDefined(): 245 return checkpointFile.get() 246 else: 247 return None
248
249 - def map(self, f, preservesPartitioning=False):
250 """ 251 Return a new RDD by applying a function to each element of this RDD. 252 253 >>> rdd = sc.parallelize(["b", "a", "c"]) 254 >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 255 [('a', 1), ('b', 1), ('c', 1)] 256 """ 257 def func(split, iterator): return imap(f, iterator) 258 return PipelinedRDD(self, func, preservesPartitioning)
259
260 - def flatMap(self, f, preservesPartitioning=False):
261 """ 262 Return a new RDD by first applying a function to all elements of this 263 RDD, and then flattening the results. 264 265 >>> rdd = sc.parallelize([2, 3, 4]) 266 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 267 [1, 1, 1, 2, 2, 3] 268 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 269 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 270 """ 271 def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 272 return self.mapPartitionsWithIndex(func, preservesPartitioning)
273
274 - def mapPartitions(self, f, preservesPartitioning=False):
275 """ 276 Return a new RDD by applying a function to each partition of this RDD. 277 278 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 279 >>> def f(iterator): yield sum(iterator) 280 >>> rdd.mapPartitions(f).collect() 281 [3, 7] 282 """ 283 def func(s, iterator): return f(iterator) 284 return self.mapPartitionsWithIndex(func)
285
286 - def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
287 """ 288 Return a new RDD by applying a function to each partition of this RDD, 289 while tracking the index of the original partition. 290 291 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 292 >>> def f(splitIndex, iterator): yield splitIndex 293 >>> rdd.mapPartitionsWithIndex(f).sum() 294 6 295 """ 296 return PipelinedRDD(self, f, preservesPartitioning)
297
298 - def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
299 """ 300 Deprecated: use mapPartitionsWithIndex instead. 301 302 Return a new RDD by applying a function to each partition of this RDD, 303 while tracking the index of the original partition. 304 305 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 306 >>> def f(splitIndex, iterator): yield splitIndex 307 >>> rdd.mapPartitionsWithSplit(f).sum() 308 6 309 """ 310 warnings.warn("mapPartitionsWithSplit is deprecated; " 311 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 312 return self.mapPartitionsWithIndex(f, preservesPartitioning)
313
314 - def filter(self, f):
315 """ 316 Return a new RDD containing only the elements that satisfy a predicate. 317 318 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 319 >>> rdd.filter(lambda x: x % 2 == 0).collect() 320 [2, 4] 321 """ 322 def func(iterator): return ifilter(f, iterator) 323 return self.mapPartitions(func)
324
325 - def distinct(self):
326 """ 327 Return a new RDD containing the distinct elements in this RDD. 328 329 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 330 [1, 2, 3] 331 """ 332 return self.map(lambda x: (x, None)) \ 333 .reduceByKey(lambda x, _: x) \ 334 .map(lambda (x, _): x)
335
336 - def sample(self, withReplacement, fraction, seed=None):
337 """ 338 Return a sampled subset of this RDD (relies on numpy and falls back 339 on default random generator if numpy is unavailable). 340 341 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 342 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 343 """ 344 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction 345 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
346 347 # this is ported from scala/spark/RDD.scala
348 - def takeSample(self, withReplacement, num, seed=None):
349 """ 350 Return a fixed-size sampled subset of this RDD (currently requires numpy). 351 352 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 353 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 354 """ 355 356 fraction = 0.0 357 total = 0 358 multiplier = 3.0 359 initialCount = self.count() 360 maxSelected = 0 361 362 if (num < 0): 363 raise ValueError 364 365 if (initialCount == 0): 366 return list() 367 368 if initialCount > sys.maxint - 1: 369 maxSelected = sys.maxint - 1 370 else: 371 maxSelected = initialCount 372 373 if num > initialCount and not withReplacement: 374 total = maxSelected 375 fraction = multiplier * (maxSelected + 1) / initialCount 376 else: 377 fraction = multiplier * (num + 1) / initialCount 378 total = num 379 380 samples = self.sample(withReplacement, fraction, seed).collect() 381 382 # If the first sample didn't turn out large enough, keep trying to take samples; 383 # this shouldn't happen often because we use a big multiplier for their initial size. 384 # See: scala/spark/RDD.scala 385 rand = Random(seed) 386 while len(samples) < total: 387 samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect() 388 389 sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint)) 390 sampler.shuffle(samples) 391 return samples[0:total]
392
393 - def union(self, other):
394 """ 395 Return the union of this RDD and another one. 396 397 >>> rdd = sc.parallelize([1, 1, 2, 3]) 398 >>> rdd.union(rdd).collect() 399 [1, 1, 2, 3, 1, 1, 2, 3] 400 """ 401 if self._jrdd_deserializer == other._jrdd_deserializer: 402 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 403 self._jrdd_deserializer) 404 return rdd 405 else: 406 # These RDDs contain data in different serialized formats, so we 407 # must normalize them to the default serializer. 408 self_copy = self._reserialize() 409 other_copy = other._reserialize() 410 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 411 self.ctx.serializer)
412
413 - def intersection(self, other):
414 """ 415 Return the intersection of this RDD and another one. The output will not 416 contain any duplicate elements, even if the input RDDs did. 417 418 Note that this method performs a shuffle internally. 419 420 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 421 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 422 >>> rdd1.intersection(rdd2).collect() 423 [1, 2, 3] 424 """ 425 return self.map(lambda v: (v, None)) \ 426 .cogroup(other.map(lambda v: (v, None))) \ 427 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ 428 .keys()
429
430 - def _reserialize(self):
431 if self._jrdd_deserializer == self.ctx.serializer: 432 return self 433 else: 434 return self.map(lambda x: x, preservesPartitioning=True)
435
436 - def __add__(self, other):
437 """ 438 Return the union of this RDD and another one. 439 440 >>> rdd = sc.parallelize([1, 1, 2, 3]) 441 >>> (rdd + rdd).collect() 442 [1, 1, 2, 3, 1, 1, 2, 3] 443 """ 444 if not isinstance(other, RDD): 445 raise TypeError 446 return self.union(other)
447
448 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
449 """ 450 Sorts this RDD, which is assumed to consist of (key, value) pairs. 451 452 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 453 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 454 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 455 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 456 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 457 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 458 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 459 """ 460 if numPartitions is None: 461 numPartitions = self.ctx.defaultParallelism 462 463 bounds = list() 464 465 # first compute the boundary of each part via sampling: we want to partition 466 # the key-space into bins such that the bins have roughly the same 467 # number of (key, value) pairs falling into them 468 if numPartitions > 1: 469 rddSize = self.count() 470 maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner 471 fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 472 473 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 474 samples = sorted(samples, reverse=(not ascending), key=keyfunc) 475 476 # we have numPartitions many parts but one of the them has 477 # an implicit boundary 478 for i in range(0, numPartitions - 1): 479 index = (len(samples) - 1) * (i + 1) / numPartitions 480 bounds.append(samples[index]) 481 482 def rangePartitionFunc(k): 483 p = 0 484 while p < len(bounds) and keyfunc(k) > bounds[p]: 485 p += 1 486 if ascending: 487 return p 488 else: 489 return numPartitions-1-p
490 491 def mapFunc(iterator): 492 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
493 494 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 495 .mapPartitions(mapFunc,preservesPartitioning=True) 496 .flatMap(lambda x: x, preservesPartitioning=True)) 497
498 - def glom(self):
499 """ 500 Return an RDD created by coalescing all elements within each partition 501 into a list. 502 503 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 504 >>> sorted(rdd.glom().collect()) 505 [[1, 2], [3, 4]] 506 """ 507 def func(iterator): yield list(iterator) 508 return self.mapPartitions(func)
509
510 - def cartesian(self, other):
511 """ 512 Return the Cartesian product of this RDD and another one, that is, the 513 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 514 C{b} is in C{other}. 515 516 >>> rdd = sc.parallelize([1, 2]) 517 >>> sorted(rdd.cartesian(rdd).collect()) 518 [(1, 1), (1, 2), (2, 1), (2, 2)] 519 """ 520 # Due to batching, we can't use the Java cartesian method. 521 deserializer = CartesianDeserializer(self._jrdd_deserializer, 522 other._jrdd_deserializer) 523 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
524
525 - def groupBy(self, f, numPartitions=None):
526 """ 527 Return an RDD of grouped items. 528 529 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 530 >>> result = rdd.groupBy(lambda x: x % 2).collect() 531 >>> sorted([(x, sorted(y)) for (x, y) in result]) 532 [(0, [2, 8]), (1, [1, 1, 3, 5])] 533 """ 534 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
535
536 - def pipe(self, command, env={}):
537 """ 538 Return an RDD created by piping elements to a forked external process. 539 540 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 541 ['1', '2', '', '3'] 542 """ 543 def func(iterator): 544 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 545 def pipe_objs(out): 546 for obj in iterator: 547 out.write(str(obj).rstrip('\n') + '\n') 548 out.close()
549 Thread(target=pipe_objs, args=[pipe.stdin]).start() 550 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) 551 return self.mapPartitions(func) 552
553 - def foreach(self, f):
554 """ 555 Applies a function to all elements of this RDD. 556 557 >>> def f(x): print x 558 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 559 """ 560 def processPartition(iterator): 561 for x in iterator: 562 f(x) 563 yield None
564 self.mapPartitions(processPartition).collect() # Force evaluation 565
566 - def foreachPartition(self, f):
567 """ 568 Applies a function to each partition of this RDD. 569 570 >>> def f(iterator): 571 ... for x in iterator: 572 ... print x 573 ... yield None 574 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 575 """ 576 self.mapPartitions(f).collect() # Force evaluation
577
578 - def collect(self):
579 """ 580 Return a list that contains all of the elements in this RDD. 581 """ 582 with _JavaStackTrace(self.context) as st: 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava))
585
586 - def _collect_iterator_through_file(self, iterator):
587 # Transferring lots of data through Py4J can be slow because 588 # socket.readline() is inefficient. Instead, we'll dump the data to a 589 # file and read it back. 590 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 591 tempFile.close() 592 self.ctx._writeToFile(iterator, tempFile.name) 593 # Read the data into Python and deserialize it: 594 with open(tempFile.name, 'rb') as tempFile: 595 for item in self._jrdd_deserializer.load_stream(tempFile): 596 yield item 597 os.unlink(tempFile.name)
598
599 - def reduce(self, f):
600 """ 601 Reduces the elements of this RDD using the specified commutative and 602 associative binary operator. Currently reduces partitions locally. 603 604 >>> from operator import add 605 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 606 15 607 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 608 10 609 """ 610 def func(iterator): 611 acc = None 612 for obj in iterator: 613 if acc is None: 614 acc = obj 615 else: 616 acc = f(obj, acc) 617 if acc is not None: 618 yield acc
619 vals = self.mapPartitions(func).collect() 620 return reduce(f, vals) 621
622 - def fold(self, zeroValue, op):
623 """ 624 Aggregate the elements of each partition, and then the results for all 625 the partitions, using a given associative function and a neutral "zero 626 value." 627 628 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 629 as its result value to avoid object allocation; however, it should not 630 modify C{t2}. 631 632 >>> from operator import add 633 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 634 15 635 """ 636 def func(iterator): 637 acc = zeroValue 638 for obj in iterator: 639 acc = op(obj, acc) 640 yield acc
641 vals = self.mapPartitions(func).collect() 642 return reduce(op, vals, zeroValue) 643
644 - def aggregate(self, zeroValue, seqOp, combOp):
645 """ 646 Aggregate the elements of each partition, and then the results for all 647 the partitions, using a given combine functions and a neutral "zero 648 value." 649 650 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 651 as its result value to avoid object allocation; however, it should not 652 modify C{t2}. 653 654 The first function (seqOp) can return a different result type, U, than 655 the type of this RDD. Thus, we need one operation for merging a T into an U 656 and one operation for merging two U 657 658 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 659 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 660 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 661 (10, 4) 662 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 663 (0, 0) 664 """ 665 def func(iterator): 666 acc = zeroValue 667 for obj in iterator: 668 acc = seqOp(acc, obj) 669 yield acc
670 671 return self.mapPartitions(func).fold(zeroValue, combOp) 672 673
674 - def max(self):
675 """ 676 Find the maximum item in this RDD. 677 678 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 679 43.0 680 """ 681 return self.reduce(max)
682
683 - def min(self):
684 """ 685 Find the maximum item in this RDD. 686 687 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 688 1.0 689 """ 690 return self.reduce(min)
691
692 - def sum(self):
693 """ 694 Add up the elements in this RDD. 695 696 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 697 6.0 698 """ 699 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
700
701 - def count(self):
702 """ 703 Return the number of elements in this RDD. 704 705 >>> sc.parallelize([2, 3, 4]).count() 706 3 707 """ 708 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
709
710 - def stats(self):
711 """ 712 Return a L{StatCounter} object that captures the mean, variance 713 and count of the RDD's elements in one operation. 714 """ 715 def redFunc(left_counter, right_counter): 716 return left_counter.mergeStats(right_counter)
717 718 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 719
720 - def mean(self):
721 """ 722 Compute the mean of this RDD's elements. 723 724 >>> sc.parallelize([1, 2, 3]).mean() 725 2.0 726 """ 727 return self.stats().mean()
728
729 - def variance(self):
730 """ 731 Compute the variance of this RDD's elements. 732 733 >>> sc.parallelize([1, 2, 3]).variance() 734 0.666... 735 """ 736 return self.stats().variance()
737
738 - def stdev(self):
739 """ 740 Compute the standard deviation of this RDD's elements. 741 742 >>> sc.parallelize([1, 2, 3]).stdev() 743 0.816... 744 """ 745 return self.stats().stdev()
746
747 - def sampleStdev(self):
748 """ 749 Compute the sample standard deviation of this RDD's elements (which corrects for bias in 750 estimating the standard deviation by dividing by N-1 instead of N). 751 752 >>> sc.parallelize([1, 2, 3]).sampleStdev() 753 1.0 754 """ 755 return self.stats().sampleStdev()
756
757 - def sampleVariance(self):
758 """ 759 Compute the sample variance of this RDD's elements (which corrects for bias in 760 estimating the variance by dividing by N-1 instead of N). 761 762 >>> sc.parallelize([1, 2, 3]).sampleVariance() 763 1.0 764 """ 765 return self.stats().sampleVariance()
766
767 - def countByValue(self):
768 """ 769 Return the count of each unique value in this RDD as a dictionary of 770 (value, count) pairs. 771 772 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 773 [(1, 2), (2, 3)] 774 """ 775 def countPartition(iterator): 776 counts = defaultdict(int) 777 for obj in iterator: 778 counts[obj] += 1 779 yield counts
780 def mergeMaps(m1, m2): 781 for (k, v) in m2.iteritems(): 782 m1[k] += v 783 return m1 784 return self.mapPartitions(countPartition).reduce(mergeMaps) 785
786 - def top(self, num):
787 """ 788 Get the top N elements from a RDD. 789 790 Note: It returns the list sorted in descending order. 791 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 792 [12] 793 >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2) 794 [6, 5] 795 """ 796 def topIterator(iterator): 797 q = [] 798 for k in iterator: 799 if len(q) < num: 800 heapq.heappush(q, k) 801 else: 802 heapq.heappushpop(q, k) 803 yield q
804 805 def merge(a, b): 806 return next(topIterator(a + b)) 807 808 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 809
810 - def takeOrdered(self, num, key=None):
811 """ 812 Get the N elements from a RDD ordered in ascending order or as specified 813 by the optional key function. 814 815 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 816 [1, 2, 3, 4, 5, 6] 817 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 818 [10, 9, 7, 6, 5, 4] 819 """ 820 821 def topNKeyedElems(iterator, key_=None): 822 q = MaxHeapQ(num) 823 for k in iterator: 824 if key_ != None: 825 k = (key_(k), k) 826 q.insert(k) 827 yield q.getElements()
828 829 def unKey(x, key_=None): 830 if key_ != None: 831 x = [i[1] for i in x] 832 return x 833 834 def merge(a, b): 835 return next(topNKeyedElems(a + b)) 836 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) 837 return sorted(unKey(result, key), key=key) 838 839
840 - def take(self, num):
841 """ 842 Take the first num elements of the RDD. 843 844 This currently scans the partitions *one by one*, so it will be slow if 845 a lot of partitions are required. In that case, use L{collect} to get 846 the whole RDD instead. 847 848 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 849 [2, 3] 850 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 851 [2, 3, 4, 5, 6] 852 """ 853 def takeUpToNum(iterator): 854 taken = 0 855 while taken < num: 856 yield next(iterator) 857 taken += 1
858 # Take only up to num elements from each partition we try 859 mapped = self.mapPartitions(takeUpToNum) 860 items = [] 861 # TODO(shivaram): Similar to the scala implementation, update the take 862 # method to scan multiple splits based on an estimate of how many elements 863 # we have per-split. 864 with _JavaStackTrace(self.context) as st: 865 for partition in range(mapped._jrdd.splits().size()): 866 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 867 partitionsToTake[0] = partition 868 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 869 items.extend(mapped._collect_iterator_through_file(iterator)) 870 if len(items) >= num: 871 break 872 return items[:num] 873
874 - def first(self):
875 """ 876 Return the first element in this RDD. 877 878 >>> sc.parallelize([2, 3, 4]).first() 879 2 880 """ 881 return self.take(1)[0]
882
883 - def saveAsTextFile(self, path):
884 """ 885 Save this RDD as a text file, using string representations of elements. 886 887 >>> tempFile = NamedTemporaryFile(delete=True) 888 >>> tempFile.close() 889 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 890 >>> from fileinput import input 891 >>> from glob import glob 892 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 893 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 894 895 Empty lines are tolerated when saving to text files. 896 897 >>> tempFile2 = NamedTemporaryFile(delete=True) 898 >>> tempFile2.close() 899 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 900 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 901 '\\n\\n\\nbar\\nfoo\\n' 902 """ 903 def func(split, iterator): 904 for x in iterator: 905 if not isinstance(x, basestring): 906 x = unicode(x) 907 yield x.encode("utf-8")
908 keyed = PipelinedRDD(self, func) 909 keyed._bypass_serializer = True 910 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 911 912 # Pair functions 913
914 - def collectAsMap(self):
915 """ 916 Return the key-value pairs in this RDD to the master as a dictionary. 917 918 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 919 >>> m[1] 920 2 921 >>> m[3] 922 4 923 """ 924 return dict(self.collect())
925
926 - def keys(self):
927 """ 928 Return an RDD with the keys of each tuple. 929 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 930 >>> m.collect() 931 [1, 3] 932 """ 933 return self.map(lambda (k, v): k)
934
935 - def values(self):
936 """ 937 Return an RDD with the values of each tuple. 938 >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 939 >>> m.collect() 940 [2, 4] 941 """ 942 return self.map(lambda (k, v): v)
943
944 - def reduceByKey(self, func, numPartitions=None):
945 """ 946 Merge the values for each key using an associative reduce function. 947 948 This will also perform the merging locally on each mapper before 949 sending results to a reducer, similarly to a "combiner" in MapReduce. 950 951 Output will be hash-partitioned with C{numPartitions} partitions, or 952 the default parallelism level if C{numPartitions} is not specified. 953 954 >>> from operator import add 955 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 956 >>> sorted(rdd.reduceByKey(add).collect()) 957 [('a', 2), ('b', 1)] 958 """ 959 return self.combineByKey(lambda x: x, func, func, numPartitions)
960
961 - def reduceByKeyLocally(self, func):
962 """ 963 Merge the values for each key using an associative reduce function, but 964 return the results immediately to the master as a dictionary. 965 966 This will also perform the merging locally on each mapper before 967 sending results to a reducer, similarly to a "combiner" in MapReduce. 968 969 >>> from operator import add 970 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 971 >>> sorted(rdd.reduceByKeyLocally(add).items()) 972 [('a', 2), ('b', 1)] 973 """ 974 def reducePartition(iterator): 975 m = {} 976 for (k, v) in iterator: 977 m[k] = v if k not in m else func(m[k], v) 978 yield m
979 def mergeMaps(m1, m2): 980 for (k, v) in m2.iteritems(): 981 m1[k] = v if k not in m1 else func(m1[k], v) 982 return m1 983 return self.mapPartitions(reducePartition).reduce(mergeMaps) 984
985 - def countByKey(self):
986 """ 987 Count the number of elements for each key, and return the result to the 988 master as a dictionary. 989 990 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 991 >>> sorted(rdd.countByKey().items()) 992 [('a', 2), ('b', 1)] 993 """ 994 return self.map(lambda x: x[0]).countByValue()
995
996 - def join(self, other, numPartitions=None):
997 """ 998 Return an RDD containing all pairs of elements with matching keys in 999 C{self} and C{other}. 1000 1001 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 1002 (k, v1) is in C{self} and (k, v2) is in C{other}. 1003 1004 Performs a hash join across the cluster. 1005 1006 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1007 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 1008 >>> sorted(x.join(y).collect()) 1009 [('a', (1, 2)), ('a', (1, 3))] 1010 """ 1011 return python_join(self, other, numPartitions)
1012
1013 - def leftOuterJoin(self, other, numPartitions=None):
1014 """ 1015 Perform a left outer join of C{self} and C{other}. 1016 1017 For each element (k, v) in C{self}, the resulting RDD will either 1018 contain all pairs (k, (v, w)) for w in C{other}, or the pair 1019 (k, (v, None)) if no elements in other have key k. 1020 1021 Hash-partitions the resulting RDD into the given number of partitions. 1022 1023 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1024 >>> y = sc.parallelize([("a", 2)]) 1025 >>> sorted(x.leftOuterJoin(y).collect()) 1026 [('a', (1, 2)), ('b', (4, None))] 1027 """ 1028 return python_left_outer_join(self, other, numPartitions)
1029
1030 - def rightOuterJoin(self, other, numPartitions=None):
1031 """ 1032 Perform a right outer join of C{self} and C{other}. 1033 1034 For each element (k, w) in C{other}, the resulting RDD will either 1035 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 1036 if no elements in C{self} have key k. 1037 1038 Hash-partitions the resulting RDD into the given number of partitions. 1039 1040 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1041 >>> y = sc.parallelize([("a", 2)]) 1042 >>> sorted(y.rightOuterJoin(x).collect()) 1043 [('a', (2, 1)), ('b', (None, 4))] 1044 """ 1045 return python_right_outer_join(self, other, numPartitions)
1046 1047 # TODO: add option to control map-side combining
1048 - def partitionBy(self, numPartitions, partitionFunc=hash):
1049 """ 1050 Return a copy of the RDD partitioned using the specified partitioner. 1051 1052 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 1053 >>> sets = pairs.partitionBy(2).glom().collect() 1054 >>> set(sets[0]).intersection(set(sets[1])) 1055 set([]) 1056 """ 1057 if numPartitions is None: 1058 numPartitions = self.ctx.defaultParallelism 1059 # Transferring O(n) objects to Java is too expensive. Instead, we'll 1060 # form the hash buckets in Python, transferring O(numPartitions) objects 1061 # to Java. Each object is a (splitNumber, [objects]) pair. 1062 outputSerializer = self.ctx._unbatched_serializer 1063 def add_shuffle_key(split, iterator): 1064 1065 buckets = defaultdict(list) 1066 1067 for (k, v) in iterator: 1068 buckets[partitionFunc(k) % numPartitions].append((k, v)) 1069 for (split, items) in buckets.iteritems(): 1070 yield pack_long(split) 1071 yield outputSerializer.dumps(items)
1072 keyed = PipelinedRDD(self, add_shuffle_key) 1073 keyed._bypass_serializer = True 1074 with _JavaStackTrace(self.context) as st: 1075 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 1076 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 1077 id(partitionFunc)) 1078 jrdd = pairRDD.partitionBy(partitioner).values() 1079 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 1080 # This is required so that id(partitionFunc) remains unique, even if 1081 # partitionFunc is a lambda: 1082 rdd._partitionFunc = partitionFunc 1083 return rdd 1084 1085 # TODO: add control over map-side aggregation
1086 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 1087 numPartitions=None):
1088 """ 1089 Generic function to combine the elements for each key using a custom 1090 set of aggregation functions. 1091 1092 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 1093 type" C. Note that V and C can be different -- for example, one might 1094 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 1095 1096 Users provide three functions: 1097 1098 - C{createCombiner}, which turns a V into a C (e.g., creates 1099 a one-element list) 1100 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 1101 a list) 1102 - C{mergeCombiners}, to combine two C's into a single one. 1103 1104 In addition, users can control the partitioning of the output RDD. 1105 1106 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1107 >>> def f(x): return x 1108 >>> def add(a, b): return a + str(b) 1109 >>> sorted(x.combineByKey(str, add, add).collect()) 1110 [('a', '11'), ('b', '1')] 1111 """ 1112 if numPartitions is None: 1113 numPartitions = self.ctx.defaultParallelism 1114 def combineLocally(iterator): 1115 combiners = {} 1116 for x in iterator: 1117 (k, v) = x 1118 if k not in combiners: 1119 combiners[k] = createCombiner(v) 1120 else: 1121 combiners[k] = mergeValue(combiners[k], v) 1122 return combiners.iteritems()
1123 locally_combined = self.mapPartitions(combineLocally) 1124 shuffled = locally_combined.partitionBy(numPartitions) 1125 def _mergeCombiners(iterator): 1126 combiners = {} 1127 for (k, v) in iterator: 1128 if not k in combiners: 1129 combiners[k] = v 1130 else: 1131 combiners[k] = mergeCombiners(combiners[k], v) 1132 return combiners.iteritems() 1133 return shuffled.mapPartitions(_mergeCombiners) 1134
1135 - def foldByKey(self, zeroValue, func, numPartitions=None):
1136 """ 1137 Merge the values for each key using an associative function "func" and a neutral "zeroValue" 1138 which may be added to the result an arbitrary number of times, and must not change 1139 the result (e.g., 0 for addition, or 1 for multiplication.). 1140 1141 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1142 >>> from operator import add 1143 >>> rdd.foldByKey(0, add).collect() 1144 [('a', 2), ('b', 1)] 1145 """ 1146 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1147 1148 1149 # TODO: support variant with custom partitioner
1150 - def groupByKey(self, numPartitions=None):
1151 """ 1152 Group the values for each key in the RDD into a single sequence. 1153 Hash-partitions the resulting RDD with into numPartitions partitions. 1154 1155 Note: If you are grouping in order to perform an aggregation (such as a 1156 sum or average) over each key, using reduceByKey will provide much better 1157 performance. 1158 1159 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1160 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) 1161 [('a', [1, 1]), ('b', [1])] 1162 """ 1163 1164 def createCombiner(x): 1165 return [x]
1166 1167 def mergeValue(xs, x): 1168 xs.append(x) 1169 return xs 1170 1171 def mergeCombiners(a, b): 1172 return a + b 1173 1174 return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 1175 numPartitions).mapValues(lambda x: ResultIterable(x)) 1176 1177 # TODO: add tests
1178 - def flatMapValues(self, f):
1179 """ 1180 Pass each value in the key-value pair RDD through a flatMap function 1181 without changing the keys; this also retains the original RDD's 1182 partitioning. 1183 1184 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 1185 >>> def f(x): return x 1186 >>> x.flatMapValues(f).collect() 1187 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 1188 """ 1189 flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 1190 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1191
1192 - def mapValues(self, f):
1193 """ 1194 Pass each value in the key-value pair RDD through a map function 1195 without changing the keys; this also retains the original RDD's 1196 partitioning. 1197 1198 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 1199 >>> def f(x): return len(x) 1200 >>> x.mapValues(f).collect() 1201 [('a', 3), ('b', 1)] 1202 """ 1203 map_values_fn = lambda (k, v): (k, f(v)) 1204 return self.map(map_values_fn, preservesPartitioning=True)
1205 1206 # TODO: support varargs cogroup of several RDDs.
1207 - def groupWith(self, other):
1208 """ 1209 Alias for cogroup. 1210 """ 1211 return self.cogroup(other)
1212 1213 # TODO: add variant with custom parittioner
1214 - def cogroup(self, other, numPartitions=None):
1215 """ 1216 For each key k in C{self} or C{other}, return a resulting RDD that 1217 contains a tuple with the list of values for that key in C{self} as well 1218 as C{other}. 1219 1220 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1221 >>> y = sc.parallelize([("a", 2)]) 1222 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) 1223 [('a', ([1], [2])), ('b', ([4], []))] 1224 """ 1225 return python_cogroup(self, other, numPartitions)
1226
1227 - def subtractByKey(self, other, numPartitions=None):
1228 """ 1229 Return each (key, value) pair in C{self} that has no pair with matching key 1230 in C{other}. 1231 1232 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 1233 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1234 >>> sorted(x.subtractByKey(y).collect()) 1235 [('b', 4), ('b', 5)] 1236 """ 1237 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 1238 map_func = lambda (key, vals): [(key, val) for val in vals[0]] 1239 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1240
1241 - def subtract(self, other, numPartitions=None):
1242 """ 1243 Return each value in C{self} that is not contained in C{other}. 1244 1245 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 1246 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1247 >>> sorted(x.subtract(y).collect()) 1248 [('a', 1), ('b', 4), ('b', 5)] 1249 """ 1250 rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder 1251 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
1252
1253 - def keyBy(self, f):
1254 """ 1255 Creates tuples of the elements in this RDD by applying C{f}. 1256 1257 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 1258 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 1259 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) 1260 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 1261 """ 1262 return self.map(lambda x: (f(x), x))
1263
1264 - def repartition(self, numPartitions):
1265 """ 1266 Return a new RDD that has exactly numPartitions partitions. 1267 1268 Can increase or decrease the level of parallelism in this RDD. Internally, this uses 1269 a shuffle to redistribute data. 1270 If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 1271 which can avoid performing a shuffle. 1272 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 1273 >>> sorted(rdd.glom().collect()) 1274 [[1], [2, 3], [4, 5], [6, 7]] 1275 >>> len(rdd.repartition(2).glom().collect()) 1276 2 1277 >>> len(rdd.repartition(10).glom().collect()) 1278 10 1279 """ 1280 jrdd = self._jrdd.repartition(numPartitions) 1281 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1282
1283 - def coalesce(self, numPartitions, shuffle=False):
1284 """ 1285 Return a new RDD that is reduced into `numPartitions` partitions. 1286 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 1287 [[1], [2, 3], [4, 5]] 1288 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 1289 [[1, 2, 3, 4, 5]] 1290 """ 1291 jrdd = self._jrdd.coalesce(numPartitions) 1292 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1293
1294 - def zip(self, other):
1295 """ 1296 Zips this RDD with another one, returning key-value pairs with the first element in each RDD 1297 second element in each RDD, etc. Assumes that the two RDDs have the same number of 1298 partitions and the same number of elements in each partition (e.g. one was made through 1299 a map on the other). 1300 1301 >>> x = sc.parallelize(range(0,5)) 1302 >>> y = sc.parallelize(range(1000, 1005)) 1303 >>> x.zip(y).collect() 1304 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 1305 """ 1306 pairRDD = self._jrdd.zip(other._jrdd) 1307 deserializer = PairDeserializer(self._jrdd_deserializer, 1308 other._jrdd_deserializer) 1309 return RDD(pairRDD, self.ctx, deserializer)
1310
1311 - def name(self):
1312 """ 1313 Return the name of this RDD. 1314 """ 1315 name_ = self._jrdd.name() 1316 if not name_: 1317 return None 1318 return name_.encode('utf-8')
1319
1320 - def setName(self, name):
1321 """ 1322 Assign a name to this RDD. 1323 >>> rdd1 = sc.parallelize([1,2]) 1324 >>> rdd1.setName('RDD1') 1325 >>> rdd1.name() 1326 'RDD1' 1327 """ 1328 self._jrdd.setName(name)
1329
1330 - def toDebugString(self):
1331 """ 1332 A description of this RDD and its recursive dependencies for debugging. 1333 """ 1334 debug_string = self._jrdd.toDebugString() 1335 if not debug_string: 1336 return None 1337 return debug_string.encode('utf-8')
1338
1339 - def getStorageLevel(self):
1340 """ 1341 Get the RDD's current storage level. 1342 >>> rdd1 = sc.parallelize([1,2]) 1343 >>> rdd1.getStorageLevel() 1344 StorageLevel(False, False, False, False, 1) 1345 """ 1346 java_storage_level = self._jrdd.getStorageLevel() 1347 storage_level = StorageLevel(java_storage_level.useDisk(), 1348 java_storage_level.useMemory(), 1349 java_storage_level.useOffHeap(), 1350 java_storage_level.deserialized(), 1351 java_storage_level.replication()) 1352 return storage_level
1353
1354 # TODO: `lookup` is disabled because we can't make direct comparisons based 1355 # on the key; we need to compare the hash of the key to the hash of the 1356 # keys in the pairs. This could be an expensive operation, since those 1357 # hashes aren't retained. 1358 1359 -class PipelinedRDD(RDD):
1360 """ 1361 Pipelined maps: 1362 >>> rdd = sc.parallelize([1, 2, 3, 4]) 1363 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 1364 [4, 8, 12, 16] 1365 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 1366 [4, 8, 12, 16] 1367 1368 Pipelined reduces: 1369 >>> from operator import add 1370 >>> rdd.map(lambda x: 2 * x).reduce(add) 1371 20 1372 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 1373 20 1374 """
1375 - def __init__(self, prev, func, preservesPartitioning=False):
1376 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 1377 # This transformation is the first in its stage: 1378 self.func = func 1379 self.preservesPartitioning = preservesPartitioning 1380 self._prev_jrdd = prev._jrdd 1381 self._prev_jrdd_deserializer = prev._jrdd_deserializer 1382 else: 1383 prev_func = prev.func 1384 def pipeline_func(split, iterator): 1385 return func(split, prev_func(split, iterator))
1386 self.func = pipeline_func 1387 self.preservesPartitioning = \ 1388 prev.preservesPartitioning and preservesPartitioning 1389 self._prev_jrdd = prev._prev_jrdd # maintain the pipeline 1390 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 1391 self.is_cached = False 1392 self.is_checkpointed = False 1393 self.ctx = prev.ctx 1394 self.prev = prev 1395 self._jrdd_val = None 1396 self._jrdd_deserializer = self.ctx.serializer 1397 self._bypass_serializer = False
1398 1399 @property
1400 - def _jrdd(self):
1401 if self._jrdd_val: 1402 return self._jrdd_val 1403 if self._bypass_serializer: 1404 serializer = NoOpSerializer() 1405 else: 1406 serializer = self.ctx.serializer 1407 command = (self.func, self._prev_jrdd_deserializer, serializer) 1408 pickled_command = CloudPickleSerializer().dumps(command) 1409 broadcast_vars = ListConverter().convert( 1410 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 1411 self.ctx._gateway._gateway_client) 1412 self.ctx._pickled_broadcast_vars.clear() 1413 class_tag = self._prev_jrdd.classTag() 1414 env = MapConverter().convert(self.ctx.environment, 1415 self.ctx._gateway._gateway_client) 1416 includes = ListConverter().convert(self.ctx._python_includes, 1417 self.ctx._gateway._gateway_client) 1418 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 1419 bytearray(pickled_command), env, includes, self.preservesPartitioning, 1420 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 1421 class_tag) 1422 self._jrdd_val = python_rdd.asJavaRDD() 1423 return self._jrdd_val
1424
1425 - def _is_pipelinable(self):
1426 return not (self.is_cached or self.is_checkpointed)
1427
1428 1429 -def _test():
1430 import doctest 1431 from pyspark.context import SparkContext 1432 globs = globals().copy() 1433 # The small batch size here ensures that we see multiple batches, 1434 # even in these small test examples: 1435 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 1436 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 1437 globs['sc'].stop() 1438 if failure_count: 1439 exit(-1)
1440 1441 1442 if __name__ == "__main__": 1443 _test() 1444