Package pyspark :: Module context
[frames] | no frames]

Source Code for Module pyspark.context

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one or more 
  3  # contributor license agreements.  See the NOTICE file distributed with 
  4  # this work for additional information regarding copyright ownership. 
  5  # The ASF licenses this file to You under the Apache License, Version 2.0 
  6  # (the "License"); you may not use this file except in compliance with 
  7  # the License.  You may obtain a copy of the License at 
  8  # 
  9  #    http://www.apache.org/licenses/LICENSE-2.0 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16  # 
 17   
 18  import os 
 19  import shutil 
 20  import sys 
 21  from threading import Lock 
 22  from tempfile import NamedTemporaryFile 
 23  from collections import namedtuple 
 24   
 25  from pyspark import accumulators 
 26  from pyspark.accumulators import Accumulator 
 27  from pyspark.broadcast import Broadcast 
 28  from pyspark.conf import SparkConf 
 29  from pyspark.files import SparkFiles 
 30  from pyspark.java_gateway import launch_gateway 
 31  from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ 
 32      PairDeserializer, CompressedSerializer 
 33  from pyspark.storagelevel import StorageLevel 
 34  from pyspark import rdd 
 35  from pyspark.rdd import RDD 
 36   
 37  from py4j.java_collections import ListConverter 
 38   
 39   
 40  # These are special default configs for PySpark, they will overwrite 
 41  # the default ones for Spark if they are not configured by user. 
 42  DEFAULT_CONFIGS = { 
 43      "spark.serializer": "org.apache.spark.serializer.KryoSerializer", 
 44      "spark.serializer.objectStreamReset": 100, 
 45      "spark.rdd.compress": True, 
 46  } 
