Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

Run the Example Pipeline

Now that we’ve introduced Luigi, let’s run an example Luigi pipeline to build recommendations and store them to DynamoDB, Mongo, or a DBMS.

Luigi Script

The mortar-recsys project has a directory called luigiscripts, which contains Luigi pipelines that can be run on Mortar.

MongoDB

The luigiscripts directory contains a Luigi script called mongo-luigi.py that runs the retail example (now broken into four scripts, for reasons we’ll discuss later). The mongo-luigi.py script will read data from your MongoDB database, and write the output recommendations to your S3 bucket. It will then copy those recommendations to MongoDB, verify the results, and then shut down the clusters used.

Here's how to run this example:

First, you'll need to prep the example by loading the Last.fm data into your MongoDB instance. To do that you need to set your MongoDB connection string, database, and target collection, and then run a Pig script we've provided to put the example data into a collection in your Mongo DB.

    mortar config:set CONN=mongodb://<username>:<password>@<host>:<port>
    mortar config:set DB=<databasename>
    mortar config:set COLLECTION=lastfm_plays
    mortar run pigscripts/mongo/load_lastfm_data_to_mongo.pig --clustersize 10 

Next, copy luigiscripts/mongo.client.cfg.template to luigiscripts/client.cfg.template. Each time you run Luigi, Mortar will expand the variables in this file (e.g. ${MORTAR_EMAIL}) to their actual values and store the result in luigiscripts/client.cfg. You should not check luigiscripts/client.cfg into source control, as it will be generated on each new run. You can read more about Luigi configuration here.

To run the script, type at the command prompt:

mortar luigi luigiscripts/mongo-luigi.py \
      --output-base-path "s3://<your-s3-bucket>/mongo-lastfm" \
      --mongodb-output-collection-name "mortar_luigi_demo"

When you run the command, you should see the following:

Taking code snapshot... done
Sending code snapshot to Mortar... done
Requesting job execution... done
job_id: some_job_id

Job status can be viewed on the web at:

 https://app.mortardata.com/jobs/pipeline_job_detail?job_id=some_job_id

This tells you that your job has started successfully and gives you the URL to monitor its progress. If you open that URL, you should see your Luigi job running, and should see logs start streaming into the Logs console.

To get a better idea of what's actually happening see Luigi Tasks below.


DBMS

The luigiscripts directory contains a Luigi script called dbms-luigi.py that runs the retail example (now broken into four scripts, for reasons we’ll discuss later). The dbms-luigi.py script will read data from your S3, and write the output recommendations to your S3 bucket. It will then store those recommendations to your database, verify the results, and shut down the clusters used.

The script is currently written to store to a PostgreSQL database, though it can be updated to use MySQL fairly simply.

Here's how to run this example. First, you'll need to set your target database connection information using Secure Configuration Parameters. From the root of your Mortar project, run the following, filling in each value accordingly:

    mortar config:set DBNAME=<target_database_name>
    mortar config:set USER=<target_database_user>
    mortar config:set PASSWORD=<target_database_password>
    mortar config:set HOST=<target_database_host>
    mortar config:set PORT=<target_database_port>

To run the script, type at the command prompt:

mortar luigi luigiscripts/dbms-luigi.py \
    --output-base-path "s3://<your-bucket>/dbms-lastfm" \
    --data-store-path "s3://<your-bucket>/lastfm-data" \
    --table-name-prefix "<table-name-prefix>"

When you run the command, you should see the following:

Taking code snapshot... done
Sending code snapshot to Mortar... done
Requesting job execution... done
job_id: some_job_id

Job status can be viewed on the web at:

 https://app.mortardata.com/jobs/pipeline_job_detail?job_id=some_job_id

This tells you that your job has started successfully and gives you the URL to monitor its progress. If you open that URL, you should see your Luigi job running, and should see logs start streaming into the Logs console.

To get a better idea of what's actually happening see Luigi Tasks below.


