Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

Run Your Pipeline

If you have run the example pipeline and updated the Pig scripts, there is very little you need to do to run your own code in the Luigi pipeline. You might choose to copy the luigi script into a new file or modify it in place.

Cluster Size

If the best cluster size for your data is not 2 (and it probably isn’t), set the cluster_size parameter to the size that has worked well for you. When the pipeline starts it will look for a cluster of at least this size, and create it if one doesn’t exist.

# cluster size to use
cluster_size = luigi.IntParameter(default=2)

Script Names

If you modified the provided scripts, you don’t need to change anything. If you created new ones or changed the names, then make sure that each script() declaration has the right script name.

def script(self):
    """
    Name of the script to run.
    """
    return '01-generate-signals'

Parameters

Whatever parameters you have defined in my_recommender.params need to be reflected in the parameters() methods of ItemItemRecs and UserItemRecs.

def parameters(self):
    return {'OUTPUT_PATH': self.output_base_path,
            'LOGISTIC_PARAM': 2.0,
            'MIN_LINK_WEIGHT': 1.0,
            'MAX_LINKS_PER_USER': 100,
            'BAYESIAN_PRIOR': 4.0,
            'NUM_RECS_PER_ITEM': 5,
            'default_parallel': self.default_parallel()}

def parameters(self):
    return {'OUTPUT_PATH': self.output_base_path,
            'NUM_RECS_PER_USER': 5,
            'ADD_DIVERSITY_FACTOR':False,
            'default_parallel': self.default_parallel()}

All of the standard ones are already defined, so you can just change the ones you need to. Also note that default_parallel is set based on a computation using cluster_size. This is an optimization that guarantees using the number of Reducers available.

Write Throughput

MongoDB

The "Write Throughput" section only applies to users that are writing their recommendations to DynamoDB.


DBMS

The "Write Throughput" section only applies to users that are writing their recommendations to DynamoDB.


In the example script, when we create a new table we use a very low write throughput. Because the retail data set is so small, we don’t need to write out very much, so 10 is a reasonable number. For large data sets a much higher number is required. This number will only be high while the data is being written, and then it will drop down. For details on DynamoDB throughput pricing, see Amazon’s pricing page.

class CreateUITable(dynamodb.CreateDynamoDBTable):

    # initial write throughput of the table
    write_throughput = luigi.IntParameter(10)

class CreateIITable(dynamodb.CreateDynamoDBTable):

    # initial write throughput of the table
    write_throughput = luigi.IntParameter(10)

A good value to try is 5000. You can adjust the writes from there if the process is going too slowly or ends very quickly.

Note: make sure you make this change in the CreateTable tasks, and not in the UpdateTableThroughput tasks.

Sentinel Ids

Both SanityTestIITable and SanityTestUITable have lists of ids to be checked after the output tables are written. These ids are specific to the retail example and should be changed to reflect your data. If you haven't yet decided what ids to use, it’s fine to return an empty list.

class SanityTestIITable(dynamodb.SanityTestDynamoDBTable):

    # sentinel ids expected to be in the result data
    def ids(self):
        return ["the sixth sense", "48 hours", "friday the thirteenth", "the paper chase", "la femme nikita"]

class SanityTestUITable(dynamodb.SanityTestDynamoDBTable):

    # sentinel ids expected to be in the result data
    def ids(self):
        return ["90a9f83e789346fdb684f58212e355e0", "7c5ed8aacdb746f9b595bda2638de0dc", "bda100dcd4c24381bc24112d4ce46ecf",
                "f8462202b59e4e6ea93c09c98ecddb9c", "e65228e3b8364cb483361a81fe36e0d1"]

MongoDB

If your final results are being written to MongoDB you need to change "dynamodb.SanityTestDynamoDBTable" to "mongodb.SanityTestMongoDBCollection" to use the MongoDB specific sanity test task.


DBMS

If your final results are being written to SQL database you need to change "dynamodb.SanityTestDynamoDBTable" to "dbms.SanityTestPostgresTable" or "dbms.SanityTestMySQLTable" to use the database specific sanity test task.


Remove User-Item Recommendations if Needed

