Bulk updating, inserting, or copying domains into MongoDB using collection.initializeUnorderedBulkOp

The Error

Processing large amounts of data in MongoDB and Node.js will probably lock your single threaded application. This is just my reference for when I need to process a large amount of documents using collection.initializeUnorderedBulkOp.

You may also get something like this:

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory

<--- Last few GCs --->

  289729 ms: Mark-sweep 1307.9 (1457.6) -> 1308.0 (1457.6) MB, 94.9 / 0 ms [allocation failure] [GC in old space requested].
  289823 ms: Mark-sweep 1308.0 (1457.6) -> 1307.9 (1457.6) MB, 94.0 / 0 ms [allocation failure] [GC in old space requested].
  289918 ms: Mark-sweep 1307.9 (1457.6) -> 1308.0 (1457.6) MB, 94.4 / 0 ms [last resort gc].
  290013 ms: Mark-sweep 1308.0 (1457.6) -> 1307.9 (1457.6) MB, 95.1 / 0 ms [last resort gc].


<--- JS stacktrace --->

==== JS stack trace =========================================

Security context: 0x3e4e4a8b4629 
    2: serialize [/data/qaserve/app/node_modules/mongoose/node_modules/bson/lib/bson/bson.js:47] [pc=0x10f9c0cfc6c4] (this=0x277dbb0fe329 ,object=0x20c5ec9c5cd1 ,checkKeys=0x3e4e4a804251 ,asBuffer=0x3e4e4a804211 ,serializeFunctions=0x3e4e4a804251 ,index=0,ignoreUndefined=0x3e4e4a804251 )
    3: toBin [/data/qaserve/app/node...

This is used when copying large amounts of data from collection to collection while doing a bunch of reformatting and processing. By large, I mean anything that’s over 1,000 documents, however, I’ve used this method for doing jobs up to 1 million documents.

Things to note

Make sure you’re familiar with Node.js and handling the single thread. Also, there’s many methods to do this, this is just my way.

1. Do not over complicate your application, if you’ll never need to process more than a few documents, there’s no reason to do this.
2. MongoDB currently will automatically batch your jobs in sets of 1,000 (they say this may change in the future).
3. Even though MongoDB does its own batching, it will use up all your RAM. This is why we need to execute our bulkOps on our own.
4. I’m using forEach for reading my documents because it’s easier for me to follow what’s happening. You may find it easier to use stream.
5. I’m using mongoose for my MongoDB stuff.

Use Cases

1. Reading documents from a collection, processing each document, then saving to another collection.
2. Reading documents from a collection, modifying each document, then updating the original document.

The Code

This first bit is the simplified version of what we’re trying to loop through. Each step I’ll make a few changes to the same loop so we fix certain problems we may encounter.


    var mongoose = require('mongoose');
    var Schema = mongoose.Schema;
    var anySchema = new Schema({}, { strict: false });

    var Item = mongoose.model('items', anySchema);

    Item.find({}, {}).lean().exec(function(err, items){
        if(items.length){

            var totalResults = 0;
            var totalComplete = 0;

            items.map(item => {
                totalResults++;
                superComplexAsyncUpdate(item, itemUpdated => {
                    Item.update({_id: item._id}, {$set: itemUpdated}, function(err, res){
                        totalComplete++;
                        if(totalResults == totalComplete){
                            callback({updated: totalResults});
                        }
                    });
                });
            });
        }else{
            callback({updated: 0});
        }
    });

Now that we’ve got the loop going, what happens if we have to update a large amount of documents? It will definitely lock our application, so we need to stream or use cursors to page through the MongoDB query. I’m using forEach, but you can use a concurrency framework like co and promises, or a stream if it’s easier for you to read. Also, note the batchSize variable, this tell MongoDB to return items in batches, instead of all at once, which will still lock your application.


    var itemCursor = Item.collection.find({});

    const batchSize = 100;

    var totalResults = 0;
    var totalComplete = 0;
    var endStream = false;

    itemCursor.batchSize(batchSize).forEach(item => {
        totalResults++;
        superComplexAsyncUpdate(item, itemUpdated => {
            Item.update({_id: item._id}, {$set: itemUpdated}, function(err, res){
                totalComplete++;
                if(endStream && totalComplete == totalResults){
                    callback({updated: totalComplete});
                }
            });
        });
    }, function(res){
        endStream = true;

        if(!totalResults){
            callback({updated: 0});
        }
    });

Now that we got the selecting documents working, let’s optimize the saving documents. This is where we will implement the initializeUnorderedBulkOp() function. What this does is queue up all the MongoDB operations, then executes it all at once using the execute() function. This is much more efficient than making individual calls, and will greatly improve your write performance. Note, if you need your operations to execute in order, there’s a db.collection.initializeOrderedBulkOp() function available. See https://docs.mongodb.com/manual/reference/method/Bulk/ for more info.


    var itemCursor = Item.collection.find({});
    var itemBulk = Item.collection.initializeUnorderedBulkOp();

    const batchSize = 100;

    var totalResults = 0;
    var totalUpdated = 0;
    var endStream = false;

    itemCursor.batchSize(batchSize).forEach(item => {
        totalResults++;

        superComplexAsyncUpdate(item, itemUpdated => {
            itemBulk.find({_id: item._id}).update({$set: itemUpdated});
            totalUpdated++;
            if(endStream && totalUpdated == totalResults){
                itemBulk.execute(function(err, res){
                    callback({updated: totalResults});
                });
            }
        });

    }, function(res){

        // note: if not using an async update, you need to put the itemBulk.execute() here instead

        endStream = true;

        if(!totalResults){
            callback({updated: 0});
        }

    });


Final Code

That last step is to make sure our bulk operations don’t fill up. MongoDB will automatically batch your operations in batches of 1000 (may be changed in the future) when it executes, however it will keep everything in memory until it executes. Because of this, when working with large amounts of data, the queue will fill up and you’ll run out of memory. To solve this problem, let’s run the execute function as it fills up, which will start updating the DB and free up memory for the incoming update operations.


    var itemCursor = Item.collection.find({});
    var itemBulk = Item.collection.initializeUnorderedBulkOp();

    const batchSize = 100;

    var totalResults = 0;
    var totalUpdated = 0;
    var totalWritten = 0;
    var endStream = false;

    itemCursor.batchSize(batchSize).forEach(item => {
        totalResults++;

        superComplexAsyncUpdate(item, itemUpdated => {
            itemBulk.find({_id: item._id}).update({$set: itemUpdated});
            totalUpdated++;

            if(totalUpdated % batchSize == 0){ // run executes as batch fills up
                itemBulk.execute(function(err, res){
                    totalWritten += res.nModified;

                    if(endStream && totalWritten == totalUpdated){
                        callback({updated: totalUpdated});
                    }
                });
                itemBulk = Item.collection.initializeUnorderedBulkOp();
            }
            if(endStream && totalUpdated == totalResults){
                if(totalUpdated % batchSize != 0){
                    itemBulk.execute(function(err, res){
                        totalWritten += res.nModified;

                        if(totalWritten == totalUpdated){
                            callback({updated: totalUpdated});
                        }
                    });
                }
            }
        });

    }, function(res){

        // note: if not using an async update, you need to put the itemBulk.execute() if statement here as well

        endStream = true;

        if(!totalResults){
            callback({updated: 0});
        }

    });

Here’s are async update function if it helps you visualize what’s happening.


// check for duplicates, run calculations, compare items, etc
function superComplexAsyncUpdate(item, callback){
    item.test = 1;
    setTimeout(() => {
        callback(item);
    }, 123);
}

Conclusion

As you have seen, there’s a lot of counting and keeping track of stuff that needs to happen. If you never need to update that many documents, there’s no reason to complicate your application. Each of these steps can be used on their own. Keep it simple, but make sure your code can handle everything that will be thrown at it. Hopefully MongoDB will implement something like the last step in the future, so we won’t have to call execute multiple times. Maybe a streaming bulk execute function?!

Share this:

Leave a comment