Package pyspark :: Module rdd
[frames] | no frames]

Source Code for Module pyspark.rdd

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one or more 
   3  # contributor license agreements.  See the NOTICE file distributed with 
   4  # this work for additional information regarding copyright ownership. 
   5  # The ASF licenses this file to You under the Apache License, Version 2.0 
   6  # (the "License"); you may not use this file except in compliance with 
   7  # the License.  You may obtain a copy of the License at 
   8  # 
   9  #    http://www.apache.org/licenses/LICENSE-2.0 
  10  # 
  11  # Unless required by applicable law or agreed to in writing, software 
  12  # distributed under the License is distributed on an "AS IS" BASIS, 
  13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14  # See the License for the specific language governing permissions and 
  15  # limitations under the License. 
  16  # 
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from itertools import chain, ifilter, imap 
  22  import operator 
  23  import os 
  24  import sys 
  25  import shlex 
  26  import traceback 
  27  from subprocess import Popen, PIPE 
  28  from tempfile import NamedTemporaryFile 
  29  from threading import Thread 
  30  import warnings 
  31   
  32  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  33      BatchedSerializer, CloudPickleSerializer, pack_long 
  34  from pyspark.join import python_join, python_left_outer_join, \ 
  35      python_right_outer_join, python_cogroup 
  36  from pyspark.statcounter import StatCounter 
  37  from pyspark.rddsampler import RDDSampler 
  38   
  39  from py4j.java_collections import ListConverter, MapConverter 
  40   
  41   
  42  __all__ = ["RDD"] 
