Package pyspark :: Module rdd
[frames] | no frames]

Source Code for Module pyspark.rdd

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one or more 
   3  # contributor license agreements.  See the NOTICE file distributed with 
   4  # this work for additional information regarding copyright ownership. 
   5  # The ASF licenses this file to You under the Apache License, Version 2.0 
   6  # (the "License"); you may not use this file except in compliance with 
   7  # the License.  You may obtain a copy of the License at 
   8  # 
   9  #    http://www.apache.org/licenses/LICENSE-2.0 
  10  # 
  11  # Unless required by applicable law or agreed to in writing, software 
  12  # distributed under the License is distributed on an "AS IS" BASIS, 
  13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14  # See the License for the specific language governing permissions and 
  15  # limitations under the License. 
  16  # 
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from itertools import chain, ifilter, imap 
  22  import operator 
  23  import os 
  24  import sys 
  25  import shlex 
  26  import traceback 
  27  from subprocess import Popen, PIPE 
  28  from tempfile import NamedTemporaryFile 
  29  from threading import Thread 
  30  import warnings 
  31  import heapq 
  32   
  33  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  34      BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long 
  35  from pyspark.join import python_join, python_left_outer_join, \ 
  36      python_right_outer_join, python_cogroup 
  37  from pyspark.statcounter import StatCounter 
  38  from pyspark.rddsampler import RDDSampler 
  39  from pyspark.storagelevel import StorageLevel 
  40   
  41  from py4j.java_collections import ListConverter, MapConverter 
  42   
  43  __all__ = ["RDD"] 