This directory contains a Luigi script called retail-luigi.py that runs the retail example (now broken into three scripts, for reasons we’ll discuss later) and writes the output data to your S3 bucket. It will then create DynamoDB tables in your AWS account for both the user-item and item-item recommendations, write the results from S3 to the DynamoDB tables, ramp down the write throughput on the tables, verify the results, and then shut down the clusters used.

We find DynamoDB a convenient and scalable way to write results quickly and retrieve them efficiently, which is why we’ve included it in our example. Ultimately the data should end up in the data store that works for your business.

The luigiscripts directory also contains client.cfg.template, a Luigi configuration file. Each time you run Luigi, Mortar will expand the variables in this file (e.g. ${MORTAR_EMAIL}) to their actual values and store the result in luigiscripts/client.cfg. You should not check luigiscripts/client.cfg into source control, as it will be generated on each new run. You can read more about Luigi configuration here.

Running the Example

Check Your DynamoDB IAM Policy

If you are using AWS IAM keys, you will need to add an additional policy to your IAM user to be able to create tables and write data to DynamoDB.

{
    "Statement": [
        {
            "Effect":"Allow",
            "Action":"dynamodb:*",
            "Resource":"*"
        }
    ]
}

For more information on IAM policies, view our help docs or check out Amazon's JSON IAM policy format.

Run the Luigi Pipeline

To run the script, type at the command prompt:

mortar luigi luigiscripts/retail-luigi.py \
    --input-base-path "s3://mortar-example-data/retail-example" \
    --output-base-path "s3://<your-s3-bucket>/retail" \
    --dynamodb-table-name "<dynamo-table-name>"

Fill in your-s3-bucket and dynamo-table-name. For your-s3-bucket, use a bucket in your AWS account where you can store data. This bucket will store intermediate recommendations and will be used by Luigi to record which tasks have completed successfully. For dynamo-table-name, you can use any table name that doesn’t currently exist in your AWS account.

When you run the command, you should see the following:

Taking code snapshot... done
Sending code snapshot to Mortar... done
Requesting job execution... done
job_id: some_job_id

Job status can be viewed on the web at:

 https://app.mortardata.com/jobs/pipeline_job_detail?job_id=some_job_id

This tells you that your job has started successfully and gives you the URL to monitor its progress. If you open that URL, you should see your Luigi job running, and should see logs start streaming into the Logs console.

Pipeline Job Details

While your job is proceeding, we’ll talk about what’s actually happening.

Luigi Tasks

Luigi scripts are broken up into tasks, each of which inherits from the class luigi.Task.

When you first start seeing log output, it should look something like this:

2014-02-28 13:51:23,028 [48365:MainThread] (worker.py:172)  DEBUG - Checking if ShutdownClusters(input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/my-retail-example, dynamodb_table_name=my-dynamodb-table) is complete

2014-02-28 13:51:23,324 [48365:MainThread] (s3.py:82)  DEBUG - Path s3://mortar-example-output-data/my-retail-example/ShutdownClusters does not exist

2014-02-28 13:51:23,324 [48365:MainThread] (worker.py:221)  INFO  - Scheduled ShutdownClusters(input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/my-retail-example, dynamodb_table_name=my-dynamodb-table)

2014-02-28 13:51:23,325 [48365:MainThread] (worker.py:172)  DEBUG - Checking if SanityTestUITable(min_total_results=100, non_null_fields=(), result_length=5, failure_threshold=2, input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/my-retail-example, dynamodb_table_name=my-dynamodb-table) is complete

What’s going on here is that we ask Luigi to run ShutdownClusters, the last task in the dependency graph. It checks that it hasn’t already been run, and schedules it. When Luigi schedules ShutdownClusters, it finds that the task depends on SanityTestUITable, so then it checks the status of that task, and so on.

MongoDB

The tasks described below are from the S3-to-DynamoDB example described above. If you ran the mongo-luigi.py example instead you should still read the sections on MortarProjectPigscriptTask and ShutdownClusters but you can skip any of the tasks that mention DynamoDB.


DBMS

