1 from base64 import standard_b64encode as b64enc
2 import copy
3 from collections import defaultdict
4 from itertools import chain, ifilter, imap, product
5 import operator
6 import os
7 import shlex
8 from subprocess import Popen, PIPE
9 from tempfile import NamedTemporaryFile
10 from threading import Thread
11
12 from pyspark import cloudpickle
13 from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
14 read_from_pickle_file
15 from pyspark.join import python_join, python_left_outer_join, \
16 python_right_outer_join, python_cogroup
17
18 from py4j.java_collections import ListConverter, MapConverter
19
20
21 __all__ = ["RDD"]
22
23
24 -class RDD(object):
25 """
26 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
27 Represents an immutable, partitioned collection of elements that can be
28 operated on in parallel.
29 """
30
32 self._jrdd = jrdd
33 self.is_cached = False
34 self.is_checkpointed = False
35 self.ctx = ctx
36 self._partitionFunc = None
37
38 @property
40 """
41 The L{SparkContext} that this RDD was created on.
42 """
43 return self.ctx
44
46 """
47 Persist this RDD with the default storage level (C{MEMORY_ONLY}).
48 """
49 self.is_cached = True
50 self._jrdd.cache()
51 return self
52
54 """
55 Mark this RDD for checkpointing. It will be saved to a file inside the
56 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
57 all references to its parent RDDs will be removed. This function must
58 be called before any job has been executed on this RDD. It is strongly
59 recommended that this RDD is persisted in memory, otherwise saving it
60 on a file will require recomputation.
61 """
62 self.is_checkpointed = True
63 self._jrdd.rdd().checkpoint()
64
66 """
67 Return whether this RDD has been checkpointed or not
68 """
69 return self._jrdd.rdd().isCheckpointed()
70
72 """
73 Gets the name of the file to which this RDD was checkpointed
74 """
75 checkpointFile = self._jrdd.rdd().getCheckpointFile()
76 if checkpointFile.isDefined():
77 return checkpointFile.get()
78 else:
79 return None
80
81
82
83 - def map(self, f, preservesPartitioning=False):
84 """
85 Return a new RDD containing the distinct elements in this RDD.
86 """
87 def func(split, iterator): return imap(f, iterator)
88 return PipelinedRDD(self, func, preservesPartitioning)
89
90 - def flatMap(self, f, preservesPartitioning=False):
91 """
92 Return a new RDD by first applying a function to all elements of this
93 RDD, and then flattening the results.
94
95 >>> rdd = sc.parallelize([2, 3, 4])
96 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
97 [1, 1, 1, 2, 2, 3]
98 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
99 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
100 """
101 def func(s, iterator): return chain.from_iterable(imap(f, iterator))
102 return self.mapPartitionsWithSplit(func, preservesPartitioning)
103
105 """
106 Return a new RDD by applying a function to each partition of this RDD.
107
108 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
109 >>> def f(iterator): yield sum(iterator)
110 >>> rdd.mapPartitions(f).collect()
111 [3, 7]
112 """
113 def func(s, iterator): return f(iterator)
114 return self.mapPartitionsWithSplit(func)
115
117 """
118 Return a new RDD by applying a function to each partition of this RDD,
119 while tracking the index of the original partition.
120
121 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
122 >>> def f(splitIndex, iterator): yield splitIndex
123 >>> rdd.mapPartitionsWithSplit(f).sum()
124 6
125 """
126 return PipelinedRDD(self, f, preservesPartitioning)
127
129 """
130 Return a new RDD containing only the elements that satisfy a predicate.
131
132 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
133 >>> rdd.filter(lambda x: x % 2 == 0).collect()
134 [2, 4]
135 """
136 def func(iterator): return ifilter(f, iterator)
137 return self.mapPartitions(func)
138
140 """
141 Return a new RDD containing the distinct elements in this RDD.
142
143 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
144 [1, 2, 3]
145 """
146 return self.map(lambda x: (x, "")) \
147 .reduceByKey(lambda x, _: x) \
148 .map(lambda (x, _): x)
149
150
151
152
153
154
155
156
157
158
160 """
161 Return the union of this RDD and another one.
162
163 >>> rdd = sc.parallelize([1, 1, 2, 3])
164 >>> rdd.union(rdd).collect()
165 [1, 1, 2, 3, 1, 1, 2, 3]
166 """
167 return RDD(self._jrdd.union(other._jrdd), self.ctx)
168
170 """
171 Return the union of this RDD and another one.
172
173 >>> rdd = sc.parallelize([1, 1, 2, 3])
174 >>> (rdd + rdd).collect()
175 [1, 1, 2, 3, 1, 1, 2, 3]
176 """
177 if not isinstance(other, RDD):
178 raise TypeError
179 return self.union(other)
180
181
182
184 """
185 Return an RDD created by coalescing all elements within each partition
186 into a list.
187
188 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
189 >>> sorted(rdd.glom().collect())
190 [[1, 2], [3, 4]]
191 """
192 def func(iterator): yield list(iterator)
193 return self.mapPartitions(func)
194
196 """
197 Return the Cartesian product of this RDD and another one, that is, the
198 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
199 C{b} is in C{other}.
200
201 >>> rdd = sc.parallelize([1, 2])
202 >>> sorted(rdd.cartesian(rdd).collect())
203 [(1, 1), (1, 2), (2, 1), (2, 2)]
204 """
205
206 java_cartesian = RDD(self._jrdd.cartesian(other._jrdd), self.ctx)
207 def unpack_batches(pair):
208 (x, y) = pair
209 if type(x) == Batch or type(y) == Batch:
210 xs = x.items if type(x) == Batch else [x]
211 ys = y.items if type(y) == Batch else [y]
212 for pair in product(xs, ys):
213 yield pair
214 else:
215 yield pair
216 return java_cartesian.flatMap(unpack_batches)
217
218 - def groupBy(self, f, numPartitions=None):
219 """
220 Return an RDD of grouped items.
221
222 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
223 >>> result = rdd.groupBy(lambda x: x % 2).collect()
224 >>> sorted([(x, sorted(y)) for (x, y) in result])
225 [(0, [2, 8]), (1, [1, 1, 3, 5])]
226 """
227 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
228
229 - def pipe(self, command, env={}):
230 """
231 Return an RDD created by piping elements to a forked external process.
232
233 >>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
234 ['1', '2', '3']
235 """
236 def func(iterator):
237 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
238 def pipe_objs(out):
239 for obj in iterator:
240 out.write(str(obj).rstrip('\n') + '\n')
241 out.close()
242 Thread(target=pipe_objs, args=[pipe.stdin]).start()
243 return (x.rstrip('\n') for x in pipe.stdout)
244 return self.mapPartitions(func)
245
247 """
248 Applies a function to all elements of this RDD.
249
250 >>> def f(x): print x
251 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
252 """
253 self.map(f).collect()
254
256 """
257 Return a list that contains all of the elements in this RDD.
258 """
259 picklesInJava = self._jrdd.collect().iterator()
260 return list(self._collect_iterator_through_file(picklesInJava))
261
263
264
265
266 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
267 tempFile.close()
268 self.ctx._writeIteratorToPickleFile(iterator, tempFile.name)
269
270 with open(tempFile.name, 'rb') as tempFile:
271 for item in read_from_pickle_file(tempFile):
272 yield item
273 os.unlink(tempFile.name)
274
276 """
277 Reduces the elements of this RDD using the specified commutative and
278 associative binary operator.
279
280 >>> from operator import add
281 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
282 15
283 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
284 10
285 """
286 def func(iterator):
287 acc = None
288 for obj in iterator:
289 if acc is None:
290 acc = obj
291 else:
292 acc = f(obj, acc)
293 if acc is not None:
294 yield acc
295 vals = self.mapPartitions(func).collect()
296 return reduce(f, vals)
297
298 - def fold(self, zeroValue, op):
299 """
300 Aggregate the elements of each partition, and then the results for all
301 the partitions, using a given associative function and a neutral "zero
302 value."
303
304 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
305 as its result value to avoid object allocation; however, it should not
306 modify C{t2}.
307
308 >>> from operator import add
309 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
310 15
311 """
312 def func(iterator):
313 acc = zeroValue
314 for obj in iterator:
315 acc = op(obj, acc)
316 yield acc
317 vals = self.mapPartitions(func).collect()
318 return reduce(op, vals, zeroValue)
319
320
321
323 """
324 Add up the elements in this RDD.
325
326 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
327 6.0
328 """
329 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
330
332 """
333 Return the number of elements in this RDD.
334
335 >>> sc.parallelize([2, 3, 4]).count()
336 3
337 """
338 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
339
341 """
342 Return the count of each unique value in this RDD as a dictionary of
343 (value, count) pairs.
344
345 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
346 [(1, 2), (2, 3)]
347 """
348 def countPartition(iterator):
349 counts = defaultdict(int)
350 for obj in iterator:
351 counts[obj] += 1
352 yield counts
353 def mergeMaps(m1, m2):
354 for (k, v) in m2.iteritems():
355 m1[k] += v
356 return m1
357 return self.mapPartitions(countPartition).reduce(mergeMaps)
358
359 - def take(self, num):
360 """
361 Take the first num elements of the RDD.
362
363 This currently scans the partitions *one by one*, so it will be slow if
364 a lot of partitions are required. In that case, use L{collect} to get
365 the whole RDD instead.
366
367 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
368 [2, 3]
369 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
370 [2, 3, 4, 5, 6]
371 """
372 items = []
373 for partition in range(self._jrdd.splits().size()):
374 iterator = self.ctx._takePartition(self._jrdd.rdd(), partition)
375
376
377
378 iterator = iterator.take(num)
379 items.extend(self._collect_iterator_through_file(iterator))
380 if len(items) >= num:
381 break
382 return items[:num]
383
385 """
386 Return the first element in this RDD.
387
388 >>> sc.parallelize([2, 3, 4]).first()
389 2
390 """
391 return self.take(1)[0]
392
393 - def saveAsTextFile(self, path):
394 """
395 Save this RDD as a text file, using string representations of elements.
396
397 >>> tempFile = NamedTemporaryFile(delete=True)
398 >>> tempFile.close()
399 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
400 >>> from fileinput import input
401 >>> from glob import glob
402 >>> ''.join(input(glob(tempFile.name + "/part-0000*")))
403 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
404 """
405 def func(split, iterator):
406 return (str(x).encode("utf-8") for x in iterator)
407 keyed = PipelinedRDD(self, func)
408 keyed._bypass_serializer = True
409 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
410
411
412
414 """
415 Return the key-value pairs in this RDD to the master as a dictionary.
416
417 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
418 >>> m[1]
419 2
420 >>> m[3]
421 4
422 """
423 return dict(self.collect())
424
426 """
427 Merge the values for each key using an associative reduce function.
428
429 This will also perform the merging locally on each mapper before
430 sending results to a reducer, similarly to a "combiner" in MapReduce.
431
432 Output will be hash-partitioned with C{numPartitions} partitions, or
433 the default parallelism level if C{numPartitions} is not specified.
434
435 >>> from operator import add
436 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
437 >>> sorted(rdd.reduceByKey(add).collect())
438 [('a', 2), ('b', 1)]
439 """
440 return self.combineByKey(lambda x: x, func, func, numPartitions)
441
443 """
444 Merge the values for each key using an associative reduce function, but
445 return the results immediately to the master as a dictionary.
446
447 This will also perform the merging locally on each mapper before
448 sending results to a reducer, similarly to a "combiner" in MapReduce.
449
450 >>> from operator import add
451 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
452 >>> sorted(rdd.reduceByKeyLocally(add).items())
453 [('a', 2), ('b', 1)]
454 """
455 def reducePartition(iterator):
456 m = {}
457 for (k, v) in iterator:
458 m[k] = v if k not in m else func(m[k], v)
459 yield m
460 def mergeMaps(m1, m2):
461 for (k, v) in m2.iteritems():
462 m1[k] = v if k not in m1 else func(m1[k], v)
463 return m1
464 return self.mapPartitions(reducePartition).reduce(mergeMaps)
465
467 """
468 Count the number of elements for each key, and return the result to the
469 master as a dictionary.
470
471 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
472 >>> sorted(rdd.countByKey().items())
473 [('a', 2), ('b', 1)]
474 """
475 return self.map(lambda x: x[0]).countByValue()
476
477 - def join(self, other, numPartitions=None):
478 """
479 Return an RDD containing all pairs of elements with matching keys in
480 C{self} and C{other}.
481
482 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
483 (k, v1) is in C{self} and (k, v2) is in C{other}.
484
485 Performs a hash join across the cluster.
486
487 >>> x = sc.parallelize([("a", 1), ("b", 4)])
488 >>> y = sc.parallelize([("a", 2), ("a", 3)])
489 >>> sorted(x.join(y).collect())
490 [('a', (1, 2)), ('a', (1, 3))]
491 """
492 return python_join(self, other, numPartitions)
493
495 """
496 Perform a left outer join of C{self} and C{other}.
497
498 For each element (k, v) in C{self}, the resulting RDD will either
499 contain all pairs (k, (v, w)) for w in C{other}, or the pair
500 (k, (v, None)) if no elements in other have key k.
501
502 Hash-partitions the resulting RDD into the given number of partitions.
503
504 >>> x = sc.parallelize([("a", 1), ("b", 4)])
505 >>> y = sc.parallelize([("a", 2)])
506 >>> sorted(x.leftOuterJoin(y).collect())
507 [('a', (1, 2)), ('b', (4, None))]
508 """
509 return python_left_outer_join(self, other, numPartitions)
510
512 """
513 Perform a right outer join of C{self} and C{other}.
514
515 For each element (k, w) in C{other}, the resulting RDD will either
516 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
517 if no elements in C{self} have key k.
518
519 Hash-partitions the resulting RDD into the given number of partitions.
520
521 >>> x = sc.parallelize([("a", 1), ("b", 4)])
522 >>> y = sc.parallelize([("a", 2)])
523 >>> sorted(y.rightOuterJoin(x).collect())
524 [('a', (2, 1)), ('b', (None, 4))]
525 """
526 return python_right_outer_join(self, other, numPartitions)
527
528
529 - def partitionBy(self, numPartitions, partitionFunc=hash):
530 """
531 Return a copy of the RDD partitioned using the specified partitioner.
532
533 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
534 >>> sets = pairs.partitionBy(2).glom().collect()
535 >>> set(sets[0]).intersection(set(sets[1]))
536 set([])
537 """
538 if numPartitions is None:
539 numPartitions = self.ctx.defaultParallelism
540
541
542
543 def add_shuffle_key(split, iterator):
544 buckets = defaultdict(list)
545 for (k, v) in iterator:
546 buckets[partitionFunc(k) % numPartitions].append((k, v))
547 for (split, items) in buckets.iteritems():
548 yield str(split)
549 yield dump_pickle(Batch(items))
550 keyed = PipelinedRDD(self, add_shuffle_key)
551 keyed._bypass_serializer = True
552 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
553 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
554 id(partitionFunc))
555 jrdd = pairRDD.partitionBy(partitioner).values()
556 rdd = RDD(jrdd, self.ctx)
557
558
559 rdd._partitionFunc = partitionFunc
560 return rdd
561
562
563 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
564 numPartitions=None):
565 """
566 Generic function to combine the elements for each key using a custom
567 set of aggregation functions.
568
569 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
570 type" C. Note that V and C can be different -- for example, one might
571 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
572
573 Users provide three functions:
574
575 - C{createCombiner}, which turns a V into a C (e.g., creates
576 a one-element list)
577 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
578 a list)
579 - C{mergeCombiners}, to combine two C's into a single one.
580
581 In addition, users can control the partitioning of the output RDD.
582
583 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
584 >>> def f(x): return x
585 >>> def add(a, b): return a + str(b)
586 >>> sorted(x.combineByKey(str, add, add).collect())
587 [('a', '11'), ('b', '1')]
588 """
589 if numPartitions is None:
590 numPartitions = self.ctx.defaultParallelism
591 def combineLocally(iterator):
592 combiners = {}
593 for (k, v) in iterator:
594 if k not in combiners:
595 combiners[k] = createCombiner(v)
596 else:
597 combiners[k] = mergeValue(combiners[k], v)
598 return combiners.iteritems()
599 locally_combined = self.mapPartitions(combineLocally)
600 shuffled = locally_combined.partitionBy(numPartitions)
601 def _mergeCombiners(iterator):
602 combiners = {}
603 for (k, v) in iterator:
604 if not k in combiners:
605 combiners[k] = v
606 else:
607 combiners[k] = mergeCombiners(combiners[k], v)
608 return combiners.iteritems()
609 return shuffled.mapPartitions(_mergeCombiners)
610
611
613 """
614 Group the values for each key in the RDD into a single sequence.
615 Hash-partitions the resulting RDD with into numPartitions partitions.
616
617 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
618 >>> sorted(x.groupByKey().collect())
619 [('a', [1, 1]), ('b', [1])]
620 """
621
622 def createCombiner(x):
623 return [x]
624
625 def mergeValue(xs, x):
626 xs.append(x)
627 return xs
628
629 def mergeCombiners(a, b):
630 return a + b
631
632 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
633 numPartitions)
634
635
637 """
638 Pass each value in the key-value pair RDD through a flatMap function
639 without changing the keys; this also retains the original RDD's
640 partitioning.
641 """
642 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
643 return self.flatMap(flat_map_fn, preservesPartitioning=True)
644
646 """
647 Pass each value in the key-value pair RDD through a map function
648 without changing the keys; this also retains the original RDD's
649 partitioning.
650 """
651 map_values_fn = lambda (k, v): (k, f(v))
652 return self.map(map_values_fn, preservesPartitioning=True)
653
654
656 """
657 Alias for cogroup.
658 """
659 return self.cogroup(other)
660
661
662 - def cogroup(self, other, numPartitions=None):
663 """
664 For each key k in C{self} or C{other}, return a resulting RDD that
665 contains a tuple with the list of values for that key in C{self} as well
666 as C{other}.
667
668 >>> x = sc.parallelize([("a", 1), ("b", 4)])
669 >>> y = sc.parallelize([("a", 2)])
670 >>> sorted(x.cogroup(y).collect())
671 [('a', ([1], [2])), ('b', ([4], []))]
672 """
673 return python_cogroup(self, other, numPartitions)
674
682 """
683 Pipelined maps:
684 >>> rdd = sc.parallelize([1, 2, 3, 4])
685 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
686 [4, 8, 12, 16]
687 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
688 [4, 8, 12, 16]
689
690 Pipelined reduces:
691 >>> from operator import add
692 >>> rdd.map(lambda x: 2 * x).reduce(add)
693 20
694 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
695 20
696 """
697 - def __init__(self, prev, func, preservesPartitioning=False):
698 if isinstance(prev, PipelinedRDD) and prev._is_pipelinable():
699 prev_func = prev.func
700 def pipeline_func(split, iterator):
701 return func(split, prev_func(split, iterator))
702 self.func = pipeline_func
703 self.preservesPartitioning = \
704 prev.preservesPartitioning and preservesPartitioning
705 self._prev_jrdd = prev._prev_jrdd
706 else:
707 self.func = func
708 self.preservesPartitioning = preservesPartitioning
709 self._prev_jrdd = prev._jrdd
710 self.is_cached = False
711 self.is_checkpointed = False
712 self.ctx = prev.ctx
713 self.prev = prev
714 self._jrdd_val = None
715 self._bypass_serializer = False
716
717 @property
719 if self._jrdd_val:
720 return self._jrdd_val
721 func = self.func
722 if not self._bypass_serializer and self.ctx.batchSize != 1:
723 oldfunc = self.func
724 batchSize = self.ctx.batchSize
725 def batched_func(split, iterator):
726 return batched(oldfunc(split, iterator), batchSize)
727 func = batched_func
728 cmds = [func, self._bypass_serializer]
729 pipe_command = ' '.join(b64enc(cloudpickle.dumps(f)) for f in cmds)
730 broadcast_vars = ListConverter().convert(
731 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
732 self.ctx._gateway._gateway_client)
733 self.ctx._pickled_broadcast_vars.clear()
734 class_manifest = self._prev_jrdd.classManifest()
735 env = copy.copy(self.ctx.environment)
736 env['PYTHONPATH'] = os.environ.get("PYTHONPATH", "")
737 env = MapConverter().convert(env, self.ctx._gateway._gateway_client)
738 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
739 pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec,
740 broadcast_vars, self.ctx._javaAccumulator, class_manifest)
741 self._jrdd_val = python_rdd.asJavaRDD()
742 return self._jrdd_val
743
745 return not (self.is_cached or self.is_checkpointed)
746
749 import doctest
750 from pyspark.context import SparkContext
751 globs = globals().copy()
752
753
754 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
755 (failure_count, test_count) = doctest.testmod(globs=globs)
756 globs['sc'].stop()
757 if failure_count:
758 exit(-1)
759
760
761 if __name__ == "__main__":
762 _test()
763