Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

R in Mortar
Add an R Script to Your Data Pipeline

Add an R Script to Your Data Pipeline

This article will show you how to add an R script to your Mortar Project and run it in a data pipeline.

Overview

To run an R script in Mortar, you include it inside a data pipeline in your Mortar Project. Your data pipeline (and R script) can be run locally on your computer or in the cloud on a Mortar pipeline server.

We'll walk step-by-step through how to integrate your R script.

1. Create the rscripts directory

Mortar Projects have a standardized structure that keeps the various elements of a project (Pigscripts, Luigiscripts, R scripts) organized.

We recommend putting your R script into the rscripts directory underneath your Mortar Project. To create this directory and set it to sync to the cloud, run the following from the root of your Mortar Project:

# create the rscripts folder
cd my_mortar_project
mkdir -p rscripts

# ensure it remains in git
touch rscripts/.gitkeep
git add rscripts/.gitkeep

# ensure that rscripts syncs to the cloud
echo "rscripts" >> project.manifest

# commit
git commit -m "Add rscripts folder" rscripts/.gitkeep project.manifest

This will create a folder named rscripts and set it up to sync to the cloud.

2. Add Your R Script

Save your R script into the rscripts directory you just created. For example, we might create a R script named rscripts/regression.R that looks like:

# regression.R

# set the width for output
options(width=400)

# Install an example library
install.packages("rjson", repos="http://lib.stat.cmu.edu/R/CRAN/")
library(rjson)

# load up the input arguments
args <- commandArgs(TRUE)
print(args)
input_path <- args[1]
output_path <- args[2]

# Read input file
data <- readLines(input_path)
# Create data frame from parsed json.
jsonData <- lapply(data, function(x) as.data.frame(fromJSON(x)))

# ...run regression here...
#resultData <- ...

# write results back to file
write.table(resultData, file=output_path, sep=",")

You'll generally want to test out your R script by running it manually before including it your pipeline. Assuming you've installed R on your computer, you can test your script with Rscript. For our example, we'd run:

Rscript rscripts/regression.R /path/to/input.json /path/to/output.json

3. Add Your Data Pipeline

To include an R script in your data pipeline, you add three tasks to your pipeline. The first pulls data down from the S3, the second runs the script, and the third uploads the results back to S3.

For example, if we wanted to run the regression.R script above, we would use the following pipeline (or add these Tasks to an existing pipeline):

# luigiscripts/my_data_pipeline.py

import os.path
import tempfile

import luigi

from mortar.luigi.mortartask import MortarRTask
from mortar.luigi.s3transfer import S3ToLocalTask, LocalToS3Task

"""
This is a template for building your own luigi data pipeline
to download data from S3, run an R script on it, and upload the
results back to S3.

Task Order:
    CopyDataFromS3
    MyRScriptTask
    CopyOutputToS3

To run:

    mortar luigi luigiscripts/<name-of-luigiscript>.py \
      --s3-input-path "s3://my-input-bucket/my-input-path/my-file.json" \
      --s3-path "s3://my-output-bucket/my-output-path/my-file.csv"
"""

# local temporary directory
LOCAL_TEMP_DIR = tempfile.gettempdir()

def get_temp_file(filename):
    """
    Helper method to get a path to a file in the local temporary
    directory.
    """
    return os.path.join(LOCAL_TEMP_DIR, filename)

class CopyDataFromS3(S3ToLocalTask):
    """
    Copy the source data from S3 down to the local filesystem.
    """

    # source path for the data in S3
    s3_path = luigi.Parameter()

    # target path for the data on local file system
    local_path = luigi.Parameter(default=get_temp_file('regression_output.json'))

    def requires(self):
        """
        Previous Tasks that this Task depends on.
        """
        return []

class MyRScriptTask(MortarRTask):
    """
    Run my R regression script.
    """

    # path for storing a token to tell Luigi that the task is complete.
    # we will will not be using this, as our R script will produce actual
    # output that we can ask Luigi to check for
    token_path = luigi.Parameter(default=LOCAL_TEMP_DIR)

    # input path on S3. we pass this through
    # to the task that loads our data from S3
    s3_input_path = luigi.Parameter()

    # local file where R will write output
    output_path = luigi.Parameter()

    def rscript(self):
        """
        R script to run. This should be a path relative to the 
        root directory of your Mortar project.
        """
        return 'rscripts/regression.R'

    def arguments(self):
        """
        List of arguments to be sent to your R script.
        """
        # grab the path to the local file created by 
        # CopyDataFromS3
        input_data_file = self.input()[0][0].path

        # tell our script to read data from the local input file
        # and write it out to the local output file
        return [input_data_file, self.output_path]

    def requires(self):
        """
        This task requires that the input data
        has first been copied from S3.
        """
        # require that data be moved in from S3
        return [CopyDataFromS3(s3_path=self.s3_input_path)]

    def output(self):
        """
        The R script will ouput a file,
        so we tell Luigi to look for that to know whether the task has
        already completed.
        """
        return luigi.LocalTarget(self.output_path)

class CopyOutputToS3(LocalToS3Task):
    """
    Copy the output of the R script to S3.
    """
    # original input data path on S3
    s3_input_path = luigi.Parameter()

    # intermediate local storage for the output
    local_path = luigi.Parameter(default=get_temp_file('regression_output.csv'))

    # target S3 path for the output
    s3_path = luigi.Parameter()

    def requires(self):
        """
        Require that the output has been generated before we
        copy it to S3.
        """
        # we pass the path for the output to MyRScriptTask so
        # R knows where to store it
        return [MyRScriptTask(
            s3_input_path=self.s3_input_path,
            output_path=self.local_path)]

if __name__ == "__main__":
    luigi.run(main_task_cls=CopyOutputToS3)

To use this template, you should only need to ensure that the rscript method references the approrpiate R script, and that the arguments match the actual arguments accepted by your script.

4. Run the Data Pipeline

You can either run your data pipeline locally or in the cloud. We'll run this example in the cloud.

When running, you'll need to pass the S3 path where input data can be found (--s3-input-path) and where output data should be written (--s3-path):

mortar luigi luigiscripts/my_data_pipeline.py \
    --s3-input-path "s3://my-input-bucket/my-input-path/my-file.json" \
    --s3-path "s3://my-output-bucket/my-output-path/my-file.csv"

When you run the command, you should see the following:

Taking code snapshot... done
Sending code snapshot to Mortar... done
Requesting job execution... done
job_id: some_job_id

Job status can be viewed on the web at:

 https://app.mortardata.com/jobs/pipeline_job_detail?job_id=some_job_id

This tells you that your job has started successfully and gives you the URL to monitor its progress. If you open that URL, you should see your pipeline running, first moving data from S3 and then running your R script on it.