The tasks described below are from the S3-to-DynamoDB example described above. If you ran the dbms-luigi.py example instead you should still read the sections on MortarProjectPigscriptTask and ShutdownClusters but you can skip any of the tasks that mention DynamoDB. CreateDynamoDBTable and SanityTestDynamoDBTable is also worth reading, as the database pipeline contains similar tasks, running against a SQL database rather than DynamoDB.


The major categories of tasks used in Mortar are:

  • MortarProjectPigscriptTask: runs a Pig script on the cloud using Mortar
  • CreateDynamoDBTable: creates a DynamoDB table
  • UpdateDynamoDBThroughput: changes the reads and writes on a DynamoDB table
  • SanityTestDynamoDBTable: verifies that a populated DynamoDB table has entries that match specified ids

Each task writes a token to an S3 location, indicating that the task has been successfully completed. If task A requires the completion of task B, then it will check for the presence of the task B token.

MortarProjectPigscriptTask

This type of task is used for running Pig scripts with Mortar, just as they can be run from the command line using the mortar client.

A MortarProjectPigscriptTask must define:

  • script(): the name of the Pig script to be run
  • token_path(): where to write the tokens indicating task progression
  • script_output(): where the output data for the script is to be written

It is also very common to want to override the defaults for:

  • cluster_size: number of nodes used to run the job
  • parameters(): dictionary of parameters for the job

In the example retail luigi script, the cluster_size is set to zero because this is a very small data set, so the entire script can run in Local Mode (which is free, and fast for small jobs) without launching a cluster.

The output of the Pigscript task looks something like this:

2014-02-28 13:51:25,386 [48365:MainThread] (worker.py:233)  INFO  - [pid 48365] Running   GenerateSignals(run_on_single_use_cluster=False, use_spot_instances=False, git_ref=master, notify_on_job_finish=False, job_polling_interval=5, num_polling_retries=3, pig_version=None, input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/cmiller-retail-output4, cluster_size=2)

2014-02-28 13:51:25,801 [48365:MainThread] (s3.py:82)  DEBUG - Path s3://mortar-example-output-data/cmiller-retail-output4/GenerateSignals-Running does not exist

2014-02-28 13:51:26,734 [48365:MainThread] (mortartask.py:173)  INFO  - Submitted new job to mortar with job_id [5310daaed718826d09ce02e9]

2014-02-28 13:53:20,163 [48365:MainThread] (mortartask.py:201)  INFO  - Mortar job_id [5310daaed718826d09ce02e9] switched to status_code [validating_script], description: Validating Script

2014-02-28 13:54:06,553 [48365:MainThread] (mortartask.py:201)  INFO  - Mortar job_id [5310daaed718826d09ce02e9] switched to status_code [starting_cluster], description: Starting Cluster

2014-02-28 13:59:16,314 [48365:MainThread] (mortartask.py:201)  INFO  - Mortar job_id [5310daaed718826d09ce02e9] switched to status_code [running], description: Running

2014-02-28 13:59:16,314 [48365:MainThread] (mortartask.py:206)  INFO  - Mortar job_id [5310daaed718826d09ce02e9] progress: [0%]

2014-02-28 13:59:57,529 [48365:MainThread] (mortartask.py:206)  INFO  - Mortar job_id [5310daaed718826d09ce02e9] progress: [50%]

2014-02-28 14:00:03,135 [48365:MainThread] (mortartask.py:206)  INFO  - Mortar job_id [5310daaed718826d09ce02e9] progress: [100%]

2014-02-28 14:00:08,250 [48365:MainThread] (mortartask.py:201)  INFO  - Mortar job_id [5310daaed718826d09ce02e9] switched to status_code [success], description: Success

The task verifies that the job is not already running, then asks Mortar to run the Pig script. The output from the Mortar job is then displayed on the screen, from starting the cluster if needed to displaying progress to showing final status. This should look familiar from running Mortar jobs directly on the command line.

The task will complete on a successful run, and throw an error if the job does not complete successfully.

CreateDynamoDBTable

