Package pyspark :: Module rdd
[frames] | no frames]

Source Code for Module pyspark.rdd

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one or more 
   3  # contributor license agreements.  See the NOTICE file distributed with 
   4  # this work for additional information regarding copyright ownership. 
   5  # The ASF licenses this file to You under the Apache License, Version 2.0 
   6  # (the "License"); you may not use this file except in compliance with 
   7  # the License.  You may obtain a copy of the License at 
   8  # 
   9  #    http://www.apache.org/licenses/LICENSE-2.0 
  10  # 
  11  # Unless required by applicable law or agreed to in writing, software 
  12  # distributed under the License is distributed on an "AS IS" BASIS, 
  13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14  # See the License for the specific language governing permissions and 
  15  # limitations under the License. 
  16  # 
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from itertools import chain, ifilter, imap 
  22  import operator 
  23  import os 
  24  import sys 
  25  import shlex 
  26  import traceback 
  27  from subprocess import Popen, PIPE 
  28  from tempfile import NamedTemporaryFile 
  29  from threading import Thread 
  30  import warnings 
  31  import heapq 
  32   
  33  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  34      BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long 
  35  from pyspark.join import python_join, python_left_outer_join, \ 
  36      python_right_outer_join, python_cogroup 
  37  from pyspark.statcounter import StatCounter 
  38  from pyspark.rddsampler import RDDSampler 
  39  from pyspark.storagelevel import StorageLevel 
  40   
  41  from py4j.java_collections import ListConverter, MapConverter 
  42   
  43  __all__ = ["RDD"] 
