Mortar has joined Datadog, the leading SaaS-based monitoring service for cloud applications. Read more about what this means here.

Mongo-Hadoop
Loading and Storing Data from/to MongoDB

Loading and Storing Data To/From MongoDB

You can load data from a MongoDB database into Mortar, or have Mortar store data back into MongoDB using the MongoDB Connector for Hadoop.

Connection Options

When working with a Production MongoDB database its important to think about the different options you have for working with the data in Hadoop and how those options will affect performance. Determining the best strategy often depends on your particular use case and data.

Reading BSON (mongodump) backup files

Often the simplest way to process your MongoDB data is to skip the database and read your MongoDB backups directly. The MongoDB Connector for Hadoop supports loading uncompressed BSON data generated by the mongodump command. To use this data in your Mortar job it needs to be stored in S3. See Connecting to Amazon S3 and Uploading Data to S3 for help.

Processing your BSON backups directly allows you to completely separate the performance concerns of your Mortar job from the performance concerns of your database. As long as your job can tolerate some staleness in your data this solution works well.

Connecting to primary nodes

The MongoDB Connector for Hadoop offers a number of ways to connect directly to your database via the Mongo connection string.

The first option is to connect to your primary MongoDB instance directly. If the amount of data you're reading is small compared to the typical traffic your database handles then any performance impact should be minor. However, if you are reading a large amount of data, especially data outside of the working set that other clients are using, you risk degrading performance.

When writing data back to MongoDB the same performance considerations apply. You also need to consider the effects of the database level write lock. If you're writing a large amount of data to a database being used by another client you can end up blocking that client for an extended period of time. As a workaround, consider writing data back to a separate database on your MongoDB instance, which will utilize a separate lock than your main database.

Connecting to secondary nodes

In cases where reading large amounts of data would slow down your primary MongoDB instance(s) you can modify your connection string to only connect to the secondary nodes in your replica set:

mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/[database]?readPreference=secondary

While reading from secondary nodes can mean you're running against slightly stale data (typically seconds to minutes behind your primary) it is an excellent way to read large amounts of data without affecting the working set or increasing load on your primary.

Connecting to dedicated analytics nodes

In cases where you are regularly running large jobs against your MongoDB cluster it can be useful to have dedicated nodes that serve just these jobs and no other clients. The easiest way to do that is to use tag sets or hidden members to distinguish your analytics instances from instances being used by other clients.


MongoDB Connection Info

If you choose to connect directly to a MongoDB instance (instead of loading a BSON dump) from Mortar there are a few details you should know.

  • Your MongoDB instance must be running in ec2 (currently US-EAST region only).
  • If your MongoDB instance runs on a non-standard port (standard is 27017) you'll need to email support@mortardata.com so that we can open that port on your account's Mortar clusters.
  • If you restrict access to your MongoDB instance using AWS security groups you can email support@mortardata.com for information on how you can securely allow your account's Mortar clusters to access your MongoDB instance.

Loading data from MongoDB

To load MongoDB data into Mortar, you'll use a specialized Pig loader called the MongoLoader. The MongoLoader is quite flexible and will allow you to load your data in several different ways.

No schema

The fastest way to load data is to load each document as a map. This looks like:

data = LOAD 'mongodb://<username>:<password>@<host>:<port>/<database>.<collection>'
         USING com.mongodb.hadoop.pig.MongoLoader();

This gives you a map named "document" with the top level field names in the MongoDB documents as keys. While this is a great way to get started with your data, the downside is that Pig doesn't know your data types. Often you will require a second step where you pull out fields from your map and set their data types.

Loading as a String

Sometimes you can have a complex MongoDB document that you're unsure of how to load and that isn't being loaded as a map properly by the MongoLoader. In these situations you can load your whole document as a string to help debug the problem. This looks like:

data = LOAD 'mongodb://<username>:<password>@<host>:<port>/<database>.<collection>'
         USING com.mongodb.hadoop.pig.MongoLoader('document:chararray', '', '--loadaschararray');

