Real-time Search with MongoDB and Elasticsearch

Something I worked on a couple of weeks ago at Stripe was overhauling the entire search infrastructure. If you’ve ever used the search feature in manage, it may have appeared sluggish or may have even timed out on you - this is mostly due to the fact that our search consisted of running a query over our database (which you can imagine is very slow for full-text search).

We finally decided to make the switch to a dedicated full-text search engine and chose ElasticSearch, which is a distributed RESTful search engine built on top of Apache Lucene. I won’t go into the details, but you can read about ElasticSearch on the official website.

Setting up ElasticSearch

To set up ElasticSearch, I followed a third-party tutorial - it’s pretty straight forward, although you should be sure to swap in the current verison number.

Hooking up MongoDB

Assuming you’re already using MongoDB for the data that you want to index^1, you’re probably wondering how to hook it up to index documents in real-time - this is one of the major hurdles that we faced. ElasticSearch has a built in feature of Rivers, which are essentially plugins for specific services to constantly stream in new updates for indexing.

Unfortunately, there’s no MongoDB River (probably due to the lack of built-in database triggers), so I did some research and realized that I could use the MongoDB oplog to continually capture updates to our main databases.

The oplog is a special collection held by MongoDB for replication purposes - I was able to make use of a tailable cursor (which performs similarly to the tail -f command) for shipping new updates from MongoDB to our search server (we’re using Tire as our Elasticsearch Ruby DSL).

def initialize
  @host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
  @port = ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT

  @db_whitelist = %w(payments customers invoices)

  @stream = Queue.new
  @max_bulk = 10000

  # Or, just set up a connection the normal way if you’re not
  # using a repl set
  @connection = ReplSetConnection.new(['database1', 27017],
                                      ['database2', 27017],
                                      ['database3', 27017],
                                      :read => :secondary)

  local = @connection.db('local')

  # Change to oplog.$main if running mongod as a master instance
  oplog = local.collection('oplog.rs')

  @tail = Cursor.new(oplog,
                      :tailable => true,
                      :order => [['$natural', 1]])

  while true
    tailer
    indexer

    # In production, we keep this constantly running in a busy
    # loop; we constantly have new things to index
    # sleep 5
  end
end

The code here basically tails the oplog and reads off new operations, pushing them onto @stream, until there are none left or we’ve reached the max number of entries that we want to batch at a time:

def tailer
  # While there are new entries
  while @tail.has_next?
    _next = @tail.next

    # We only want to index specific collections
    match = /some namespace regex/.match(_next["ns"])
    if match && @db_whitelist.include?(match[1])
      @stream << [match[1], _next]
    end

    # Upon hitting the maximum bulk size we want to send over
    break if @stream.length >= @max_bulk
  end
end

Afterwards, we pull rows out of @stream, transform them into hashes that we actually want to index into Elasticsearch, and bulk index them via Tire. Rinse and repeat.

def indexer
  bulk_index = []
  while !@stream.empty?
    _next = @stream.pop
    item = _next[1]
    type = _next[0]

    # Our TransformRow.transform_for method just returns a hash
    # to index. Ex. {:type => “customer”, :name => “Amber”}
    new_row = TransformRow.transform_for(type, item['o'])

    # Add keywords if they exist on the database object
    bulk_index << new_row
  end

  # Bulk index the data via Tire
  if !bulk_index.empty?
    Tire.index "search" do
      import bulk_index
    end
  end
end

Of course, this is a very basic synchronous example - you could imagine making this into a more flexible asynchronous producer-consumer model.

In addition, this sort of naive first pass doesn’t save any state, so if the driver crashes due to an timeout on the database side or if the search server is down, the restarted driver will start again from the very beginning. To combat this, I simply wrote a timestamp to /tmp to keep track of the last record indexed, and queried the collection to start at that specific timestamp the next time around.

(Update: I recently changed this to write to permanent storage instead; although not necessary, it's simple enough to do and is a better idea in general.)

@tail = oplog.find({
  'ts' => {'$gte'=> BSON::Timestamp.new(@last_ts, 0) }
})
@tail.add_option(Constants::OP_QUERY_TAILABLE)

But wait, there’s more! Scanning the entire collection in most restart scenarios can be pretty expensive (by the way, the oplog doesn’t allow any indexes), especially if your oplog is large like ours, at 2GB.

You can take advantage of a special flag for this kind of use only, called OPLOG_REPLAY. This flag is meant for replication purposes - it first starts from the bottom of the log and searches upward for around 100MB or so, assuming that you just restarted (which is usually the case if the tailer crashes and restarts immediately). If it can’t be found, it’ll continue by traversing extents in the most efficient manner possible.

@tail.add_option(Constants::OP_QUERY_OPLOG_REPLAY)

Running this tailer as a daemontools service, we were able to attain nearly real-time search latency. It takes around 3-5 seconds from the time a record is created or updated to when it gets indexed (and is searchable) in ElasticSearch - pretty cool!

####Footnotes 1. It’s possible to use ElasticSearch as a data store itself (it’ll store the document data as well as the index data), but I’m not convinced this is a good idea in practice.