44 45 46 -def _extract_concise_traceback():
47 tb = traceback.extract_stack() 48 if len(tb) == 0: 49 return "I'm lost!" 50 # HACK: This function is in a file called 'rdd.py' in the top level of 51 # everything PySpark. Just trim off the directory name and assume 52 # everything in that tree is PySpark guts. 53 file, line, module, what = tb[len(tb) - 1] 54 sparkpath = os.path.dirname(file) 55 first_spark_frame = len(tb) - 1 56 for i in range(0, len(tb)): 57 file, line, fun, what = tb[i] 58 if file.startswith(sparkpath): 59 first_spark_frame = i 60 break 61 if first_spark_frame == 0: 62 file, line, fun, what = tb[0] 63 return "%s at %s:%d" % (fun, file, line) 64 sfile, sline, sfun, swhat = tb[first_spark_frame] 65 ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 66 return "%s at %s:%d" % (sfun, ufile, uline)
67 68 _spark_stack_depth = 0
69 70 -class _JavaStackTrace(object):
71 - def __init__(self, sc):
72 self._traceback = _extract_concise_traceback() 73 self._context = sc
74
75 - def __enter__(self):
76 global _spark_stack_depth 77 if _spark_stack_depth == 0: 78 self._context._jsc.setCallSite(self._traceback) 79 _spark_stack_depth += 1
80
81 - def __exit__(self, type, value, tb):
82 global _spark_stack_depth 83 _spark_stack_depth -= 1 84 if _spark_stack_depth == 0: 85 self._context._jsc.setCallSite(None)
86
87 -class MaxHeapQ(object):
88 """ 89 An implementation of MaxHeap. 90 >>> import pyspark.rdd 91 >>> heap = pyspark.rdd.MaxHeapQ(5) 92 >>> [heap.insert(i) for i in range(10)] 93 [None, None, None, None, None, None, None, None, None, None] 94 >>> sorted(heap.getElements()) 95 [0, 1, 2, 3, 4] 96 >>> heap = pyspark.rdd.MaxHeapQ(5) 97 >>> [heap.insert(i) for i in range(9, -1, -1)] 98 [None, None, None, None, None, None, None, None, None, None] 99 >>> sorted(heap.getElements()) 100 [0, 1, 2, 3, 4] 101 >>> heap = pyspark.rdd.MaxHeapQ(1) 102 >>> [heap.insert(i) for i in range(9, -1, -1)] 103 [None, None, None, None, None, None, None, None, None, None] 104 >>> heap.getElements() 105 [0] 106 """ 107
108 - def __init__(self, maxsize):
109 # we start from q[1], this makes calculating children as trivial as 2 * k 110 self.q = [0] 111 self.maxsize = maxsize
112
113 - def _swim(self, k):
114 while (k > 1) and (self.q[k/2] < self.q[k]): 115 self._swap(k, k/2) 116 k = k/2
117
118 - def _swap(self, i, j):
119 t = self.q[i] 120 self.q[i] = self.q[j] 121 self.q[j] = t
122
123 - def _sink(self, k):
124 N = self.size() 125 while 2 * k <= N: 126 j = 2 * k 127 # Here we test if both children are greater than parent 128 # if not swap with larger one. 129 if j < N and self.q[j] < self.q[j + 1]: 130 j = j + 1 131 if(self.q[k] > self.q[j]): 132 break 133 self._swap(k, j) 134 k = j
135
136 - def size(self):
137 return len(self.q) - 1
138
139 - def insert(self, value):
140 if (self.size()) < self.maxsize: 141 self.q.append(value) 142 self._swim(self.size()) 143 else: 144 self._replaceRoot(value)
145
146 - def getElements(self):
147 return self.q[1:]
148
149 - def _replaceRoot(self, value):
150 if(self.q[1] > value): 151 self.q[1] = value 152 self._sink(1)
153
154 -class RDD(object):
155 """ 156 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 157 Represents an immutable, partitioned collection of elements that can be 158 operated on in parallel. 159 """ 160
161 - def __init__(self, jrdd, ctx, jrdd_deserializer):
162 self._jrdd = jrdd 163 self.is_cached = False 164 self.is_checkpointed = False 165 self.ctx = ctx 166 self._jrdd_deserializer = jrdd_deserializer
167
168 - def __repr__(self):
169 return self._jrdd.toString()
170 171 @property
172 - def context(self):
173 """ 174 The L{SparkContext} that this RDD was created on. 175 """ 176 return self.ctx
177
178 - def cache(self):
179 """ 180 Persist this RDD with the default storage level (C{MEMORY_ONLY}). 181 """ 182 self.is_cached = True 183 self._jrdd.cache() 184 return self
185
186 - def persist(self, storageLevel):
187 """ 188 Set this RDD's storage level to persist its values across operations after the first time 189 it is computed. This can only be used to assign a new storage level if the RDD does not 190 have a storage level set yet. 191 """ 192 self.is_cached = True 193 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 194 self._jrdd.persist(javaStorageLevel) 195 return self
196
197 - def unpersist(self):
198 """ 199 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 200 """ 201 self.is_cached = False 202 self._jrdd.unpersist() 203 return self
204
205 - def checkpoint(self):
206 """ 207 Mark this RDD for checkpointing. It will be saved to a file inside the 208 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 209 all references to its parent RDDs will be removed. This function must 210 be called before any job has been executed on this RDD. It is strongly 211 recommended that this RDD is persisted in memory, otherwise saving it 212 on a file will require recomputation. 213 """ 214 self.is_checkpointed = True 215 self._jrdd.rdd().checkpoint()
216
217 - def isCheckpointed(self):
218 """ 219 Return whether this RDD has been checkpointed or not 220 """ 221 return self._jrdd.rdd().isCheckpointed()
222
223 - def getCheckpointFile(self):
224 """ 225 Gets the name of the file to which this RDD was checkpointed 226 """ 227 checkpointFile = self._jrdd.rdd().getCheckpointFile() 228 if checkpointFile.isDefined(): 229 return checkpointFile.get() 230 else: 231 return None
232
233 - def map(self, f, preservesPartitioning=False):
234 """ 235 Return a new RDD by applying a function to each element of this RDD. 236 """ 237 def func(split, iterator): return imap(f, iterator) 238 return PipelinedRDD(self, func, preservesPartitioning)
239
240 - def flatMap(self, f, preservesPartitioning=False):
241 """ 242 Return a new RDD by first applying a function to all elements of this 243 RDD, and then flattening the results. 244 245 >>> rdd = sc.parallelize([2, 3, 4]) 246 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 247 [1, 1, 1, 2, 2, 3] 248 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 249 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 250 """ 251 def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 252 return self.mapPartitionsWithIndex(func, preservesPartitioning)
253
254 - def mapPartitions(self, f, preservesPartitioning=False):
255 """ 256 Return a new RDD by applying a function to each partition of this RDD. 257 258 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 259 >>> def f(iterator): yield sum(iterator) 260 >>> rdd.mapPartitions(f).collect() 261 [3, 7] 262 """ 263 def func(s, iterator): return f(iterator) 264 return self.mapPartitionsWithIndex(func)
265
266 - def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
267 """ 268 Return a new RDD by applying a function to each partition of this RDD, 269 while tracking the index of the original partition. 270 271 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 272 >>> def f(splitIndex, iterator): yield splitIndex 273 >>> rdd.mapPartitionsWithIndex(f).sum() 274 6 275 """ 276 return PipelinedRDD(self, f, preservesPartitioning)
277
278 - def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
279 """ 280 Deprecated: use mapPartitionsWithIndex instead. 281 282 Return a new RDD by applying a function to each partition of this RDD, 283 while tracking the index of the original partition. 284 285 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 286 >>> def f(splitIndex, iterator): yield splitIndex 287 >>> rdd.mapPartitionsWithSplit(f).sum() 288 6 289 """ 290 warnings.warn("mapPartitionsWithSplit is deprecated; " 291 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 292 return self.mapPartitionsWithIndex(f, preservesPartitioning)
293
294 - def filter(self, f):
295 """ 296 Return a new RDD containing only the elements that satisfy a predicate. 297 298 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 299 >>> rdd.filter(lambda x: x % 2 == 0).collect() 300 [2, 4] 301 """ 302 def func(iterator): return ifilter(f, iterator) 303 return self.mapPartitions(func)
304
305 - def distinct(self):
306 """ 307 Return a new RDD containing the distinct elements in this RDD. 308 309 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 310 [1, 2, 3] 311 """ 312 return self.map(lambda x: (x, None)) \ 313 .reduceByKey(lambda x, _: x) \ 314 .map(lambda (x, _): x)
315
316 - def sample(self, withReplacement, fraction, seed):
317 """ 318 Return a sampled subset of this RDD (relies on numpy and falls back 319 on default random generator if numpy is unavailable). 320 321 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 322 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 323 """ 324 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction 325 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
326 327 # this is ported from scala/spark/RDD.scala
328 - def takeSample(self, withReplacement, num, seed):
329 """ 330 Return a fixed-size sampled subset of this RDD (currently requires numpy). 331 332 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 333 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 334 """ 335 336 fraction = 0.0 337 total = 0 338 multiplier = 3.0 339 initialCount = self.count() 340 maxSelected = 0 341 342 if (num < 0): 343 raise ValueError 344 345 if (initialCount == 0): 346 return list() 347 348 if initialCount > sys.maxint - 1: 349 maxSelected = sys.maxint - 1 350 else: 351 maxSelected = initialCount 352 353 if num > initialCount and not withReplacement: 354 total = maxSelected 355 fraction = multiplier * (maxSelected + 1) / initialCount 356 else: 357 fraction = multiplier * (num + 1) / initialCount 358 total = num 359 360 samples = self.sample(withReplacement, fraction, seed).collect() 361 362 # If the first sample didn't turn out large enough, keep trying to take samples; 363 # this shouldn't happen often because we use a big multiplier for their initial size. 364 # See: scala/spark/RDD.scala 365 while len(samples) < total: 366 if seed > sys.maxint - 2: 367 seed = -1 368 seed += 1 369 samples = self.sample(withReplacement, fraction, seed).collect() 370 371 sampler = RDDSampler(withReplacement, fraction, seed+1) 372 sampler.shuffle(samples) 373 return samples[0:total]
374
375 - def union(self, other):
376 """ 377 Return the union of this RDD and another one. 378 379 >>> rdd = sc.parallelize([1, 1, 2, 3]) 380 >>> rdd.union(rdd).collect() 381 [1, 1, 2, 3, 1, 1, 2, 3] 382 """ 383 if self._jrdd_deserializer == other._jrdd_deserializer: 384 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 385 self._jrdd_deserializer) 386 return rdd 387 else: 388 # These RDDs contain data in different serialized formats, so we 389 # must normalize them to the default serializer. 390 self_copy = self._reserialize() 391 other_copy = other._reserialize() 392 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 393 self.ctx.serializer)
394
395 - def _reserialize(self):
396 if self._jrdd_deserializer == self.ctx.serializer: 397 return self 398 else: 399 return self.map(lambda x: x, preservesPartitioning=True)
400
401 - def __add__(self, other):
402 """ 403 Return the union of this RDD and another one. 404 405 >>> rdd = sc.parallelize([1, 1, 2, 3]) 406 >>> (rdd + rdd).collect() 407 [1, 1, 2, 3, 1, 1, 2, 3] 408 """ 409 if not isinstance(other, RDD): 410 raise TypeError 411 return self.union(other)
412
413 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
414 """ 415 Sorts this RDD, which is assumed to consist of (key, value) pairs. 416 417 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 418 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 419 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 420 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 421 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 422 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 423 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 424 """ 425 if numPartitions is None: 426 numPartitions = self.ctx.defaultParallelism 427 428 bounds = list() 429 430 # first compute the boundary of each part via sampling: we want to partition 431 # the key-space into bins such that the bins have roughly the same 432 # number of (key, value) pairs falling into them 433 if numPartitions > 1: 434 rddSize = self.count() 435 maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner 436 fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 437 438 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 439 samples = sorted(samples, reverse=(not ascending), key=keyfunc) 440 441 # we have numPartitions many parts but one of the them has 442 # an implicit boundary 443 for i in range(0, numPartitions - 1): 444 index = (len(samples) - 1) * (i + 1) / numPartitions 445 bounds.append(samples[index]) 446 447 def rangePartitionFunc(k): 448 p = 0 449 while p < len(bounds) and keyfunc(k) > bounds[p]: 450 p += 1 451 if ascending: 452 return p 453 else: 454 return numPartitions-1-p
455 456 def mapFunc(iterator): 457 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
458 459 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 460 .mapPartitions(mapFunc,preservesPartitioning=True) 461 .flatMap(lambda x: x, preservesPartitioning=True)) 462
463 - def glom(self):
464 """ 465 Return an RDD created by coalescing all elements within each partition 466 into a list. 467 468 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 469 >>> sorted(rdd.glom().collect()) 470 [[1, 2], [3, 4]] 471 """ 472 def func(iterator): yield list(iterator) 473 return self.mapPartitions(func)
474
475 - def cartesian(self, other):
476 """ 477 Return the Cartesian product of this RDD and another one, that is, the 478 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 479 C{b} is in C{other}. 480 481 >>> rdd = sc.parallelize([1, 2]) 482 >>> sorted(rdd.cartesian(rdd).collect()) 483 [(1, 1), (1, 2), (2, 1), (2, 2)] 484 """ 485 # Due to batching, we can't use the Java cartesian method. 486 deserializer = CartesianDeserializer(self._jrdd_deserializer, 487 other._jrdd_deserializer) 488 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
489
490 - def groupBy(self, f, numPartitions=None):
491 """ 492 Return an RDD of grouped items. 493 494 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 495 >>> result = rdd.groupBy(lambda x: x % 2).collect() 496 >>> sorted([(x, sorted(y)) for (x, y) in result]) 497 [(0, [2, 8]), (1, [1, 1, 3, 5])] 498 """ 499 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
500
501 - def pipe(self, command, env={}):
502 """ 503 Return an RDD created by piping elements to a forked external process. 504 505 >>> sc.parallelize([1, 2, 3]).pipe('cat').collect() 506 ['1', '2', '3'] 507 """ 508 def func(iterator): 509 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 510 def pipe_objs(out): 511 for obj in iterator: 512 out.write(str(obj).rstrip('\n') + '\n') 513 out.close()
514 Thread(target=pipe_objs, args=[pipe.stdin]).start() 515 return (x.rstrip('\n') for x in pipe.stdout) 516 return self.mapPartitions(func) 517
518 - def foreach(self, f):
519 """ 520 Applies a function to all elements of this RDD. 521 522 >>> def f(x): print x 523 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 524 """ 525 def processPartition(iterator): 526 for x in iterator: 527 f(x) 528 yield None
529 self.mapPartitions(processPartition).collect() # Force evaluation 530
531 - def collect(self):
532 """ 533 Return a list that contains all of the elements in this RDD. 534 """ 535 with _JavaStackTrace(self.context) as st: 536 bytesInJava = self._jrdd.collect().iterator() 537 return list(self._collect_iterator_through_file(bytesInJava))
538
539 - def _collect_iterator_through_file(self, iterator):
540 # Transferring lots of data through Py4J can be slow because 541 # socket.readline() is inefficient. Instead, we'll dump the data to a 542 # file and read it back. 543 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 544 tempFile.close() 545 self.ctx._writeToFile(iterator, tempFile.name) 546 # Read the data into Python and deserialize it: 547 with open(tempFile.name, 'rb') as tempFile: 548 for item in self._jrdd_deserializer.load_stream(tempFile): 549 yield item 550 os.unlink(tempFile.name)
551
552 - def reduce(self, f):
553 """ 554 Reduces the elements of this RDD using the specified commutative and 555 associative binary operator. 556 557 >>> from operator import add 558 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 559 15 560 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 561 10 562 """ 563 def func(iterator): 564 acc = None 565 for obj in iterator: 566 if acc is None: 567 acc = obj 568 else: 569 acc = f(obj, acc) 570 if acc is not None: 571 yield acc
572 vals = self.mapPartitions(func).collect() 573 return reduce(f, vals) 574
575 - def fold(self, zeroValue, op):
576 """ 577 Aggregate the elements of each partition, and then the results for all 578 the partitions, using a given associative function and a neutral "zero 579 value." 580 581 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 582 as its result value to avoid object allocation; however, it should not 583 modify C{t2}. 584 585 >>> from operator import add 586 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 587 15 588 """ 589 def func(iterator): 590 acc = zeroValue 591 for obj in iterator: 592 acc = op(obj, acc) 593 yield acc
594 vals = self.mapPartitions(func).collect() 595 return reduce(op, vals, zeroValue) 596 597 # TODO: aggregate 598
599 - def sum(self):
600 """ 601 Add up the elements in this RDD. 602 603 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 604 6.0 605 """ 606 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
607
608 - def count(self):
609 """ 610 Return the number of elements in this RDD. 611 612 >>> sc.parallelize([2, 3, 4]).count() 613 3 614 """ 615 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
616
617 - def stats(self):
618 """ 619 Return a L{StatCounter} object that captures the mean, variance 620 and count of the RDD's elements in one operation. 621 """ 622 def redFunc(left_counter, right_counter): 623 return left_counter.mergeStats(right_counter)
624 625 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 626
627 - def mean(self):
628 """ 629 Compute the mean of this RDD's elements. 630 631 >>> sc.parallelize([1, 2, 3]).mean() 632 2.0 633 """ 634 return self.stats().mean()
635
636 - def variance(self):
637 """ 638 Compute the variance of this RDD's elements. 639 640 >>> sc.parallelize([1, 2, 3]).variance() 641 0.666... 642 """ 643 return self.stats().variance()
644
645 - def stdev(self):
646 """ 647 Compute the standard deviation of this RDD's elements. 648 649 >>> sc.parallelize([1, 2, 3]).stdev() 650 0.816... 651 """ 652 return self.stats().stdev()
653
654 - def sampleStdev(self):
655 """ 656 Compute the sample standard deviation of this RDD's elements (which corrects for bias in 657 estimating the standard deviation by dividing by N-1 instead of N). 658 659 >>> sc.parallelize([1, 2, 3]).sampleStdev() 660 1.0 661 """ 662 return self.stats().sampleStdev()
663
664 - def sampleVariance(self):
665 """ 666 Compute the sample variance of this RDD's elements (which corrects for bias in 667 estimating the variance by dividing by N-1 instead of N). 668 669 >>> sc.parallelize([1, 2, 3]).sampleVariance() 670 1.0 671 """ 672 return self.stats().sampleVariance()
673
674 - def countByValue(self):
675 """ 676 Return the count of each unique value in this RDD as a dictionary of 677 (value, count) pairs. 678 679 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 680 [(1, 2), (2, 3)] 681 """ 682 def countPartition(iterator): 683 counts = defaultdict(int) 684 for obj in iterator: 685 counts[obj] += 1 686 yield counts
687 def mergeMaps(m1, m2): 688 for (k, v) in m2.iteritems(): 689 m1[k] += v 690 return m1 691 return self.mapPartitions(countPartition).reduce(mergeMaps) 692
693 - def top(self, num):
694 """ 695 Get the top N elements from a RDD. 696 697 Note: It returns the list sorted in descending order. 698 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 699 [12] 700 >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2) 701 [6, 5] 702 """ 703 def topIterator(iterator): 704 q = [] 705 for k in iterator: 706 if len(q) < num: 707 heapq.heappush(q, k) 708 else: 709 heapq.heappushpop(q, k) 710 yield q
711 712 def merge(a, b): 713 return next(topIterator(a + b)) 714 715 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 716
717 - def takeOrdered(self, num, key=None):
718 """ 719 Get the N elements from a RDD ordered in ascending order or as specified 720 by the optional key function. 721 722 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 723 [1, 2, 3, 4, 5, 6] 724 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 725 [10, 9, 7, 6, 5, 4] 726 """ 727 728 def topNKeyedElems(iterator, key_=None): 729 q = MaxHeapQ(num) 730 for k in iterator: 731 if key_ != None: 732 k = (key_(k), k) 733 q.insert(k) 734 yield q.getElements()
735 736 def unKey(x, key_=None): 737 if key_ != None: 738 x = [i[1] for i in x] 739 return x 740 741 def merge(a, b): 742 return next(topNKeyedElems(a + b)) 743 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) 744 return sorted(unKey(result, key), key=key) 745 746
747 - def take(self, num):
748 """ 749 Take the first num elements of the RDD. 750 751 This currently scans the partitions *one by one*, so it will be slow if 752 a lot of partitions are required. In that case, use L{collect} to get 753 the whole RDD instead. 754 755 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 756 [2, 3] 757 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 758 [2, 3, 4, 5, 6] 759 """ 760 def takeUpToNum(iterator): 761 taken = 0 762 while taken < num: 763 yield next(iterator) 764 taken += 1
765 # Take only up to num elements from each partition we try 766 mapped = self.mapPartitions(takeUpToNum) 767 items = [] 768 # TODO(shivaram): Similar to the scala implementation, update the take 769 # method to scan multiple splits based on an estimate of how many elements 770 # we have per-split. 771 with _JavaStackTrace(self.context) as st: 772 for partition in range(mapped._jrdd.splits().size()): 773 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 774 partitionsToTake[0] = partition 775 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 776 items.extend(mapped._collect_iterator_through_file(iterator)) 777 if len(items) >= num: 778 break 779 return items[:num] 780
781 - def first(self):
782 """ 783 Return the first element in this RDD. 784 785 >>> sc.parallelize([2, 3, 4]).first() 786 2 787 """ 788 return self.take(1)[0]
789
790 - def saveAsTextFile(self, path):
791 """ 792 Save this RDD as a text file, using string representations of elements. 793 794 >>> tempFile = NamedTemporaryFile(delete=True) 795 >>> tempFile.close() 796 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 797 >>> from fileinput import input 798 >>> from glob import glob 799 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 800 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 801 """ 802 def func(split, iterator): 803 for x in iterator: 804 if not isinstance(x, basestring): 805 x = unicode(x) 806 yield x.encode("utf-8")
807 keyed = PipelinedRDD(self, func) 808 keyed._bypass_serializer = True 809 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 810 811 # Pair functions 812
813 - def collectAsMap(self):
814 """ 815 Return the key-value pairs in this RDD to the master as a dictionary. 816 817 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 818 >>> m[1] 819 2 820 >>> m[3] 821 4 822 """ 823 return dict(self.collect())
824
825 - def reduceByKey(self, func, numPartitions=None):
826 """ 827 Merge the values for each key using an associative reduce function. 828 829 This will also perform the merging locally on each mapper before 830 sending results to a reducer, similarly to a "combiner" in MapReduce. 831 832 Output will be hash-partitioned with C{numPartitions} partitions, or 833 the default parallelism level if C{numPartitions} is not specified. 834 835 >>> from operator import add 836 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 837 >>> sorted(rdd.reduceByKey(add).collect()) 838 [('a', 2), ('b', 1)] 839 """ 840 return self.combineByKey(lambda x: x, func, func, numPartitions)
841
842 - def reduceByKeyLocally(self, func):
843 """ 844 Merge the values for each key using an associative reduce function, but 845 return the results immediately to the master as a dictionary. 846 847 This will also perform the merging locally on each mapper before 848 sending results to a reducer, similarly to a "combiner" in MapReduce. 849 850 >>> from operator import add 851 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 852 >>> sorted(rdd.reduceByKeyLocally(add).items()) 853 [('a', 2), ('b', 1)] 854 """ 855 def reducePartition(iterator): 856 m = {} 857 for (k, v) in iterator: 858 m[k] = v if k not in m else func(m[k], v) 859 yield m
860 def mergeMaps(m1, m2): 861 for (k, v) in m2.iteritems(): 862 m1[k] = v if k not in m1 else func(m1[k], v) 863 return m1 864 return self.mapPartitions(reducePartition).reduce(mergeMaps) 865
866 - def countByKey(self):
867 """ 868 Count the number of elements for each key, and return the result to the 869 master as a dictionary. 870 871 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 872 >>> sorted(rdd.countByKey().items()) 873 [('a', 2), ('b', 1)] 874 """ 875 return self.map(lambda x: x[0]).countByValue()
876
877 - def join(self, other, numPartitions=None):
878 """ 879 Return an RDD containing all pairs of elements with matching keys in 880 C{self} and C{other}. 881 882 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 883 (k, v1) is in C{self} and (k, v2) is in C{other}. 884 885 Performs a hash join across the cluster. 886 887 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 888 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 889 >>> sorted(x.join(y).collect()) 890 [('a', (1, 2)), ('a', (1, 3))] 891 """ 892 return python_join(self, other, numPartitions)
893
894 - def leftOuterJoin(self, other, numPartitions=None):
895 """ 896 Perform a left outer join of C{self} and C{other}. 897 898 For each element (k, v) in C{self}, the resulting RDD will either 899 contain all pairs (k, (v, w)) for w in C{other}, or the pair 900 (k, (v, None)) if no elements in other have key k. 901 902 Hash-partitions the resulting RDD into the given number of partitions. 903 904 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 905 >>> y = sc.parallelize([("a", 2)]) 906 >>> sorted(x.leftOuterJoin(y).collect()) 907 [('a', (1, 2)), ('b', (4, None))] 908 """ 909 return python_left_outer_join(self, other, numPartitions)
910
911 - def rightOuterJoin(self, other, numPartitions=None):
912 """ 913 Perform a right outer join of C{self} and C{other}. 914 915 For each element (k, w) in C{other}, the resulting RDD will either 916 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 917 if no elements in C{self} have key k. 918 919 Hash-partitions the resulting RDD into the given number of partitions. 920 921 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 922 >>> y = sc.parallelize([("a", 2)]) 923 >>> sorted(y.rightOuterJoin(x).collect()) 924 [('a', (2, 1)), ('b', (None, 4))] 925 """ 926 return python_right_outer_join(self, other, numPartitions)
927 928 # TODO: add option to control map-side combining
929 - def partitionBy(self, numPartitions, partitionFunc=hash):
930 """ 931 Return a copy of the RDD partitioned using the specified partitioner. 932 933 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 934 >>> sets = pairs.partitionBy(2).glom().collect() 935 >>> set(sets[0]).intersection(set(sets[1])) 936 set([]) 937 """ 938 if numPartitions is None: 939 numPartitions = self.ctx.defaultParallelism 940 # Transferring O(n) objects to Java is too expensive. Instead, we'll 941 # form the hash buckets in Python, transferring O(numPartitions) objects 942 # to Java. Each object is a (splitNumber, [objects]) pair. 943 outputSerializer = self.ctx._unbatched_serializer 944 def add_shuffle_key(split, iterator): 945 946 buckets = defaultdict(list) 947 948 for (k, v) in iterator: 949 buckets[partitionFunc(k) % numPartitions].append((k, v)) 950 for (split, items) in buckets.iteritems(): 951 yield pack_long(split) 952 yield outputSerializer.dumps(items)
953 keyed = PipelinedRDD(self, add_shuffle_key) 954 keyed._bypass_serializer = True 955 with _JavaStackTrace(self.context) as st: 956 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 957 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 958 id(partitionFunc)) 959 jrdd = pairRDD.partitionBy(partitioner).values() 960 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 961 # This is required so that id(partitionFunc) remains unique, even if 962 # partitionFunc is a lambda: 963 rdd._partitionFunc = partitionFunc 964 return rdd 965 966 # TODO: add control over map-side aggregation
967 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 968 numPartitions=None):
969 """ 970 Generic function to combine the elements for each key using a custom 971 set of aggregation functions. 972 973 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 974 type" C. Note that V and C can be different -- for example, one might 975 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 976 977 Users provide three functions: 978 979 - C{createCombiner}, which turns a V into a C (e.g., creates 980 a one-element list) 981 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 982 a list) 983 - C{mergeCombiners}, to combine two C's into a single one. 984 985 In addition, users can control the partitioning of the output RDD. 986 987 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 988 >>> def f(x): return x 989 >>> def add(a, b): return a + str(b) 990 >>> sorted(x.combineByKey(str, add, add).collect()) 991 [('a', '11'), ('b', '1')] 992 """ 993 if numPartitions is None: 994 numPartitions = self.ctx.defaultParallelism 995 def combineLocally(iterator): 996 combiners = {} 997 for x in iterator: 998 (k, v) = x 999 if k not in combiners: 1000 combiners[k] = createCombiner(v) 1001 else: 1002 combiners[k] = mergeValue(combiners[k], v) 1003 return combiners.iteritems()
1004 locally_combined = self.mapPartitions(combineLocally) 1005 shuffled = locally_combined.partitionBy(numPartitions) 1006 def _mergeCombiners(iterator): 1007 combiners = {} 1008 for (k, v) in iterator: 1009 if not k in combiners: 1010 combiners[k] = v 1011 else: 1012 combiners[k] = mergeCombiners(combiners[k], v) 1013 return combiners.iteritems() 1014 return shuffled.mapPartitions(_mergeCombiners) 1015
1016 - def foldByKey(self, zeroValue, func, numPartitions=None):
1017 """ 1018 Merge the values for each key using an associative function "func" and a neutral "zeroValue" 1019 which may be added to the result an arbitrary number of times, and must not change 1020 the result (e.g., 0 for addition, or 1 for multiplication.). 1021 1022 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1023 >>> from operator import add 1024 >>> rdd.foldByKey(0, add).collect() 1025 [('a', 2), ('b', 1)] 1026 """ 1027 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1028 1029 1030 # TODO: support variant with custom partitioner
1031 - def groupByKey(self, numPartitions=None):
1032 """ 1033 Group the values for each key in the RDD into a single sequence. 1034 Hash-partitions the resulting RDD with into numPartitions partitions. 1035 1036 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1037 >>> sorted(x.groupByKey().collect()) 1038 [('a', [1, 1]), ('b', [1])] 1039 """ 1040 1041 def createCombiner(x): 1042 return [x]
1043 1044 def mergeValue(xs, x): 1045 xs.append(x) 1046 return xs 1047 1048 def mergeCombiners(a, b): 1049 return a + b 1050 1051 return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 1052 numPartitions) 1053 1054 # TODO: add tests
1055 - def flatMapValues(self, f):
1056 """ 1057 Pass each value in the key-value pair RDD through a flatMap function 1058 without changing the keys; this also retains the original RDD's 1059 partitioning. 1060 """ 1061 flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 1062 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1063
1064 - def mapValues(self, f):
1065 """ 1066 Pass each value in the key-value pair RDD through a map function 1067 without changing the keys; this also retains the original RDD's 1068 partitioning. 1069 """ 1070 map_values_fn = lambda (k, v): (k, f(v)) 1071 return self.map(map_values_fn, preservesPartitioning=True)
1072 1073 # TODO: support varargs cogroup of several RDDs.
1074 - def groupWith(self, other):
1075 """ 1076 Alias for cogroup. 1077 """ 1078 return self.cogroup(other)
1079 1080 # TODO: add variant with custom parittioner
1081 - def cogroup(self, other, numPartitions=None):
1082 """ 1083 For each key k in C{self} or C{other}, return a resulting RDD that 1084 contains a tuple with the list of values for that key in C{self} as well 1085 as C{other}. 1086 1087 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1088 >>> y = sc.parallelize([("a", 2)]) 1089 >>> sorted(x.cogroup(y).collect()) 1090 [('a', ([1], [2])), ('b', ([4], []))] 1091 """ 1092 return python_cogroup(self, other, numPartitions)
1093
1094 - def subtractByKey(self, other, numPartitions=None):
1095 """ 1096 Return each (key, value) pair in C{self} that has no pair with matching key 1097 in C{other}. 1098 1099 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 1100 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1101 >>> sorted(x.subtractByKey(y).collect()) 1102 [('b', 4), ('b', 5)] 1103 """ 1104 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 1105 map_func = lambda (key, vals): [(key, val) for val in vals[0]] 1106 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1107
1108 - def subtract(self, other, numPartitions=None):
1109 """ 1110 Return each value in C{self} that is not contained in C{other}. 1111 1112 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 1113 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1114 >>> sorted(x.subtract(y).collect()) 1115 [('a', 1), ('b', 4), ('b', 5)] 1116 """ 1117 rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder 1118 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
1119
1120 - def keyBy(self, f):
1121 """ 1122 Creates tuples of the elements in this RDD by applying C{f}. 1123 1124 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 1125 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 1126 >>> sorted(x.cogroup(y).collect()) 1127 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 1128 """ 1129 return self.map(lambda x: (f(x), x))
1130
1131 - def repartition(self, numPartitions):
1132 """ 1133 Return a new RDD that has exactly numPartitions partitions. 1134 1135 Can increase or decrease the level of parallelism in this RDD. Internally, this uses 1136 a shuffle to redistribute data. 1137 If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 1138 which can avoid performing a shuffle. 1139 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 1140 >>> sorted(rdd.glom().collect()) 1141 [[1], [2, 3], [4, 5], [6, 7]] 1142 >>> len(rdd.repartition(2).glom().collect()) 1143 2 1144 >>> len(rdd.repartition(10).glom().collect()) 1145 10 1146 """ 1147 jrdd = self._jrdd.repartition(numPartitions) 1148 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1149
1150 - def coalesce(self, numPartitions, shuffle=False):
1151 """ 1152 Return a new RDD that is reduced into `numPartitions` partitions. 1153 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 1154 [[1], [2, 3], [4, 5]] 1155 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 1156 [[1, 2, 3, 4, 5]] 1157 """ 1158 jrdd = self._jrdd.coalesce(numPartitions) 1159 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1160
1161 - def zip(self, other):
1162 """ 1163 Zips this RDD with another one, returning key-value pairs with the first element in each RDD 1164 second element in each RDD, etc. Assumes that the two RDDs have the same number of 1165 partitions and the same number of elements in each partition (e.g. one was made through 1166 a map on the other). 1167 1168 >>> x = sc.parallelize(range(0,5)) 1169 >>> y = sc.parallelize(range(1000, 1005)) 1170 >>> x.zip(y).collect() 1171 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 1172 """ 1173 pairRDD = self._jrdd.zip(other._jrdd) 1174 deserializer = PairDeserializer(self._jrdd_deserializer, 1175 other._jrdd_deserializer) 1176 return RDD(pairRDD, self.ctx, deserializer)
1177
1178 - def name(self):
1179 """ 1180 Return the name of this RDD. 1181 """ 1182 name_ = self._jrdd.name() 1183 if not name_: 1184 return None 1185 return name_.encode('utf-8')
1186
1187 - def setName(self, name):
1188 """ 1189 Assign a name to this RDD. 1190 >>> rdd1 = sc.parallelize([1,2]) 1191 >>> rdd1.setName('RDD1') 1192 >>> rdd1.name() 1193 'RDD1' 1194 """ 1195 self._jrdd.setName(name)
1196
1197 - def toDebugString(self):
1198 """ 1199 A description of this RDD and its recursive dependencies for debugging. 1200 """ 1201 debug_string = self._jrdd.toDebugString() 1202 if not debug_string: 1203 return None 1204 return debug_string.encode('utf-8')
1205
1206 - def getStorageLevel(self):
1207 """ 1208 Get the RDD's current storage level. 1209 >>> rdd1 = sc.parallelize([1,2]) 1210 >>> rdd1.getStorageLevel() 1211 StorageLevel(False, False, False, 1) 1212 """ 1213 java_storage_level = self._jrdd.getStorageLevel() 1214 storage_level = StorageLevel(java_storage_level.useDisk(), 1215 java_storage_level.useMemory(), 1216 java_storage_level.deserialized(), 1217 java_storage_level.replication()) 1218 return storage_level
1219
1220 # TODO: `lookup` is disabled because we can't make direct comparisons based 1221 # on the key; we need to compare the hash of the key to the hash of the 1222 # keys in the pairs. This could be an expensive operation, since those 1223 # hashes aren't retained. 1224 1225 1226 -class PipelinedRDD(RDD):
1227 """ 1228 Pipelined maps: 1229 >>> rdd = sc.parallelize([1, 2, 3, 4]) 1230 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 1231 [4, 8, 12, 16] 1232 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 1233 [4, 8, 12, 16] 1234 1235 Pipelined reduces: 1236 >>> from operator import add 1237 >>> rdd.map(lambda x: 2 * x).reduce(add) 1238 20 1239 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 1240 20 1241 """
1242 - def __init__(self, prev, func, preservesPartitioning=False):
1243 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 1244 # This transformation is the first in its stage: 1245 self.func = func 1246 self.preservesPartitioning = preservesPartitioning 1247 self._prev_jrdd = prev._jrdd 1248 self._prev_jrdd_deserializer = prev._jrdd_deserializer 1249 else: 1250 prev_func = prev.func 1251 def pipeline_func(split, iterator): 1252 return func(split, prev_func(split, iterator))
1253 self.func = pipeline_func 1254 self.preservesPartitioning = \ 1255 prev.preservesPartitioning and preservesPartitioning 1256 self._prev_jrdd = prev._prev_jrdd # maintain the pipeline 1257 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 1258 self.is_cached = False 1259 self.is_checkpointed = False 1260 self.ctx = prev.ctx 1261 self.prev = prev 1262 self._jrdd_val = None 1263 self._jrdd_deserializer = self.ctx.serializer 1264 self._bypass_serializer = False
1265 1266 @property
1267 - def _jrdd(self):
1268 if self._jrdd_val: 1269 return self._jrdd_val 1270 if self._bypass_serializer: 1271 serializer = NoOpSerializer() 1272 else: 1273 serializer = self.ctx.serializer 1274 command = (self.func, self._prev_jrdd_deserializer, serializer) 1275 pickled_command = CloudPickleSerializer().dumps(command) 1276 broadcast_vars = ListConverter().convert( 1277 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 1278 self.ctx._gateway._gateway_client) 1279 self.ctx._pickled_broadcast_vars.clear() 1280 class_tag = self._prev_jrdd.classTag() 1281 env = MapConverter().convert(self.ctx.environment, 1282 self.ctx._gateway._gateway_client) 1283 includes = ListConverter().convert(self.ctx._python_includes, 1284 self.ctx._gateway._gateway_client) 1285 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 1286 bytearray(pickled_command), env, includes, self.preservesPartitioning, 1287 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 1288 class_tag) 1289 self._jrdd_val = python_rdd.asJavaRDD() 1290 return self._jrdd_val
1291
1292 - def _is_pipelinable(self):
1293 return not (self.is_cached or self.is_checkpointed)
1294
1295 1296 -def _test():
1297 import doctest 1298 from pyspark.context import SparkContext 1299 globs = globals().copy() 1300 # The small batch size here ensures that we see multiple batches, 1301 # even in these small test examples: 1302 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 1303 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 1304 globs['sc'].stop() 1305 if failure_count: 1306 exit(-1)
1307 1308 1309 if __name__ == "__main__": 1310 _test() 1311