Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

DynamoDB Tasks

DynamoDB Tasks manage tables in Amazon's DynamoDB NoSQL database.

mortar-luigi

These Tasks are part of the mortar-luigi project, an open-source collection of extensions for using Mortar from within Luigi. It is installed automatically when you use the mortar local:luigi or mortar luigi commands.

Configuration

All DynamoDB Tasks require the following section to be added to your client.template.cfg file:

#
# AWS credentials to write your recommendations
# to DynamoDB.
#
[dynamodb]
aws_access_key_id: ${AWS_ACCESS_KEY_ID}
aws_secret_access_key: ${AWS_SECRET_ACCESS_KEY}

CreateDynamoDBTable

Creates a table in DynamoDB.

Example Usage

class CreateMyDynamoDBTable(dynamodb.CreateDynamoDBTable):

    # Initial read throughput of the table
    read_throughput = luigi.IntParameter(1)

    # Initial write throughput of the table
    write_throughput = luigi.IntParameter(1000)

    # Name of the primary hash key for this table
    hash_key = 'unique_id'

    # Type of the primary hash key (boto.dynamodb2.types)
    hash_key_type = STRING

    def output_token(self):
        return 'path-to-token-output'

    def table_name(self):
        return 'my-table-name'

    def requires(self):
        return [MyPigscriptTask()]

The Dynamo-specific elements here are:

  • read_throughput: This should be scaled to the number of reads you expect to have on the database, but can always be ramped up or down.
  • write_throughput: If the next step is to write to the database, this should be a large number. You can turn it down after you've written data to it.
  • hash_key: This functions as both a primary key and a unique index on the table (in the absence of a range key).
  • hash_key_type: The boto.dynamodb2.types type of the hash_key.

The table_name function returns the name of the table to be created. Remember that table names are unique within an account in DynamoDB.

output_token defines where Luigi should put its task finished token.

requires is a method that needs to be defined for every Luigi task, and it indicates dependencies for the task. This can either be other Luigi tasks, data in a specified location, or nothing.

More Options

class CreateMyAllOptionsDynamoDBTable(dynamodb.CreateDynamoDBTable):

    # Initial read throughput of the table
    read_throughput = luigi.IntParameter(1)

    # Initial write throughput of the table
    write_throughput = luigi.IntParameter(1000)

    # Name of the primary hash key for this table
    hash_key = 'unique_id'

    # Type of the primary hash key (boto.dynamodb2.types)
    hash_key_type = STRING

    # Name of the primary range key for this table
    range_key = 'unique_range'

    # Type of the primary range key for this table (boto.dynamodb2.types)
    range_key_type = NUMBER

    # Secondary indexes of the table
    indexes = [ {'name': 'MySecondaryIndex', 'range_key': 'range_value', 'data_type': STRING}]

    def output_token(self):
        return 'path-to-token-output'

    def table_name(self):
        return 'my-table-name'

    def requires(self):
        return [MyPigscriptTask()]

Optional elements used here are:

  • range_key: If a range_key is specified, then it and the hash_key are treated as a primary key and combined unique index.
  • range_key_type: The boto.dynamodb2.types type of the range_key. Must be specified if range_key is used.
  • indexes: Set of secondary indexes for the table. All secondary indexes must be created at table creation time. For more on secondary indexes, see Amazon's documentation.

Example in Context

Mortar's recommendation engine data app uses CreateDynamoDBTable Tasks to create tables for storing the recommender's output, in the form of item-item recommendations and user-item recommendations.

UpdateDynamoDBThroughput

DynamoDB scales via the write throughput and read throughput. Both can be reduced by any amount immediately, but throughput can only be doubled within a small time window. Thus increasing throughput involves and ramping process of doubling and waiting for the changes to take effect before doubling again.

Example Usage

class UpdateMyTableThroughput(dynamodb.UpdateDynamoDBThroughput):

    # target read throughput of the dynamodb table
    read_throughput = luigi.IntParameter(10)

    # target write throughput of the dynamodb table
    write_throughput = luigi.IntParameter(1)

    def requires(self):
        return [WriteDataToDynamoDB()]

    def table_name(self):
        return 'my-table-name'

    def output_token(self):
            return 'path-to-token-output'

All of these fields are similar to those in the CreateDynamoDBTableTask. read_throughput and write_throughput are the throughput values the table should end up at. The table_name function returns the name of the table to be modified; an error will be thrown if the table does not exist.

As with all Luigi tasks, requires describes the dependencies of the task. output_token defines where Luigi should put its task finished token.

Example in Context

Mortar's recommendation engine data app uses the UpdateDynamoDBThroughput Task to ramp down the writes and/or up the reads after the tables have been written to make the tables ready for production use.

SanityTestDynamoDBTable

The function of a sanity test is to give you an automated smoke test of whether data was successfully written to the DynamoDB table. It uses sentinel ids and a threshold number of records to do a quick check on whether the table has been populated.

Example Usage

class SanityTestMyDynamoDBTable(dynamodb.SanityTestDynamoDBTable):

    # primary hash key for the DynamoDB table
    hash_key = 'unique_id'

    # number of results required to be returned for each primary key
    result_length = luigi.IntParameter(5)

    # when testing specific ids, how many are allowed to fail
    failure_threshold = luigi.IntParameter(2)

    # number of entries required to be in the table
    min_total_results = luigi.IntParameter(100)

    def output_token(self):
        return 'path-to-token-output'

    def table_name(self):
        return 'my-table-name'

    # sentinel ids expected to be in the result data
    def ids(self):
        return ["hash_key_value1", "hash_key_value2", "hash_key_value3",
                "hash_key_value4", "hash_key_value5"]

    def requires(self):
           return [UpdateMyDynamoDBThroughputTask()]

Elements specific to this task are:

  • hash_key: Hash key for the table. This is needed to select data from the table.
  • result_length: For each id tested, minimum acceptable number of rows to be returned.
  • failure_threshold: When testing the list of ids, how many ids can fail before throwing an error.
  • min_total_results: Minimum number of rows required to be in the table.

The table_name function returns the name of the table to be tested. The ids function returns a list of the sentinel ids to be checked. No more than failure_threshold of these ids can fail to exist or return results smaller than result_length rows.

output_token defines where Luigi should put its task finished token.

requires is a method that needs to be defined for every Luigi task, and it indicates dependencies for the task. This can either be other Luigi tasks, data in a specified location, or nothing.

Example in Context

The mortar-recsys project uses sanity tests to make sure that newly created DynamoDB tables contain a few items that are expected to show up in the output data.