Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

DBMS Tasks

DBMS Tasks create and manage SQL database tables.

mortar-luigi

These Tasks are part of the mortar-luigi project, an open-source collection of extensions for using Mortar from within Luigi. It is installed automatically when you use the mortar local:luigi or mortar luigi commands.

Configuration

All DBMS Tasks require a connection info section to be added to your client.template.cfg file.

To run a PostgreSQLTask the following must be defined in your client.cfg.template:

[postgres]
dbname: ${DBNAME}
user: ${USER}
password: ${PASSWORD}
host: ${HOST}
port: ${PORT}

To run a MySQLTask the following must be defined in your client.cfg.template:

[mysql]
dbname: ${DBNAME}
user: ${USER}
password: ${PASSWORD}
host: ${HOST}
port: ${PORT}

You'll also need to set the following Secure Configuration Parameters on your project:

    mortar config:set DBNAME=<target_database_name>
    mortar config:set USER=<target_database_user>
    mortar config:set PASSWORD=<target_database_password>
    mortar config:set HOST=<target_database_host>
    mortar config:set PORT=<target_database_port>

Create DBMS Table

Creates a table in a SQL database.

Example Usage

PostgreSQL

class CreateTable(dbms.CreatePostgresTable):

    def output_token(self):
        return 'path-to-token-output'

    def primary_key(self):
        return ['from_id', 'value']

    def field_string(self):
        return 'id varchar, value decimal, type int'

    def table_name(self):
           return 'my-table-name'

    def requires(self):
        return [MyPigscriptTask()]

MySQL

class CreateTable(dbms.CreateMySQLTable):

    def output_token(self):
        return 'path-to-token-output'

    def primary_key(self):
        return ['id', 'value']

    def field_string(self):
        return 'id char, value decimal, type int'

    def table_name(self):
           return 'my-table-name'

    def requires(self):
        return [MyPigscriptTask()]

The field_string function returns the string that defines the table fields. This is what would appear in a CREATE TABLE statement. The field types may vary by database type.

primary_key is a list of the fields that should be included in the primary key. This can be a single or multiple fields.

The table_name function returns the name of the table to be created.

output_token defines where Luigi should put its task finished token.

requires is a method that needs to be defined for every Luigi task, and it indicates dependencies for the task. This can either be other Luigi tasks, data in a specified location, or nothing.

Example in Context

Mortar's recommendation engine data app uses a Luigi Task to create a Postgres table, and it can easily be modified to create a table in a MySQL database instead.

Extract Data from a DBMS Table

Export data from a database table to a file in S3.

Note: MySQL is currently supported, and PostgreSQL is coming soon.

Example Usage

MySQL

from mortar.luigi import dbms
class ExtractMySQLData(dbms.ExtractFromMySQL):

    # Table to extract
    table = luigi.Parameter('mytable')

    # Columns to select, in comma-delimited list.
    columns = luigi.Parameter('mycol1, mycol2, mycol3')

    # Output location for the extracted data.
    output_path = luigi.Parameter('s3://my-bucket/my-folder/mytable.txt')

Required Parameters

The table parameter provides the name of MySQL database table where data is located.

The columns parameter provides a comma-delimited list of columns to extract from the table.

The output_path parameter provides the target destination (usually in S3) where data should be stored. Data will be saved in tab-delimited format.

Optional Parameters

The where parameter allows you to define a custom WHERE clause for the extraction SQL statement. By default it is set to None, so no where clause is used and all data is extracted.

The replace_null_with_blank parameter controls whether the string "NULL" should be replaced with an empty string. This defaults to True, as the MySQL command-line tool usually exports the string "NULL" whenever a field is NULL, and most data utilities prefer an empty string.

The raw parameter controls whether the MySQL command-line tool should escape control characters like newlines, tabs, and backslashes in your data. It is usually set to False, which causes MySQL to escape control characters.

Using with mortar local:luigi

To use the ExtractFromMySQL task from mortar local:luigi, you'll need to ensure that you have the MySQL command-line tool installed on your system. This tool is installed automatically on Mortar's pipeline servers for use with mortar luigi.

Example in Context

Mortar's Redshift Data Warehouse ETL app uses an ExtractFromMySQL Task to extract wiki data from MySQL. Unlike the example above, it instantiates an ExtractFromMySQL task directly in the parent Task's requires method without creating a custom subclass. You can choose whether you prefer to create a custom subclass—either way will work.

SanityTestDBMSTable

The function of a sanity test is to give you an automated smoke test of whether data was successfully written to the DynamoDB table. It uses sentinel ids and a threshold number of records to do a quick check on whether the table has been populated.

Example Usage

PostgreSQL

class SanityTestTable(dynamodb.SanityTestPostgresTable):

    def id_field(self):
        return 'id';

    # number of results required to be returned for each primary key
    result_length = luigi.IntParameter(5)

    # when testing specific ids, how many are allowed to fail
    failure_threshold = luigi.IntParameter(2)

    # number of entries required to be in the table
    min_total_results = luigi.IntParameter(100)

    def output_token(self):
        return 'path-to-token-output'

    def table_name(self):
        return 'my-table-name'

    # sentinel ids expected to be in the result data
    def ids(self):
        return ["id_value1", "id_value2", "id_value3",
                "id_value4", "id_value5"]

    def requires(self):
           return [WriteDBMSDataTask()]

MySQL

class SanityTestTable(dynamodb.SanityTestMySQLTable):

    def id_field(self):
        return 'id';

    # number of results required to be returned for each primary key
    result_length = luigi.IntParameter(5)

    # when testing specific ids, how many are allowed to fail
    failure_threshold = luigi.IntParameter(2)

    # number of entries required to be in the table
    min_total_results = luigi.IntParameter(100)

    def output_token(self):
        return 'path-to-token-output'

    def table_name(self):
        return 'my-table-name'

    # sentinel ids expected to be in the result data
    def ids(self):
        return ["id_value1", "id_value2", "id_value3",
                "id_value4", "id_value5"]

    def requires(self):
           return [WriteDBMSDataTask()]

Elements specific to this task are:

  • id_field(): The field that corresponds to the ids defined in ids(). This will be used to select data from the table.
  • result_length: For each id tested, minimum acceptable number of rows to be returned.
  • failure_threshold: When testing the list of ids, how many ids can fail before throwing an error.
  • min_total_results: Minimum number of rows required to be in the table.
  • ids(): returns a list of the sentinel ids to be checked.

The table_name function returns the name of the table to be tested. No more than failure_threshold of these ids can fail to exist or return results smaller than result_length rows.

output_token defines where Luigi should put its task finished token.

requires is a method that needs to be defined for every Luigi task, and it indicates dependencies for the task. This can either be other Luigi tasks, data in a specified location, or nothing.

Example in Context

Mortar's recommendation engine code includes a sanity check Task to verify that a newly created table contains the expected data. The template code is written for Postgres but can easily be applied to MySQL.