Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

Using JSON Data

You can load JSON data into Mortar, or tell Mortar to output data in JSON format, by using the Piggybank functions JsonLoader and JsonStorage.

Loading JSON Data with JsonLoader

Precursor

To load JSON data into Mortar, first ensure that your files are formatted with exactly one JSON object per line. This helps Pig and Hadoop to properly split your files for parallel processing.

Quick Start - No Schema

The simplest way to use the JsonLoader is with no schema definition:

json_objects = LOAD 's3n://my-s3-bucket/path/to/json/file' 
               USING org.apache.pig.piggybank.storage.JsonLoader();

This loads each JSON document into a field called object with a Pig data type of map[]. All of the top-level keys in your JSON object will appear in that map.

This will give you a quick overview of your data, but Pig won't know the data types of your fields.

Providing a Schema

Once you know which fields you need from your data, you can provide JsonLoader with a schema. You pass a schema to JsonLoader as a string parameter, like this:

json_objects = LOAD 's3n://my-s3-bucket/path/to/json/file'
               USING org.apache.pig.piggybank.storage.JsonLoader(
                   'a: int, b: tuple(i: int, j: int), c: bag{t: tuple(i: int, j: int)}'
               );

Note that this schema is passed as a parameter, not put into the LOAD statement AS clause.

JsonLoader will extract these fields from your data. If a field does not parse correctly, an error will be logged and the field will be read as null. Any fields that can't be found in the data will also be read as null.

See below for additional guidelines to defining your schema.

Simple Example

Let's read in some tweets pulled down by the Mortar twitter gardenhose as a simple example.

To load the tweets without a schema, we'd use:

tweets = LOAD 's3n://twitter-gardenhose-mortar/tweets' 
         USING org.apache.pig.piggybank.storage.JsonLoader();

However, let's provide a schema to get just the fields we want with their data types:

tweets = LOAD 's3n://twitter-gardenhose-mortar/tweets' 
         USING org.apache.pig.piggybank.storage.JsonLoader(
             'created_at:chararray, text:chararray, user:map[]'
         );

If you illustrate this script, you should see something like:

Tweets

We can now use the fields in the script as we please, say to find ones that include exclamation marks:

tweets = LOAD 's3n://twitter-gardenhose-mortar/tweets'
         USING org.apache.pig.piggybank.storage.JsonLoader(
             'created_at:chararray, text:chararray, user:map[]'
         );
exclamatory_tweets = FILTER tweets BY text matches '.*[!].*';
user_extracted = FOREACH exclamatory_tweets 
                 GENERATE created_at, 
                          text, 
                          user#'name' AS name:chararray, 
                          user#'screen_name' AS screen_name:chararray;

Storing JSON data with JsonStorage

You can store the result of a Pig script in JSON format using JsonStorage. It will store each tuple from a Pig relation as a JSON object on a single line. The schema of the relation to be stored must be fully specified.

Example:

-- Pig 0.9
STORE result INTO 's3n://my-s3-bucket/path/to/json/output' 
             USING org.apache.pig.piggybank.storage.JsonStorage();

-- Pig 0.12
STORE result INTO 's3n://my-s3-bucket/path/to/json/output'
             USING org.apache.pig.builtin.JsonStorage();

Mortar Project Example

For a full example in a Mortar Project, clone down the mortar-examples repository and check out the coffee_tweets pigscript.


FromJSON UDFs

If you have JSON data stored within a field of a larger schema (as in the case where a tsv log file might have JSON fields) there are two UDFs that can help extract that data.

FromJsonWithSchema

DEFINE FromJson org.apache.pig.piggybank.evaluation.FromJsonWithSchema('f1: int, f2: chararray');
data = LOAD 'my_data' USING TextLoader AS (text: chararray);
json = FOREACH data GENERATE FLATTEN(FromJson(text));

The schema of the json relation will be (f1:int, f2: chararray)

FromJsonInferSchema

data = LOAD 'my_data' USING TextLoader AS (text: chararray);
json = FOREACH data GENERATE org.apache.pig.piggybank.evaluation.FromJsonInferSchema(text);

The schema of the json relation will be (object: map[]), where f1 and f2 are keys in the map.


JsonLoader Schema Guidelines

