Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

The Example ETL Pipeline

Now that you have your Redshift cluster running, its time to run the ETL pipeline that will build the example data warehouse.

The Example Data

The mortar-etl-redshift example looks at a small 0.5GB sample of user interactions from wikipedia for a particular month. This data is stored at: s3://mortar-example-data/wikipedia/pagecounts-2011-07-aa.

Download that file and take a look at it. You will see that the data looks like this:

aa.d Special:Gadgets 5 KH1,VO1,^J1,^K2

This rather cryptic schema encodes Wikipedia pageviews. There are four fields, separated by whitespace. The schema is as follows:

  • Wikipedia code - includes information on the language and type (wikibooks, wikinews, ...) of the article that was read.
  • Article Title
  • Total number of views that month
  • Number of views for each hour of each day-of-the-month, encoded in a custom wiki format.

The complete documentation is here.

Let's look at the ETL pipeline that will build a data warehouse for this data. We'll cover how to write your own Pig code in a later tutorial, but for now we are just going to work through the different Pig scripts, explaining what they do. All of these scripts can be found in the project you forked from mortar-etl-redshift.


Extract

Open pigscripts/01-wiki-extract-data.pig in your code editor.

For this example our raw data is already in S3 and in a format that we can easily transform. The only thing we're going to do is drop the monthly_pageviews field since it will be easy to calculate from the more granular hourly pageview data.

raw = load '$INPUT_PATH'
     using PigStorage(' ')
        as (
            wiki_code:chararray,
            article:chararray,
            monthly_pageviews:int,
            encoded_hourly_pageviews:chararray
        );

data = foreach raw generate wiki_code, article, encoded_hourly_pageviews;

rmf $OUTPUT_PATH/extract;
store raw into '$OUTPUT_PATH/extract' using PigStorage('\t');

We included this "extract" step for completeness of the example, but we could have skipped it and just used our raw data already in S3. In general, you will only need the extract step if you are extracting your data from a data source other than S3. However, there may be cases where you are writing new data to your source S3 location while your ETL pipeline is running and it may still be useful to capture an exact snapshot of the data you processed.

Pig has a handy feature called "illustrate" that will help you visualize how your script works. Try it out by illustrating the extract script. In your terminal, run:

mortar local:illustrate pigscripts/01-wiki-extract-data.pig -f params/wiki.params

The Mortar ETL pipeline requires a number of parameters to run. These parameters will be covered in a later tutorial. For now, just note that all commands require a path to a parameter file.

The first time you run a mortar local command, it will take a minute or two to set up your environment. On the first time only, Mortar downloads all of the dependencies you need to run a Pig job into a local sandbox for your project. This lets you run everything on your own machine quickly and without having to launch a Hadoop cluster.

After the command finishes you should see output similar to:

Illustrate Results

Here you can see what the data looks like and that it's loading correctly. We will use illustrate throughout the tutorials to help visualize how data is being loaded and processed.

Transform

Looking at the schema of the data there are two things that need to be transformed: wiki_code and encoded_hourly_pageviews.

For wiki_code we're going to separate the language from the wiki type. And for encoded_hourly_pageviews, we're going to transform each encoded day/hour combination into separate columns and rows with a sensible easy-to-understand format.

Here's what the above example should look like after being transformed:

language_code   wiki_type   article_title     day   hour   pageviews 
aa              d           Special:Gadgets   11    7      1
aa              d           Special:Gadgets   22    14     1
aa              d           Special:Gadgets   30    9      1
aa              d           Special:Gadgets   30    10     2

The transformation code is in pigscripts/02-wiki-transform-data.pig. Open it in your code editor and take a look at the code that does this transformation.

-- Load data from the extract step.
raw =  load '$OUTPUT_PATH/extract'
      using PigStorage()
         as (
              wiki_code:chararray,
              article:chararray,
              encoded_hourly_pageviews:chararray
          );

-- use jython UDFs to decode wiki_code and encoded_hourly_pageviews
decoded = foreach raw generate 
            wiki_code,
            flatten(wikipedia.decode_wikicode(wiki_code))
                as (language, wiki_type),
            article,
            flatten(wikipedia.decode_pageviews(encoded_hourly_pageviews))
                as (day, hour, pageviews);

Here, we're running two Jython User-Defined functions (UDFs), decode_wikicode and decode_pageviews, to translate the data to a more usable format. You can see the code for these UDFs by opening up udfs/jython/wikipedia.py in your text editor. Check out Writing Jython UDFs to learn more about writing your own UDFs in Jython.

Now that our data has been transformed we need to prepare it to be loaded into Redshift. That code looks like this:

-- split data into multiple output files to speed
-- up loading into Redshift
grouped_data =    group decoded
                     by (day, hour)
               parallel $REDSHIFT_PARALLELIZATION;
reduced =  foreach grouped_data
          generate flatten(decoded);

-- Use gzip compression
set output.compression.enabled true;
set output.compression.codec org.apache.hadoop.io.compress.GzipCodec;

-- remove any existing data
rmf $OUTPUT_PATH/transform;
store reduced into '$OUTPUT_PATH/transform' using PigStorage('\t');

To speed up Redshift's data loading process, we ask Pig and Hadoop to split data into multiple output files. The parameter $REDSHIFT_PARALLELIZATION determines how many output files Hadoop will generate. The Mortar ETL pipeline will automatically calculate and set this parameter for you based on the size of your Hadoop cluster, but you can override it if you like.

It's important that you group on a field (or fields) that will result in a number of groups a couple of times greater than your cluster size and with each group having an approximately equal split of your data. This prevents performance problems like Reducer Skew.

In this example, we group by the (day, hour) combination. We expect that while some (day, hour) combinations will be larger than others, there won't be many that are orders of magnitude greater than others. If we had grouped on a field like langauge, we would have only a small number of groups and a very skewed division of data, as there are languages with many orders of magnitude more data than others (Ex: English and Esperanto).

Before storing the data we're also going to ensure that the data is gzip-compressed so that it will load into Redshift faster.

To see how this script works, do a local illustrate on it:

mortar local:illustrate pigscripts/02-wiki-transform-data.pig -f params/wiki-transform.params

Load

The Load step isn't going to be performed by a Pig script. Instead we're going to use a Redshift-specific task in Luigi to copy the data to Redshift, which we'll cover in the next article.