The mortar-etl-redshift example looks at a small 0.5GB sample of user interactions from wikipedia for a particular month. This data is stored at:
Download that file and take a look at it. You will see that the data looks like this:
aa.d Special:Gadgets 5 KH1,VO1,^J1,^K2
This rather cryptic schema encodes Wikipedia pageviews. There are four fields, separated by whitespace. The schema is as follows:
The complete documentation is here.
Let's look at the ETL pipeline that will build a data warehouse for this data. We'll cover how to write your own Pig code in a later tutorial, but for now we are just going to work through the different Pig scripts, explaining what they do. All of these scripts can be found in the project you forked from mortar-etl-redshift.
pigscripts/01-wiki-extract-data.pig in your code editor.
For this example our raw data is already in S3 and in a format that we can easily transform. The only thing we're going to do is drop the
monthly_pageviews field since it will be easy to calculate from the more granular hourly pageview data.
raw = load '$INPUT_PATH' using PigStorage(' ') as ( wiki_code:chararray, article:chararray, monthly_pageviews:int, encoded_hourly_pageviews:chararray ); data = foreach raw generate wiki_code, article, encoded_hourly_pageviews; rmf $OUTPUT_PATH/extract; store raw into '$OUTPUT_PATH/extract' using PigStorage('\t');
We included this "extract" step for completeness of the example, but we could have skipped it and just used our raw data already in S3. In general, you will only need the extract step if you are extracting your data from a data source other than S3. However, there may be cases where you are writing new data to your source S3 location while your ETL pipeline is running and it may still be useful to capture an exact snapshot of the data you processed.
Pig has a handy feature called "illustrate" that will help you visualize how your script works. Try it out by illustrating the extract script. In your terminal, run:
mortar local:illustrate pigscripts/01-wiki-extract-data.pig -f params/wiki.params
The Mortar ETL pipeline requires a number of parameters to run. These parameters will be covered in a later tutorial. For now, just note that all commands require a path to a parameter file.
The first time you run a mortar local command, it will take a minute or two to set up your environment. On the first time only, Mortar downloads all of the dependencies you need to run a Pig job into a local sandbox for your project. This lets you run everything on your own machine quickly and without having to launch a Hadoop cluster.
After the command finishes you should see output similar to:
Here you can see what the data looks like and that it's loading correctly. We will use illustrate throughout the tutorials to help visualize how data is being loaded and processed.
Looking at the schema of the data there are two things that need to be transformed:
wiki_code we're going to separate the language from the wiki type. And for
encoded_hourly_pageviews, we're going to transform each encoded day/hour combination into separate columns and rows with a sensible easy-to-understand format.
Here's what the above example should look like after being transformed:
language_code wiki_type article_title day hour pageviews aa d Special:Gadgets 11 7 1 aa d Special:Gadgets 22 14 1 aa d Special:Gadgets 30 9 1 aa d Special:Gadgets 30 10 2
The transformation code is in
pigscripts/02-wiki-transform-data.pig. Open it in your code editor and take a look at the code that does this transformation.
-- Load data from the extract step. raw = load '$OUTPUT_PATH/extract' using PigStorage() as ( wiki_code:chararray, article:chararray, encoded_hourly_pageviews:chararray ); -- use jython UDFs to decode wiki_code and encoded_hourly_pageviews decoded = foreach raw generate wiki_code, flatten(wikipedia.decode_wikicode(wiki_code)) as (language, wiki_type), article, flatten(wikipedia.decode_pageviews(encoded_hourly_pageviews)) as (day, hour, pageviews);
Here, we're running two Jython User-Defined functions (UDFs),
decode_pageviews, to translate the data to a more usable format. You can see the code for these UDFs by opening up
udfs/jython/wikipedia.py in your text editor. Check out Writing Jython UDFs to learn more about writing your own UDFs in Jython.
Now that our data has been transformed we need to prepare it to be loaded into Redshift. That code looks like this:
-- split data into multiple output files to speed -- up loading into Redshift grouped_data = group decoded by (day, hour) parallel $REDSHIFT_PARALLELIZATION; reduced = foreach grouped_data generate flatten(decoded); -- Use gzip compression set output.compression.enabled true; set output.compression.codec org.apache.hadoop.io.compress.GzipCodec; -- remove any existing data rmf $OUTPUT_PATH/transform; store reduced into '$OUTPUT_PATH/transform' using PigStorage('\t');
To speed up Redshift's data loading process, we ask Pig and Hadoop to split data into multiple output files. The parameter
$REDSHIFT_PARALLELIZATION determines how many output files Hadoop will generate. The Mortar ETL pipeline will automatically calculate and set this parameter for you based on the size of your Hadoop cluster, but you can override it if you like.
It's important that you group on a field (or fields) that will result in a number of groups a couple of times greater than your cluster size and with each group having an approximately equal split of your data. This prevents performance problems like Reducer Skew.
In this example, we group by the (day, hour) combination. We expect that while some (day, hour) combinations will be larger than others, there won't be many that are orders of magnitude greater than others. If we had grouped on a field like langauge, we would have only a small number of groups and a very skewed division of data, as there are languages with many orders of magnitude more data than others (Ex: English and Esperanto).
Before storing the data we're also going to ensure that the data is gzip-compressed so that it will load into Redshift faster.
To see how this script works, do a local illustrate on it:
mortar local:illustrate pigscripts/02-wiki-transform-data.pig -f params/wiki-transform.params
The Load step isn't going to be performed by a Pig script. Instead we're going to use a Redshift-specific task in Luigi to copy the data to Redshift, which we'll cover in the next article.