Node.js, redis, and resque

written by paul on February 28th, 2010 @ 12:24 PM

Update (3/2/10): Updated code to work with version 0.1.30 of node.js

I’ve continued to play with node.js, and I’ve decided to do a follow up spike to my previous one: Web proxy in node.js for high availability

The previous spike used node to proxy requests directly to a web server. This spike uses node to put messages into a (redis) queue. Ruby background workers read from the queue, process the requests, and respond on a different queue. When node receives the response from the background worker, it sends the response back to the waiting user.

Just like my first spike, this type of architecture can be used for high availability web sites. Since all messages go into a queue and node holds the connections from the users, the site can be upgraded (including database migrations or infrastructure changes) as long as node and redis stay the same. Once the upgrade is finished, the workers can resume working from the queue. Users would see an extra long request, but as long as the upgrade was short (eg, less than a minute), the user should not know the site was down.

A queue has a lot of advantages over a straight proxy:
  1. Easy to scale up and down by adding or removing workers
  2. Can use priority queues to prioritize more important web requests
  3. Easy to monitor (eg, how many messages are in the queue, how fast are they being added)

Here is a very simple version of the code. First, the node webserver (using redis-node-client):


var sys = require('sys'),
   http = require('http'),
  redis = require("./redisclient");

var queuedRes = {}
var counter = 1;

http.createServer(function (req, res) {
  pushOnQueue(req, res);
}).listen(8000);

function pushOnQueue(req, res) {
  requestNumber = counter++;

  message = JSON.stringify({
    "class": "RequestProcessor",
    "args": [ {"node_id": requestNumber, "url": req.url} ]
  });

  client.rpush('resque:queue:requests', message);
  queuedRes[requestNumber] = res
}

var client = new redis.Client();
client.connect(function() {
  popFromQueue();
});

function popFromQueue() {
  client.lpop('responses', handleResponse);
}

function handleResponse(err, result) {
  if (result == null) {
    setTimeout(function() { popFromQueue(); }, 100);
  } else {
    json = JSON.parse(result);
    requestNumber = json.node_id;
    body = unescape(json.body);
    res = queuedRes[requestNumber];
    res.writeHeader(200, {'Content-Type': 'text/plain'});
    res.write(body);
    res.close();
    delete queuedRes[requestNumber];
    popFromQueue();
  }
}

sys.puts('Server running at http://127.0.0.1:8000/');

Also available as a gist.

pushOnQueue() is called on incoming web requests. This creates a JSON message and pushes it on the resque:queue:requests queue. It also puts the res object into a hash so it can be retrieved again on the way back.

At the same time, a queue listener is set up using redis.Client(). On connect, popFromQueue() is called. This method pops messages from the responses queue and calls handleResponse(). If the pop did not find a message, it is scheduled to call again in 100 milliseconds. If it did find a message, the message is parsed with JSON, the requestNumber is pulled out, and the original res object is pulled out of the queuedRes hash. The res object is then sent the body of the message from the queue, which makes it back to the user.

On the other side, I have a ruby worker using resque:


class RequestProcessor
  @queue = :requests

  APP = Rack::Builder.new do
    use Rails::Rack::Static
    use Rack::CommonLogger
    run ActionController::Dispatcher.new
  end

  RACK_BASE_REQUEST = {
    "PATH_INFO" => "/things",
    "QUERY_STRING" => "",
    "REQUEST_METHOD" => "GET",
    "SERVER_NAME" => "localhost",
    "SERVER_PORT" => "3000",
    "rack.errors" => STDERR,
    "rack.input" => StringIO.new(""),
    "rack.multiprocess" => true,
    "rack.multithread" => false,
    "rack.run_once" => false,
    "rack.url_scheme" => "http",
    "rack.version" => [1, 0],
  }

  def self.perform(hash)
    url = hash.delete("url")

    request = RACK_BASE_REQUEST.clone
    request["PATH_INFO"] = url
    response = APP.call(request)

    body = "" 
    response.last.each { |part| body << part }

    hash["body"] = URI.escape(body)
    cmd = "redis-cli rpush responses #{hash.to_json.inspect}" 
    system cmd
  end