One example of where this is useful is when you have a field in your collection that takes many different data types. If sometimes the field has a simple value like an integer and other times it has a complex embedded object the MongoLoader may error on trying to load the data. By using the loadascharray option to look at your data you can find the problem field and potentially fix it in your source data.

Providing a Schema

Another way to load your data is to provide the MongoLoader with the schema of the data you want to load. This looks like:

data = LOAD 'mongodb://<username>:<password>@<host>:<port>/<database>.<collection>'
         USING com.mongodb.hadoop.pig.MongoLoader('<pig_schema>', '<id_alias>')
  • <pig_schema> is the pig schema (described below) of the data you want to load.
  • <id_alias> is the alias you would like to use in Pig to replace the _id alias used by MongoDB. For more information read "Pig Schema and the Id Alias" section below.

Loading BSON

The MongoDB Connector for Hadoop also supports loading the BSON data generated by a mongodump command. Similar to the MongoLoader you can load BSON data with or without a schema.

-- Without schema
data = LOAD 's3://my-s3-bucket/my-folder/mycollection.bson'
         USING com.mongodb.hadoop.pig.BSONLoader();

-- With schema
data = LOAD 's3://my-s3-bucket/my-folder/mycollection.bson'
         USING com.mongodb.hadoop.pig.BSONLoader('<id_alias>','<pig_schema>');
  • <pig_schema> is the pig schema (described below) of the data you want to load.
  • <id_alias> is the alias you would like to use in Pig to replace the _id alias used by MongoDB. For more information read "Pig Schema and the Id Alias" section below.

Pig Schema and the Id Alias.

When providing a schema for the MongoLoader or BSONLoader there are a few things to know:

  • If your MongoDB document contains a field not provided in the schema, the MongoLoader will ignore that field.
  • If your MongoDB document is missing a field that is provided in the schema, the MongoLoader will set that field to null for that corresponding row.
  • If your MongoDB document contains a field with a value that can not be converted into the declared data type, the MongoLoader will set that field to null for that corresponding row and log an error message in the TaskTracker logs.

When providing a schema you should also provide an id_alias value. MongoDB uses '_id' as the primary id of all of its documents. Unfortunately, Pig does not support field names that start with an underscore. As a result you need to provide the loader with the name you want to use for the '_id' field in Pig.

For more information about declaring a schema in Pig read: Pig Schemas.


Simple Example

Let's say you have the following simple two document collection called "simple_example" in MongoDB:

[
  {
    "_id":ObjectId("507f1f77bcf86cd799439011"),
    "key1": 1.0,
    "key2": "Key 2 Value",
    "key3": [ { "innerKey" : 1 },
              { "innerKey" : 2 } ],
    "extra": "Extra Field"
  },
  {
    "_id":ObjectId("507f1f77bcf86cd799439012"),
    "key1": 2.0,
    "key3": "Error"
  }
]

To load this collection into Pig without a schema you could do:

data = LOAD 'mongodb://<username>:<password>@<host>:<port>/<database>.simple_example'
         USING com.mongodb.hadoop.pig.MongoLoader();
out = FOREACH data GENERATE document#'key1' as key1:double, document#'key2' as key2:chararray;
STORE out INTO 's3n://<s3_bucket>/out' USING PigStorage('\t');

Here's what illustrating this script gives you:

MongoLoader no schema

And if you run this script, you will get the output:

1.0    Key 2 Value
2.0

Notice how the second row has a null value for the second column ('key2'). This is because the second document had no field named 'key2'.

Also notice how you have a problem if you want to work with 'key3'. Since this field has two incompatible types in different documents you can't just declare one of the types in your Pig script. If you try to pick a type Pig will throw an error when it hits a document where the value can't be converted into that type. In this case you can either write your own UDF function that will handle the conversion/error handling for you - or you can provide the schema you want to the MongoLoader and let it null out any incompatible fields. Continuing with our example then, you could do:

