Dave Hrycyszyn

2014-05-13 10:00:00 +0100

Big Data in Spark for absolute beginners

Spark is a Big Data framework which allows you to run batch jobs, query data interactively, and process incoming information as it streams into your system.

Spark runs on top of normal Hadoop infrastructure - if you already have a Hadoop Distributed File System (HDFS) cluster set up, you can run it on top of that, and run your Spark jobs on it without modifying or disturbing anything you’re already doing. Like Hadoop, it can do batch processing, although it’s typically quite a bit faster than Hadoop due to aggressive caching of data in RAM.

Spark has several additional tricks, though.

Hadoop workloads are typically centred around the concept of doing batch jobs on large amounts of data, and typically it’s not used for interactive querying. Spark, in contrast, has the ability to chainsaw through large amounts of data, store pared-down views of the data in structures called Resilient Distributed Datasets (RDDs), and do interactive queries with sub-second response times.

An RDD is a queryable collection of information stored across multiple machines in your cluster. You can think of an RDD as a persisted Scala collection, ready for manipulation with Scala’s (awesome) collection functions.

You can ingest data with Spark, clean it up, join up data from multiple sources, boil it down to the smallest data set you need, and save the resultant data set as a new RDD for speedy querying. Spark distributes the RDD across your HDFS cluster and takes care of all low-level details for you.

Spark can also process very high volume streams of incoming information in real time. So, you could attach to the Twitter firehose, the Amazon product-purchase firehose, a data stream coming from multiple point of sale systems, or any other source of rapid-fire information. You can then process the stream as it arrives, saving results to the Spark cluster as a new or existing RDD.

All of these use cases (batch, query, and stream processing) can use exactly the same algorithms, data structures, and RDDs. You might do an initial run of batch processing on stored data, then switch to stream processing for new data as it arrives, and then do interactive querying of the RDDs you’ve created. Having only one stack for all three purposes greatly simplifies both the necessary skillsets and the infrastructure you need to set up.

Spark is very easy to get started with. The easiest way to start is by setting it up to run on a single machine. Spark works natively in both Java and Scala.

Let’s try it out by setting up a new Spark project in the Scala language. Note: in order to run the following example code, you need to have Java installed, but that’s about it. Everything else will be downloaded to your machine automatically if you follow the tutorial.

Install the tools and generate a new project

If you haven’t built an application in Scala before, it’s fairly easy to install the prequisites: Java, SBT, Giter8, and Conscript. Get those onto your machine, and then come back here and follow along.

Let’s generate a new sbt project skeleton, so that we can try running a spark job in it.

g8 chrislewis/basic-project

That will generate you a very basic sbt project skeleton to work from. The g8 utility will ask you a few questions about your project, answer them somewhat like this (substitute your domain name for mine!):

name [Basic Project]: Spark Simple App
organization [com.example]: com.constructiveproof # put your domain here
version [0.1.0-SNAPSHOT]: 1.0

Now we’ve got a basic application set up to build with sbt.

As a test, build it. First run sbt. All the project dependencies will be downloaded, and you’ll end up at an sbt shell.

cd spark-simple-app
sbt

If you don’t yet have sbt installed, install it for your platform then re-run the command.

Compile the application, then run it:

compile
run

It should build and run, with output that looks something like this:

$ sbt
[info] Loading project definition from /Users/dave/Desktop/spark-simple-app/project
[info] Updating {file:/Users/dave/Desktop/spark-simple-app/project/}spark-simple-app-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to Spark Simple App (in build file:/Users/dave/Desktop/spark-simple-app/)
> compile
[info] Updating {file:/Users/dave/Desktop/spark-simple-app/}spark-simple-app...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Compiling 1 Scala source to /Users/dave/Desktop/spark-simple-app/target/scala-2.10/classes...
[success] Total time: 1 s, completed 22-May-2014 10:11:36
> run
[info] Running com.constructiveproof.sparksimpleapp.App
Hello com.constructiveproof.Spark Simple App![success] 
Total time: 0 s, completed 22-May-2014 10:11:38
>

If it doesn’t, you’ve got problems with your basic setup, fix those before proceeding.

Now it’s time to make some additions to the build file.

An sbt project usually contains a file called build.sbt. This defines all the project’s dependencies, where libraries are downloaded from, what version of Scala the project uses, and many other settings. Open it up and take a look.

Add the Spark 0.9.1 dependency to it, and also add the Akka resolvers to it (they’re both necessary) in order to get Spark downloaded onto your machine.

name := "Spark Simple App"

organization := "com.constructiveproof"

version := "1.0"

scalaVersion := "2.10.3"