47 48 49 -class SparkContext(object):
50 51 """ 52 Main entry point for Spark functionality. A SparkContext represents the 53 connection to a Spark cluster, and can be used to create L{RDD}s and 54 broadcast variables on that cluster. 55 """ 56 57 _gateway = None 58 _jvm = None 59 _writeToFile = None 60 _next_accum_id = 0 61 _active_spark_context = None 62 _lock = Lock() 63 _python_includes = None # zip and egg files that need to be added to PYTHONPATH 64 _default_batch_size_for_serialized_input = 10 65
66 - def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, 67 environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, 68 gateway=None):
69 """ 70 Create a new SparkContext. At least the master and app name should be set, 71 either through the named parameters here or through C{conf}. 72 73 @param master: Cluster URL to connect to 74 (e.g. mesos://host:port, spark://host:port, local[4]). 75 @param appName: A name for your job, to display on the cluster web UI. 76 @param sparkHome: Location where Spark is installed on cluster nodes. 77 @param pyFiles: Collection of .zip or .py files to send to the cluster 78 and add to PYTHONPATH. These can be paths on the local file 79 system or HDFS, HTTP, HTTPS, or FTP URLs. 80 @param environment: A dictionary of environment variables to set on 81 worker nodes. 82 @param batchSize: The number of Python objects represented as a single 83 Java object. Set 1 to disable batching or -1 to use an 84 unlimited batch size. 85 @param serializer: The serializer for RDDs. 86 @param conf: A L{SparkConf} object setting Spark properties. 87 @param gateway: Use an existing gateway and JVM, otherwise a new JVM 88 will be instantiated. 89 90 91 >>> from pyspark.context import SparkContext 92 >>> sc = SparkContext('local', 'test') 93 94 >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL 95 Traceback (most recent call last): 96 ... 97 ValueError:... 98 """ 99 if rdd._extract_concise_traceback() is not None: 100 self._callsite = rdd._extract_concise_traceback() 101 else: 102 tempNamedTuple = namedtuple("Callsite", "function file linenum") 103 self._callsite = tempNamedTuple(function=None, file=None, linenum=None) 104 SparkContext._ensure_initialized(self, gateway=gateway) 105 try: 106 self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, 107 conf) 108 except: 109 # If an error occurs, clean up in order to allow future SparkContext creation: 110 self.stop() 111 raise
112
113 - def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, 114 conf):
115 self.environment = environment or {} 116 self._conf = conf or SparkConf(_jvm=self._jvm) 117 self._batchSize = batchSize # -1 represents an unlimited batch size 118 self._unbatched_serializer = serializer 119 if batchSize == 1: 120 self.serializer = self._unbatched_serializer 121 else: 122 self.serializer = BatchedSerializer(self._unbatched_serializer, 123 batchSize) 124 125 # Set any parameters passed directly to us on the conf 126 if master: 127 self._conf.setMaster(master) 128 if appName: 129 self._conf.setAppName(appName) 130 if sparkHome: 131 self._conf.setSparkHome(sparkHome) 132 if environment: 133 for key, value in environment.iteritems(): 134 self._conf.setExecutorEnv(key, value) 135 for key, value in DEFAULT_CONFIGS.items(): 136 self._conf.setIfMissing(key, value) 137 138 # Check that we have at least the required parameters 139 if not self._conf.contains("spark.master"): 140 raise Exception("A master URL must be set in your configuration") 141 if not self._conf.contains("spark.app.name"): 142 raise Exception("An application name must be set in your configuration") 143 144 # Read back our properties from the conf in case we loaded some of them from 145 # the classpath or an external config file 146 self.master = self._conf.get("spark.master") 147 self.appName = self._conf.get("spark.app.name") 148 self.sparkHome = self._conf.get("spark.home", None) 149 for (k, v) in self._conf.getAll(): 150 if k.startswith("spark.executorEnv."): 151 varName = k[len("spark.executorEnv."):] 152 self.environment[varName] = v 153 154 # Create the Java SparkContext through Py4J 155 self._jsc = self._initialize_context(self._conf._jconf) 156 157 # Create a single Accumulator in Java that we'll send all our updates through; 158 # they will be passed back to us through a TCP server 159 self._accumulatorServer = accumulators._start_update_server() 160 (host, port) = self._accumulatorServer.server_address 161 self._javaAccumulator = self._jsc.accumulator( 162 self._jvm.java.util.ArrayList(), 163 self._jvm.PythonAccumulatorParam(host, port)) 164 165 self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') 166 167 # Broadcast's __reduce__ method stores Broadcast instances here. 168 # This allows other code to determine which Broadcast instances have 169 # been pickled, so it can determine which Java broadcast objects to 170 # send. 171 self._pickled_broadcast_vars = set() 172 173 SparkFiles._sc = self 174 root_dir = SparkFiles.getRootDirectory() 175 sys.path.append(root_dir) 176 177 # Deploy any code dependencies specified in the constructor 178 self._python_includes = list() 179 for path in (pyFiles or []): 180 self.addPyFile(path) 181 182 # Deploy code dependencies set by spark-submit; these will already have been added 183 # with SparkContext.addFile, so we just need to add them to the PYTHONPATH 184 for path in self._conf.get("spark.submit.pyFiles", "").split(","): 185 if path != "": 186 (dirname, filename) = os.path.split(path) 187 self._python_includes.append(filename) 188 sys.path.append(path) 189 if dirname not in sys.path: 190 sys.path.append(dirname) 191 192 # Create a temporary directory inside spark.local.dir: 193 local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) 194 self._temp_dir = \ 195 self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
196
197 - def _initialize_context(self, jconf):
198 """ 199 Initialize SparkContext in function to allow subclass specific initialization 200 """ 201 return self._jvm.JavaSparkContext(jconf)
202 203 @classmethod
204 - def _ensure_initialized(cls, instance=None, gateway=None):
205 """ 206 Checks whether a SparkContext is initialized or not. 207 Throws error if a SparkContext is already running. 208 """ 209 with SparkContext._lock: 210 if not SparkContext._gateway: 211 SparkContext._gateway = gateway or launch_gateway() 212 SparkContext._jvm = SparkContext._gateway.jvm 213 SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile 214 215 if instance: 216 if (SparkContext._active_spark_context and 217 SparkContext._active_spark_context != instance): 218 currentMaster = SparkContext._active_spark_context.master 219 currentAppName = SparkContext._active_spark_context.appName 220 callsite = SparkContext._active_spark_context._callsite 221 222 # Raise error if there is already a running Spark context 223 raise ValueError( 224 "Cannot run multiple SparkContexts at once; " 225 "existing SparkContext(app=%s, master=%s)" 226 " created by %s at %s:%s " 227 % (currentAppName, currentMaster, 228 callsite.function, callsite.file, callsite.linenum)) 229 else: 230 SparkContext._active_spark_context = instance
231 232 @classmethod
233 - def setSystemProperty(cls, key, value):
234 """ 235 Set a Java system property, such as spark.executor.memory. This must 236 must be invoked before instantiating SparkContext. 237 """ 238 SparkContext._ensure_initialized() 239 SparkContext._jvm.java.lang.System.setProperty(key, value)
240 241 @property
242 - def version(self):
243 """ 244 The version of Spark on which this application is running. 245 """ 246 return self._jsc.version()
247 248 @property
249 - def defaultParallelism(self):
250 """ 251 Default level of parallelism to use when not given by user (e.g. for 252 reduce tasks) 253 """ 254 return self._jsc.sc().defaultParallelism()
255 256 @property
257 - def defaultMinPartitions(self):
258 """ 259 Default min number of partitions for Hadoop RDDs when not given by user 260 """ 261 return self._jsc.sc().defaultMinPartitions()
262
263 - def stop(self):
264 """ 265 Shut down the SparkContext. 266 """ 267 if getattr(self, "_jsc", None): 268 self._jsc.stop() 269 self._jsc = None 270 if getattr(self, "_accumulatorServer", None): 271 self._accumulatorServer.shutdown() 272 self._accumulatorServer = None 273 with SparkContext._lock: 274 SparkContext._active_spark_context = None
275
276 - def parallelize(self, c, numSlices=None):
277 """ 278 Distribute a local Python collection to form an RDD. 279 280 >>> sc.parallelize(range(5), 5).glom().collect() 281 [[0], [1], [2], [3], [4]] 282 """ 283 numSlices = numSlices or self.defaultParallelism 284 # Calling the Java parallelize() method with an ArrayList is too slow, 285 # because it sends O(n) Py4J commands. As an alternative, serialized 286 # objects are written to a file and loaded through textFile(). 287 tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) 288 # Make sure we distribute data evenly if it's smaller than self.batchSize 289 if "__len__" not in dir(c): 290 c = list(c) # Make it a list so we can compute its length 291 batchSize = min(len(c) // numSlices, self._batchSize) 292 if batchSize > 1: 293 serializer = BatchedSerializer(self._unbatched_serializer, 294 batchSize) 295 else: 296 serializer = self._unbatched_serializer 297 serializer.dump_stream(c, tempFile) 298 tempFile.close() 299 readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile 300 jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) 301 return RDD(jrdd, self, serializer)
302
303 - def pickleFile(self, name, minPartitions=None):
304 """ 305 Load an RDD previously saved using L{RDD.saveAsPickleFile} method. 306 307 >>> tmpFile = NamedTemporaryFile(delete=True) 308 >>> tmpFile.close() 309 >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5) 310 >>> sorted(sc.pickleFile(tmpFile.name, 3).collect()) 311 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 312 """ 313 minPartitions = minPartitions or self.defaultMinPartitions 314 return RDD(self._jsc.objectFile(name, minPartitions), self, 315 BatchedSerializer(PickleSerializer()))
316
317 - def textFile(self, name, minPartitions=None):
318 """ 319 Read a text file from HDFS, a local file system (available on all 320 nodes), or any Hadoop-supported file system URI, and return it as an 321 RDD of Strings. 322 323 >>> path = os.path.join(tempdir, "sample-text.txt") 324 >>> with open(path, "w") as testFile: 325 ... testFile.write("Hello world!") 326 >>> textFile = sc.textFile(path) 327 >>> textFile.collect() 328 [u'Hello world!'] 329 """ 330 minPartitions = minPartitions or min(self.defaultParallelism, 2) 331 return RDD(self._jsc.textFile(name, minPartitions), self, 332 UTF8Deserializer())
333
334 - def wholeTextFiles(self, path, minPartitions=None):
335 """ 336 Read a directory of text files from HDFS, a local file system 337 (available on all nodes), or any Hadoop-supported file system 338 URI. Each file is read as a single record and returned in a 339 key-value pair, where the key is the path of each file, the 340 value is the content of each file. 341 342 For example, if you have the following files:: 343 344 hdfs://a-hdfs-path/part-00000 345 hdfs://a-hdfs-path/part-00001 346 ... 347 hdfs://a-hdfs-path/part-nnnnn 348 349 Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")}, 350 then C{rdd} contains:: 351 352 (a-hdfs-path/part-00000, its content) 353 (a-hdfs-path/part-00001, its content) 354 ... 355 (a-hdfs-path/part-nnnnn, its content) 356 357 NOTE: Small files are preferred, as each file will be loaded 358 fully in memory. 359 360 >>> dirPath = os.path.join(tempdir, "files") 361 >>> os.mkdir(dirPath) 362 >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1: 363 ... file1.write("1") 364 >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2: 365 ... file2.write("2") 366 >>> textFiles = sc.wholeTextFiles(dirPath) 367 >>> sorted(textFiles.collect()) 368 [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')] 369 """ 370 minPartitions = minPartitions or self.defaultMinPartitions 371 return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, 372 PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
373
374 - def _dictToJavaMap(self, d):
375 jm = self._jvm.java.util.HashMap() 376 if not d: 377 d = {} 378 for k, v in d.iteritems(): 379 jm[k] = v 380 return jm
381
382 - def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, 383 valueConverter=None, minSplits=None, batchSize=None):
384 """ 385 Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, 386 a local file system (available on all nodes), or any Hadoop-supported file system URI. 387 The mechanism is as follows: 388 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key 389 and value Writable classes 390 2. Serialization is attempted via Pyrolite pickling 391 3. If this fails, the fallback is to call 'toString' on each key and value 392 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side 393 394 @param path: path to sequncefile 395 @param keyClass: fully qualified classname of key Writable class 396 (e.g. "org.apache.hadoop.io.Text") 397 @param valueClass: fully qualified classname of value Writable class 398 (e.g. "org.apache.hadoop.io.LongWritable") 399 @param keyConverter: 400 @param valueConverter: 401 @param minSplits: minimum splits in dataset 402 (default min(2, sc.defaultParallelism)) 403 @param batchSize: The number of Python objects represented as a single 404 Java object. (default sc._default_batch_size_for_serialized_input) 405 """ 406 minSplits = minSplits or min(self.defaultParallelism, 2) 407 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 408 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 409 jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, 410 keyConverter, valueConverter, minSplits, batchSize) 411 return RDD(jrdd, self, ser)
412
413 - def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, 414 valueConverter=None, conf=None, batchSize=None):
415 """ 416 Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, 417 a local file system (available on all nodes), or any Hadoop-supported file system URI. 418 The mechanism is the same as for sc.sequenceFile. 419 420 A Hadoop configuration can be passed in as a Python dict. This will be converted into a 421 Configuration in Java 422 423 @param path: path to Hadoop file 424 @param inputFormatClass: fully qualified classname of Hadoop InputFormat 425 (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") 426 @param keyClass: fully qualified classname of key Writable class 427 (e.g. "org.apache.hadoop.io.Text") 428 @param valueClass: fully qualified classname of value Writable class 429 (e.g. "org.apache.hadoop.io.LongWritable") 430 @param keyConverter: (None by default) 431 @param valueConverter: (None by default) 432 @param conf: Hadoop configuration, passed in as a dict 433 (None by default) 434 @param batchSize: The number of Python objects represented as a single 435 Java object. (default sc._default_batch_size_for_serialized_input) 436 """ 437 jconf = self._dictToJavaMap(conf) 438 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 439 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 440 jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, 441 valueClass, keyConverter, valueConverter, 442 jconf, batchSize) 443 return RDD(jrdd, self, ser)
444
445 - def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, 446 valueConverter=None, conf=None, batchSize=None):
447 """ 448 Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary 449 Hadoop configuration, which is passed in as a Python dict. 450 This will be converted into a Configuration in Java. 451 The mechanism is the same as for sc.sequenceFile. 452 453 @param inputFormatClass: fully qualified classname of Hadoop InputFormat 454 (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") 455 @param keyClass: fully qualified classname of key Writable class 456 (e.g. "org.apache.hadoop.io.Text") 457 @param valueClass: fully qualified classname of value Writable class 458 (e.g. "org.apache.hadoop.io.LongWritable") 459 @param keyConverter: (None by default) 460 @param valueConverter: (None by default) 461 @param conf: Hadoop configuration, passed in as a dict 462 (None by default) 463 @param batchSize: The number of Python objects represented as a single 464 Java object. (default sc._default_batch_size_for_serialized_input) 465 """ 466 jconf = self._dictToJavaMap(conf) 467 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 468 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 469 jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, 470 valueClass, keyConverter, valueConverter, 471 jconf, batchSize) 472 return RDD(jrdd, self, ser)
473
474 - def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, 475 valueConverter=None, conf=None, batchSize=None):
476 """ 477 Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, 478 a local file system (available on all nodes), or any Hadoop-supported file system URI. 479 The mechanism is the same as for sc.sequenceFile. 480 481 A Hadoop configuration can be passed in as a Python dict. This will be converted into a 482 Configuration in Java. 483 484 @param path: path to Hadoop file 485 @param inputFormatClass: fully qualified classname of Hadoop InputFormat 486 (e.g. "org.apache.hadoop.mapred.TextInputFormat") 487 @param keyClass: fully qualified classname of key Writable class 488 (e.g. "org.apache.hadoop.io.Text") 489 @param valueClass: fully qualified classname of value Writable class 490 (e.g. "org.apache.hadoop.io.LongWritable") 491 @param keyConverter: (None by default) 492 @param valueConverter: (None by default) 493 @param conf: Hadoop configuration, passed in as a dict 494 (None by default) 495 @param batchSize: The number of Python objects represented as a single 496 Java object. (default sc._default_batch_size_for_serialized_input) 497 """ 498 jconf = self._dictToJavaMap(conf) 499 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 500 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 501 jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, 502 valueClass, keyConverter, valueConverter, 503 jconf, batchSize) 504 return RDD(jrdd, self, ser)
505
506 - def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, 507 valueConverter=None, conf=None, batchSize=None):
508 """ 509 Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary 510 Hadoop configuration, which is passed in as a Python dict. 511 This will be converted into a Configuration in Java. 512 The mechanism is the same as for sc.sequenceFile. 513 514 @param inputFormatClass: fully qualified classname of Hadoop InputFormat 515 (e.g. "org.apache.hadoop.mapred.TextInputFormat") 516 @param keyClass: fully qualified classname of key Writable class 517 (e.g. "org.apache.hadoop.io.Text") 518 @param valueClass: fully qualified classname of value Writable class 519 (e.g. "org.apache.hadoop.io.LongWritable") 520 @param keyConverter: (None by default) 521 @param valueConverter: (None by default) 522 @param conf: Hadoop configuration, passed in as a dict 523 (None by default) 524 @param batchSize: The number of Python objects represented as a single 525 Java object. (default sc._default_batch_size_for_serialized_input) 526 """ 527 jconf = self._dictToJavaMap(conf) 528 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 529 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 530 jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, 531 valueClass, keyConverter, valueConverter, 532 jconf, batchSize) 533 return RDD(jrdd, self, ser)
534
535 - def _checkpointFile(self, name, input_deserializer):
536 jrdd = self._jsc.checkpointFile(name) 537 return RDD(jrdd, self, input_deserializer)
538
539 - def union(self, rdds):
540 """ 541 Build the union of a list of RDDs. 542 543 This supports unions() of RDDs with different serialized formats, 544 although this forces them to be reserialized using the default 545 serializer: 546 547 >>> path = os.path.join(tempdir, "union-text.txt") 548 >>> with open(path, "w") as testFile: 549 ... testFile.write("Hello") 550 >>> textFile = sc.textFile(path) 551 >>> textFile.collect() 552 [u'Hello'] 553 >>> parallelized = sc.parallelize(["World!"]) 554 >>> sorted(sc.union([textFile, parallelized]).collect()) 555 [u'Hello', 'World!'] 556 """ 557 first_jrdd_deserializer = rdds[0]._jrdd_deserializer 558 if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds): 559 rdds = [x._reserialize() for x in rdds] 560 first = rdds[0]._jrdd 561 rest = [x._jrdd for x in rdds[1:]] 562 rest = ListConverter().convert(rest, self._gateway._gateway_client) 563 return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer)
564
565 - def broadcast(self, value):
566 """ 567 Broadcast a read-only variable to the cluster, returning a 568 L{Broadcast<pyspark.broadcast.Broadcast>} 569 object for reading it in distributed functions. The variable will 570 be sent to each cluster only once. 571 """ 572 ser = CompressedSerializer(PickleSerializer()) 573 # pass large object by py4j is very slow and need much memory 574 tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) 575 ser.dump_stream([value], tempFile) 576 tempFile.close() 577 jbroadcast = self._jvm.PythonRDD.readBroadcastFromFile(self._jsc, tempFile.name) 578 return Broadcast(jbroadcast.id(), None, jbroadcast, 579 self._pickled_broadcast_vars, tempFile.name)
580
581 - def accumulator(self, value, accum_param=None):
582 """ 583 Create an L{Accumulator} with the given initial value, using a given 584 L{AccumulatorParam} helper object to define how to add values of the 585 data type if provided. Default AccumulatorParams are used for integers 586 and floating-point numbers if you do not provide one. For other types, 587 a custom AccumulatorParam can be used. 588 """ 589 if accum_param is None: 590 if isinstance(value, int): 591 accum_param = accumulators.INT_ACCUMULATOR_PARAM 592 elif isinstance(value, float): 593 accum_param = accumulators.FLOAT_ACCUMULATOR_PARAM 594 elif isinstance(value, complex): 595 accum_param = accumulators.COMPLEX_ACCUMULATOR_PARAM 596 else: 597 raise Exception("No default accumulator param for type %s" % type(value)) 598 SparkContext._next_accum_id += 1 599 return Accumulator(SparkContext._next_accum_id - 1, value, accum_param)
600
601 - def addFile(self, path):
602 """ 603 Add a file to be downloaded with this Spark job on every node. 604 The C{path} passed can be either a local file, a file in HDFS 605 (or other Hadoop-supported filesystems), or an HTTP, HTTPS or 606 FTP URI. 607 608 To access the file in Spark jobs, use 609 L{SparkFiles.get(path)<pyspark.files.SparkFiles.get>} to find its 610 download location. 611 612 >>> from pyspark import SparkFiles 613 >>> path = os.path.join(tempdir, "test.txt") 614 >>> with open(path, "w") as testFile: 615 ... testFile.write("100") 616 >>> sc.addFile(path) 617 >>> def func(iterator): 618 ... with open(SparkFiles.get("test.txt")) as testFile: 619 ... fileVal = int(testFile.readline()) 620 ... return [x * fileVal for x in iterator] 621 >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() 622 [100, 200, 300, 400] 623 """ 624 self._jsc.sc().addFile(path)
625
626 - def clearFiles(self):
627 """ 628 Clear the job's list of files added by L{addFile} or L{addPyFile} so 629 that they do not get downloaded to any new nodes. 630 """ 631 # TODO: remove added .py or .zip files from the PYTHONPATH? 632 self._jsc.sc().clearFiles()
633
634 - def addPyFile(self, path):
635 """ 636 Add a .py or .zip dependency for all tasks to be executed on this 637 SparkContext in the future. The C{path} passed can be either a local 638 file, a file in HDFS (or other Hadoop-supported filesystems), or an 639 HTTP, HTTPS or FTP URI. 640 """ 641 self.addFile(path) 642 (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix 643 644 if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): 645 self._python_includes.append(filename) 646 # for tests in local mode 647 sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename))
648
649 - def setCheckpointDir(self, dirName):
650 """ 651 Set the directory under which RDDs are going to be checkpointed. The 652 directory must be a HDFS path if running on a cluster. 653 """ 654 self._jsc.sc().setCheckpointDir(dirName)
655
656 - def _getJavaStorageLevel(self, storageLevel):
657 """ 658 Returns a Java StorageLevel based on a pyspark.StorageLevel. 659 """ 660 if not isinstance(storageLevel, StorageLevel): 661 raise Exception("storageLevel must be of type pyspark.StorageLevel") 662 663 newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel 664 return newStorageLevel(storageLevel.useDisk, 665 storageLevel.useMemory, 666 storageLevel.useOffHeap, 667 storageLevel.deserialized, 668 storageLevel.replication)
669
670 - def setJobGroup(self, groupId, description, interruptOnCancel=False):
671 """ 672 Assigns a group ID to all the jobs started by this thread until the group ID is set to a 673 different value or cleared. 674 675 Often, a unit of execution in an application consists of multiple Spark actions or jobs. 676 Application programmers can use this method to group all those jobs together and give a 677 group description. Once set, the Spark web UI will associate such jobs with this group. 678 679 The application can use L{SparkContext.cancelJobGroup} to cancel all 680 running jobs in this group. 681 682 >>> import thread, threading 683 >>> from time import sleep 684 >>> result = "Not Set" 685 >>> lock = threading.Lock() 686 >>> def map_func(x): 687 ... sleep(100) 688 ... raise Exception("Task should have been cancelled") 689 >>> def start_job(x): 690 ... global result 691 ... try: 692 ... sc.setJobGroup("job_to_cancel", "some description") 693 ... result = sc.parallelize(range(x)).map(map_func).collect() 694 ... except Exception as e: 695 ... result = "Cancelled" 696 ... lock.release() 697 >>> def stop_job(): 698 ... sleep(5) 699 ... sc.cancelJobGroup("job_to_cancel") 700 >>> supress = lock.acquire() 701 >>> supress = thread.start_new_thread(start_job, (10,)) 702 >>> supress = thread.start_new_thread(stop_job, tuple()) 703 >>> supress = lock.acquire() 704 >>> print result 705 Cancelled 706 707 If interruptOnCancel is set to true for the job group, then job cancellation will result 708 in Thread.interrupt() being called on the job's executor threads. This is useful to help 709 ensure that the tasks are actually stopped in a timely manner, but is off by default due 710 to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead. 711 """ 712 self._jsc.setJobGroup(groupId, description, interruptOnCancel)
713
714 - def setLocalProperty(self, key, value):
715 """ 716 Set a local property that affects jobs submitted from this thread, such as the 717 Spark fair scheduler pool. 718 """ 719 self._jsc.setLocalProperty(key, value)
720
721 - def getLocalProperty(self, key):
722 """ 723 Get a local property set in this thread, or null if it is missing. See 724 L{setLocalProperty} 725 """ 726 return self._jsc.getLocalProperty(key)
727
728 - def sparkUser(self):
729 """ 730 Get SPARK_USER for user who is running SparkContext. 731 """ 732 return self._jsc.sc().sparkUser()
733
734 - def cancelJobGroup(self, groupId):
735 """ 736 Cancel active jobs for the specified group. See L{SparkContext.setJobGroup} 737 for more information. 738 """ 739 self._jsc.sc().cancelJobGroup(groupId)
740
741 - def cancelAllJobs(self):
742 """ 743 Cancel all jobs that have been scheduled or are running. 744 """ 745 self._jsc.sc().cancelAllJobs()
746
747 - def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
748 """ 749 Executes the given partitionFunc on the specified set of partitions, 750 returning the result as an array of elements. 751 752 If 'partitions' is not specified, this will run over all partitions. 753 754 >>> myRDD = sc.parallelize(range(6), 3) 755 >>> sc.runJob(myRDD, lambda part: [x * x for x in part]) 756 [0, 1, 4, 9, 16, 25] 757 758 >>> myRDD = sc.parallelize(range(6), 3) 759 >>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True) 760 [0, 1, 16, 25] 761 """ 762 if partitions is None: 763 partitions = range(rdd._jrdd.partitions().size()) 764 javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client) 765 766 # Implementation note: This is implemented as a mapPartitions followed 767 # by runJob() in order to avoid having to pass a Python lambda into 768 # SparkContext#runJob. 769 mappedRDD = rdd.mapPartitions(partitionFunc) 770 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 771 return list(mappedRDD._collect_iterator_through_file(it))
772
773 774 -def _test():
775 import atexit 776 import doctest 777 import tempfile 778 globs = globals().copy() 779 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 780 globs['tempdir'] = tempfile.mkdtemp() 781 atexit.register(lambda: shutil.rmtree(globs['tempdir'])) 782 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 783 globs['sc'].stop() 784 if failure_count: 785 exit(-1)
786 787 788 if __name__ == "__main__": 789 _test() 790