data = LOAD 'mongodb://<username>:<password>@<host>:<port>/<database>.simple_example'
         USING com.mongodb.hadoop.pig.MongoLoader
            ('mongo_id:chararray, key1:double, key2:chararray, key3:bag{t:(innerKey:chararray)}', 'mongo_id');
STORE data INTO 's3n://<s3_bucket>/data' USING PigStorage('\t');

Here's what illustrating this script gives you:

MongoLoader schema

And if you run this script on a cluster, you will get the output:

    507f1f77bcf86cd799439011    1.0    Key 2 Value    {(1.0),(2.0)}
    507f1f77bcf86cd799439012    2.0

Notice how the 'key3' column is null for the second row even though the second document had a value there. In this case because the MongoLoader was unable to convert the string "Error" to the bag you defined it just used null and logged an error message about an invalid conversion.


Storing data to MongoDB

There are two different storage options provided by the MongoDB Connector for Hadoop.

Inserting Data

To insert your data from Pig into MongoDB you can use MongoInsertStorage. This will store each tuple it receives from Pig into a document in Mongo, mapping the Pig field name to the field name in Mongo.

STORE data INTO 'mongodb://<username>:<password>@<host>:<port>/<database>.<collection>'
             USING com.mongodb.hadoop.pig.MongoInsertStorage('<id_alias>');
  • <id_alias> is the Pig field that MongoInsertStorage should use for the '_id' field in Mongo.

Updating Data

To update an existing MongoDB collection with data from Pig you can use MongoUpdateStorage.

STORE data INTO 'mongodb://<username>:<password>@<host>:<port>/<database>.<collection>'
             USING com.mongodb.hadoop.pig.MongoUpdateStorage(
                    '<find_query>',
                    '<update_query>',
                    ['<pig_schema>',]
                    ['<field_to_ignore>',]
                    ['<update_options>']);
  • <find_query> is the valid JSON of the query to use to find the documents you want to udpate.
  • <update_query> is the valid JSON of the update you want to perform on the document(s) in the collection.
  • <pig_schema> Optional. This is the Pig schema of the alias you're outputting. It must include every field in the alias that you are storing. MongoUpdateStorage requires a schema for the alias being output. If this is not provided it will try to determine the fields and types dynamically.
  • <field_to_ignore> Optional. This is a fieldname to ignore in <pig_schema>.
  • <update_options> Optional. This is the valid JSON of update options you would like to use similar to what can be passed in through the MongoDB JavaScript shell. Ex: "{upsert : true, multi : true}"

Example:

STORE result INTO 'mongodb://<username>:<password>@<host>:<port>/<database>.<collection>' USING
    com.mongodb.hadoop.pig.MongoUpdateStorage(
        '{key1:"\$key1"}',
        '{\$set:{key2:"\$key2"}}',
        'mongo_id:chararray, key1:double, key2:chararray',
        '',
        '{unique:false, dropDups: false}'
    );

Troubleshooting

Calculating Input Splits

In order to process large amounts of data quickly, Hadoop needs to know how to divide it into separate pieces (input splits) that can be processed in parallel. To do so, the MongoDB Connector for Hadoop uses MongoDB features that require reading the admin database. If your MongoDB database requires authentication and your user does not have access to the admin database you will see an error like this:

java.lang.RuntimeException: Unable to calculate input splits from collection stats: ns not found

One way to deal with this error is to set a separate connection string for the admin database. To do that you would just set the mongo property 'mongo.auth.uri':

SET mongo.auth.uri 'mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/admin'

Another way to address this error is to disable input splits when you load data. Disabiling input splits can significantly slow down loading large amounts of data from MongoDB but can work in some cases. To do this you would add the following to your Pig script:

SET mongo.input.split.create_input_splits false;

This error only applies to loading your data directly from MongoDB. If you're loading BSON data the MongoDB Connector will know how to split the data.