Dave Hrycyszyn

2014-05-25 10:00:00 +0100

Streaming Twitter into Spark

I mentioned during the introduction to this series that Spark is more than a faster Hadoop. Besides batch processing, it can also operate on very large streams of incoming data, potentially combining and transforming multiple streams in realtime. While it’s doing this, it can do running calculations on the information it’s seen in the stream thus far, or save transformed stream(s) into Resilient Distrbuted Datasets (RDDs) or into the Hadoop Distributed File System (HDFS).

Spark Streaming can operate on a wide variety of streaming data sources in many formats. Built into Spark is the capability to monitor:

  • any HDFS compatible filesystem
  • raw TCP sockets

You can also add additional libraries to access these data sources:

  • The Flume data transport framework
  • The Ganglia grid monitoring system
  • The Kafka distributed messaging system
  • The MQTT “internet of things” pubsub protocol
  • Twitter’s streaming API
  • ZeroMQ message queues

If the system you need to integrate isn’t on this list, you can roll your own integration. Use one of Java’s existing libraries to ingest from whatever format you want, then translate it into something Spark already understands. Java/Scala offers a huge selection of rock-solid libraries for pretty much every conceivable integration case.

Let’s try out Spark streaming. Twitter is a readily available data source, so we’ll use that - but keep in mind that we can use the same techniques on any incoming data stream, or multiple streams.

Note: if you don’t yet have the tools from the Development Quickstart installed, go there now and install Java, SBT, Conscript, and Giter8.

Generate a new sbt project:

g8 chrislewis/basic-project
name [Basic Project]: Spark Streaming Example
organization [com.example]: com.constructiveproof
version [0.1.0-SNAPSHOT]: 1.0

Template applied in ./spark-streaming-example

Ok, the project is now generated. Let’s build it.

cd spark-streaming-example/
sbt

Compile it as a sanity check, just to make sure everything’s working.

compile

Adding dependencies

We’re going to use a few different libraries in our application:

  • Typesafe Config to manage our application configuration
  • Spark, version 0.9.1
  • Spark’s Twitter integration, version 0.9.1

Open up build.sbt and add the dependencies:

libraryDependencies ++= Seq(
  "com.typesafe" % "config" % "1.2.1",
  "org.apache.spark" %% "spark-core" % "0.9.1",
  "org.apache.spark" %% "spark-streaming-twitter" % "0.9.1",
  "org.scalatest" %% "scalatest" % "2.1.RC1" % "test",
  "org.scalacheck" %% "scalacheck" % "1.11.3" % "test"
)

Twitter App Setup

In order to integrate anything against Twitter’s data stream, you’ll need to register a new Twitter App with Twitter itself (you’ll need to be a Twitter user to do this). Here’s how.

First, go to Twitters’s App Management page.

  • hit the “Create New App” button.
  • for a description, say something like: “A very simple Spark streaming example which uses Twitter as a data source”
  • add your website’s URL, use your Github account or fake it if you don’t have one

Agree to the Terms and Conditions, and click the “Create your Twitter application” button to create the app.

Next, click on the “API Keys” tab.

Click on “Create my access token”. It may take a second or two for your token to be created, so refresh the page if it doesn’t show up the first time.

That’s it for Twitter-side setup.

Using Typesafe Config to configure the app

We’ll use the Typesafe Config library to hold our Twitter configuration. Add a new directory src/main/resources, and drop a file called application.conf into it with the following contents:

twitter4j {
  oauth {
    apiKey =       ${?TWITTER_API_KEY}
    apiSecret =    ${?TWITTER_API_SECRET}
    accessToken =       ${?TWITTER_ACCESS_TOKEN}
    accessTokenSecret = ${?TWITTER_ACCESS_TOKEN_SECRET}        
  }
}

Keys and values in this file become available to our application. For example, the key twitter4j.oauth.apiKey will hold the value of the API key.

Because of the special ${?TWITTER_CONSUMER_KEY} format, Typesafe Config will use your shell’s environment to populate the values of the secrets when the application starts. You’ll need to export these in your terminal, or put them in your ~/.bashrc file (Linux) or ~/.bash_profile (Mac).

# exports for Spark streaming example app
export TWITTER_API_KEY="XXXXXXXX"
export TWITTER_API_SECRET="XXXXXXXX"
export TWITTER_ACCESS_TOKEN="XXXXXXXX"
export TWITTER_ACCESS_TOKEN_SECRET="XXXXXXXX"

Replace the XXXXs with the real values in your Twitter app. They’ll be listed on the API Keys tab. Remember to run source ~/.bash_profile (Mac) or source ~/.bashrc (Linux) in order to load these exports into your shell.