Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

Redshift Tasks

Redshift Tasks load data into Amazon's Redshift, the popular data warehouse as-a-service.

Configuration

To access Redshift with Mortar you first need to set some Secure Configuration Parameters. Fill in the missing values below (omitting the < and >) and run:

    mortar config:set HOST=<my-endpoint.redshift.amazonaws.com>
    mortar config:set PORT=5439
    mortar config:set DATABASE=<my-database-name>
    mortar config:set USERNAME=<my-master-username>
    mortar config:set PASSWORD=<my-master-username-password>

(You can get the details about your Redshift cluster from the AWS Redshift Console.)

S3CopyToTable

Copies data from S3 to a table in Redshift.

Example Usage

Note that this Task is part of the luigi.contrib submodule, which must be imported at the start of the script:

from luigi.contrib import redshift
class CopyToRedshiftTask(redshift.S3CopyToTable):

    # The Redshift table where the data will be written.
    table_name = luigi.Parameter()

    output_base_path = luigi.Parameter()

    def requires(self):
        return [OtherTask()]

    # The schema of the Redshift table where the data will be written.
    columns =[
        ('column1_name', 'text'),
        ('column2_name', 'int'),
        ('PRIMARY KEY', '(column1_name, column2_name)')]

    def redshift_credentials(self):
        """
        Returns a dictionary with the necessary fields 
        for connecting to Redshift.
        """
        config = configuration.get_config()
        section = 'redshift'
        return {
            'host' : config.get(section, 'host'),
            'port' : config.get(section, 'port'),
            'database' : config.get(section, 'database'),
            'username' : config.get(section, 'username'),
            'password' : config.get(section, 'password'),
            'aws_access_key_id' : config.get(section, 'aws_access_key_id'),
            'aws_secret_access_key' : config.get(section, 'aws_secret_access_key')
        }

    def s3_load_path(self):
        """
        The S3 path to the files that will be loaded into Redshift.
        """
        return 's3://your-bucket/path-to-files'

    """
    Property methods for connecting to Redshift.
    """

    @property
    def aws_access_key_id(self):
        return self.redshift_credentials()['aws_access_key_id']

    @property
    def aws_secret_access_key(self):
        return self.redshift_credentials()['aws_secret_access_key']

    @property
    def database(self):
        return self.redshift_credentials()['database']

    @property
    def user(self):
        return self.redshift_credentials()['username']

    @property
    def password(self):
        return self.redshift_credentials()['password']

    @property
    def host(self):
        return self.redshift_credentials()['host'] + ':' + self.redshift_credentials()['port']

    @property
    def table(self):
        return self.table_name

    @property
    def copy_options(self):
        return 'GZIP'

The Redshift-specific elements here are:

  • table_name: This is the name of the Redshift table that will receive the data from S3.
  • columns: Defines the schema of the Redshift table.
  • redshift_credentials and the @property methods are needed to read your Redshift access credentials and pass them to Redshift.

output_base_path: As this task is writing to a Redshift table and not generating any output data files, this S3 location is used to store a 'token' file indicating when the task has been completed.

s3_load_path points to the location in S3 where the data will be loaded from for copying into Redshift.

requires is a method that needs to be defined for every Luigi task, and it indicates dependencies for the task. This can either be other Luigi tasks, data in a specified location, or nothing.

Example in Context

The Redshift data app includes an example pipeline that builds an Amazon Redshift data warehouse from Wikipedia pageview data stored in S3. You can see how the S3CopyToTable Task works in the context of that pipeline in the source code here.