end

Also available as a gist.

The worker can be started with:


env QUEUE=requests INTERVAL=1 rake environment resque:work

This worker uses resque, which polls the queue and calls perform when a message is received. The perform method builds the Rack request and runs the URL from the message through Rails. It then pushes the response body onto the responses queue using redis-cli.

As before, this spike only works with GET requests and does not pass any headers through to keep the code simple. Comments and forks are welcome.

Comments

  • Peter Mescalchin on 28 Feb 16:49

    Paul, such a brilliant idea - will be following your progress with this one for sure. Being able to add new workers live without downtime is awesome. Such a simple concept when you think about it - but I guess all good ideas are :)
  • TJ Holowaychuk on 28 Feb 20:25

    requestNumber is a global :p
  • Paul Gross on 28 Feb 22:53

    TJ, I needed some way to share state between the incoming web request and the handleResponse() function. For simplicity, I decided to use a global hash and counter, but there are definitely other solutions.
  • Chris Wanstrath on 02 Mar 19:23

    Freaking sweet!
  • Rdb on 03 Mar 05:08

    I have done mainly java dev: is that counter++ update safe? In Java that is not an atomic action so more than one request could conceivably share the same request id. Is this possible in JavaScript? Is there no chance of interleaving requests to that operation? I am asking from the point of view of total ignorance: I don't know what node does with your server code and I don't know how the VM works so sorry if this is just a dumb question. Cheers
  • Paul Gross on 03 Mar 11:02

    Rdb, node.js does not use threads. It uses an event loop, so only one thing is running at a time. You do not have to worry about multiple requests running at the same time.
  • Orlin Bozhinov on 05 Mar 10:29

    This is great! Exactly the kind of solution I'm looking for. The only thing I'm not sure about is how realtime (immediate processing of the requests) this is... What is INTERVAL=1? The worker is polling every 1 second? Resque is nice and I prefer it to RabbitMQ. Compare with http://github.com/somic/rabbitbal (a good README as well) -- I'm thinking persistent TCP connection (with AMQP) vs Resque workers polling... On the other hand, I could be processing with daemon-kit. Please help me make up my mind.
  • Paul Gross on 05 Mar 16:58

    Orlin, INTERVAL is the amount of time in seconds the worker sleeps if it does not find a message in the queue. If there are message in the queue, there is no sleeping, so it will process them quickly. Also, if you have many workers polling, the amount of time a message will wait will be very low. You could always start with a solution like mine above, and if the latency is too high, move to a worker that blocks and see if that helps. You can do blocking reads with redis. See http://code.google.com/p/redis/wiki/BlpopCommand. Rabbitbal looks far more polished than my spike, however.
  • khadijah on 03 Apr 10:39

    congratulations, good luck with the article that you always bring
  • Avatar Oyunları on 22 Jun 12:26

    the article is very good Paul, congratulations
  • David Evans on 28 Jul 10:29

    Just a minor point, but isn't the function created in the following line redundant? setTimeout(function() { popFromQueue(); }, 100); Wouldn't it be more efficient to just do: setTimeout(popFromQueue, 100);
  • Michael Bridgen on 28 Jul 17:16

    Paul, handleResponse and popFromQueue appear to be mutually recursive, and don't bottom out unless the pop is on a timer. Won't this eat up the stack?
  • Paul Gross on 28 Jul 22:43

    David, you are correct. Thanks.
  • Paul Gross on 28 Jul 22:46

    Michael, the setTimeout prevents the recursion from continuing by registering a new event.
  • Michael Bridgen on 31 Jul 17:23

    Yes, as I noted. What are the semantics of lpop? You only set the timer (and thereby unwind the stack) if it returns nothing.

Post a comment

Options:

Size

Colors