Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

How Luigi Works

Luigi is a Python-based framework for expressing data pipelines. Everything in Luigi is in Python. Instead of XML configuration or similar external data files, the dependency graph is specified entirely within simple Python classes. This makes it easy to build up large dependency graphs of tasks, where the dependencies can involve date algebra or recursive references to other versions of the same task.

However, your data pipeline can trigger things not in Python, such as running Mortar jobs or moving files on S3. In fact, you can run anything you like from within a Luigi pipeline.

Luigi Basics

There are two important concepts to understand in Luigi: Targets and Tasks.

Targets

Broadly speaking, the Target class corresponds to a file on a disk. Or a file on S3. Or some kind of a checkpoint, like an entry in a database.

The only method that Targets have to implement is the exists method, which returns True if and only if the Target exists.

import luigi

class MyTarget(luigi.Target):

    def exists(self):
        """
        Does this Target exist?
        """
        return True

In practice, implementing Target subclasses is rarely needed. You will mostly use the LocalTarget, S3Target, or FTPTarget classes that are available out of the box. These directly map to a file on the local drive, on S3, or in FTP, respectively.

Tasks

The Task class is where work gets done in Luigi. It has a very simple interface, with only three methods you need to implement: requires, output, and run.

import luigi

class MyExampleTask(luigi.Task):

    # Example parameter for our task: a 
    # date for which a report should be run
    report_date = luigi.DateParameter()

    def requires(self):
        """
        Which other Tasks need to be complete before
        this Task can start? Luigi will use this to 
        compute the task dependency graph.
        """
        return [MyUpstreamTask(self.report_date)]

    def output(self):
        """
        When this Task is complete, where will it produce output?
        Luigi will check whether this output (specified as a Target) 
        exists to determine whether the Task needs to run at all.
        """
        return S3Target('s3://my-output-bucket/my-example-tasks-output')

    def run(self):
        """
        How do I run this Task?
        Luigi will call this method if the Task needs to be run.
        """
        # We can do anything we want in here, from calling python
        # methods to running shell scripts to calling APIs

The requires method is used to specify dependencies on other Task objects, which might even be of the same class. In this example, MyExampleTask requires that MyUpstreamTask has already been completed. It also passes a Parameter, report_date, to MyUpstreamTask, indicating that the work should be done for that particular date. In general, you can have as many upstream dependencies as you like, or you can indicate zero dependencies by returning an empty array (return []), which you'll often do for the Task that kicks off a pipeline.

The output method returns one or more Target objects that point at the output from your Task. When you run a pipeline, Luigi first checks whether its output Targets already exist, and if not, schedules the Task to run. Otherwise, Luigi assumes that the Task is complete, and does not rerun it.

The run method contains the actual code that is run. Anything you want can go into this method. In practice, you'll often use pre-baked Task subclasses (like MortarProjectPigscriptTask) where the run method is already implemented for you.

Checkpointing and Failure Recovery

Luigi has built-in checkpointing support. Any time you run a pipeline, Luigi will only run the work that has not already been completed. Luigi determines whether a Task is complete by checking whether its expected output already exists.

In practice, this means you can (and should!) rerun your Luigi pipeline as often as you like. It will only do the work that needs to be done and no more. If an error happens on one execution, it will restart right at that point on the next one and try again. Luigi also ensures that duplicate Tasks don't run simultaneously, protecting you from duplicate output.

This concept of being able to confidently rerun your process is known as idempotence, and it makes Luigi ideal for large, complex pipelines where failures will happen. Operating such a pipeline is easy: you restart it with the same parameters and it heals itself.

Getting Started

The easiest way to get started with Luigi is to build and run a sample pipeline. We'll do that next.

(*) Thanks to the official Luigi documentation for some of the text and examples used here.