Dave Hrycyszyn

2014-05-22 10:00:00 +0100

Running jobs on the Spark cluster

Ok, so now you’e got a Spark cluster running. How do we take our earlier example and run it over the cluster?

If you don’t have it handy, check out the SimpleApp example from https://github.com/futurechimp/spark-simple-app, using the following terminal command:

git clone https://github.com/futurechimp/spark-simple-app.git -b simple

This will check out the simple version of the Spark app, which uses local mode for its Spark context.

There are basically two things to change in order to run this on a cluster. Open up src/main/scala/SimpleApp.scala. We need to change the setup of the SparkContext first. Currently, it’s set up to run in local mode:

val sc = new SparkContext("local", "Simple App")

The SparkContext configuration needs to be changed so that it runs in cluster-aware mode, like this:

val sc = new SparkContext("spark://yourhostname:7077",       // changed: to spark cluster url
    "Simple App",                                            // stays the same
    "/path/to/spark-0.9.1-bin-hadoop1/",                     // new: path to $SPARK_HOME
    List("target/scala-2.10/spark-simple-app_2.10-1.0.jar")) // new: a list of spark job jars

Let’s break it down. Instead of the SparkContext being local, we want to point our application at the Spark URL of our newly-minted cluster. This can be found most easily by looking at Spark’s web ui (currently running on http://localhost:8080). Unless you’ve changed the default settings, it’ll be in the form spark://yourhostname:7077.

The app name, Simple App, stays the same.

Next, you need to pass in the full path to your Spark cluster installation. Change the code to match whatever you’ve got in $SPARK_HOME and you’ll be fine. If you’re running the cluster on multiple machines, make sure that this path is the same on all of them.

Lastly, provide a Scala List of jarfiles which will be available to this SparkContext. They need to be available on the machine running the driver program (SimpleApp, in our case). Spark will distribute these jars to all nodes in the cluster via SSH during job runs.

That’s it for the SparkContext changes.

You’ll also need to add sc.stop() at the end of SimpleApp’s main method, so that we disconnect from the cluster cleanly.

The final code looks like this:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val textFile = "/etc/hosts" // Should be some file on your system
    val sc = new SparkContext("spark://yourhostname:7077", // CHANGE to the hostname in your Spark web UI.
        "Simple App",
        "/path/to/spark-0.9.1-bin-hadoop1/", // CHANGE to reflect the correct path on your installation
        List("target/scala-2.10/spark-simple-app_2.10-1.0.jar")) 

    val logData = sc.textFile(textFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
    sc.stop()
  }
}

Let’s package it up for use across the cluster.

At your sbt console, do the following:

package

This compiles SimpleApp and packages it up as a jar. Assuming your application is named Spark Simple App, with version 1.0 in build.sbt, this jar will appear at the path we’ve defined in List("target/scala-2.10/spark-simple-app_2.10-1.0.jar"). If your build.sbt is different, you may need to do some alterations to get the output paths to match.

Now it’s time to run the job. At the sbt prompt:

run

If all is well, a whole pile of logging text will scroll by, and about 100 lines from the end you’ll see your job output:

Lines with a: 7, Lines with b: 4

Bonus: config cleanup

All of those strings inside your Spark job are making SimpleApp look like PHP. Let’s clean that up using Typesafe Config.

Typesafe Config is an easy way to add configuration files to a Scala or Java application. It allows you to easily load human-readable configuration files and read them in your applications. You’ll need to grab a new jar file and build it into your application in order to use it.

First, add the Typesafe Config dependency to build.sbt:

 "com.typesafe" % "config" % "1.2.1", // add this into libraryDependencies

Type reload at the sbt command prompt, or restart sbt, and you’ll pick up the new dependency. Now we can create our config file.

Add this as a new file at src/main/resources/application.conf:

simple-app {
    sparkurl = ${?SPARK_URL}
    sparkhome = ${?SPARK_HOME}
}

This is a config file format called HOCON, the Human-Optimized Config Object Notation.

The ${?SPARK_URL} and ${?SPARK_HOME} portions of the file tell your sbt application to use the corresponding environment variables. You’ve already got your $SPARK_HOME set, but you’ll need to set your $SPARK_URL as an environment variable for this config to work. Run this on a bash command line (feel free to add it to your .bash_profile on Mac, or your .bashrc file on Linux, if you want to persist it):

export SPARK_URL="/path/to/spark-0.9.1-bin-hadoop1" // modify to match your system

Next, we need a Settings class to load up the settings. Add the following to the new file src/main/scala/Settings.scala:

import com.typesafe.config.Config

class Settings(config: Config) {

  // non-lazy fields, we want all exceptions at construct time
  val sparkurl = config.getString("simple-app.sparkurl")
  val sparkhome = config.getString("simple-app.sparkhome")
}

Now all we need to do is use the new settings in our application. Load the config, and remove the strings hardcoded into the SparkContext, so that SimpleApp.scala looks like this:

import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkContext


object SimpleApp {
  def main(args: Array[String]) {

    val config = new Settings(ConfigFactory.load()) // load the config

    val textFile = "/etc/hosts"
    val sc = new SparkContext(config.sparkurl, // this is much cleaner!
    "Simple App",
    config.sparkhome, // this is also much cleaner
    List("target/scala-2.10/spark-simple-app_2.10-1.0.jar"))
    
    val logData = sc.textFile(textFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

    // In real life, you might want to use the Loan Pattern to ensure that
    // this SparkContext detaches from the cluster without fail, but...
    sc.stop() // this is necessary in order to cleanly detach from the cluster
  }
}

Once again, if you have $SPARK_HOME and $SPARK_URL set, and your cluster is running, you should be able to run sbt, and then do:

package
run

You’ll see a whole lot of output as your Spark cluster spools up, the job will run, and things will spool down once again when sc.stop() runs.

If your code isn’t working, a finished version is cloneable from

git clone https://github.com/futurechimp/spark-simple-app.git -b config