Luigi is a Python-based framework for expressing data pipelines. Everything in Luigi is in Python. Instead of XML configuration or similar external data files, the dependency graph is specified entirely within simple Python classes. This makes it easy to build up large dependency graphs of tasks, where the dependencies can involve date algebra or recursive references to other versions of the same task.
However, your data pipeline can trigger things not in Python, such as running Mortar jobs or moving files on S3. In fact, you can run anything you like from within a Luigi pipeline.
There are two important concepts to understand in Luigi: Targets and Tasks.
Broadly speaking, the Target class corresponds to a file on a disk. Or a file on S3. Or some kind of a checkpoint, like an entry in a database.
The only method that Targets have to implement is the
exists method, which returns True if and only if the Target exists.
import luigi class MyTarget(luigi.Target): def exists(self): """ Does this Target exist? """ return True
In practice, implementing Target subclasses is rarely needed. You will mostly use the LocalTarget, S3Target, or FTPTarget classes that are available out of the box. These directly map to a file on the local drive, on S3, or in FTP, respectively.
The Task class is where work gets done in Luigi. It has a very simple interface, with only three methods you need to implement: requires, output, and run.
import luigi class MyExampleTask(luigi.Task): # Example parameter for our task: a # date for which a report should be run report_date = luigi.DateParameter() def requires(self): """ Which other Tasks need to be complete before this Task can start? Luigi will use this to compute the task dependency graph. """ return [MyUpstreamTask(self.report_date)] def output(self): """ When this Task is complete, where will it produce output? Luigi will check whether this output (specified as a Target) exists to determine whether the Task needs to run at all. """ return S3Target('s3://my-output-bucket/my-example-tasks-output') def run(self): """ How do I run this Task? Luigi will call this method if the Task needs to be run. """ # We can do anything we want in here, from calling python # methods to running shell scripts to calling APIs
The requires method is used to specify dependencies on other Task objects, which might even be of the same class. In this example, MyExampleTask requires that MyUpstreamTask has already been completed. It also passes a Parameter, report_date, to MyUpstreamTask, indicating that the work should be done for that particular date. In general, you can have as many upstream dependencies as you like, or you can indicate zero dependencies by returning an empty array (
return ), which you'll often do for the Task that kicks off a pipeline.
The output method returns one or more Target objects that point at the output from your Task. When you run a pipeline, Luigi first checks whether its output Targets already exist, and if not, schedules the Task to run. Otherwise, Luigi assumes that the Task is complete, and does not rerun it.
The run method contains the actual code that is run. Anything you want can go into this method. In practice, you'll often use pre-baked Task subclasses (like MortarProjectPigscriptTask) where the run method is already implemented for you.
Luigi has built-in checkpointing support. Any time you run a pipeline, Luigi will only run the work that has not already been completed. Luigi determines whether a Task is complete by checking whether its expected output already exists.
In practice, this means you can (and should!) rerun your Luigi pipeline as often as you like. It will only do the work that needs to be done and no more. If an error happens on one execution, it will restart right at that point on the next one and try again. Luigi also ensures that duplicate Tasks don't run simultaneously, protecting you from duplicate output.
This concept of being able to confidently rerun your process is known as idempotence, and it makes Luigi ideal for large, complex pipelines where failures will happen. Operating such a pipeline is easy: you restart it with the same parameters and it heals itself.
The easiest way to get started with Luigi is to build and run a sample pipeline. We'll do that next.
(*) Thanks to the official Luigi documentation for some of the text and examples used here.