March 15, 2012

Real-time Search with MongoDB and Elasticsearch

Something I worked on a couple of weeks ago at Stripe was overhauling the entire search infrastructure. If you’ve ever used the search feature in manage, it may have appeared sluggish or may have even timed out on you - this is mostly due to the fact that our search consisted of running a query over our database (which you can imagine is very slow for full-text search).

We finally decided to make the switch to a dedicated full-text search engine and chose ElasticSearch, which is a distributed RESTful search engine built on top of Apache Lucene. I won’t go into the details, but you can read about ElasticSearch on the official website.

Setting up ElasticSearch

To set up ElasticSearch, I followed a third-party tutorial - it’s pretty straight forward, although you should be sure to swap in the current verison number.

Hooking up MongoDB

Assuming you’re already using MongoDB for the data that you want to index1, you’re probably wondering how to hook it up to index documents in real-time - this is one of the major hurdles that we faced. ElasticSearch has a built in feature of Rivers, which are essentially plugins for specific services to constantly stream in new updates for indexing.

Unfortunately, there’s no MongoDB River (probably due to the lack of built-in database triggers), so I did some research and realized that I could use the MongoDB oplog to continually capture updates to our main databases.

The oplog is a special collection held by MongoDB for replication purposes - I was able to make use of a tailable cursor (which performs similarly to the tail -f command) for shipping new updates from MongoDB to our search server (we’re using Tire as our Elasticsearch Ruby DSL).

def initialize
  @host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
  @port = ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT

  @db_whitelist = %w(payments customers invoices)

  @stream = Queue.new
  @max_bulk = 10000

  # Or, just set up a connection the normal way if you’re not
  # using a repl set
  @connection = ReplSetConnection.new(['database1', 27017],
                                      ['database2', 27017],
                                      ['database3', 27017],
                                      :read => :secondary)

  local = @connection.db('local')

  # Change to oplog.$main if running mongod as a master instance
  oplog = local.collection('oplog.rs')

  @tail = Cursor.new(oplog,
                     :tailable => true,
                     :order => [['$natural', 1]])

  while true
    tailer
    indexer

    # In production, we keep this constantly running in a busy
    # loop; we constantly have new things to index
    # sleep 5
  end
end

The code here basically tails the oplog and reads off new operations, pushing them onto @stream, until there are none left or we’ve reached the max number of entries that we want to batch at a time:

def tailer
  # While there are new entries
  while @tail.has_next?
    _next = @tail.next

    # We only want to index specific collections
    match = /some namespace regex/.match(_next["ns"])
    if match && @db_whitelist.include?(match[1])
      @stream << [match[1], _next]
    end

    # Upon hitting the maximum bulk size we want to send over
    break if @stream.length >= @max_bulk
  end
end

Afterwards, we pull rows out of @stream, transform them into hashes that we actually want to index into Elasticsearch, and bulk index them via Tire. Rinse and repeat.

def indexer
  bulk_index = []
  while !@stream.empty?
    _next = @stream.pop
    item = _next[1]
    type = _next[0]

    # Our TransformRow.transform_for method just returns a hash
    # to index. Ex. {:type => “customer”, :name => “Amber”}
    new_row = TransformRow.transform_for(type, item['o'])

    # Add keywords if they exist on the database object
    bulk_index << new_row
  end

  # Bulk index the data via Tire
  if !bulk_index.empty?
    Tire.index "search" do
      import bulk_index
    end
  end
end

Of course, this is a very basic synchronous example - you could imagine making this into a more flexible asynchronous producer-consumer model.

In addition, this sort of naive first pass doesn’t save any state, so if the driver crashes due to an timeout on the database side or if the search server is down, the restarted driver will start again from the very beginning. To combat this, I simply wrote a timestamp to /tmp to keep track of the last record indexed, and queried the collection to start at that specific timestamp the next time around.