44 45 46 -def _extract_concise_traceback():
47 tb = traceback.extract_stack() 48 if len(tb) == 0: 49 return "I'm lost!" 50 # HACK: This function is in a file called 'rdd.py' in the top level of 51 # everything PySpark. Just trim off the directory name and assume 52 # everything in that tree is PySpark guts. 53 file, line, module, what = tb[len(tb) - 1] 54 sparkpath = os.path.dirname(file) 55 first_spark_frame = len(tb) - 1 56 for i in range(0, len(tb)): 57 file, line, fun, what = tb[i] 58 if file.startswith(sparkpath): 59 first_spark_frame = i 60 break 61 if first_spark_frame == 0: 62 file, line, fun, what = tb[0] 63 return "%s at %s:%d" % (fun, file, line) 64 sfile, sline, sfun, swhat = tb[first_spark_frame] 65 ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 66 return "%s at %s:%d" % (sfun, ufile, uline)
67 68 _spark_stack_depth = 0
69 70 -class _JavaStackTrace(object):
71 - def __init__(self, sc):
72 self._traceback = _extract_concise_traceback() 73 self._context = sc
74
75 - def __enter__(self):
76 global _spark_stack_depth 77 if _spark_stack_depth == 0: 78 self._context._jsc.setCallSite(self._traceback) 79 _spark_stack_depth += 1
80
81 - def __exit__(self, type, value, tb):
82 global _spark_stack_depth 83 _spark_stack_depth -= 1 84 if _spark_stack_depth == 0: 85 self._context._jsc.setCallSite(None)
86
87 -class MaxHeapQ(object):
88 """ 89 An implementation of MaxHeap. 90 >>> import pyspark.rdd 91 >>> heap = pyspark.rdd.MaxHeapQ(5) 92 >>> [heap.insert(i) for i in range(10)] 93 [None, None, None, None, None, None, None, None, None, None] 94 >>> sorted(heap.getElements()) 95 [0, 1, 2, 3, 4] 96 >>> heap = pyspark.rdd.MaxHeapQ(5) 97 >>> [heap.insert(i) for i in range(9, -1, -1)] 98 [None, None, None, None, None, None, None, None, None, None] 99 >>> sorted(heap.getElements()) 100 [0, 1, 2, 3, 4] 101 >>> heap = pyspark.rdd.MaxHeapQ(1) 102 >>> [heap.insert(i) for i in range(9, -1, -1)] 103 [None, None, None, None, None, None, None, None, None, None] 104 >>> heap.getElements() 105 [0] 106 """ 107
108 - def __init__(self, maxsize):
109 # we start from q[1], this makes calculating children as trivial as 2 * k 110 self.q = [0] 111 self.maxsize = maxsize
112
113 - def _swim(self, k):
114 while (k > 1) and (self.q[k/2] < self.q[k]): 115 self._swap(k, k/2) 116 k = k/2
117
118 - def _swap(self, i, j):
119 t = self.q[i] 120 self.q[i] = self.q[j] 121 self.q[j] = t
122
123 - def _sink(self, k):
124 N = self.size() 125 while 2 * k <= N: 126 j = 2 * k 127 # Here we test if both children are greater than parent 128 # if not swap with larger one. 129 if j < N and self.q[j] < self.q[j + 1]: 130 j = j + 1 131 if(self.q[k] > self.q[j]): 132 break 133 self._swap(k, j) 134 k = j
135
136 - def size(self):
137 return len(self.q) - 1
138
139 - def insert(self, value):
140 if (self.size()) < self.maxsize: 141 self.q.append(value) 142 self._swim(self.size()) 143 else: 144 self._replaceRoot(value)
145
146 - def getElements(self):
147 return self.q[1:]
148
149 - def _replaceRoot(self, value):
150 if(self.q[1] > value): 151 self.q[1] = value 152 self._sink(1)
153
154 -class RDD(object):
155 """ 156 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 157 Represents an immutable, partitioned collection of elements that can be 158 operated on in parallel. 159 """ 160
161 - def __init__(self, jrdd, ctx, jrdd_deserializer):
162 self._jrdd = jrdd 163 self.is_cached = False 164 self.is_checkpointed = False 165 self.ctx = ctx 166 self._jrdd_deserializer = jrdd_deserializer
167
168 - def __repr__(self):
169 return self._jrdd.toString()
170 171 @property
172 - def context(self):
173 """ 174 The L{SparkContext} that this RDD was created on. 175 """ 176 return self.ctx
177
178 - def cache(self):
179 """ 180 Persist this RDD with the default storage level (C{MEMORY_ONLY}). 181 """ 182 self.is_cached = True 183 self._jrdd.cache() 184 return self
185
186 - def persist(self, storageLevel):
187 """ 188 Set this RDD's storage level to persist its values across operations after the first time 189 it is computed. This can only be used to assign a new storage level if the RDD does not 190 have a storage level set yet. 191 """ 192 self.is_cached = True 193 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 194 self._jrdd.persist(javaStorageLevel) 195 return self
196
197 - def unpersist(self):
198 """ 199 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 200 """ 201 self.is_cached = False 202 self._jrdd.unpersist() 203 return self
204
205 - def checkpoint(self):
206 """ 207 Mark this RDD for checkpointing. It will be saved to a file inside the 208 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 209 all references to its parent RDDs will be removed. This function must 210 be called before any job has been executed on this RDD. It is strongly 211 recommended that this RDD is persisted in memory, otherwise saving it 212 on a file will require recomputation. 213 """ 214 self.is_checkpointed = True 215 self._jrdd.rdd().checkpoint()
216
217 - def isCheckpointed(self):
218 """ 219 Return whether this RDD has been checkpointed or not 220 """ 221 return self._jrdd.rdd().isCheckpointed()
222
223 - def getCheckpointFile(self):
224 """ 225 Gets the name of the file to which this RDD was checkpointed 226 """ 227 checkpointFile = self._jrdd.rdd().getCheckpointFile() 228 if checkpointFile.isDefined(): 229 return checkpointFile.get() 230 else: 231 return None
232
233 - def map(self, f, preservesPartitioning=False):
234 """ 235 Return a new RDD by applying a function to each element of this RDD. 236 """ 237 def func(split, iterator): return imap(f, iterator) 238 return PipelinedRDD(self, func, preservesPartitioning)
239
240 - def flatMap(self, f, preservesPartitioning=False):
241 """ 242 Return a new RDD by first applying a function to all elements of this 243 RDD, and then flattening the results. 244 245 >>> rdd = sc.parallelize([2, 3, 4]) 246 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 247 [1, 1, 1, 2, 2, 3] 248 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 249 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 250 """ 251 def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 252 return self.mapPartitionsWithIndex(func, preservesPartitioning)
253
254 - def mapPartitions(self, f, preservesPartitioning=False):
255 """ 256 Return a new RDD by applying a function to each partition of this RDD. 257 258 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 259 >>> def f(iterator): yield sum(iterator) 260 >>> rdd.mapPartitions(f).collect() 261 [3, 7] 262 """ 263 def func(s, iterator): return f(iterator) 264 return self.mapPartitionsWithIndex(func)
265
266 - def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
267 """ 268 Return a new RDD by applying a function to each partition of this RDD, 269 while tracking the index of the original partition. 270 271 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 272 >>> def f(splitIndex, iterator): yield splitIndex 273 >>> rdd.mapPartitionsWithIndex(f).sum() 274 6 275 """ 276 return PipelinedRDD(self, f, preservesPartitioning)
277
278 - def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
279 """ 280 Deprecated: use mapPartitionsWithIndex instead. 281 282 Return a new RDD by applying a function to each partition of this RDD, 283 while tracking the index of the original partition. 284 285 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 286 >>> def f(splitIndex, iterator): yield splitIndex 287 >>> rdd.mapPartitionsWithSplit(f).sum() 288 6 289 """ 290 warnings.warn("mapPartitionsWithSplit is deprecated; " 291 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 292 return self.mapPartitionsWithIndex(f, preservesPartitioning)
293
294 - def filter(self, f):
295 """ 296 Return a new RDD containing only the elements that satisfy a predicate. 297 298 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 299 >>> rdd.filter(lambda x: x % 2 == 0).collect() 300 [2, 4] 301 """ 302 def func(iterator): return ifilter(f, iterator) 303 return self.mapPartitions(func)
304
305 - def distinct(self):
306 """ 307 Return a new RDD containing the distinct elements in this RDD. 308 309 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 310 [1, 2, 3] 311 """ 312 return self.map(lambda x: (x, None)) \ 313 .reduceByKey(lambda x, _: x) \ 314 .map(lambda (x, _): x)
315
316 - def sample(self, withReplacement, fraction, seed):
317 """ 318 Return a sampled subset of this RDD (relies on numpy and falls back 319 on default random generator if numpy is unavailable). 320 321 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 322 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 323 """ 324 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction 325 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
326 327 # this is ported from scala/spark/RDD.scala
328 - def takeSample(self, withReplacement, num, seed):
329 """ 330 Return a fixed-size sampled subset of this RDD (currently requires numpy). 331 332 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 333 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 334 """ 335 336 fraction = 0.0 337 total = 0 338 multiplier = 3.0 339 initialCount = self.count() 340 maxSelected = 0 341 342 if (num < 0): 343 raise ValueError 344 345 if (initialCount == 0): 346 return list() 347 348 if initialCount > sys.maxint - 1: 349 maxSelected = sys.maxint - 1 350 else: 351 maxSelected = initialCount 352 353 if num > initialCount and not withReplacement: 354 total = maxSelected 355 fraction = multiplier * (maxSelected + 1) / initialCount 356 else: 357 fraction = multiplier * (num + 1) / initialCount 358 total = num 359 360 samples = self.sample(withReplacement, fraction, seed).collect() 361 362 # If the first sample didn't turn out large enough, keep trying to take samples; 363 # this shouldn't happen often because we use a big multiplier for their initial size. 364 # See: scala/spark/RDD.scala 365 while len(samples) < total: 366 if seed > sys.maxint - 2: 367 seed = -1 368 seed += 1 369 samples = self.sample(withReplacement, fraction, seed).collect() 370 371 sampler = RDDSampler(withReplacement, fraction, seed+1) 372 sampler.shuffle(samples) 373 return samples[0:total]
374
375 - def union(self, other):
376 """ 377 Return the union of this RDD and another one. 378 379 >>> rdd = sc.parallelize([1, 1, 2, 3]) 380 >>> rdd.union(rdd).collect() 381 [1, 1, 2, 3, 1, 1, 2, 3] 382 """ 383 if self._jrdd_deserializer == other._jrdd_deserializer: 384 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 385 self._jrdd_deserializer) 386 return rdd 387 else: 388 # These RDDs contain data in different serialized formats, so we 389 # must normalize them to the default serializer. 390 self_copy = self._reserialize() 391 other_copy = other._reserialize() 392 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 393 self.ctx.serializer)
394
395 - def _reserialize(self):
396 if self._jrdd_deserializer == self.ctx.serializer: 397 return self 398 else: 399 return self.map(lambda x: x, preservesPartitioning=True)
400
401 - def __add__(self, other):
402 """ 403 Return the union of this RDD and another one. 404 405 >>> rdd = sc.parallelize([1, 1, 2, 3]) 406 >>> (rdd + rdd).collect() 407 [1, 1, 2, 3, 1, 1, 2, 3] 408 """ 409 if not isinstance(other, RDD): 410 raise TypeError 411 return self.union(other)
412
413 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
414 """ 415 Sorts this RDD, which is assumed to consist of (key, value) pairs. 416 417 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 418 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 419 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 420 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 421 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 422 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 423 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 424 """ 425 if numPartitions is None: 426 numPartitions = self.ctx.defaultParallelism 427 428 bounds = list() 429 430 # first compute the boundary of each part via sampling: we want to partition 431 # the key-space into bins such that the bins have roughly the same 432 # number of (key, value) pairs falling into them 433 if numPartitions > 1: 434 rddSize = self.count() 435 maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner 436 fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 437 438 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 439 samples = sorted(samples, reverse=(not ascending), key=keyfunc) 440 441 # we have numPartitions many parts but one of the them has 442 # an implicit boundary 443 for i in range(0, numPartitions - 1): 444 index = (len(samples) - 1) * (i + 1) / numPartitions 445 bounds.append(samples[index]) 446 447 def rangePartitionFunc(k): 448 p = 0 449 while p < len(bounds) and keyfunc(k) > bounds[p]: 450 p += 1 451 if ascending: 452 return p 453 else: 454 return numPartitions-1-p
455 456 def mapFunc(iterator): 457 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
458 459 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 460 .mapPartitions(mapFunc,preservesPartitioning=True) 461 .flatMap(lambda x: x, preservesPartitioning=True)) 462
463 - def glom(self):
464 """ 465 Return an RDD created by coalescing all elements within each partition 466 into a list. 467 468 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 469 >>> sorted(rdd.glom().collect()) 470 [[1, 2], [3, 4]] 471 """ 472 def func(iterator): yield list(iterator) 473 return self.mapPartitions(func)
474
475 - def cartesian(self, other):
476 """ 477 Return the Cartesian product of this RDD and another one, that is, the 478 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 479 C{b} is in C{other}. 480 481 >>> rdd = sc.parallelize([1, 2]) 482 >>> sorted(rdd.cartesian(rdd).collect()) 483 [(1, 1), (1, 2), (2, 1), (2, 2)] 484 """ 485 # Due to batching, we can't use the Java cartesian method. 486 deserializer = CartesianDeserializer(self._jrdd_deserializer, 487 other._jrdd_deserializer) 488 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
489
490 - def groupBy(self, f, numPartitions=None):
491 """ 492 Return an RDD of grouped items. 493 494 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 495 >>> result = rdd.groupBy(lambda x: x % 2).collect() 496 >>> sorted([(x, sorted(y)) for (x, y) in result]) 497 [(0, [2, 8]), (1, [1, 1, 3, 5])] 498 """ 499 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
500
501 - def pipe(self, command, env={}):
502 """ 503 Return an RDD created by piping elements to a forked external process. 504 505 >>> sc.parallelize([1, 2, 3]).pipe('cat').collect() 506 ['1', '2', '3'] 507 """ 508 def func(iterator): 509 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 510 def pipe_objs(out): 511 for obj in iterator: 512 out.write(str(obj).rstrip('\n') + '\n') 513 out.close()
514 Thread(target=pipe_objs, args=[pipe.stdin]).start() 515 return (x.rstrip('\n') for x in pipe.stdout) 516 return self.mapPartitions(func) 517
518 - def foreach(self, f):
519 """ 520 Applies a function to all elements of this RDD. 521 522 >>> def f(x): print x 523 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 524 """ 525 def processPartition(iterator): 526 for x in iterator: 527 f(x) 528 yield None
529 self.mapPartitions(processPartition).collect() # Force evaluation 530
531 - def collect(self):
532 """ 533 Return a list that contains all of the elements in this RDD. 534 """ 535 with _JavaStackTrace(self.context) as st: 536 bytesInJava = self._jrdd.collect().iterator() 537 return list(self._collect_iterator_through_file(bytesInJava))
538
539 - def _collect_iterator_through_file(self, iterator):
540 # Transferring lots of data through Py4J can be slow because 541 # socket.readline() is inefficient. Instead, we'll dump the data to a 542 # file and read it back. 543 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 544 tempFile.close() 545 self.ctx._writeToFile(iterator, tempFile.name) 546 # Read the data into Python and deserialize it: 547 with open(tempFile.name, 'rb') as tempFile: 548 for item in self._jrdd_deserializer.load_stream(tempFile): 549 yield item 550 os.unlink(tempFile.name)
551
552 - def reduce(self, f):
553 """ 554 Reduces the elements of this RDD using the specified commutative and 555 associative binary operator. 556 557 >>> from operator import add 558 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 559 15 560 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 561 10 562 """ 563 def func(iterator): 564 acc = None 565 for obj in iterator: 566 if acc is None: 567 acc = obj 568 else: 569 acc = f(obj, acc) 570 if acc is not None: 571 yield acc
572 vals = self.mapPartitions(func).collect() 573 return reduce(f, vals) 574
575 - def fold(self, zeroValue, op):
576 """ 577 Aggregate the elements of each partition, and then the results for all 578 the partitions, using a given associative function and a neutral "zero 579 value." 580 581 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 582 as its result value to avoid object allocation; however, it should not 583 modify C{t2}. 584 585 >>> from operator import add 586 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 587 15 588 """ 589 def func(iterator): 590 acc = zeroValue 591 for obj in iterator: 592 acc = op(obj, acc) 593 yield acc
594 vals = self.mapPartitions(func).collect() 595 return reduce(op, vals, zeroValue) 596 597 # TODO: aggregate 598
599 - def sum(self):
600 """ 601 Add up the elements in this RDD. 602 603 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 604 6.0 605 """ 606 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
607
608 - def count(self):
609 """ 610 Return the number of elements in this RDD. 611 612 >>> sc.parallelize([2, 3, 4]).count() 613 3 614 """ 615 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
616
617 - def stats(self):
618 """ 619 Return a L{StatCounter} object that captures the mean, variance 620 and count of the RDD's elements in one operation. 621 """ 622 def redFunc(left_counter, right_counter): 623 return left_counter.mergeStats(right_counter)
624 625 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 626
627 - def mean(self):
628 """ 629 Compute the mean of this RDD's elements. 630 631 >>> sc.parallelize([1, 2, 3]).mean() 632 2.0 633 """ 634 return self.stats().mean()
635
636 - def variance(self):
637 """ 638 Compute the variance of this RDD's elements. 639 640 >>> sc.parallelize([1, 2, 3]).variance() 641 0.666... 642 """ 643 return self.stats().variance()
644
645 - def stdev(self):
646 """ 647 Compute the standard deviation of this RDD's elements. 648 649 >>> sc.parallelize([1, 2, 3]).stdev() 650 0.816... 651 """ 652 return self.stats().stdev()
653
654 - def sampleStdev(self):
655 """ 656 Compute the sample standard deviation of this RDD's elements (which corrects for bias in 657 estimating the standard deviation by dividing by N-1 instead of N). 658 659 >>> sc.parallelize([1, 2, 3]).sampleStdev() 660 1.0 661 """ 662 return self.stats().sampleStdev()
663
664 - def sampleVariance(self):
665 """ 666 Compute the sample variance of this RDD's elements (which corrects for bias in 667 estimating the variance by dividing by N-1 instead of N). 668 669 >>> sc.parallelize([1, 2, 3]).sampleVariance() 670 1.0 671 """ 672 return self.stats().sampleVariance()
673
674 - def countByValue(self):
675 """ 676 Return the count of each unique value in this RDD as a dictionary of 677 (value, count) pairs. 678 679 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 680 [(1, 2), (2, 3)] 681 """ 682 def countPartition(iterator): 683 counts = defaultdict(int) 684 for obj in iterator: 685 counts[obj] += 1 686 yield counts
687 def mergeMaps(m1, m2): 688 for (k, v) in m2.iteritems(): 689 m1[k] += v 690 return m1 691 return self.mapPartitions(countPartition).reduce(mergeMaps) 692
693 - def top(self, num):
694 """ 695 Get the top N elements from a RDD. 696 697 Note: It returns the list sorted in descending order. 698 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 699 [12] 700 >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2) 701 [6, 5] 702 """ 703 def topIterator(iterator): 704 q = [] 705 for k in iterator: 706 if len(q) < num: 707 heapq.heappush(q, k) 708 else: 709 heapq.heappushpop(q, k) 710 yield q
711 712 def merge(a, b): 713 return next(topIterator(a + b)) 714 715 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 716
717 - def takeOrdered(self, num, key=None):
718 """ 719 Get the N elements from a RDD ordered in ascending order or as specified 720 by the optional key function. 721 722 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 723 [1, 2, 3, 4, 5, 6] 724 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 725 [10, 9, 7, 6, 5, 4] 726 """ 727 728 def topNKeyedElems(iterator, key_=None): 729 q = MaxHeapQ(num) 730 for k in iterator: 731 if key_ != None: 732 k = (key_(k), k) 733 q.insert(k) 734 yield q.getElements()
735 736 def unKey(x, key_=None): 737 if key_ != None: 738 x = [i[1] for i in x] 739 return x 740 741 def merge(a, b): 742 return next(topNKeyedElems(a + b)) 743 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) 744 return sorted(unKey(result, key), key=key) 745 746
747 - def take(self, num):
748 """ 749 Take the first num elements of the RDD. 750 751 This currently scans the partitions *one by one*, so it will be slow if 752 a lot of partitions are required. In that case, use L{collect} to get 753 the whole RDD instead. 754 755 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 756 [2, 3] 757 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 758 [2, 3, 4, 5, 6] 759 """ 760 def takeUpToNum(iterator): 761 taken = 0 762 while taken < num: 763 yield next(iterator) 764 taken += 1
765 # Take only up to num elements from each partition we try 766 mapped = self.mapPartitions(takeUpToNum) 767 items = [] 768 # TODO(shivaram): Similar to the scala implementation, update the take 769 # method to scan multiple splits based on an estimate of how many elements 770 # we have per-split. 771 with _JavaStackTrace(self.context) as st: 772 for partition in range(mapped._jrdd.splits().size()): 773 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 774 partitionsToTake[0] = partition 775 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 776 items.extend(mapped._collect_iterator_through_file(iterator)) 777 if len(items) >= num: 778 break 779 return items[:num] 780
781 - def first(self):
782 """ 783 Return the first element in this RDD. 784 785 >>> sc.parallelize([2, 3, 4]).first() 786 2 787 """ 788 return self.take(1)[0]
789
790 - def saveAsTextFile(self, path):
791 """ 792 Save this RDD as a text file, using string representations of elements. 793 794 >>> tempFile = NamedTemporaryFile(delete=True) 795 >>> tempFile.close() 796 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 797 >>> from fileinput import input 798 >>> from glob import glob 799 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 800 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 801 """ 802 def func(split, iterator): 803 for x in iterator: 804 if not isinstance(x, basestring): 805 x = unicode(x) 806 yield x.encode("utf-8")
807 keyed = PipelinedRDD(self, func) 808 keyed._bypass_serializer = True 809 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 810 811 # Pair functions 812
813 - def collectAsMap(self):
814 """ 815 Return the key-value pairs in this RDD to the master as a dictionary. 816 817 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 818 >>> m[1] 819 2 820 >>> m[3] 821 4 822 """ 823 return dict(self.collect())
824
825 - def reduceByKey(self, func, numPartitions=None):
826 """ 827 Merge the values for each key using an associative reduce function. 828 829 This will also perform the merging locally on each mapper before 830 sending results to a reducer, similarly to a "combiner" in MapReduce. 831 832 Output will be hash-partitioned with C{numPartitions} partitions, or 833 the default parallelism level if C{numPartitions} is not specified. 834 835 >>> from operator import add 836 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 837 >>> sorted(rdd.reduceByKey(add).collect()) 838 [('a', 2), ('b', 1)] 839 """ 840 return self.combineByKey(lambda x: x, func, func, numPartitions)
841
842 - def reduceByKeyLocally(self, func):
843 """ 844 Merge the values for each key using an associative reduce function, but 845 return the results immediately to the master as a dictionary. 846 847 This will also perform the merging locally on each mapper before 848 sending results to a reducer, similarly to a "combiner" in MapReduce. 849 850 >>> from operator import add 851 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 852 >>> sorted(rdd.reduceByKeyLocally(add).items()) 853 [('a', 2), ('b', 1)] 854 """ 855 def reducePartition(iterator): 856 m = {} 857 for (k, v) in iterator: 858 m[k] = v if k not in m else func(m[k], v) 859 yield m
860 def mergeMaps(m1, m2): 861 for (k, v) in m2.iteritems(): 862 m1[k] = v if k not in m1 else func(m1[k], v) 863 return m1 864 return self.mapPartitions(reducePartition).reduce(mergeMaps) 865
866 - def countByKey(self):
867 """ 868 Count the number of elements for each key, and return the result to the 869 master as a dictionary. 870 871 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 872 >>> sorted(rdd.countByKey().items()) 873 [('a', 2), ('b', 1)] 874 """ 875 return self.map(lambda x: x[0]).countByValue()
876
877 - def join(self, other, numPartitions=None):
878 """ 879 Return an RDD containing all pairs of elements with matching keys in 880 C{self} and C{other}. 881 882 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 883 (k, v1) is in C{self} and (k, v2) is in C{other}. 884 885 Performs a hash join across the cluster. 886 887 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 888 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 889 >>> sorted(x.join(y).collect()) 890 [('a', (1, 2)), ('a', (1, 3))] 891 """ 892 return python_join(self, other, numPartitions)
893
894 - def leftOuterJoin(self, other, numPartitions=None):
895 """ 896 Perform a left outer join of C{self} and C{other}. 897 898 For each element (k, v) in C{self}, the resulting RDD will either 899 contain all pairs (k, (v, w)) for w in C{other}, or the pair 900 (k, (v, None)) if no elements in other have key k. 901 902 Hash-partitions the resulting RDD into the given number of partitions. 903 904 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 905 >>> y = sc.parallelize([("a", 2)]) 906 >>> sorted(x.leftOuterJoin(y).collect()) 907 [('a', (1, 2)), ('b', (4, None))] 908 """ 909 return python_left_outer_join(self, other, numPartitions)
910
911 - def rightOuterJoin(self, other, numPartitions=None):
912 """ 913 Perform a right outer join of C{self} and C{other}. 914 915 For each element (k, w) in C{other}, the resulting RDD will either 916 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 917 if no elements in C{self} have key k. 918 919 Hash-partitions the resulting RDD into the given number of partitions. 920 921 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 922 >>> y = sc.parallelize([("a", 2)]) 923 >>> sorted(y.rightOuterJoin(x).collect()) 924 [('a', (2, 1)), ('b', (None, 4))] 925 """ 926 return python_right_outer_join(self, other, numPartitions)
927 928 # TODO: add option to control map-side combining
929 - def partitionBy(self, numPartitions, partitionFunc=None):
930 """ 931 Return a copy of the RDD partitioned using the specified partitioner. 932 933 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 934 >>> sets = pairs.partitionBy(2).glom().collect() 935 >>> set(sets[0]).intersection(set(sets[1])) 936 set([]) 937 """ 938 if numPartitions is None: 939 numPartitions = self.ctx.defaultParallelism 940 941 if partitionFunc is None: 942 partitionFunc = lambda x: 0 if x is None else hash(x) 943 # Transferring O(n) objects to Java is too expensive. Instead, we'll 944 # form the hash buckets in Python, transferring O(numPartitions) objects 945 # to Java. Each object is a (splitNumber, [objects]) pair. 946 outputSerializer = self.ctx._unbatched_serializer 947 def add_shuffle_key(split, iterator): 948 949 buckets = defaultdict(list) 950 951 for (k, v) in iterator: 952 buckets[partitionFunc(k) % numPartitions].append((k, v)) 953 for (split, items) in buckets.iteritems(): 954 yield pack_long(split) 955 yield outputSerializer.dumps(items)
956 keyed = PipelinedRDD(self, add_shuffle_key) 957 keyed._bypass_serializer = True 958 with _JavaStackTrace(self.context) as st: 959 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 960 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 961 id(partitionFunc)) 962 jrdd = pairRDD.partitionBy(partitioner).values() 963 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 964 # This is required so that id(partitionFunc) remains unique, even if 965 # partitionFunc is a lambda: 966 rdd._partitionFunc = partitionFunc 967 return rdd 968 969 # TODO: add control over map-side aggregation
970 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 971 numPartitions=None):
972 """ 973 Generic function to combine the elements for each key using a custom 974 set of aggregation functions. 975 976 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 977 type" C. Note that V and C can be different -- for example, one might 978 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 979 980 Users provide three functions: 981 982 - C{createCombiner}, which turns a V into a C (e.g., creates 983 a one-element list) 984 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 985 a list) 986 - C{mergeCombiners}, to combine two C's into a single one. 987 988 In addition, users can control the partitioning of the output RDD. 989 990 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 991 >>> def f(x): return x 992 >>> def add(a, b): return a + str(b) 993 >>> sorted(x.combineByKey(str, add, add).collect()) 994 [('a', '11'), ('b', '1')] 995 """ 996 if numPartitions is None: 997 numPartitions = self.ctx.defaultParallelism 998 def combineLocally(iterator): 999 combiners = {} 1000 for x in iterator: 1001 (k, v) = x 1002 if k not in combiners: 1003 combiners[k] = createCombiner(v) 1004 else: 1005 combiners[k] = mergeValue(combiners[k], v) 1006 return combiners.iteritems()
1007 locally_combined = self.mapPartitions(combineLocally) 1008 shuffled = locally_combined.partitionBy(numPartitions) 1009 def _mergeCombiners(iterator): 1010 combiners = {} 1011 for (k, v) in iterator: 1012 if not k in combiners: 1013 combiners[k] = v 1014 else: 1015 combiners[k] = mergeCombiners(combiners[k], v) 1016 return combiners.iteritems() 1017 return shuffled.mapPartitions(_mergeCombiners) 1018
1019 - def foldByKey(self, zeroValue, func, numPartitions=None):
1020 """ 1021 Merge the values for each key using an associative function "func" and a neutral "zeroValue" 1022 which may be added to the result an arbitrary number of times, and must not change 1023 the result (e.g., 0 for addition, or 1 for multiplication.). 1024 1025 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1026 >>> from operator import add 1027 >>> rdd.foldByKey(0, add).collect() 1028 [('a', 2), ('b', 1)] 1029 """ 1030 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1031 1032 1033 # TODO: support variant with custom partitioner
1034 - def groupByKey(self, numPartitions=None):
1035 """ 1036 Group the values for each key in the RDD into a single sequence. 1037 Hash-partitions the resulting RDD with into numPartitions partitions. 1038 1039 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1040 >>> sorted(x.groupByKey().collect()) 1041 [('a', [1, 1]), ('b', [1])] 1042 """ 1043 1044 def createCombiner(x): 1045 return [x]
1046 1047 def mergeValue(xs, x): 1048 xs.append(x) 1049 return xs 1050 1051 def mergeCombiners(a, b): 1052 return a + b 1053 1054 return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 1055 numPartitions) 1056 1057 # TODO: add tests
1058 - def flatMapValues(self, f):
1059 """ 1060 Pass each value in the key-value pair RDD through a flatMap function 1061 without changing the keys; this also retains the original RDD's 1062 partitioning. 1063 """ 1064 flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 1065 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1066
1067 - def mapValues(self, f):
1068 """ 1069 Pass each value in the key-value pair RDD through a map function 1070 without changing the keys; this also retains the original RDD's 1071 partitioning. 1072 """ 1073 map_values_fn = lambda (k, v): (k, f(v)) 1074 return self.map(map_values_fn, preservesPartitioning=True)
1075 1076 # TODO: support varargs cogroup of several RDDs.
1077 - def groupWith(self, other):
1078 """ 1079 Alias for cogroup. 1080 """ 1081 return self.cogroup(other)
1082 1083 # TODO: add variant with custom parittioner
1084 - def cogroup(self, other, numPartitions=None):
1085 """ 1086 For each key k in C{self} or C{other}, return a resulting RDD that 1087 contains a tuple with the list of values for that key in C{self} as well 1088 as C{other}. 1089 1090 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1091 >>> y = sc.parallelize([("a", 2)]) 1092 >>> sorted(x.cogroup(y).collect()) 1093 [('a', ([1], [2])), ('b', ([4], []))] 1094 """ 1095 return python_cogroup(self, other, numPartitions)
1096
1097 - def subtractByKey(self, other, numPartitions=None):
1098 """ 1099 Return each (key, value) pair in C{self} that has no pair with matching key 1100 in C{other}. 1101 1102 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 1103 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1104 >>> sorted(x.subtractByKey(y).collect()) 1105 [('b', 4), ('b', 5)] 1106 """ 1107 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 1108 map_func = lambda (key, vals): [(key, val) for val in vals[0]] 1109 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1110
1111 - def subtract(self, other, numPartitions=None):
1112 """ 1113 Return each value in C{self} that is not contained in C{other}. 1114 1115 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 1116 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1117 >>> sorted(x.subtract(y).collect()) 1118 [('a', 1), ('b', 4), ('b', 5)] 1119 """ 1120 rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder 1121 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
1122
1123 - def keyBy(self, f):
1124 """ 1125 Creates tuples of the elements in this RDD by applying C{f}. 1126 1127 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 1128 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 1129 >>> sorted(x.cogroup(y).collect()) 1130 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 1131 """ 1132 return self.map(lambda x: (f(x), x))
1133
1134 - def repartition(self, numPartitions):
1135 """ 1136 Return a new RDD that has exactly numPartitions partitions. 1137 1138 Can increase or decrease the level of parallelism in this RDD. Internally, this uses 1139 a shuffle to redistribute data. 1140 If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 1141 which can avoid performing a shuffle. 1142 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 1143 >>> sorted(rdd.glom().collect()) 1144 [[1], [2, 3], [4, 5], [6, 7]] 1145 >>> len(rdd.repartition(2).glom().collect()) 1146 2 1147 >>> len(rdd.repartition(10).glom().collect()) 1148 10 1149 """ 1150 jrdd = self._jrdd.repartition(numPartitions) 1151 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1152
1153 - def coalesce(self, numPartitions, shuffle=False):
1154 """ 1155 Return a new RDD that is reduced into `numPartitions` partitions. 1156 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 1157 [[1], [2, 3], [4, 5]] 1158 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 1159 [[1, 2, 3, 4, 5]] 1160 """ 1161 jrdd = self._jrdd.coalesce(numPartitions) 1162 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1163
1164 - def zip(self, other):
1165 """ 1166 Zips this RDD with another one, returning key-value pairs with the first element in each RDD 1167 second element in each RDD, etc. Assumes that the two RDDs have the same number of 1168 partitions and the same number of elements in each partition (e.g. one was made through 1169 a map on the other). 1170 1171 >>> x = sc.parallelize(range(0,5)) 1172 >>> y = sc.parallelize(range(1000, 1005)) 1173 >>> x.zip(y).collect() 1174 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 1175 """ 1176 pairRDD = self._jrdd.zip(other._jrdd) 1177 deserializer = PairDeserializer(self._jrdd_deserializer, 1178 other._jrdd_deserializer) 1179 return RDD(pairRDD, self.ctx, deserializer)
1180
1181 - def name(self):
1182 """ 1183 Return the name of this RDD. 1184 """ 1185 name_ = self._jrdd.name() 1186 if not name_: 1187 return None 1188 return name_.encode('utf-8')
1189
1190 - def setName(self, name):
1191 """ 1192 Assign a name to this RDD. 1193 >>> rdd1 = sc.parallelize([1,2]) 1194 >>> rdd1.setName('RDD1') 1195 >>> rdd1.name() 1196 'RDD1' 1197 """ 1198 self._jrdd.setName(name)
1199
1200 - def toDebugString(self):
1201 """ 1202 A description of this RDD and its recursive dependencies for debugging. 1203 """ 1204 debug_string = self._jrdd.toDebugString() 1205 if not debug_string: 1206 return None 1207 return debug_string.encode('utf-8')
1208
1209 - def getStorageLevel(self):
1210 """ 1211 Get the RDD's current storage level. 1212 >>> rdd1 = sc.parallelize([1,2]) 1213 >>> rdd1.getStorageLevel() 1214 StorageLevel(False, False, False, 1) 1215 """ 1216 java_storage_level = self._jrdd.getStorageLevel() 1217 storage_level = StorageLevel(java_storage_level.useDisk(), 1218 java_storage_level.useMemory(), 1219 java_storage_level.deserialized(), 1220 java_storage_level.replication()) 1221 return storage_level
1222
1223 # TODO: `lookup` is disabled because we can't make direct comparisons based 1224 # on the key; we need to compare the hash of the key to the hash of the 1225 # keys in the pairs. This could be an expensive operation, since those 1226 # hashes aren't retained. 1227 1228 1229 -class PipelinedRDD(RDD):
1230 """ 1231 Pipelined maps: 1232 >>> rdd = sc.parallelize([1, 2, 3, 4]) 1233 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 1234 [4, 8, 12, 16] 1235 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 1236 [4, 8, 12, 16] 1237 1238 Pipelined reduces: 1239 >>> from operator import add 1240 >>> rdd.map(lambda x: 2 * x).reduce(add) 1241 20 1242 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 1243 20 1244 """
1245 - def __init__(self, prev, func, preservesPartitioning=False):
1246 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 1247 # This transformation is the first in its stage: 1248 self.func = func 1249 self.preservesPartitioning = preservesPartitioning 1250 self._prev_jrdd = prev._jrdd 1251 self._prev_jrdd_deserializer = prev._jrdd_deserializer 1252 else: 1253 prev_func = prev.func 1254 def pipeline_func(split, iterator): 1255 return func(split, prev_func(split, iterator))
1256 self.func = pipeline_func 1257 self.preservesPartitioning = \ 1258 prev.preservesPartitioning and preservesPartitioning 1259 self._prev_jrdd = prev._prev_jrdd # maintain the pipeline 1260 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 1261 self.is_cached = False 1262 self.is_checkpointed = False 1263 self.ctx = prev.ctx 1264 self.prev = prev 1265 self._jrdd_val = None 1266 self._jrdd_deserializer = self.ctx.serializer 1267 self._bypass_serializer = False
1268 1269 @property
1270 - def _jrdd(self):
1271 if self._jrdd_val: 1272 return self._jrdd_val 1273 if self._bypass_serializer: 1274 serializer = NoOpSerializer() 1275 else: 1276 serializer = self.ctx.serializer 1277 command = (self.func, self._prev_jrdd_deserializer, serializer) 1278 pickled_command = CloudPickleSerializer().dumps(command) 1279 broadcast_vars = ListConverter().convert( 1280 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 1281 self.ctx._gateway._gateway_client) 1282 self.ctx._pickled_broadcast_vars.clear() 1283 class_tag = self._prev_jrdd.classTag() 1284 env = MapConverter().convert(self.ctx.environment, 1285 self.ctx._gateway._gateway_client) 1286 includes = ListConverter().convert(self.ctx._python_includes, 1287 self.ctx._gateway._gateway_client) 1288 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 1289 bytearray(pickled_command), env, includes, self.preservesPartitioning, 1290 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 1291 class_tag) 1292 self._jrdd_val = python_rdd.asJavaRDD() 1293 return self._jrdd_val
1294
1295 - def _is_pipelinable(self):
1296 return not (self.is_cached or self.is_checkpointed)
1297
1298 1299 -def _test():
1300 import doctest 1301 from pyspark.context import SparkContext 1302 globs = globals().copy() 1303 # The small batch size here ensures that we see multiple batches, 1304 # even in these small test examples: 1305 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 1306 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 1307 globs['sc'].stop() 1308 if failure_count: 1309 exit(-1)
1310 1311 1312 if __name__ == "__main__": 1313 _test() 1314