This article will show you how to add an R script to your Mortar Project and run it in a data pipeline.
To run an R script in Mortar, you include it inside a data pipeline in your Mortar Project. Your data pipeline (and R script) can be run locally on your computer or in the cloud on a Mortar pipeline server.
We'll walk step-by-step through how to integrate your R script.
Mortar Projects have a standardized structure that keeps the various elements of a project (Pigscripts, Luigiscripts, R scripts) organized.
We recommend putting your R script into the
rscripts directory underneath your Mortar Project. To create this directory and set it to sync to the cloud, run the following from the root of your Mortar Project:
# create the rscripts folder cd my_mortar_project mkdir -p rscripts # ensure it remains in git touch rscripts/.gitkeep git add rscripts/.gitkeep # ensure that rscripts syncs to the cloud echo "rscripts" >> project.manifest # commit git commit -m "Add rscripts folder" rscripts/.gitkeep project.manifest
This will create a folder named rscripts and set it up to sync to the cloud.
Save your R script into the
rscripts directory you just created. For example, we might create a R script named
rscripts/regression.R that looks like:
# regression.R # set the width for output options(width=400) # Install an example library install.packages("rjson", repos="http://lib.stat.cmu.edu/R/CRAN/") library(rjson) # load up the input arguments args <- commandArgs(TRUE) print(args) input_path <- args output_path <- args # Read input file data <- readLines(input_path) # Create data frame from parsed json. jsonData <- lapply(data, function(x) as.data.frame(fromJSON(x))) # ...run regression here... #resultData <- ... # write results back to file write.table(resultData, file=output_path, sep=",")
You'll generally want to test out your R script by running it manually before including it your pipeline. Assuming you've installed R on your computer, you can test your script with
Rscript. For our example, we'd run:
Rscript rscripts/regression.R /path/to/input.json /path/to/output.json
To include an R script in your data pipeline, you add three tasks to your pipeline. The first pulls data down from the S3, the second runs the script, and the third uploads the results back to S3.
For example, if we wanted to run the
regression.R script above, we would use the following pipeline (or add these Tasks to an existing pipeline):
# luigiscripts/my_data_pipeline.py import os.path import tempfile import luigi from mortar.luigi.mortartask import MortarRTask from mortar.luigi.s3transfer import S3ToLocalTask, LocalToS3Task """ This is a template for building your own luigi data pipeline to download data from S3, run an R script on it, and upload the results back to S3. Task Order: CopyDataFromS3 MyRScriptTask CopyOutputToS3 To run: mortar luigi luigiscripts/<name-of-luigiscript>.py \ --s3-input-path "s3://my-input-bucket/my-input-path/my-file.json" \ --s3-path "s3://my-output-bucket/my-output-path/my-file.csv" """ # local temporary directory LOCAL_TEMP_DIR = tempfile.gettempdir() def get_temp_file(filename): """ Helper method to get a path to a file in the local temporary directory. """ return os.path.join(LOCAL_TEMP_DIR, filename) class CopyDataFromS3(S3ToLocalTask): """ Copy the source data from S3 down to the local filesystem. """ # source path for the data in S3 s3_path = luigi.Parameter() # target path for the data on local file system local_path = luigi.Parameter(default=get_temp_file('regression_output.json')) def requires(self): """ Previous Tasks that this Task depends on. """ return  class MyRScriptTask(MortarRTask): """ Run my R regression script. """ # path for storing a token to tell Luigi that the task is complete. # we will will not be using this, as our R script will produce actual # output that we can ask Luigi to check for token_path = luigi.Parameter(default=LOCAL_TEMP_DIR) # input path on S3. we pass this through # to the task that loads our data from S3 s3_input_path = luigi.Parameter() # local file where R will write output output_path = luigi.Parameter() def rscript(self): """ R script to run. This should be a path relative to the root directory of your Mortar project. """ return 'rscripts/regression.R' def arguments(self): """ List of arguments to be sent to your R script. """ # grab the path to the local file created by # CopyDataFromS3 input_data_file = self.input().path # tell our script to read data from the local input file # and write it out to the local output file return [input_data_file, self.output_path] def requires(self): """ This task requires that the input data has first been copied from S3. """ # require that data be moved in from S3 return [CopyDataFromS3(s3_path=self.s3_input_path)] def output(self): """ The R script will ouput a file, so we tell Luigi to look for that to know whether the task has already completed. """ return luigi.LocalTarget(self.output_path) class CopyOutputToS3(LocalToS3Task): """ Copy the output of the R script to S3. """ # original input data path on S3 s3_input_path = luigi.Parameter() # intermediate local storage for the output local_path = luigi.Parameter(default=get_temp_file('regression_output.csv')) # target S3 path for the output s3_path = luigi.Parameter() def requires(self): """ Require that the output has been generated before we copy it to S3. """ # we pass the path for the output to MyRScriptTask so # R knows where to store it return [MyRScriptTask( s3_input_path=self.s3_input_path, output_path=self.local_path)] if __name__ == "__main__": luigi.run(main_task_cls=CopyOutputToS3)
To use this template, you should only need to ensure that the
rscript method references the approrpiate R script, and that the
arguments match the actual arguments accepted by your script.
When running, you'll need to pass the S3 path where input data can be found (
--s3-input-path) and where output data should be written (
mortar luigi luigiscripts/my_data_pipeline.py \ --s3-input-path "s3://my-input-bucket/my-input-path/my-file.json" \ --s3-path "s3://my-output-bucket/my-output-path/my-file.csv"
When you run the command, you should see the following:
Taking code snapshot... done Sending code snapshot to Mortar... done Requesting job execution... done job_id: some_job_id Job status can be viewed on the web at: https://app.mortardata.com/jobs/pipeline_job_detail?job_id=some_job_id
This tells you that your job has started successfully and gives you the URL to monitor its progress. If you open that URL, you should see your pipeline running, first moving data from S3 and then running your R script on it.