43 44 -def _extract_concise_traceback():
45 tb = traceback.extract_stack() 46 if len(tb) == 0: 47 return "I'm lost!" 48 # HACK: This function is in a file called 'rdd.py' in the top level of 49 # everything PySpark. Just trim off the directory name and assume 50 # everything in that tree is PySpark guts. 51 file, line, module, what = tb[len(tb) - 1] 52 sparkpath = os.path.dirname(file) 53 first_spark_frame = len(tb) - 1 54 for i in range(0, len(tb)): 55 file, line, fun, what = tb[i] 56 if file.startswith(sparkpath): 57 first_spark_frame = i 58 break 59 if first_spark_frame == 0: 60 file, line, fun, what = tb[0] 61 return "%s at %s:%d" % (fun, file, line) 62 sfile, sline, sfun, swhat = tb[first_spark_frame] 63 ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 64 return "%s at %s:%d" % (sfun, ufile, uline)
65 66 _spark_stack_depth = 0
67 68 -class _JavaStackTrace(object):
69 - def __init__(self, sc):
70 self._traceback = _extract_concise_traceback() 71 self._context = sc
72
73 - def __enter__(self):
74 global _spark_stack_depth 75 if _spark_stack_depth == 0: 76 self._context._jsc.setCallSite(self._traceback) 77 _spark_stack_depth += 1
78
79 - def __exit__(self, type, value, tb):
80 global _spark_stack_depth 81 _spark_stack_depth -= 1 82 if _spark_stack_depth == 0: 83 self._context._jsc.setCallSite(None)
84
85 -class RDD(object):
86 """ 87 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 88 Represents an immutable, partitioned collection of elements that can be 89 operated on in parallel. 90 """ 91
92 - def __init__(self, jrdd, ctx, jrdd_deserializer):
93 self._jrdd = jrdd 94 self.is_cached = False 95 self.is_checkpointed = False 96 self.ctx = ctx 97 self._jrdd_deserializer = jrdd_deserializer
98
99 - def __repr__(self):
100 return self._jrdd.toString()
101 102 @property
103 - def context(self):
104 """ 105 The L{SparkContext} that this RDD was created on. 106 """ 107 return self.ctx
108
109 - def cache(self):
110 """ 111 Persist this RDD with the default storage level (C{MEMORY_ONLY}). 112 """ 113 self.is_cached = True 114 self._jrdd.cache() 115 return self
116
117 - def persist(self, storageLevel):
118 """ 119 Set this RDD's storage level to persist its values across operations after the first time 120 it is computed. This can only be used to assign a new storage level if the RDD does not 121 have a storage level set yet. 122 """ 123 self.is_cached = True 124 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 125 self._jrdd.persist(javaStorageLevel) 126 return self
127
128 - def unpersist(self):
129 """ 130 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 131 """ 132 self.is_cached = False 133 self._jrdd.unpersist() 134 return self
135
136 - def checkpoint(self):
137 """ 138 Mark this RDD for checkpointing. It will be saved to a file inside the 139 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 140 all references to its parent RDDs will be removed. This function must 141 be called before any job has been executed on this RDD. It is strongly 142 recommended that this RDD is persisted in memory, otherwise saving it 143 on a file will require recomputation. 144 """ 145 self.is_checkpointed = True 146 self._jrdd.rdd().checkpoint()
147
148 - def isCheckpointed(self):
149 """ 150 Return whether this RDD has been checkpointed or not 151 """ 152 return self._jrdd.rdd().isCheckpointed()
153
154 - def getCheckpointFile(self):
155 """ 156 Gets the name of the file to which this RDD was checkpointed 157 """ 158 checkpointFile = self._jrdd.rdd().getCheckpointFile() 159 if checkpointFile.isDefined(): 160 return checkpointFile.get() 161 else: 162 return None
163
164 - def map(self, f, preservesPartitioning=False):
165 """ 166 Return a new RDD containing the distinct elements in this RDD. 167 """ 168 def func(split, iterator): return imap(f, iterator) 169 return PipelinedRDD(self, func, preservesPartitioning)
170
171 - def flatMap(self, f, preservesPartitioning=False):
172 """ 173 Return a new RDD by first applying a function to all elements of this 174 RDD, and then flattening the results. 175 176 >>> rdd = sc.parallelize([2, 3, 4]) 177 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 178 [1, 1, 1, 2, 2, 3] 179 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 180 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 181 """ 182 def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 183 return self.mapPartitionsWithIndex(func, preservesPartitioning)
184
185 - def mapPartitions(self, f, preservesPartitioning=False):
186 """ 187 Return a new RDD by applying a function to each partition of this RDD. 188 189 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 190 >>> def f(iterator): yield sum(iterator) 191 >>> rdd.mapPartitions(f).collect() 192 [3, 7] 193 """ 194 def func(s, iterator): return f(iterator) 195 return self.mapPartitionsWithIndex(func)
196
197 - def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
198 """ 199 Return a new RDD by applying a function to each partition of this RDD, 200 while tracking the index of the original partition. 201 202 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 203 >>> def f(splitIndex, iterator): yield splitIndex 204 >>> rdd.mapPartitionsWithIndex(f).sum() 205 6 206 """ 207 return PipelinedRDD(self, f, preservesPartitioning)
208
209 - def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
210 """ 211 Deprecated: use mapPartitionsWithIndex instead. 212 213 Return a new RDD by applying a function to each partition of this RDD, 214 while tracking the index of the original partition. 215 216 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 217 >>> def f(splitIndex, iterator): yield splitIndex 218 >>> rdd.mapPartitionsWithSplit(f).sum() 219 6 220 """ 221 warnings.warn("mapPartitionsWithSplit is deprecated; " 222 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 223 return self.mapPartitionsWithIndex(f, preservesPartitioning)
224
225 - def filter(self, f):
226 """ 227 Return a new RDD containing only the elements that satisfy a predicate. 228 229 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 230 >>> rdd.filter(lambda x: x % 2 == 0).collect() 231 [2, 4] 232 """ 233 def func(iterator): return ifilter(f, iterator) 234 return self.mapPartitions(func)
235
236 - def distinct(self):
237 """ 238 Return a new RDD containing the distinct elements in this RDD. 239 240 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 241 [1, 2, 3] 242 """ 243 return self.map(lambda x: (x, None)) \ 244 .reduceByKey(lambda x, _: x) \ 245 .map(lambda (x, _): x)
246
247 - def sample(self, withReplacement, fraction, seed):
248 """ 249 Return a sampled subset of this RDD (relies on numpy and falls back 250 on default random generator if numpy is unavailable). 251 252 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 253 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 254 """ 255 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
256 257 # this is ported from scala/spark/RDD.scala
258 - def takeSample(self, withReplacement, num, seed):
259 """ 260 Return a fixed-size sampled subset of this RDD (currently requires numpy). 261 262 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 263 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 264 """ 265 266 fraction = 0.0 267 total = 0 268 multiplier = 3.0 269 initialCount = self.count() 270 maxSelected = 0 271 272 if (num < 0): 273 raise ValueError 274 275 if initialCount > sys.maxint - 1: 276 maxSelected = sys.maxint - 1 277 else: 278 maxSelected = initialCount 279 280 if num > initialCount and not withReplacement: 281 total = maxSelected 282 fraction = multiplier * (maxSelected + 1) / initialCount 283 else: 284 fraction = multiplier * (num + 1) / initialCount 285 total = num 286 287 samples = self.sample(withReplacement, fraction, seed).collect() 288 289 # If the first sample didn't turn out large enough, keep trying to take samples; 290 # this shouldn't happen often because we use a big multiplier for their initial size. 291 # See: scala/spark/RDD.scala 292 while len(samples) < total: 293 if seed > sys.maxint - 2: 294 seed = -1 295 seed += 1 296 samples = self.sample(withReplacement, fraction, seed).collect() 297 298 sampler = RDDSampler(withReplacement, fraction, seed+1) 299 sampler.shuffle(samples) 300 return samples[0:total]
301
302 - def union(self, other):
303 """ 304 Return the union of this RDD and another one. 305 306 >>> rdd = sc.parallelize([1, 1, 2, 3]) 307 >>> rdd.union(rdd).collect() 308 [1, 1, 2, 3, 1, 1, 2, 3] 309 """ 310 if self._jrdd_deserializer == other._jrdd_deserializer: 311 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 312 self._jrdd_deserializer) 313 return rdd 314 else: 315 # These RDDs contain data in different serialized formats, so we 316 # must normalize them to the default serializer. 317 self_copy = self._reserialize() 318 other_copy = other._reserialize() 319 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 320 self.ctx.serializer)
321
322 - def _reserialize(self):
323 if self._jrdd_deserializer == self.ctx.serializer: 324 return self 325 else: 326 return self.map(lambda x: x, preservesPartitioning=True)
327
328 - def __add__(self, other):
329 """ 330 Return the union of this RDD and another one. 331 332 >>> rdd = sc.parallelize([1, 1, 2, 3]) 333 >>> (rdd + rdd).collect() 334 [1, 1, 2, 3, 1, 1, 2, 3] 335 """ 336 if not isinstance(other, RDD): 337 raise TypeError 338 return self.union(other)
339
340 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
341 """ 342 Sorts this RDD, which is assumed to consist of (key, value) pairs. 343 344 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 345 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 346 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 347 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 348 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 349 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 350 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 351 """ 352 if numPartitions is None: 353 numPartitions = self.ctx.defaultParallelism 354 355 bounds = list() 356 357 # first compute the boundary of each part via sampling: we want to partition 358 # the key-space into bins such that the bins have roughly the same 359 # number of (key, value) pairs falling into them 360 if numPartitions > 1: 361 rddSize = self.count() 362 maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner 363 fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 364 365 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 366 samples = sorted(samples, reverse=(not ascending), key=keyfunc) 367 368 # we have numPartitions many parts but one of the them has 369 # an implicit boundary 370 for i in range(0, numPartitions - 1): 371 index = (len(samples) - 1) * (i + 1) / numPartitions 372 bounds.append(samples[index]) 373 374 def rangePartitionFunc(k): 375 p = 0 376 while p < len(bounds) and keyfunc(k) > bounds[p]: 377 p += 1 378 if ascending: 379 return p 380 else: 381 return numPartitions-1-p
382 383 def mapFunc(iterator): 384 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
385 386 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 387 .mapPartitions(mapFunc,preservesPartitioning=True) 388 .flatMap(lambda x: x, preservesPartitioning=True)) 389
390 - def glom(self):
391 """ 392 Return an RDD created by coalescing all elements within each partition 393 into a list. 394 395 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 396 >>> sorted(rdd.glom().collect()) 397 [[1, 2], [3, 4]] 398 """ 399 def func(iterator): yield list(iterator) 400 return self.mapPartitions(func)
401
402 - def cartesian(self, other):
403 """ 404 Return the Cartesian product of this RDD and another one, that is, the 405 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 406 C{b} is in C{other}. 407 408 >>> rdd = sc.parallelize([1, 2]) 409 >>> sorted(rdd.cartesian(rdd).collect()) 410 [(1, 1), (1, 2), (2, 1), (2, 2)] 411 """ 412 # Due to batching, we can't use the Java cartesian method. 413 deserializer = CartesianDeserializer(self._jrdd_deserializer, 414 other._jrdd_deserializer) 415 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
416
417 - def groupBy(self, f, numPartitions=None):
418 """ 419 Return an RDD of grouped items. 420 421 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 422 >>> result = rdd.groupBy(lambda x: x % 2).collect() 423 >>> sorted([(x, sorted(y)) for (x, y) in result]) 424 [(0, [2, 8]), (1, [1, 1, 3, 5])] 425 """ 426 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
427
428 - def pipe(self, command, env={}):
429 """ 430 Return an RDD created by piping elements to a forked external process. 431 432 >>> sc.parallelize([1, 2, 3]).pipe('cat').collect() 433 ['1', '2', '3'] 434 """ 435 def func(iterator): 436 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 437 def pipe_objs(out): 438 for obj in iterator: 439 out.write(str(obj).rstrip('\n') + '\n') 440 out.close()
441 Thread(target=pipe_objs, args=[pipe.stdin]).start() 442 return (x.rstrip('\n') for x in pipe.stdout) 443 return self.mapPartitions(func) 444
445 - def foreach(self, f):
446 """ 447 Applies a function to all elements of this RDD. 448 449 >>> def f(x): print x 450 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 451 """ 452 def processPartition(iterator): 453 for x in iterator: 454 f(x) 455 yield None
456 self.mapPartitions(processPartition).collect() # Force evaluation 457
458 - def collect(self):
459 """ 460 Return a list that contains all of the elements in this RDD. 461 """ 462 with _JavaStackTrace(self.context) as st: 463 bytesInJava = self._jrdd.collect().iterator() 464 return list(self._collect_iterator_through_file(bytesInJava))
465
466 - def _collect_iterator_through_file(self, iterator):
467 # Transferring lots of data through Py4J can be slow because 468 # socket.readline() is inefficient. Instead, we'll dump the data to a 469 # file and read it back. 470 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 471 tempFile.close() 472 self.ctx._writeToFile(iterator, tempFile.name) 473 # Read the data into Python and deserialize it: 474 with open(tempFile.name, 'rb') as tempFile: 475 for item in self._jrdd_deserializer.load_stream(tempFile): 476 yield item 477 os.unlink(tempFile.name)
478
479 - def reduce(self, f):
480 """ 481 Reduces the elements of this RDD using the specified commutative and 482 associative binary operator. 483 484 >>> from operator import add 485 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 486 15 487 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 488 10 489 """ 490 def func(iterator): 491 acc = None 492 for obj in iterator: 493 if acc is None: 494 acc = obj 495 else: 496 acc = f(obj, acc) 497 if acc is not None: 498 yield acc
499 vals = self.mapPartitions(func).collect() 500 return reduce(f, vals) 501
502 - def fold(self, zeroValue, op):
503 """ 504 Aggregate the elements of each partition, and then the results for all 505 the partitions, using a given associative function and a neutral "zero 506 value." 507 508 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 509 as its result value to avoid object allocation; however, it should not 510 modify C{t2}. 511 512 >>> from operator import add 513 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 514 15 515 """ 516 def func(iterator): 517 acc = zeroValue 518 for obj in iterator: 519 acc = op(obj, acc) 520 yield acc
521 vals = self.mapPartitions(func).collect() 522 return reduce(op, vals, zeroValue) 523 524 # TODO: aggregate 525
526 - def sum(self):
527 """ 528 Add up the elements in this RDD. 529 530 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 531 6.0 532 """ 533 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
534
535 - def count(self):
536 """ 537 Return the number of elements in this RDD. 538 539 >>> sc.parallelize([2, 3, 4]).count() 540 3 541 """ 542 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
543
544 - def stats(self):
545 """ 546 Return a L{StatCounter} object that captures the mean, variance 547 and count of the RDD's elements in one operation. 548 """ 549 def redFunc(left_counter, right_counter): 550 return left_counter.mergeStats(right_counter)
551 552 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 553
554 - def mean(self):
555 """ 556 Compute the mean of this RDD's elements. 557 558 >>> sc.parallelize([1, 2, 3]).mean() 559 2.0 560 """ 561 return self.stats().mean()
562
563 - def variance(self):
564 """ 565 Compute the variance of this RDD's elements. 566 567 >>> sc.parallelize([1, 2, 3]).variance() 568 0.666... 569 """ 570 return self.stats().variance()
571
572 - def stdev(self):
573 """ 574 Compute the standard deviation of this RDD's elements. 575 576 >>> sc.parallelize([1, 2, 3]).stdev() 577 0.816... 578 """ 579 return self.stats().stdev()
580
581 - def sampleStdev(self):
582 """ 583 Compute the sample standard deviation of this RDD's elements (which corrects for bias in 584 estimating the standard deviation by dividing by N-1 instead of N). 585 586 >>> sc.parallelize([1, 2, 3]).sampleStdev() 587 1.0 588 """ 589 return self.stats().sampleStdev()
590
591 - def sampleVariance(self):
592 """ 593 Compute the sample variance of this RDD's elements (which corrects for bias in 594 estimating the variance by dividing by N-1 instead of N). 595 596 >>> sc.parallelize([1, 2, 3]).sampleVariance() 597 1.0 598 """ 599 return self.stats().sampleVariance()
600
601 - def countByValue(self):
602 """ 603 Return the count of each unique value in this RDD as a dictionary of 604 (value, count) pairs. 605 606 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 607 [(1, 2), (2, 3)] 608 """ 609 def countPartition(iterator): 610 counts = defaultdict(int) 611 for obj in iterator: 612 counts[obj] += 1 613 yield counts
614 def mergeMaps(m1, m2): 615 for (k, v) in m2.iteritems(): 616 m1[k] += v 617 return m1 618 return self.mapPartitions(countPartition).reduce(mergeMaps) 619
620 - def take(self, num):
621 """ 622 Take the first num elements of the RDD. 623 624 This currently scans the partitions *one by one*, so it will be slow if 625 a lot of partitions are required. In that case, use L{collect} to get 626 the whole RDD instead. 627 628 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 629 [2, 3] 630 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 631 [2, 3, 4, 5, 6] 632 """ 633 def takeUpToNum(iterator): 634 taken = 0 635 while taken < num: 636 yield next(iterator) 637 taken += 1
638 # Take only up to num elements from each partition we try 639 mapped = self.mapPartitions(takeUpToNum) 640 items = [] 641 # TODO(shivaram): Similar to the scala implementation, update the take 642 # method to scan multiple splits based on an estimate of how many elements 643 # we have per-split. 644 with _JavaStackTrace(self.context) as st: 645 for partition in range(mapped._jrdd.splits().size()): 646 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 647 partitionsToTake[0] = partition 648 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 649 items.extend(mapped._collect_iterator_through_file(iterator)) 650 if len(items) >= num: 651 break 652 return items[:num] 653
654 - def first(self):
655 """ 656 Return the first element in this RDD. 657 658 >>> sc.parallelize([2, 3, 4]).first() 659 2 660 """ 661 return self.take(1)[0]
662
663 - def saveAsTextFile(self, path):
664 """ 665 Save this RDD as a text file, using string representations of elements. 666 667 >>> tempFile = NamedTemporaryFile(delete=True) 668 >>> tempFile.close() 669 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 670 >>> from fileinput import input 671 >>> from glob import glob 672 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 673 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 674 """ 675 def func(split, iterator): 676 for x in iterator: 677 if not isinstance(x, basestring): 678 x = unicode(x) 679 yield x.encode("utf-8")
680 keyed = PipelinedRDD(self, func) 681 keyed._bypass_serializer = True 682 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 683 684 # Pair functions 685
686 - def collectAsMap(self):
687 """ 688 Return the key-value pairs in this RDD to the master as a dictionary. 689 690 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 691 >>> m[1] 692 2 693 >>> m[3] 694 4 695 """ 696 return dict(self.collect())
697
698 - def reduceByKey(self, func, numPartitions=None):
699 """ 700 Merge the values for each key using an associative reduce function. 701 702 This will also perform the merging locally on each mapper before 703 sending results to a reducer, similarly to a "combiner" in MapReduce. 704 705 Output will be hash-partitioned with C{numPartitions} partitions, or 706 the default parallelism level if C{numPartitions} is not specified. 707 708 >>> from operator import add 709 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 710 >>> sorted(rdd.reduceByKey(add).collect()) 711 [('a', 2), ('b', 1)] 712 """ 713 return self.combineByKey(lambda x: x, func, func, numPartitions)
714
715 - def reduceByKeyLocally(self, func):
716 """ 717 Merge the values for each key using an associative reduce function, but 718 return the results immediately to the master as a dictionary. 719 720 This will also perform the merging locally on each mapper before 721 sending results to a reducer, similarly to a "combiner" in MapReduce. 722 723 >>> from operator import add 724 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 725 >>> sorted(rdd.reduceByKeyLocally(add).items()) 726 [('a', 2), ('b', 1)] 727 """ 728 def reducePartition(iterator): 729 m = {} 730 for (k, v) in iterator: 731 m[k] = v if k not in m else func(m[k], v) 732 yield m
733 def mergeMaps(m1, m2): 734 for (k, v) in m2.iteritems(): 735 m1[k] = v if k not in m1 else func(m1[k], v) 736 return m1 737 return self.mapPartitions(reducePartition).reduce(mergeMaps) 738
739 - def countByKey(self):
740 """ 741 Count the number of elements for each key, and return the result to the 742 master as a dictionary. 743 744 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 745 >>> sorted(rdd.countByKey().items()) 746 [('a', 2), ('b', 1)] 747 """ 748 return self.map(lambda x: x[0]).countByValue()
749
750 - def join(self, other, numPartitions=None):
751 """ 752 Return an RDD containing all pairs of elements with matching keys in 753 C{self} and C{other}. 754 755 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 756 (k, v1) is in C{self} and (k, v2) is in C{other}. 757 758 Performs a hash join across the cluster. 759 760 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 761 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 762 >>> sorted(x.join(y).collect()) 763 [('a', (1, 2)), ('a', (1, 3))] 764 """ 765 return python_join(self, other, numPartitions)
766
767 - def leftOuterJoin(self, other, numPartitions=None):
768 """ 769 Perform a left outer join of C{self} and C{other}. 770 771 For each element (k, v) in C{self}, the resulting RDD will either 772 contain all pairs (k, (v, w)) for w in C{other}, or the pair 773 (k, (v, None)) if no elements in other have key k. 774 775 Hash-partitions the resulting RDD into the given number of partitions. 776 777 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 778 >>> y = sc.parallelize([("a", 2)]) 779 >>> sorted(x.leftOuterJoin(y).collect()) 780 [('a', (1, 2)), ('b', (4, None))] 781 """ 782 return python_left_outer_join(self, other, numPartitions)
783
784 - def rightOuterJoin(self, other, numPartitions=None):
785 """ 786 Perform a right outer join of C{self} and C{other}. 787 788 For each element (k, w) in C{other}, the resulting RDD will either 789 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 790 if no elements in C{self} have key k. 791 792 Hash-partitions the resulting RDD into the given number of partitions. 793 794 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 795 >>> y = sc.parallelize([("a", 2)]) 796 >>> sorted(y.rightOuterJoin(x).collect()) 797 [('a', (2, 1)), ('b', (None, 4))] 798 """ 799 return python_right_outer_join(self, other, numPartitions)
800 801 # TODO: add option to control map-side combining
802 - def partitionBy(self, numPartitions, partitionFunc=hash):
803 """ 804 Return a copy of the RDD partitioned using the specified partitioner. 805 806 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 807 >>> sets = pairs.partitionBy(2).glom().collect() 808 >>> set(sets[0]).intersection(set(sets[1])) 809 set([]) 810 """ 811 if numPartitions is None: 812 numPartitions = self.ctx.defaultParallelism 813 # Transferring O(n) objects to Java is too expensive. Instead, we'll 814 # form the hash buckets in Python, transferring O(numPartitions) objects 815 # to Java. Each object is a (splitNumber, [objects]) pair. 816 outputSerializer = self.ctx._unbatched_serializer 817 def add_shuffle_key(split, iterator): 818 819 buckets = defaultdict(list) 820 821 for (k, v) in iterator: 822 buckets[partitionFunc(k) % numPartitions].append((k, v)) 823 for (split, items) in buckets.iteritems(): 824 yield pack_long(split) 825 yield outputSerializer.dumps(items)
826 keyed = PipelinedRDD(self, add_shuffle_key) 827 keyed._bypass_serializer = True 828 with _JavaStackTrace(self.context) as st: 829 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 830 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 831 id(partitionFunc)) 832 jrdd = pairRDD.partitionBy(partitioner).values() 833 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 834 # This is required so that id(partitionFunc) remains unique, even if 835 # partitionFunc is a lambda: 836 rdd._partitionFunc = partitionFunc 837 return rdd 838 839 # TODO: add control over map-side aggregation
840 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 841 numPartitions=None):
842 """ 843 Generic function to combine the elements for each key using a custom 844 set of aggregation functions. 845 846 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 847 type" C. Note that V and C can be different -- for example, one might 848 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 849 850 Users provide three functions: 851 852 - C{createCombiner}, which turns a V into a C (e.g., creates 853 a one-element list) 854 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 855 a list) 856 - C{mergeCombiners}, to combine two C's into a single one. 857 858 In addition, users can control the partitioning of the output RDD. 859 860 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 861 >>> def f(x): return x 862 >>> def add(a, b): return a + str(b) 863 >>> sorted(x.combineByKey(str, add, add).collect()) 864 [('a', '11'), ('b', '1')] 865 """ 866 if numPartitions is None: 867 numPartitions = self.ctx.defaultParallelism 868 def combineLocally(iterator): 869 combiners = {} 870 for x in iterator: 871 (k, v) = x 872 if k not in combiners: 873 combiners[k] = createCombiner(v) 874 else: 875 combiners[k] = mergeValue(combiners[k], v) 876 return combiners.iteritems()
877 locally_combined = self.mapPartitions(combineLocally) 878 shuffled = locally_combined.partitionBy(numPartitions) 879 def _mergeCombiners(iterator): 880 combiners = {} 881 for (k, v) in iterator: 882 if not k in combiners: 883 combiners[k] = v 884 else: 885 combiners[k] = mergeCombiners(combiners[k], v) 886 return combiners.iteritems() 887 return shuffled.mapPartitions(_mergeCombiners) 888 889 # TODO: support variant with custom partitioner
890 - def groupByKey(self, numPartitions=None):
891 """ 892 Group the values for each key in the RDD into a single sequence. 893 Hash-partitions the resulting RDD with into numPartitions partitions. 894 895 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 896 >>> sorted(x.groupByKey().collect()) 897 [('a', [1, 1]), ('b', [1])] 898 """ 899 900 def createCombiner(x): 901 return [x]
902 903 def mergeValue(xs, x): 904 xs.append(x) 905 return xs 906 907 def mergeCombiners(a, b): 908 return a + b 909 910 return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 911 numPartitions) 912 913 # TODO: add tests
914 - def flatMapValues(self, f):
915 """ 916 Pass each value in the key-value pair RDD through a flatMap function 917 without changing the keys; this also retains the original RDD's 918 partitioning. 919 """ 920 flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 921 return self.flatMap(flat_map_fn, preservesPartitioning=True)
922
923 - def mapValues(self, f):
924 """ 925 Pass each value in the key-value pair RDD through a map function 926 without changing the keys; this also retains the original RDD's 927 partitioning. 928 """ 929 map_values_fn = lambda (k, v): (k, f(v)) 930 return self.map(map_values_fn, preservesPartitioning=True)
931 932 # TODO: support varargs cogroup of several RDDs.
933 - def groupWith(self, other):
934 """ 935 Alias for cogroup. 936 """ 937 return self.cogroup(other)
938 939 # TODO: add variant with custom parittioner
940 - def cogroup(self, other, numPartitions=None):
941 """ 942 For each key k in C{self} or C{other}, return a resulting RDD that 943 contains a tuple with the list of values for that key in C{self} as well 944 as C{other}. 945 946 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 947 >>> y = sc.parallelize([("a", 2)]) 948 >>> sorted(x.cogroup(y).collect()) 949 [('a', ([1], [2])), ('b', ([4], []))] 950 """ 951 return python_cogroup(self, other, numPartitions)
952
953 - def subtractByKey(self, other, numPartitions=None):
954 """ 955 Return each (key, value) pair in C{self} that has no pair with matching key 956 in C{other}. 957 958 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 959 >>> y = sc.parallelize([("a", 3), ("c", None)]) 960 >>> sorted(x.subtractByKey(y).collect()) 961 [('b', 4), ('b', 5)] 962 """ 963 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 964 map_func = lambda (key, vals): [(key, val) for val in vals[0]] 965 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
966
967 - def subtract(self, other, numPartitions=None):
968 """ 969 Return each value in C{self} that is not contained in C{other}. 970 971 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 972 >>> y = sc.parallelize([("a", 3), ("c", None)]) 973 >>> sorted(x.subtract(y).collect()) 974 [('a', 1), ('b', 4), ('b', 5)] 975 """ 976 rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder 977 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
978
979 - def keyBy(self, f):
980 """ 981 Creates tuples of the elements in this RDD by applying C{f}. 982 983 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 984 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 985 >>> sorted(x.cogroup(y).collect()) 986 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 987 """ 988 return self.map(lambda x: (f(x), x))
989
990 # TODO: `lookup` is disabled because we can't make direct comparisons based 991 # on the key; we need to compare the hash of the key to the hash of the 992 # keys in the pairs. This could be an expensive operation, since those 993 # hashes aren't retained. 994 995 996 -class PipelinedRDD(RDD):
997 """ 998 Pipelined maps: 999 >>> rdd = sc.parallelize([1, 2, 3, 4]) 1000 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 1001 [4, 8, 12, 16] 1002 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 1003 [4, 8, 12, 16] 1004 1005 Pipelined reduces: 1006 >>> from operator import add 1007 >>> rdd.map(lambda x: 2 * x).reduce(add) 1008 20 1009 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 1010 20 1011 """
1012 - def __init__(self, prev, func, preservesPartitioning=False):
1013 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 1014 # This transformation is the first in its stage: 1015 self.func = func 1016 self.preservesPartitioning = preservesPartitioning 1017 self._prev_jrdd = prev._jrdd 1018 self._prev_jrdd_deserializer = prev._jrdd_deserializer 1019 else: 1020 prev_func = prev.func 1021 def pipeline_func(split, iterator): 1022 return func(split, prev_func(split, iterator))
1023 self.func = pipeline_func 1024 self.preservesPartitioning = \ 1025 prev.preservesPartitioning and preservesPartitioning 1026 self._prev_jrdd = prev._prev_jrdd # maintain the pipeline 1027 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 1028 self.is_cached = False 1029 self.is_checkpointed = False 1030 self.ctx = prev.ctx 1031 self.prev = prev 1032 self._jrdd_val = None 1033 self._jrdd_deserializer = self.ctx.serializer 1034 self._bypass_serializer = False
1035 1036 @property
1037 - def _jrdd(self):
1038 if self._jrdd_val: 1039 return self._jrdd_val 1040 if self._bypass_serializer: 1041 serializer = NoOpSerializer() 1042 else: 1043 serializer = self.ctx.serializer 1044 command = (self.func, self._prev_jrdd_deserializer, serializer) 1045 pickled_command = CloudPickleSerializer().dumps(command) 1046 broadcast_vars = ListConverter().convert( 1047 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 1048 self.ctx._gateway._gateway_client) 1049 self.ctx._pickled_broadcast_vars.clear() 1050 class_tag = self._prev_jrdd.classTag() 1051 env = MapConverter().convert(self.ctx.environment, 1052 self.ctx._gateway._gateway_client) 1053 includes = ListConverter().convert(self.ctx._python_includes, 1054 self.ctx._gateway._gateway_client) 1055 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 1056 bytearray(pickled_command), env, includes, self.preservesPartitioning, 1057 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 1058 class_tag) 1059 self._jrdd_val = python_rdd.asJavaRDD() 1060 return self._jrdd_val
1061
1062 - def _is_pipelinable(self):
1063 return not (self.is_cached or self.is_checkpointed)
1064
1065 1066 -def _test():
1067 import doctest 1068 from pyspark.context import SparkContext 1069 globs = globals().copy() 1070 # The small batch size here ensures that we see multiple batches, 1071 # even in these small test examples: 1072 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 1073 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 1074 globs['sc'].stop() 1075 if failure_count: 1076 exit(-1)
1077 1078 1079 if __name__ == "__main__": 1080 _test() 1081