This task creates a DynamoDB table with the specified HashKey, RangeKey, throughput, and indexes. This is not something you’ll need to change yourself unless you want a table with a different schema. The details of the DynamoDB tables created for the standard recommendation pipeline will be discussed later.

A CreateDynamoDBTable task must define:

  • read_throughput: initial read throughput of the table
  • write_throughput: initial write throughput of the table
  • hash_key: primary HashKey for the table
  • hash_key_type: type of the primary HashKey for the table
  • table_name(): name of the table to be created

It is also common to define:

  • range_key: primary RangeKey for the table
  • range_key_type: type of the primary RangeKey for the table
  • indexes: additional indexes for the table

DynamoDB only allows indexes to be created at the same time the table is created, so it’s important to get them correct.

The output of a CreateDynamoDBTable task looks like this:

2014-02-28 14:13:40,277 [48365:MainThread] (worker.py:233)  INFO  - [pid 48365] Running   CreateIITable(indexes=None, input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/my-retail-example, dynamodb_table_name=my-dynamodb-table, read_throughput=1, write_throughput=10)

2014-02-28 14:13:40,663 [48365:MainThread] (dynamodb.py:61)  INFO  - Created new dynamodb table my-dynamodb-table-II with schema [<boto.dynamodb2.fields.HashKey object at 0x101d039d0>, <boto.dynamodb2.fields.RangeKey object at 0x101d116d0>]

2014-02-28 14:13:40,691 [48365:MainThread] (dynamodb.py:109)  DEBUG - Table my-dynamodb-table-II is in status CREATING

2014-02-28 14:13:50,755 [48365:MainThread] (dynamodb.py:109)  DEBUG - Table my-dynamodb-table-II is in status CREATING

2014-02-28 14:14:00,808 [48365:MainThread] (dynamodb.py:109)  DEBUG - Table my-dynamodb-table-II is in status CREATING

2014-02-28 14:14:10,855 [48365:MainThread] (dynamodb.py:109)  DEBUG - Table my-dynamodb-table-II is in status CREATING

2014-02-28 14:14:20,909 [48365:MainThread] (dynamodb.py:106)  INFO  - Table my-dynamodb-table-II is ACTIVE with throughput {'read': 1, 'write': 10}

2014-02-28 14:14:21,050 [48365:MainThread] (worker.py:247)  INFO  - [pid 48365] Done      CreateIITable(indexes=None, input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/my-retail-example, dynamodb_table_name=my-dynamodb-table, read_throughput=1, write_throughput=10)

After checking its preconditions, the task sends a request to DynamoDB to create a new table based on the specifications. It then waits for table creation to finish. While table creation is in process the table’s status will be "CREATING."

The task will complete on successful table creation, and throw an error if a table is not created (most commonly because the name already exists).

UpdateDynamoDBThroughput

This task updates the read and write throughput of a DynamoDB table. Writes need to be high when putting data into a table but are unneeded afterward, so it is most cost-effective to change the throughput immediately after completing the data write.

An UpdateDynamoDBThroughput task must define:

  • read_throughput: target read throughput of the table
  • write_throughput: target write throughput of the table
  • table_name: name of the table being changed

The output of an UpdateDynamoDBThroughput task looks like this:

2014-02-28 14:24:43,043 [48365:MainThread] (worker.py:233)  INFO  - [pid 48365] Running   UpdateIITableThroughput(read_throughput=1, write_throughput=1, input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/my-retail-example, dynamodb_table_name=my-dynamodb-table)

2014-02-28 14:24:43,220 [48365:MainThread] (dynamodb.py:91)  INFO  - Round 0: Updating table to throughput {'read': 1, 'write': 1}

2014-02-28 14:24:43,897 [48365:MainThread] (dynamodb.py:109)  DEBUG - Table my-dynamodb-table-II is in status UPDATING

2014-02-28 14:25:04,068 [48365:MainThread] (dynamodb.py:109)  DEBUG - Table my-dynamodb-table-II is in status UPDATING

