Node.js, redis, and resque

Update (3/2/10): Updated code to work with version 0.1.30 of node.js

I’ve continued to play with node.js, and I’ve decided to do a follow up spike to my previous one: Web proxy in node.js for high availability

The previous spike used node to proxy requests directly to a web server. This spike uses node to put messages into a (redis) queue. Ruby background workers read from the queue, process the requests, and respond on a different queue. When node receives the response from the background worker, it sends the response back to the waiting user.

Just like my first spike, this type of architecture can be used for high availability web sites. Since all messages go into a queue and node holds the connections from the users, the site can be upgraded (including database migrations or infrastructure changes) as long as node and redis stay the same. Once the upgrade is finished, the workers can resume working from the queue. Users would see an extra long request, but as long as the upgrade was short (eg, less than a minute), the user should not know the site was down.

A queue has a lot of advantages over a straight proxy:

  1. Easy to scale up and down by adding or removing workers
  2. Can use priority queues to prioritize more important web requests
  3. Easy to monitor (eg, how many messages are in the queue, how fast are they being added)

Here is a very simple version of the code. First, the node webserver (using redis-node-client):

var sys = require('sys'),
   http = require('http'),
  redis = require("./redisclient");

var queuedRes = {}
var counter = 1;

http.createServer(function (req, res) {
  pushOnQueue(req, res);
}).listen(8000);

function pushOnQueue(req, res) {
  requestNumber = counter++;

  message = JSON.stringify({
    "class": "RequestProcessor",
    "args": [ {"node_id": requestNumber, "url": req.url} ]
  });

  client.rpush('resque:queue:requests', message);
  queuedRes[requestNumber] = res
}

var client = new redis.Client();
client.connect(function() {
  popFromQueue();
});

function popFromQueue() {
  client.lpop('responses', handleResponse);
}

function handleResponse(err, result) {
  if (result == null) {
    setTimeout(function() { popFromQueue(); }, 100);
  } else {
    json = JSON.parse(result);
    requestNumber = json.node_id;
    body = unescape(json.body);
    res = queuedRes[requestNumber];
    res.writeHeader(200, {'Content-Type': 'text/plain'});
    res.write(body);
    res.close();
    delete queuedRes[requestNumber];
    popFromQueue();
  }
}

sys.puts('Server running at http://127.0.0.1:8000/');

Also available as a gist.

pushOnQueue() is called on incoming web requests. This creates a JSON message and pushes it on the resque:queue:requests queue. It also puts the res object into a hash so it can be retrieved again on the way back.

At the same time, a queue listener is set up using redis.Client(). On\ connect, popFromQueue() is called. This method pops messages from the\ responses queue and calls handleResponse(). If the pop did not find a\ message, it is scheduled to call again in 100 milliseconds. If it did find a\ message, the message is parsed with JSON, the requestNumber is pulled out, and\ the original res object is pulled out of the queuedRes hash. The res object is then\ sent the body of the message from the queue, which makes it back to the user.

On the other side, I have a ruby worker using\ resque:

class RequestProcessor
  @queue = :requests

  APP = Rack::Builder.new do
    use Rails::Rack::Static
    use Rack::CommonLogger
    run ActionController::Dispatcher.new
  end

  RACK_BASE_REQUEST = {
    "PATH_INFO" => "/things",
    "QUERY_STRING" => "",
    "REQUEST_METHOD" => "GET",
    "SERVER_NAME" => "localhost",
    "SERVER_PORT" => "3000",
    "rack.errors" => STDERR,
    "rack.input" => StringIO.new(""),
    "rack.multiprocess" => true,
    "rack.multithread" => false,
    "rack.run_once" => false,
    "rack.url_scheme" => "http",
    "rack.version" => [1, 0],
  }

  def self.perform(hash)
    url = hash.delete("url")

    request = RACK_BASE_REQUEST.clone
    request["PATH_INFO"] = url
    response = APP.call(request)

    body = ""
    response.last.each { |part| body << part }

    hash["body"] = URI.escape(body)
    cmd = "redis-cli rpush responses #{hash.to_json.inspect}"
    system cmd
  end
end

Also available as a gist.

The worker can be started with:

env QUEUE=requests INTERVAL=1 rake environment resque:work

This worker uses resque, which polls the queue and calls perform when a\ message is received. The perform method builds the Rack request and runs the URL from\ the message through Rails. It then pushes the response body onto the\ responses queue using redis-cli.

As before, this spike only works with GET requests and does not pass any headers through to keep the code simple. Comments and forks are welcome.