Spark provides three main locations to configure the system:
- Environment variables for launching Spark workers, which can
be set either in your driver program or in the
- Java system properties, which control internal configuration parameters and can be set either
programmatically (by calling
System.setPropertybefore creating a
SparkContext) or through the
SPARK_JAVA_OPTSenvironment variable in
- Logging configuration, which is done through
Spark determines how to initialize the JVM on worker nodes, or even on the local node when you run
by running the
conf/spark-env.sh script in the directory where it is installed. This script does not exist by default
in the Git repository, but but you can create it by copying
conf/spark-env.sh.template. Make sure that you make
the copy executable.
spark-env.sh, you must set at least the following two variables:
SCALA_HOME, to point to your Scala installation.
MESOS_NATIVE_LIBRARY, if you are running on a Mesos cluster.
In addition, there are four other variables that control execution. These can be set either in
or in each job’s driver program, because they will automatically be propagated to workers from the driver.
For a multi-user environment, we recommend setting the in the driver program instead of
that different user jobs can use different amounts of memory, JVM options, etc.
SPARK_MEM, to set the amount of memory used per node (this should be in the same format as the JVM’s -Xmx option, e.g.
SPARK_JAVA_OPTS, to add JVM options. This includes any system properties that you’d like to pass with
SPARK_CLASSPATH, to add elements to Spark’s classpath.
SPARK_LIBRARY_PATH, to add search directories for native libraries.
Note that if you do set these in
spark-env.sh, they will override the values set by user programs, which
is undesirable; you can choose to have
spark-env.sh set them only if the user program hasn’t, as follows:
if [ -z "$SPARK_MEM" ] ; then SPARK_MEM="1g" fi
To set a system property for configuring Spark, you need to either pass it with a -D flag to the JVM (for example
java -Dspark.cores.max=5 MyProgram) or call
System.setProperty in your code before creating your Spark context, as follows:
System.setProperty("spark.cores.max", "5") val sc = new SparkContext(...)
Most of the configurable system properties control internal settings that have reasonable default values. However, there are at least four properties that you will commonly want to control:
Class to use for serializing objects that will be sent over the network or need to be cached
in serialized form. The default of Java serialization works with any Serializable Java object but is
quite slow, so we recommend using
If you use Kryo serialization, set this class to register your custom classes with Kryo.
You need to set it to a class that extends
|spark.local.dir||/tmp||Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories.|
|spark.cores.max||(infinite)||When running on a standalone deploy cluster or a Mesos cluster in "coarse-grained" sharing mode, how many CPU cores to request at most. The default will use all available cores.|
Apart from these, the following properties are also available, and may be useful in some situations:
|spark.mesos.coarse||false||If set to "true", runs over Mesos clusters in "coarse-grained" sharing mode, where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use for the whole duration of the Spark job.|
Default number of tasks to use for distributed shuffle operations (
|spark.storage.memoryFraction||0.66||Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase it if you configure your own old generation size.|
|spark.shuffle.compress||true||Whether to compress map output files. Generally a good idea.|
|spark.broadcast.compress||true||Whether to compress broadcast variables before sending them. Generally a good idea.|
Whether to compress serialized RDD partitions (e.g. for
|spark.reducer.maxMbInFlight||48||Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since each output requires us to create a buffer to receive it, this represents a fixed memory overhead per reduce task, so keep it small unless you have a large amount of memory.|
|spark.closure.serializer||spark.JavaSerializer||Serializer class to use for closures. Generally Java is fine unless your distributed functions (e.g. map functions) reference large objects in the driver program.|
|spark.kryoserializer.buffer.mb||32||Maximum object size to allow within Kryo (the library needs to create a buffer at least as large as the largest single object you'll serialize). Increase this if you get a "buffer limit exceeded" exception inside Kryo. Note that there will be one buffer per core on each worker.|
|spark.broadcast.factory||spark.broadcast. HttpBroadcastFactory||Which broadcast implementation to use.|
|spark.locality.wait||3000||Number of milliseconds to wait to launch a data-local task before giving up and launching it in a non-data-local location. You should increase this if your tasks are long and you are seeing poor data locality, but the default generally works well.|
|spark.akka.threads||4||Number of actor threads to use for communication. Can be useful to increase on large clusters when the master has a lot of CPU cores.|
|spark.master.host||(local hostname)||Hostname or IP address for the master to listen on.|
|spark.master.port||(random)||Port for the master to listen on.|
Spark uses log4j for logging. You can configure it by adding a
file in the
conf directory. One way to start is to copy the existing
log4j.properties.template located there.