Launching Spark on YARN
Support for running on YARN (Hadoop NextGen) was added to Spark in version 0.6.0, and improved in 0.7.0 and 0.8.0.
Building a YARN-Enabled Assembly JAR
We need a consolidated Spark JAR (which bundles all the required dependencies) to run Spark jobs on a YARN cluster.
This can be built by setting the Hadoop version and
SPARK_YARN environment variable, as follows:
SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
The assembled JAR will be something like this:
The build process now also supports new YARN versions (2.2.x). See below.
- Building a YARN-enabled assembly (see above).
- The assembled jar can be installed into HDFS or used locally.
- Your application code must be packaged into a separate JAR file.
If you want to test out the YARN deployment mode, you can use the current Spark examples. A
spark-examples_2.10-0.9.2 file can be generated by running
sbt/sbt assembly. NOTE: since the documentation you’re reading is for Spark version 0.9.2, we are assuming here that you have downloaded Spark 0.9.2 or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.
Most of the configs are the same for Spark on YARN as other deploys. See the Configuration page for more information on those. These are configs that are specific to SPARK on YARN.
SPARK_YARN_USER_ENV, to add environment variables to the Spark processes launched on YARN. This can be a comma separated list of environment variables, e.g.
spark.yarn.applicationMaster.waitTries, property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
spark.yarn.submit.file.replication, the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
spark.yarn.preserve.staging.files, set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
spark.yarn.scheduler.heartbeat.interval-ms, the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
spark.yarn.max.worker.failures, the maximum number of worker failures before failing the application. Default is the number of workers requested times 2 with minimum of 3.
Launching Spark on YARN
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster. This would be used to connect to the cluster, write to the dfs and submit jobs to the resource manager.
There are two scheduler mode that can be used to launch spark application on YARN.
Launch spark application by YARN Client with yarn-standalone mode.
The command to launch the YARN Client is as follows:
SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar <YOUR_APP_JAR_FILE> \ --class <APP_MAIN_CLASS> \ --args <APP_MAIN_ARGUMENTS> \ --num-workers <NUMBER_OF_WORKER_MACHINES> \ --master-class <ApplicationMaster_CLASS> --master-memory <MEMORY_FOR_MASTER> \ --worker-memory <MEMORY_PER_WORKER> \ --worker-cores <CORES_PER_WORKER> \ --name <application_name> \ --queue <queue_name> \ --addJars <any_local_files_used_in_SparkContext.addJar> \ --files <files_for_distributed_cache> \ --archives <archives_for_distributed_cache>
# Build the Spark assembly JAR and the Spark examples JAR $ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly # Configure logging $ cp conf/log4j.properties.template conf/log4j.properties # Submit Spark's ApplicationMaster to YARN's ResourceManager, and instruct Spark to run the SparkPi example $ SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.2-hadoop2.0.5-alpha.jar \ ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar examples/target/scala-2.10/spark-examples-assembly-0.9.2.jar \ --class org.apache.spark.examples.SparkPi \ --args yarn-standalone \ --num-workers 3 \ --master-memory 4g \ --worker-memory 2g \ --worker-cores 1 # Examine the output (replace $YARN_APP_ID in the following with the "application identifier" output by the previous command) # (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.) $ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout Pi is roughly 3.13794
The above starts a YARN Client programs which start the default Application Master. Then SparkPi will be run as a child thread of Application Master, YARN Client will periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running.
With this mode, your application is actually run on the remote machine where the Application Master is run upon. Thus application that involve local interaction will not work well, e.g. spark-shell.
Launch spark application with yarn-client mode.
With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use “yarn-client” instead. And you also need to export the env value for SPARK_JAR and SPARK_YARN_APP_JAR. If you are using spark-shell with secure HDFS you also need to export SPARK_YARN_MODE=true.
Configuration in yarn-client mode:
In order to tune worker core/number/memory etc. You need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
SPARK_YARN_APP_JAR, Path to your application’s JAR file (required)
SPARK_WORKER_INSTANCES, Number of workers to start (Default: 2)
SPARK_WORKER_CORES, Number of cores for the workers (Default: 1).
SPARK_WORKER_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
SPARK_MASTER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
SPARK_YARN_APP_NAME, The name of your application (Default: Spark)
SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: ‘default’)
SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.
SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.2-hadoop2.0.5-alpha.jar \ SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples-assembly-0.9.2.jar \ ./bin/run-example org.apache.spark.examples.SparkPi yarn-client SPARK_YARN_MODE=true \ SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.2-hadoop2.0.5-alpha.jar \ SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples-assembly-0.9.2.jar \ MASTER=yarn-client ./bin/spark-shell
Building Spark for Hadoop/YARN 2.2.x
See Building Spark with Maven for instructions on how to build Spark using the Maven process.
- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be passed to YARN. Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
- The –files and –archives options support specifying file names with the # similar to Hadoop. For example you can specify: –files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
- The –addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.