Node.js, redis, and resque

Update (3/2/10): Updated code to work with version 0.1.30 of node.js

I’ve continued to play with node.js, and I’ve
decided to do a follow up spike to my previous one: Web proxy in
node.js for high
availability

The previous spike used node to proxy requests directly to a web server.
This spike uses node to put messages into a
(redis) queue. Ruby background workers read from the queue, process the requests, and respond on a different
queue. When node receives the response from the background worker, it
sends the response back to the waiting user.

Just like my first spike, this type of architecture can be used for high
availability web sites. Since all messages go into a queue and node
holds the connections from the users, the site can be upgraded
(including database migrations or infrastructure changes) as long as node and redis stay the same. Once the upgrade is finished, the workers
can resume working from the queue. Users would see an extra long
request, but as long as the upgrade was short (eg, less than a minute),
the user should not know the site was down.

A queue has a lot of advantages over a straight proxy:

  1. Easy to scale up and down by adding or removing workers
  2. Can use priority queues to prioritize more important web requests
  3. Easy to monitor (eg, how many messages are in the queue, how fast
    are they being added)

Here is a very simple version of the code. First, the node webserver
(using redis-node-client):

var sys = require('sys'),  
   http = require('http'),
  redis = require("./redisclient");

var queuedRes = {}  
var counter = 1;

http.createServer(function (req, res) {  
  pushOnQueue(req, res);
}).listen(8000);

function pushOnQueue(req, res) {  
  requestNumber = counter++;

  message = JSON.stringify({
    "class": "RequestProcessor",
    "args": [ {"node_id": requestNumber, "url": req.url} ]
  });

  client.rpush('resque:queue:requests', message);
  queuedRes[requestNumber] = res
}

var client = new redis.Client();  
client.connect(function() {  
  popFromQueue();
});

function popFromQueue() {  
  client.lpop('responses', handleResponse);
}

function handleResponse(err, result) {  
  if (result == null) {
    setTimeout(function() { popFromQueue(); }, 100);
  } else {
    json = JSON.parse(result);
    requestNumber = json.node_id;
    body = unescape(json.body);
    res = queuedRes[requestNumber];
    res.writeHeader(200, {'Content-Type': 'text/plain'});
    res.write(body);
    res.close();
    delete queuedRes[requestNumber];
    popFromQueue();
  }
}

sys.puts('Server running at http://127.0.0.1:8000/');  

Also available as a gist.

pushOnQueue() is called on incoming web requests. This creates a class="caps">JSON message and pushes it on the
resque:queue:requests queue. It also puts the res object into a hash so
it can be retrieved again on the way back.

At the same time, a queue listener is set up using redis.Client(). On\
connect, popFromQueue() is called. This method pops messages from the\
responses queue and calls handleResponse(). If the pop did not find a\
message, it is scheduled to call again in 100 milliseconds. If it did
find a\
message, the message is parsed with JSON, the
requestNumber is pulled out, and\
the original res object is pulled out of the queuedRes hash. The res
object is then\
sent the body of the message from the queue, which makes it back to the
user.

On the other side, I have a ruby worker using\
resque:

class RequestProcessor  
  @queue = :requests

  APP = Rack::Builder.new do
    use Rails::Rack::Static
    use Rack::CommonLogger
    run ActionController::Dispatcher.new
  end

  RACK_BASE_REQUEST = {
    "PATH_INFO" => "/things",
    "QUERY_STRING" => "",
    "REQUEST_METHOD" => "GET",
    "SERVER_NAME" => "localhost",
    "SERVER_PORT" => "3000",
    "rack.errors" => STDERR,
    "rack.input" => StringIO.new(""),
    "rack.multiprocess" => true,
    "rack.multithread" => false,
    "rack.run_once" => false,
    "rack.url_scheme" => "http",
    "rack.version" => [1, 0],
  }

  def self.perform(hash)
    url = hash.delete("url")

    request = RACK_BASE_REQUEST.clone
    request["PATH_INFO"] = url
    response = APP.call(request)

    body = ""
    response.last.each { |part| body << part }

    hash["body"] = URI.escape(body)
    cmd = "redis-cli rpush responses #{hash.to_json.inspect}"
    system cmd
  end
end  

Also available as a gist.

The worker can be started with:

env QUEUE=requests INTERVAL=1 rake environment resque:work  

This worker uses resque, which polls the queue and calls perform when a\
message is received. The perform method builds the Rack request and runs
the URL from\
the message through Rails. It then pushes the response body onto the\
responses queue using redis-cli.

As before, this spike only works with GET
requests and does not pass any headers through to keep the code simple.
Comments and forks are welcome.