If you aren’t generating user-item recommendations, you’ll want to remove the tasks that pertain to them, which are:

  • UserItemRecs
  • CreateUITable
  • UpdateUITableThroughput
  • SanityTestUITable

Additionally, you’ll need to open up 04-write-results-to-dynamodb.pig and remove these lines:

user_item_recs = LOAD '$OUTPUT_PATH/user_item_recs'
    USING PigStorage()
    AS (from_id:chararray, to_id:chararray, weight:float, reason_item:chararray, user_reason_item_weight:float, item_reason_item_weight:float, rank:int);

-- STORE the item_item_recs into dynamo
STORE user_item_recs
INTO '$OUTPUT_PATH/unused-ui-table-data'
USING com.mortardata.pig.storage.DynamoDBStorage('$UI_TABLE', '$AWS_ACCESS_KEY_ID', '$AWS_SECRET_ACCESS_KEY');

MongoDB

If you copied mongo-luigi.py instead of retail-luigi.py you will just need to delete:

  • UserItemRecs
  • SanityTestUITable

You will also need to modify pigscripts/mongo/04-write-results-to-mognodb.pig and remove these lines:

    user_item_recs = LOAD '$OUTPUT_PATH/user_item_recs'
        USING PigStorage()
        AS (from_id:chararray, to_id:chararray, weight:float, reason_item:chararray,
            user_reason_item_weight:float, item_reason_item_weight:float, rank:int);

    store user_item_recs  into '$CONN/$DB.$UI_COLLECTION'
                         using com.mongodb.hadoop.pig.MongoInsertStorage('','');


DBMS

If you copied dbms-luigi.py instead of retail-luigi.py you will just need to delete:

  • UserItemRecs
  • CreateUITable
  • SanityTestUITable

You will also need to modify pigscripts/dbms/04-write-results-to-dbms.pig and remove these lines:

    user_item_recs = LOAD '$OUTPUT_PATH/user_item_recs'
        USING PigStorage()
        AS (from_id:chararray, to_id:chararray, weight:float, reason_item:chararray,
            user_reason_item_weight:float, item_reason_item_weight:float, rank:int);

    store user_item_recs  into 'hdfs:///unused-ignore'
       USING org.apache.pig.piggybank.storage.DBStorage('$DATABASE_DRIVER',
       'jdbc:$DATABASE_TYPE://$DATABASE_HOST/$DATABASE_NAME',
       '$DATABASE_USER',
       '$DATABASE_PASS',
       'INSERT INTO $UI_TABLE(from_id,to_id,weight,reason_item,user_reason_item_weight,item_reason_item_weight,rank) VALUES (?,?,?,?,?,?,?)');


Run Pipeline

To run the pipeline you’ll want to make sure that the arguments are pointing to your S3 bucket:

mortar luigi luigiscripts/retail-luigi.py \
    --input-base-path "s3://<your-input-bucket>" \
    --output-base-path "s3://<your-output-bucket>" \
    --dynamodb-table-name "<dynamo-table-name>"

Once the pipeline runs with a given output location it will not run again unless the data in that location is deleted or a different output location is given. The pipeline is idempotent, so running it multiple times has the same result as running it once, meaning that once a task has completed successfully it won’t run it again. Changing the output location (for example, using a date parameter: s3://<your-output-bucket>/<date-today>) causes it to behave like a new pipeline.

MongoDB

The equivalent command for running a MongoDB pipeline is:

mortar luigi luigiscripts/mongo-luigi.py \
    --output-base-path "s3://<your-output-bucket>/output" \
    --mongodb-output-collection-name "<your-output-collection-name>"


DBMS

The equivalent command for running a DBMS pipeline is:

mortar luigi luigiscripts/dbms-luigi.py \
    --output-base-path "s3://<your-output-bucket>/output" \
    --data-store-path "s3://<your-output-bucket>/data-store" \
    --table-name-prefix "<your-recsys-table-name-prefix>"


Also be aware that DynamoDB tables must have unique names within your account, so trying to create the same one twice will fail.

With a working pipeline, it's time to integrate results into your application.