(Update: I recently changed this to write to permanent storage instead; although not necessary, it's simple enough to do and is a better idea in general.)

@tail = oplog.find({
  'ts' => {'$gte'=> BSON::Timestamp.new(@last_ts, 0) }
})
@tail.add_option(Constants::OP_QUERY_TAILABLE)

But wait, there’s more! Scanning the entire collection in most restart scenarios can be pretty expensive (by the way, the oplog doesn’t allow any indexes), especially if your oplog is large like ours, at 2GB.

You can take advantage of a special flag for this kind of use only, called OPLOG_REPLAY. This flag is meant for replication purposes - it first starts from the bottom of the log and searches upward for around 100MB or so, assuming that you just restarted (which is usually the case if the tailer crashes and restarts immediately). If it can’t be found, it’ll continue by traversing extents in the most efficient manner possible.

@tail.add_option(Constants::OP_QUERY_OPLOG_REPLAY)

Running this tailer as a daemontools service, we were able to attain nearly real-time search latency. It takes around 3-5 seconds from the time a record is created or updated to when it gets indexed (and is searchable) in ElasticSearch - pretty cool!

Footnotes

  1. It’s possible to use ElasticSearch as a data store itself (it’ll store the document data as well as the index data), but I’m not convinced this is a good idea in practice.
January 3, 2012

2012 CS Resolutions

Matt Might's "What every computer science major should know" article is fantastic. I regret not reading this article in the beginning of the holiday break, but I guess I can use this as sort of a "2012 CS Resolutions" list since everyone seems to be making New Years resolutions right about now.

  1. Master LaTex

    In school, I never really used LaTex for typing up assignments until last semester for our project design docs. I actually don't know how feasible this will be to do now that I'm in the "real world", but I'll take any chance I can get to write up documents in pretty LaTex!

  2. Be better at UNIX/Sysadmin-ing

    Being traditionally a frontend person, I don't have a crazy amount of experience working with command line or doing sysadmin-y things. Sure, I know how to do some elementary things and I'm a vim fanatic (by the way, vim > emacs). However, I'd like to be able to count myself as an expert at writing shell scripts or configuring a web server.

  3. Learn a couple of new languages

    Not for the sake of "adding new languages on my resume", which is absolutely useless, but for exploring and mastering new programming paradigms. A few that sparked my interest/I've been meaning to pick up: Scala, Haskell, Erlang.

  4. "Understand a computer from the transistors up"

    Being an EECS major has exposed me to all of the "levels" of a computer, but I never took much effort in trying to piece things together and much of my EE knowledge is long forgotten.

  5. Get some hands-on experience with Operating Systems

    The semester I took it, Berkeley dramatically changed their traditional Operating Systems class into a hybrid (read: less in-depth/useful) systems class that covered minimal multi-programming and memory/scheduler topics but didn't actually give us any practical experience with how operating systems work. In particular, I'd love to learn more about file systems and possibly tinker around with the Linux kernel.

  6. Catch up on reading systems papers

    I have a giant list of systems papers to go through/that I semi-went through during winter break courtesy of my friend Sunil and the graduate cloud computing course at Berkeley. In addition, Stripe has super cute academic paper reading lunches, which I'm really excited about! I'm sure this will be an easy resolution to keep up with. (:

After reading the article, I'm not super satisfied with what I learned in my 3.5 years as a Berkeley EECS major. I'm looking forward to filling in the gaps now that I've graduated!

December 2, 2011

Paxos/Multi-paxos Algorithm

A couple of posts ago, I talked about the distributed systems programming class I was taking here in my last semester at Berkeley. Our projects are really cool - we've done everything from a quorum KVS to a distributed lock manager, and for our final project, my group chose to implement Multi-paxos. I'll start by explaining general Paxos first.

Basically, Paxos is an protocol (or technically, a "family" of protocols, says Wikipedia) that solves the problem of getting a group of nodes to agree, or reach consensus, on a single value.

Why is this important?

If there's a stable master, there's hardly any use for this, because the master can just decide and propose values itself and have the other nodes always follow it. However, what if leader election fails (and it sometimes does), and there are two nodes proposing values?

The Paxos algorithm ensures that only one value is chosen out of all of the proposed values. Where this algorithm comes in handy is when you'd like your distributed system to decide on a sequence of values, or in other words, a consistent ordering over a set of values. As Ayende Rahien describes, if these values are events, then you know that all of the machines in the cluster have the same state or subset of the state.

Algorithm

The Paxos algorithm is divided into two phases, the prepare phase, and the accept phase.

Disclaimer: the text in this section is partially paraphrased from Leslie Lamport's excellent Paxos Made Simple paper.

Prepare Phase

In the prepare phase, the proposer selects a proposal number n (which is greater than any n it has previously sent, and distinct from any n that any other proposers' n values, I'll discuss this more later) and sends a PREPARE request to a majority of the acceptors.

On the acceptor side, if it ever receives a PREPARE request with a proposal number n greater than that of any other proposal number n that it has previously received, it responds with a "promise" to not accept any proposals with a lower numbered n in the future and the value of the highest-numbered proposal that it has already accepted (if any).

Starting to sound confusing? I had to read this over several times before I actually understood it, so maybe the pseudocode below will help.

# Proposer
for acceptor in acceptors
  send :prepare_req, next_n()

# Acceptor
if (req.n > highest_proposed_n)
  highest_proposed_n = req.n
  reply :prepare_resp, {
    :n => highest_acc.n,
    :value => highest_acc.value
  }

Accept Phase

After a proposer has received a response to its PREPARE requests from a majority of the acceptors, it then sends an ACCEPT to those acceptors with a value v, where v is the value of the highest numbered proposal among the responses, or any value if the responses reported no other proposals.

By contrast, when an acceptor receives an ACCEPT request, it always accepts it unless it has already promised not to in the prepare phase.

# Proposer
for acceptor in acceptors
  send :accept_req, responses.argmax { |i| i.n }.value }

# Acceptor
if (req.n >= highest_proposed_n)
  highest_acc = {:n => req.n, :value => req.value}
  reply :accept_resp

Multi-paxos

All this talk about Paxos. How does Multi-paxos fit in?

I mentioned in the introduction that one of the main useful applications of the Paxos application is having the group of participants decide on a sequence of numbers. Since one round of Paxos results in a decision of one value, the naive way to go about finding a sequence of numbers would be to run Paxos many times, right?

One optimization that can be made in this case, assuming a single stable leader, is to skip the prepare phase. If we assume that the leadership will remain "sticky", there is no need to continue sending out proposal numbers - the first proposal number sent out will never be "overridden" since there is only one leader.

Thus, we only need to do the prepare phase once. In subsequent rounds of Paxos, we can just send the ACCEPT messages, with n as the proposal number used in the original PREPARE request and an additional parameter that indicates the sequence number (the current round we're in). We don't have to worry about the worst case where leadership isn't stable (or distinct), because the algorithm will degrade gracefully into the general Paxos algorithm (both prepare and accept phases for each round). Cool!

Other Considerations

Persistent Storage

Machines must keep certain things in persistent storage to be able to recover from failures. In particular, acceptors need to be able to remember which PREPARE requests they have promised to follow, and which ACCEPT requests they have responded to in order to make decisions about which proposals to promise to and to pass necessary information back to the proposes.

Distinct Proposal Numbers n

In order for the algorithm to work, each proposer must propose monotonically increasing proposal numbers n that are distinct from any other proposers' numbers n.

To achieve this, we can assign disjoint sets of numbers for each proposer and have them only choose numbers from their own set (ex. assign each node a unique prime number, they choose multiples of that prime number).

Or, if we're assuming static membership of participants, assign each node a unique number i between 0 and k, where k is the total number of participants, and n = i + (k * round_number).

Conclusion

tl;dr Paxos is for getting a group of machines to agree on a value (or more usefully, provide a consistent ordering on a sequence of values). Multi-paxos is an optimization on using Paxos for consecutive rounds, where you can skip one of the phases if you assume a stable leader.

Hope this post was interesting and/or informative! Similar to the eventual consistency post, I don't claim to be an expert in this area, so I'd love to hear any comments or corrections.

October 29, 2011

Using Javascript Timeouts for Performance

Wait, what? Doesn't setting timeouts in Javascript delay a piece of code from being run for an X amount of time, so how could it possibly help with performance?

Imagine writing a Javascript app that is doing some large, computationally-expensive task like adding a five thousand elements to the DOM:

var list = document.getElementByTagName("ul")[0];
for (var i = 0; i < 5000; i++) {
  var newElement = document.createElement("li");
  list.append(newElement);
}

Since Javascript is single threaded, executing Javascript will suspend any user interface interaction and/or updates to the page - some browsers like Chrome will even try to kill the script if it's been running for too long (have you ever seen that sad face "Oops" page?).

With the use of timers, we can split up the code into different segments (ex. 10 loops doing 500 DOM additions each):

var curr = 0; var max = 4999;
var step = 500;
var list = document.getElementByTagName("ul")[0];

setTimeout(function() {
  for (var end = curr + step; curr < end; curr++) {
    var newElement = document.createElement("li");
    list.append(newElement);
  }
  if (curr < max) { setTimeout(arguments.callee, 0); }
}, 0);

Setting the timer to 0 will tell the browser to fire off the timers as quickly as possible (in most browsers, the minimum timer delay is around 10-15ms). However, since the code is broken up into smaller pieces, it will allow the browser to do things like render updates to the page and respond to user events - basically, being more responsive (this is good!).

October 25, 2011

Hiree

Disclaimer: These screenshots in no way represent my actual interest/interview status with companies. I have randomly selected companies to demo the smiley face moods, so the sad/happy faces do not reflect my actual sentiments towards those companies.

You know how companies have these online systems in place to keep track of all their job candidates?

In the last semester of my senior year, I felt so overwhelmed with all of the companies that I was interviewing with that I decided to create the reverse - an online system for me, the job candidate, to keep track of all the companies. Side note: I thought I was solving a greater need when a couple of my friends told me, "Amber, nobody else has these problems".

Basically, you can create buckets and companies to go into those buckets, annotating each company with your current "mood" in happy/sad faces (IMHO, this is the most awesome feature). By default, I have "Interested", "Waiting", "Interviewing", and "Received Offer" buckets but everything is renamable and editable so you can feel free to use this application for anything you need (e.g. graduate school application tracking).

I'm currently in the process of adding new features—the ability to add notes for each company (deadlines, locations, etc) and backend storage (the app currently uses local storage, don't clear your browser data) are at the top of my list, but feel free to contact me if you'd like to see anything else.

This project was also an awesome opportunity for me to teach myself Backbone.js, which I've been meaning to learn for a while. I will blog about my experiences sometime and/or open source the code!