A few additional guidelines for providing a schema to JsonLoader:

  • Fields in your data may be in a different order than fields in your schema, so long as the field names match.
  • JsonLoader will perform type coercion if it is possible (for example, converting "17" to 17 for an int field).
  • If you specify an object field as my_field: map[], that field will be loaded in the same way as the document would have if you had not passed JsonLoader a schema. This can be useful for variable schema elements, for example the options field in: {req_param_1, req_param_2, options: {}}.
  • To load a field with an initial underscore (eg: "_id"), use the original field name in the schema. When it loads, the field will have the prefix "underscore" because Pig does not allow names with prefixing underscores.

    e.g. 'f1: int, _f2: chararray' would load the fields (f1, underscore_f2)

  • To load a field with spaces in its name, use backslashes to escape the original field name in the schema. When it loads, the spaces will be replaced by underscores.

    e.g. 'f1: int, f\\ 2: chararray' would load the fields (f1, f_2)

  • Pig requires every value in a bag to be part of a tuple, while JSON allows arrays to hold values directly.

    • To load the flat array "arr": [1, 2, 3, 4], you would specify the schema 'arr: {t: (i: int)}'.

      It would be loaded as {(1), (2), (3), (4)}. Note that the name "t" of the inner tuple does not matter.

    • If you knew that the flat array would have only four elements, you could cast it to a tuple,

      e.g. 'arr: (i: int, j: int, k: int, l: int)'' would load (1, 2, 3, 4).

    • Nested arrays can be loaded too. You could load "arr": [[1, 2], [3, 4]]

      with the schema 'arr: {t: (arr: {t: (i: int)})}', producing {({(1), (2)}), ({(3), (4)})}.

    • Finally, you can combine nested array loading and casting arrays to tuples. You could load a JSON array-of-arrays of latitude-longitude coordinate pairs,

      "coord_arr": [[40.664167, -73.938611], [37.783333, -122.416667]],

      using the schema 'coord_arr: {t: (coords: (lat: double, long: double))}'.

JsonLoader Schema Example

Let's say you received the following objects as responses to calling some API.

{ "response": { "id": 10123, "thread": "Sloths", "comments": ["Sloths are adorable", "So chill"] }, "response_time": 0.425 }
{ "response": { "id": 13828, "thread": "Bigfoot", "comments": [] } , "response_time": 0.517 }

You could load these without a schema using:

data        =    LOAD 's3n://my-s3-bucket/path/to/responses' 
                 USING org.apache.pig.piggybank.storage.JsonLoader();
responses   =    FOREACH data GENERATE object#'response' AS response: map[];
out         =    FOREACH responses 
                 GENERATE response#'id' AS id: int, response#'thread' AS thread: chararray, 
                          response#'comments' AS comments: {t: (comment: chararray)};
STORE out INTO 's3n://path/to/output' USING PigStorage('|');

If you run this script, you will get:

10123|Sloths|{(Sloths are adorable),(So chill)}
13828|Bigfoot|{}

Suppose you had a third row though:

{ "response": { "id": 10123, "thread": "Sloths", "comments": ["Sloths are adorable", "So chill"] }, "response_time": 0.425 }
{ "response": { "id": 13828, "thread": "Bigfoot", "comments": [] } , "response_time": 0.517 }
{ "response": "Could not find any results for query \"Nessy\"", "response_time": 0.788 }

You could not cast the string value for "response" in the last object to a map as you did before.

To handle this properly (i.e. write a null for that field instead of crashing), you need to specify a schema.

data    =   LOAD 's3://my-s3-bucket/path/to/responses' 
            USING org.apache.pig.piggybank.storage.JsonLoader('
                response: (id: int, thread: chararray, comments: {t: (comment: chararray)}),
                response_time: double
            ');
out     =   FOREACH data GENERATE 
                FLATTEN(response) AS (id, thread, comments),
                response_time;
STORE data INTO 's3n://my-s3-bucket/path/to/output' USING PigStorage('|');

If you run this script, you will get:

10123|Sloths|{(Sloths are adorable),(So chill)}|0.425
13828|Bigfoot|{}|0.517
|||0.788

Note that we did not have to specify types for the fields in out, since we already specified them in the load statement.

JsonStorage Performance Tips

JsonStorage uses a dynamic buffer as part of the storage process. If you know that each row of your data will never exceed a certain size and need a performance boost, you can tell JsonStorage to use a fixed-size buffer. You specify this size in kilobytes as a string parameter which must parse to an integer.

Example:

-- Pig 0.9
STORE result INTO '$OUTPUT_PATH' USING org.apache.pig.piggybank.storage.JsonStorage('16');'

-- Pig 0.12
STORE result INTO '$OUTPUT_PATH' USING org.apache.pig.builtin.JsonStorage('16');'