Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

Load and Transform Data

In this tutorial, we will build a classification model using a dataset of roughly 19,000 newsgroup postings. (Thanks to the scikit-learn project, on whose example this tutorial is based.) The classifier will predict which category (out of 20 newsgroups) a posting belongs to. The newsgroup dataset exists in numerous formats, but for the purposes of this tutorial the postings have been converted to JSON records and collected in a single input file that is hosted on S3.

In order to go from a set of raw newsgroup postings to a predictive model, our classifier will have to perform several transformations and actions on the dataset. We'll use a Spark script to:

  • Split the full dataset into a training dataset and a smaller dataset for validating the model.
  • Extract the relevant portions of the newsgroup postings for training and validating a predictive model.
  • Tokenize the text, breaking each posting into a bag of words rather than one long string, lowercase all the terms, remove punctuation and stopwords (extremely common words of little predictive value), and stem the terms (e.g., “having” becomes “have”).
  • Apply the hashing trick to the postings, turning each bag of words into a vector that represents the words that appear in the posting and how often they appear.
  • Train a machine learning model on the hashed features and category labels in the training dataset.
  • Run the hashed features of the testing dataset through the model to test its accuracy.

We're going to build our Spark script step by step.

Import Section

First we need to import some utilities and algorithms that we will use in this script. Don't worry about what these do just yet—we'll get to that.

ACTION: Paste the following lines of code at the top of your script:

import string
import json 

from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer

from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes

Create an NLP Function

As you may have gleaned from the import section above, we are going to use NLTK for natural language processing. We will run the text of each newsgroup message through a Python function that “tokenizes” the text (breaking long strings into single terms), lowercases all the tokens, removes punctuation and stopwords (common words with little predictive value), and stems the words (e.g., “catching” and “catches” both become “catch”).

A detailed NLTK walkthrough is beyond the scope of this tutorial (you can read much more about NLTK here), so we’ve provided a complete function below that will make text processing and cleaning much easier.

ACTION: Paste the Python code below into your Spark script:

# Module-level global variables for the `tokenize` function below
PUNCTUATION = set(string.punctuation)
STOPWORDS = set(stopwords.words('english'))
STEMMER = PorterStemmer()

# Function to break text into "tokens", lowercase them, remove punctuation and stopwords, and stem them
def tokenize(text):
    tokens = word_tokenize(text)
    lowercased = [t.lower() for t in tokens]
    no_punctuation = []
    for word in lowercased:
        punct_removed = ''.join([letter for letter in word if not letter in PUNCTUATION])
    no_stopwords = [w for w in no_punctuation if not w in STOPWORDS]
    stemmed = [STEMMER.stem(w) for w in no_stopwords]
    return [w for w in stemmed if w]

Initialize a Spark Context

In order to build RDDs and run your script, you will need to initialize a SparkContext, which is how your program will create RDDs and connect to a Spark cluster.

ACTION: Initialize a SparkContext as "sc" by pasting the following lines into your script:

# Initialize a SparkContext
sc = SparkContext()

Load Data into Spark

Our input data is a text file in S3 containing several thousand newsgroup postings as JSON records, one per line. Each record looks a bit like this:

{"text": "From: USERNAME@leland.Stanford.EDU (USER NAME)\nSubject: When does Fred McGriff of the Padres become a free agent?\nOrganization: DSG, Stanford University, CA 94305, USA\nLines: 1\n\n\n", "label_name": "", "label": 9}

Note that each of the 20 newsgroup categories has been assigned a numeric label between 0 and 19 (e.g., 9 for "") for classification purposes.

To make this file into a Spark RDD, we can use sc.textFile, which reads a text file and returns an RDD of strings. (Recall that we created the SparkContext object "sc" above.)

ACTION: Load the dataset from S3 as an RDD:

# Import full dataset of newsgroup posts as text file
data_raw = sc.textFile('s3://mortar-example-data/newsgroup/newsgroup_full')

Now we'll use Python's JSON module to decode the strings in our RDD so that we can access specific fields in the data. We'll use a few new tactics here:

In this case we want to map our existing RDD data_raw to a new RDD in which each string has been decoded using the JSON module.

ACTION: Transform the RDD of strings we just created into a new, decoded RDD:

# Parse JSON entries in dataset
data = line: json.loads(line))

Now that our data has indexable fields, let's pull out the fields that we want and ignore the rest. The features that we'll feed into our model on are the contents of each posting ("text"). And since this is a supervised learning task, we'll provide the category labels ("label") as well to train the model.

ACTION: Transform our decoded RDD into a new RDD that includes only the fields "text" and "label":

# Extract relevant fields in dataset -- category label and text content
data_pared = line: (line['label'], line['text']))

Optional: Test Your Script

At this point you might be curious to see what the transformed data looks like. Let's run the script as-is to see what's happening. We'll use the first() action to return the first element of our transformed RDD.

ACTION: Insert an action to print the first element of our RDD data_pared as output:

# Temporary print statement for testing partial script
print data_pared.first()

Save your script and run it on Mortar. You can run a Spark script with the "Run Job" button on the "Jobs" page of the Mortar Web app, but here we will use the command line interface to submit the job. Note that this action will launch a two-node cluster (providing enough computing power for this tutorial) in your account, which will shut down automatically after an hour of idling. Spark clusters run on r3.xlarge instance types in Mortar, at the current pass-through price of $0.44 per hour per node. We use r3.xlarge nodes for their superior memory-to-price ratio, which is optimal for Spark.

ACTION: Save your work, and then, from the root directory of your Mortar project, run the following command:

mortar spark sparkscripts/

(Note: You may be accustomed to running Pig and Luigi scripts locally using mortar local commands, which aren’t yet supported for Spark.)

Follow the URL provided to track the progress of your job and to retrieve the output (under the Output tab). While your job is running you can also access the Spark user interface (screenshot below) via the Details tab. If you don’t have a running cluster, it will take several minutes to start one up.

Spark UI

Once the job completes, you should see a tuple under the Output tab with a numerical category label and a string of text.

Spark Spot Check

Once you’re done with this spot check, you should delete the print statement to avoid extraneous output and to allow Spark to optimize the execution of your job, avoiding unnecessary intermediate computations.

TIP: If you want to check how your partially finished script is working at any point, you can insert a similar print <RDD-name>.first() statement in your script and run it, which will tell Spark to output the first record in the RDD so you can see what is happening.

Clean and Standardize Import Data

Now we’ll apply the NLP function we created above to transform the raw text of the newsgroup postings into something more useful for our machine learning model. As described above, the function tokenize splits each posting into individual terms, lowercases them for consistency, strips out punctuation and common “stopwords,” and finally stems the words.

ACTION: Pass the text of each newsgroup posting through the tokenize function defined above:

# Prepare text for analysis using our tokenize function to clean it up
data_cleaned = (label, text): (label, tokenize(text)))

Now the text is in a very clean, standardized form that will make it easy to train a high-quality classifier. That's just what we'll do in the next step of the tutorial.