Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

Building Your Pipeline

Importing Luigi and Friends

A Luigi script is written in Python. The first thing you need to do is import the packages that your script will need to run. For this example we'll supply the import section, which imports not only Luigi itself but some additional modules that we'll use later in this script.

ACTION: Copy this code into the top of your new Luigi script:

import luigi
from luigi import configuration
from luigi.s3 import S3Target, S3PathTask

from mortar.luigi import mortartask

How Dependencies Work

One of the core elements of any Luigi Task is its dependencies. Task B depends on Task A if it requires Task A's output to exist. In cases where there is no output generated by a Task, we store output tokens to indicate a Task has completed.

In luigi, we express Tasks as classes, and define their dependencies with a requires method:

class MyTask(luigi.Task):

    The requires method is how you build your dependency graph in 
    Luigi. Luigi will not run this Task until all Tasks in the 
    requires list are complete.

    def requires(self):
        return [S3PathTask('s3://path-we-require-goes-here')]

S3PathTask is an external Luigi Task that verifies the existence of a file in S3. For the above example, we're telling Luigi to only run MyTask when the required S3PathTask is complete.

Running a Pig Script

Now's let's update to run a Pigscript on Mortar. Here's what a Pigscript Task looks like in Luigi:

class MyPigScriptTask(mortartask.MortarProjectPigscriptTask):

    # Cluster size to use for running this Pig script.
    # Defaults to Local Mode (no cluster), 
    # but larger cluster values can be passed in at runtime
    cluster_size = luigi.IntParameter(default=0)

    # S3 path to the script's output directory 
    # (will be passed in as a parameter at runtime)
    output_path = luigi.Parameter()

    # S3 path for Luigi to store tokens indicating 
    # the Task has completed
    def token_path(self):
        return self.output_path

    def script(self):
        Name of the Pigscript to run 
        (omit the .pig from the file name).
        return 'my-script-name-goes-here'

    def requires(self):
        Dependency of this Task -- in this case Luigi uses 
        S3PathTask to check for the existence of input data
        at a certain S3 path before running this Task.
        return [S3PathTask('s3://input-path-where-data-lives')]

    def parameters(self):
        Any parameters that you want to pass to your Pig 
        script can go here. In this case we want the Pig job 
        to write output to the same path Luigi is using, 
        which will make it easier for subsequent Luigi tasks 
        to find the output from the Pig job.
        return {
            'OUTPUT_PATH': self.output_path

    def script_output(self):
        S3 target where the Pig job's output will be stored. 
        This directory will be cleared of partial data 
        if the script fails.
        return [S3Target(self.output_path)]

Let's start working from that template.

ACTION: Paste the MyPigScriptTask template above into your script.

For the Luigi script we're writing, we're going to run the google_books_words Pigscript as the first Task.

ACTION: Fill in the google_books_words script name in the script method.

The sole dependency of our Task will be the existence of input data on S3. Note that the google_books_words script takes input data from s3://mortar-example-data/ngrams/books/20120701/eng-all/1gram/googlebooks-eng-all-1gram-20120701-q.gz. Let's update the requires method to require data from that S3 Path.

ACTION: Fill in the correct S3 path in the requires method.

Now's let's name our Task something more descriptive. We'll use the name WordRank, which will make it easier to connect up this Task to the subsequent Task.

ACTION: Change the class name from MyPigScriptTask to WordRank.

Congratulations! You've built a Luigi Task that will run a Pigscript. Now let's add one more Task and then run our Luigi script.

Running a Sanity Test on Output Data

For our second Task, we're going to make sure that the output from our google_books_words Pigscript is sensible.

Here we will use a little Python code to ensure that all the words in our output file indeed begin with "q", as should be the case.

ACTION: Append the following Task to the end of your script:

class SanityTest(luigi.Task):

    output_path = luigi.Parameter()

    def requires(self):
        This Task takes the output of the WordRank Task as its input, 
        so we list WordRank as a dependency.
        return [WordRank(output_path=self.output_path)]

    def output(self):
        We want this Task to write its tokens to a unique location, 
        defined by the name of the class.
        return [S3Target('%s/%s' % (self.output_path, self.__class__.__name__))]

    def run(self):
        This Python code checks that each word in the pig script 
        output file begins with 'q' and returns an exception if not.
        file = S3Target('%s/%s/part-r-00000' % (self.output_path, 'dictionary'))
        for line in'r'):
            if line[0] != 'q':
                raise Exception("Word: %s didn't start with q" % word)

Take a look at the requires method on the SanityTest Task. By including WordRank there, we're telling Luigi that SanityTest depends the output of the WordRank Task. Now when we run this script, Luigi will automatically run WordRank first, and then run SanityTest afterward.

Notice also how we pass the output_path parameter through to WordRank in the requires method. All parameters without explicit default values must be passed through to the required Task.

Telling Luigi What to Run

You've now linked two Tasks together to create a data pipeline! Granted it's a pretty simple one, but the great thing about Luigi is that you can keep adding Tasks and dependencies to make your pipeline as complex as you need. And as you saw in the sanity test Task, a Task can be anything you can write in Python.

Before we have completely finished this pipeline, though, we need to add a bit of Python to tell Luigi which Task to run.

ACTION: Add the following code to the end of the script:

if __name__ == "__main__":

This snippet tells Luigi to look at the last Task in the pipeline, to evaluate whether that Task has unfulfilled dependencies, and then move up the dependency chain until it finds a Task whose dependencies are fulfilled.

At this point you should have a ready-to-run Luigi script. If you have any trouble building the script, look at for hints.