Lately I've been playing around with Spark for data processing. It provides some really amazing features like MLLib and Spark SQL and there's no better way to learn something that to use it. I've attended a couple of meet ups about Spark and its related tools including the famous ampcamp put on by the developers of spark and, although I'm not an expert, I thought it would be good to consolidate my knowledge and teach others.
I always like to best understand the problem that is being solved when I approach new tools. To throw some buzzwords around, you're looking to perform some data science on some big data. More simply, you've got a ton of information that you want to understand more about and doing it on your computer would just be too slow.
Some great examples of data problems that are solved well by a tool like Apache Spark include:
- Analyzing Log Data
- In order to hunt down a bug happening on a production server(s)
- Massive Natural Language Processing
- Finally some use for all that twitter data you've been downloading...
- Large Scale Recommendation Systems or General Machine Learning Tasks
- Recommending products to users or trying to find related groups
While this is far from an exhaustive list, it gives a starting point to the problem we're solving: we've got a ton of information and we want to (actionable) extract information from it.
In this tutorial I'll be using Spark 1.1.1. I'll also assume that you're on some sort of Unix system.
It's important to note that I'll be focusing on PySpark and you can check out the (documentation) for the code examples.
Go ahead and download Spark 1.1.1 right here.
You can download other versions of Apache Spark on the site.
Once we've got Spark downloaded, let's get started. You're going to have to build Spark yourself once you've downloaded it but as the readme will show you, it's pretty straight forward. Just run
./sbt/sbt assembly in the root download folder directory. It may take a while to build (took me 15+ minutes from scratch) but just be patient. I'd grab a cup of coffee.
Once it's all built, you should be ready to go. Let's dive right into the
PySpark shell with
./bin/pyspark on the command line.
You should see some log information then boom you'll be in the Spark Shell.
Welcome to \_\_\_\_ \_\_ / \_\_/\_\_ \_\_\_ \_\_\_\_\_/ /\_\_ \_\ \/ \_ \/ \_ `/ \_\_/ '\_/ /\_\_ / .\_\_/\\_,\_/\_/ /\_/\\_\ version 1.1.1 /\_/ Using Python version 2.7.6 (default, Sep 9 2014 15:04:36) SparkContext available as sc. >>>
On a quick note while I love the regular python shell, I really like the IPython Shell much better. It's got tons of handy tools built right in and using IPython with PySpark is super easy. Just close out of that Python Shell and set the IPYTHON environmental variable to 1. Run this in your bash shell.
export IPYTHON=1 ./bin/pyspark
Now we're loaded up into a python shell and we can finally get to work! Now the python shell we're in is the same python shell that you have on your machine. We can import libraries, do math, same 'ol stuff.
import numpy as np 1 + 1 # 2 x = 5 x # 5
There is however, one difference, the
sc variable or SparkContext. You can see this is what was made available when we started up the shell previously.
type(sc) # pyspark.context.SparkContext
This spark context is the source for all of our handy Spark features.
Let's go ahead and get started analyzing some data so we can better understand the SparkContext. Let's start small to understand the concepts. I'll be using the Rotten Tomatoes Dataset from Kaggle. Go ahead and download it and put it in the same Spark download folder on your machine.
Now, let's go ahead and open it on up and get things started. We load in data with the
data = sc.textFile('../rotten\_tomatoes\_train.tsv') type(data) # pyspark.rdd.RDD
Our dataset is now loaded into spark as an RDD or Resilient Distributed Dataset. This is the fundamental abstraction in Spark and basically it is a representation of a dataset that is distributed through the cluster. Obviously at this point we don't have a cluster running as we are just on our local machine but the same concepts apply.
One of the most important concepts concerning RDDs is that they are immutable. What you're doing is applying a series of
transformations to data (stored in the RDD) then finally performing an
action in order to get an answer.
Just as a note, our columns are tab separated and arranged as follows:
PhraseId SentenceId Phrase Sentiment
First let's get the count, how many items(lines) do we have in our data? This is an example of an
data.count() # ...log statements # 156061
There we get an answer because we're requesting an
action as opposed to a
transformation. Let's perform one of those now.
negative\_reviews = data.filter(lambda line: '0' == line.split('\t')) print negative\_reviews # PythonRDD at RDD at PythonRDD.scala:43 print type(negative\_reviews) # <class 'pyspark.rdd.PipelinedRDD'>
Notice how when we go to print it, it prints out that it is an RDD and that the type is a
PipelinedRDD not a list of values as we might expect. That's because we haven't performed an
action yet, we've only performed a
As a side note, we could write the above code this way as well.
negative\_reviews = data \ .map(lambda line: line.split('\t')) \ .filter(lambda line: line == '0')
You'll notice that when we run
transformation commands, nothing gets printed out. That's because Spark commands are lazily evaluated. We set up a
transformation from RDD to RDD to prepare to run an
action to get back results.
Here are some examples of other
negative\_reviews.count() # 7072 negative\_reviews.first() # [u'102', u'3', u'would have a hard time sitting through this one', u'0']
Now that we've got a better idea about how Spark works and executes commands through
actions let's write ourselves a little MapReduce job to figure out what words show up most commonly in the negative reviews.
word\_counts = negative\_reviews \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word,1)) \ .reduceByKey(lambda a,b: a+b)
What we're doing above is getting the word counts. We do that by tokenizing the review text in a
flatMap which basically just makes it into one big giant list of words. Then we map each word to 1 and
reduceByKey which parallelizes our reduce operation to each key in our data set.
The above code is equivalent to:
word\_counts = negative\_reviews \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word,1)) \ .groupByKey() \ .map(lambda val: (val, len(val))) # basically the value and the generator
Now let's get the most common word we can find in the reviews.
def most\_common(comp1, comp2): word1, count1 = comp1 word2, count2 = comp2 if count1 > count2: return (word1, count1) else: return (word2, count2) most\_common\_word = word\_counts.reduce(most\_common) print most\_common\_word # (u',', 3722)
Knowing that it is a comma isn't too useful, let's try it without punctuation, which means rewriting our map function.
import string def clean\_words(word): if word not in string.punctuation: return (word, 1) else: return (word, 0) word\_counts = negative\_reviews \ .flatMap(lambda line: line.split()) \ .map(clean\_words) \ .reduceByKey(lambda a,b: a+b) most\_common\_word = word\_counts.reduce(most\_common) print most\_common\_word # (u'the', 3070)
Now that's not particularly useful either so let's try getting just the top 5. We'll keep using the word_counts that excludes punctuation.
word\_counts.sortBy(lambda x: x\*-1).take(5) # (u'the', 3070), (u'a', 2572), (u'and', 2507), (u'of', 2236), (u'to', 1880)]
Note how we had to multiply it by negative one in order to sort it least to greatest.
Finally now that we've performed some analysis. We might want to store those
word_counts for some later use. Speaking of which, it'd also be valuable to store the
negative_reviews too. It's helpful to imagine this being a much larger dataset as this is a trivial example but let's go ahead and save it.
Note that negative reviews is saved as a Hadoop text file while word counts is saved as a sequence file. Because these datasets are typically large, we have to save them as these distributed data files. Here's a list of some of the ways you can save these files.
data.saveAsHadoopDataset() data.saveAsNewAPIHadoopFile() data.saveAsTextFile() data.saveAsHadoopFile() data.saveAsPickleFile() data.saveAsNewAPIHadoopDataset() data.saveAsSequenceFile()
Those are the basics for how to analyze data in Spark. In some later posts I'll be going over how to analyze larger sets of data. Going over how to run MapReduce jobs on Spark, using SparkSQL and more.