2014-02-28 14:25:24,491 [48365:MainThread] (dynamodb.py:109)  DEBUG - Table my-dynamodb-table-II is in status UPDATING

2014-02-28 14:25:29,520 [48365:MainThread] (dynamodb.py:106)  INFO  - Table my-dynamodb-table-II is ACTIVE with throughput {'read': 1, 'write': 1}

2014-02-28 14:25:29,670 [48365:MainThread] (worker.py:247)  INFO  - [pid 48365] Done      UpdateIITableThroughput(read_throughput=1, write_throughput=1, input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/my-retail-example, dynamodb_table_name=my-dynamodb-table)

This task check its preconditions, and if satisfied sends a request to DynamoDB to change the throughput of the specified table. It then blocks waiting for the table to ramp to its target throughput, during which time the table status is "UPDATING."

SanityTestDynamoDBTable

When running a production pipeline it can be helpful to get an early sense of whether it is functioning properly. This task is for doing a very quick check that a DynamoDB table is populated, and contains ids that are expected to be there. It does not substitute for thorough QA, but it often catches errors before they go into production.

A SanityTestDynamoDBTable task must define:

  • hash_key: primary HashKey for the table
  • table_name(): name of the table to check
  • ids(): list of ids to verify

It is also common to override:

  • min_total_results: the minimum number of entries required to be in the table
  • non_null_fields: when guaranteeing entries, make sure that they have these fields defined
  • result_length: for each id checked, how many entries it is required to have
  • failure_threshold: how many ids are allowed to fail

The output of a SanityTestDynamoDBTable task looks like this:

2014-02-28 14:26:00,984 [48365:MainThread] (worker.py:233)  INFO  - [pid 48365] Running   SanityTestIITable(min_total_results=100, non_null_fields=(), result_length=5, failure_threshold=2, input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/my-retail-example, dynamodb_table_name=my-dynamodb-table)

2014-02-28 14:26:01,638 [48365:MainThread] (worker.py:247)  INFO  - [pid 48365] Done      SanityTestIITable(min_total_results=100, non_null_fields=(), result_length=5, failure_threshold=2, input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/my-retail-example, dynamodb_table_name=my-dynamodb-table)

The task checks its preconditions, and if satisfied will perform queries against the DynamoDB table, verifying that the required data is present. The task completes if it doesn’t hit any error thresholds; otherwise it throws an exception.

ShutdownClusters

This aptly named task shuts down all active idle clusters for the user.

The output of a ShutdownClusters task looks like this:

2014-02-28 14:26:02,279 [48365:MainThread] (worker.py:233)  INFO  - [pid 48365] Running   ShutdownClusters(input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/cmiller-retail-output4, dynamodb_table_name=cmiller-test4)

2014-02-28 14:26:03,098 [48365:MainThread] (mortartask.py:260)  INFO  - Stopping idle cluster 5310db4de4b059df9a4c55cc

2014-02-28 14:26:03,350 [48365:MainThread] (worker.py:247)  INFO  - [pid 48365] Done      ShutdownClusters(input_base_path=s3://mortar-example-data/retail-example, output_base_path=s3://mortar-example-output-data/cmiller-retail-output4, dynamodb_table_name=cmiller-test4)

This cleanup task checks its preconditions, and if satisfied shuts down all active clusters for the user that are not currently running jobs.

Complete

Finally, the end of the Luigi process has output that looks like this:

2014-02-28 14:26:03,350 [48365:MainThread] (worker.py:283)  DEBUG - Asking scheduler for work...

2014-02-28 14:26:03,350 [48365:MainThread] (worker.py:267)  INFO  - Done

2014-02-28 14:26:03,350 [48365:MainThread] (worker.py:268)  INFO  - There are no more tasks to run at this time

2014-02-28 14:26:03,380 [48365:Thread-1] (worker.py:102)  INFO  - Worker was stopped. Shutting down Keep-Alive thread

All of the tasks have been completed, and the process exits.

At this point you can go look at the results in DynamoDB. When you are ready to start adapting the pipeline to your data, go on to Splitting Up Your Scripts.