Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

Your ETL Pipeline

Now that you have your data transformed we need to get it to your Redshift cluster.

Luigi Config

Similar to the wikipedia example, we need to set the Luigi config with some personalized parameters.

The mortar-etl-redshift project has a directory called luigiscripts, which contains Luigi pipelines that can be run on Mortar.

The luigiscripts directory contains client.cfg.template, a Luigi configuration file. Each time you run Luigi, Mortar will expand the variables in this file (e.g. ${MORTAR_EMAIL}) to their actual values and store the result in luigiscripts/client.cfg. You should not check luigiscripts/client.cfg into source control, as it will be generated on each new run.

Before running the ETL pipeline we need to set Secure Configuration Parameters for the values referenced in client.cfg.template. To do that, fill in the missing values below (omitting the < and >) and run:

    mortar config:set HOST=<my-endpoint.redshift.amazonaws.com>
    mortar config:set PORT=5439
    mortar config:set DATABASE=<my-database-name>
    mortar config:set USERNAME=<my-master-username>
    mortar config:set PASSWORD=<my-master-username-password>

Luigi Script

Now open up luigiscripts/my-redshift.py. This luigi work flow implements a basic Mortar ETL pipeline that runs the extract and transform scripts and then copies that output to Redshift.

The first thing you need to do is update it with the schema of your data. For help determining valid names and data types see Amazon Redshift Basic Elements. You will need to ensure that the format of your data at the end of the transform step matches the schema you have defined here.

# REPLACE WITH YOUR REDSHIFT COLUMNS
REDSHIFT_COLUMNS = [
        ('language_code', 'text'),
        ('wiki_type', 'text'),
        ('article_title', 'varchar(max)'),
        ('day', 'int'),
        ('hour', 'int'),
        ('pageviews', 'int'),
        ('PRIMARY KEY', '(article, day, hour)')]

In cases of more complex ETL pipelines you may need to make some additional changes to your Luigi work flow.

Modify Parameters

The template extract and transform scripts only take parameters for the input and output paths they should use. If your scripts require additional parameters you will need to add them to the parameters method of the appropriate ETLPigscriptTask.


Adding Tasks to Your Luigi Script

Pig Script Tasks

In a complex ETL pipeline you may have multiple extract or transform steps to work with multiple data sources or to help break up complex logic. To add a step that runs a Mortar script you would just add a task similar to the TransformDataTask.

class TransformDataTask(ETLPigscriptTask):
    """
    Task that runs the data extraction script pigscripts/02-transform-data.pig.
    """

    def script_output(self):
        return [S3Target(create_full_path(self.output_base_path, 'transform'))]

    def parameters(self):
        return { 'OUTPUT_PATH': self.output_base_path,
                 'INPUT_PATH': self.input_base_path,
                 'REDSHIFT_PARALLELIZATION': self.number_of_files()
                }

    def script(self):
        return '02-transform-data.pig'

    def requires(self):
        return ExtractDataTask(
                        self.cluster_size,
                        input_base_path=self.input_base_path,
                        output_base_path=self.output_base_path)

To customize the task make the following changes:

  • Modify the script_output function to write to an s3 location specific to your task.
  • Modify the parameters function to pass any parameters your script expects
  • Modify the script function to return the Pig script you want to run.
  • Modify the requires function to reference the task that needs to be complete before your new task can run. If your task has no dependencies on other tasks, just delete the function.

You will also need to modify a downstream task's (ie. an existing task that will require your new task to complete before it can run) requires method to include your new task.

For more information, see the Pig Script tasks help page.

Other Luigi Tasks

There are a number of other ready-to-use Luigi tasks you can drop into your pipeline. Some examples include:

You can also use Basic Luigi Python Tasks to run any custom logic you need from Python.