crossScalaVersions := Seq("2.10.3", "2.11.0-M8")

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "0.9.1", // add this
  "org.scalatest" %% "scalatest" % "2.1.RC1" % "test",
  "org.scalacheck" %% "scalacheck" % "1.11.3" % "test"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/" // and add this

initialCommands := "import com.constructiveproof.sparksimpleapp._"

An sbt project details the jar files that your project depends on. This line adds the spark jar:

"org.apache.spark" %% "spark-core" % "0.9.1", // add this

The spark jar files depend on another library, called Akka. You add this resolver so that sbt knows where to download the Akka jars from:

resolvers += "Akka Repository" at "http://repo.akka.io/releases/" // and add this

In order to pick up the dependencies, you can either type exit at the sbt prompt and run sbt again, or just type reload at the sbt prompt.

Now let’s turn our attention to the Spark app itself. We’ll adapt the Simple App example from the Spark 0.9.1 documentation to get something running quickly.

By default, the generated project template has placed an object called App in the file src/main/scala/App.scala. Delete that (or rename it to SimpleApp), and enter the following code in src/main/scala/SimpleApp.scala:

// src/main/scala/SimpleApp.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val textFile = "/etc/hosts" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App")
    val logData = sc.textFile(textFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

This is a really simple Spark program. Let’s take it a few lines at a time to see what it does.

First, we pointed our app at some example data. It’s a simple text file:

val textFile = "/etc/hosts" // Should be some file on your system

Assuming you’re on a *nix system, you’ll have a file at /etc/hosts, so that should work. If you’re on Windows, pick any text file you’ve got kicking around. If you want to do some speed testing, I’d suggest that you download one of the many publicly available large datasets on the internet. My system rips through the 1.8GB subset of the Million Songs Database in about 2 seconds.

Next, we set up what’s called a SparkContext:

val sc = new SparkContext("local", "Simple App")

This tells our SimpleApp where Spark lives, and the name of our Spark job. The name serves as a job identifier, and shows up in the Spark web interface if you’re running a cluster. Right now, since we’re saying our Spark cluster is local, there’s no need to have a cluster set up. We’ll get this running on a distributed Spark cluster in the next post.

val logData = sc.textFile(textFile, 2).cache()

This loads up some data into the SparkContext sc. We tell it to cache() the result, although in the case of /etc/hosts, this is not going to make a difference. If we were ingesting a huge file on a cluster, though, it’d be ingested by Spark, and cached as RDDs across all machines in our cluster, so you’d see a major performance boost the next time you attempted to use this data source.

val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

Now this is truly interesting, at least to me. Our logData value is a multi-line RDD[String] in Scala. We can iterate over every line using Spark’s RDD collections. These collections have most of the methods of the regular Scala collections, but they’re modified to operate on RDDs spread across your Spark cluster. When you use the collections for querying data, it feels very much as though you’re working with regular Scala collections (e.g. Lists, Maps, etc). In fact, you’re using a special set of collections which are transparently running map-reduce algorithms, locally or across a multi-machine Akka cluster. That’s basically what Spark is.

That’s pretty much it. You can compile, package, and run your job with this command in the sbt console:

~; compile; package; run

That tilde at the front of the command will ensure that your code is recompiled, re-packaged, and re-run every time you modify and save the SimpleApp.scala file.

My output looks like this:

> ~; compile; package; run
[info] Compiling 1 Scala source to /Users/dave/workspace/constructiveproof/code-examples/spark-simple-app/target/scala-2.10/classes...
[success] Total time: 1 s, completed 22-May-2014 10:37:44
[info] Packaging /Users/dave/workspace/constructiveproof/code-examples/spark-simple-app/target/scala-2.10/spark-simple-app_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 0 s, completed 22-May-2014 10:37:44
[info] Running SimpleApp
<snip>
Lines with a: 7, Lines with b: 4
14/05/22 10:37:46 INFO network.ConnectionManager: Selector thread was interrupted!
[success] Total time: 2 s, completed 22-May-2014 10:37:46
1. Waiting for source changes... (press enter to interrupt)

Whoa, what was all that stuff at the <snip>?

When you told sbt to run your application, it fired up a single-node local Spark system, which uses an Akka cluster to achieve parallelism, then ran your job.

The result is right at the end there:

Lines with a: 7, Lines with b: 4

As you can see, Spark is remarkably easy to set up and play with in local mode.

Running locally is a great way to try Spark. There’s not much infrastructure to set up, and you can easily get a feel for how it feels to load up your data and use Spark’s collections to manipulate it.

In real life, though, you’ll want to run your jobs on a cluster. In the next blog post, we’ll modify this simple app to run on a Spark cluster, to get the benefits of distributed processing.