everburning

home quotes

Riding the Rabbit

13 Dec 2008

img_5523We've started using RabbitMQ at work for some of our messaging needs. We've been using the Ruby AMQP library for all of our communication with Rabbit so far. So, I thought I'd write up a quick article on how to use the Ruby AMQP library.

I'll be showing some code from a fork of the Ruby AMQP code that adds a bit of reconnect logic. I'll call out this difference when we get there.

The Ruby AMQP bindings are written using the EventMachine framework so we'll be working under that for this article.

We're going to look at a couple simple examples, adding to an exchange and draining messages off an exchange.

img_5677First, a quick note on AMQP terminology, this isn't complete but should cover us for our purpose. When you publish a message to an AMQP server (I'm just going to use Rabbit from now on) it gets published onto an exchange. A client who wishes to listen to the exchange attaches a queue to the exchange. Queues and exchanges can be set as durable which means they'll survive a restart of Rabbit. Messages can be marked as persistent which means they'll sit on the queues attached to the exchange until the consumer picks them up. If a message isn't persistent it will just disappear from the queue if there is no client listening.

The first step is setting up our Rabbit instance. I'm not going to explain this stuff, the Rabbit documentation can do a better job then I can.

[dj2@titania ~]# rabbitmqctl add_vhost /my_vhost
Creating vhost "/my_vhost" ...done.

[dj2@titania ~]# rabbitmqctl add_user dj2 secret_password
Creating user "dj2" ...done.
[dj2@titania ~]# rabbitmqctl add_user reader some_pass
Creating user "reader" ...done.

[dj2@titania ~]# rabbitmqctl map_user_vhost dj2 /my_vhost
Mapping user "dj2" to vhost "/my_vhost" ...done.
[dj2@titania ~]# rabbitmqctl map_user_vhost reader /my_vhost
Mapping user "reader" to vhost "/my_vhost" ...done.

[dj2@titania ~]# rabbitmqctl set_permissions dj2 /my_vhost /data all
Setting permissions for user "dj2", vhost "/my_vhost", realm "/data" ...done.
[dj2@titania ~]# rabbitmqctl set_permissions reader /my_vhost /data all
Setting permissions for user "reader", vhost "/my_vhost", realm "/data" ...done.

With that out of the way, lets move to something a bit more interesting.

#!/usr/local/bin/ruby

require 'rubygems'
require 'mq'

EM.run do
  AMQP.connect(:user => 'dj2', :pass => 'secret_password',
               :host => 'localhost', :vhost => '/my_vhost') do |conn|
    @connection = conn
    channel = MQ.new(@connection)
    @xchange = channel.fanout('my_exchange', :durable => true)
  end

  EM.add_timer(1) do
    @xchange.publish("data to publish")
    EM.add_timer(1) { EM.stop_event_loop }
  end
end

As I mentioned, everything runs under EventMachine, so our main code is wrapped inside an EM#run block. The first thing we're doing is connecting to Rabbit. This is done with AMQP#connect. In the official Ruby AMQP code you don't provide a block to AMQP#connect it returns the connection object. If you're using the reconnect fork you pass a block to AMQP#connect that will be executed when we reconnect to the server (it also returns the connection object if you want).

Once we've got the connection we need to create the channel that we're going to work with. This is done with MQ#new. With the channel in hand we can create our exchange. In this case we're creating a fanout exchange. The exchange will be named my_exchange and will be durable, so the exchange will survive a Rabbit restart.

I'm setting up an EventMachine timer to publish my message. This gives everything a few seconds to startup and connect to Rabbit. We then use the publish method to send the message to the exchange.

We kill off EventMachine a second later so the application will exit.

#!/usr/local/bin/ruby

require 'rubygems'
require 'mq'

EM.run do
  trap("INT") { EM.stop }
  trap("TERM") { EM.stop }

  AMQP.connect(:user => 'reader', :pass => 'some_pass',
               :host => 'localhost', :vhost => '/my_vhost') do |conn|
    @connection = conn
    channel = MQ.new(@connection)

    xchange = channel.fanout('my_exchange', :durable => false)
    q = MQ::Queue.new(channel, 'my_queue', :durable => false)
    q.bind(xchange)

    q.subscribe do |header, msg|
      puts "GOT #{msg}"
    end
  end
end

You'll notice the similarities between this code and the last block of code. Once we've created our exchange we create a queue on the exchange. This is done using MQ::Queue#new We give the queue a unique name and set it to a non-durable. We then bind the queue to the exchange. With the queue bound we use the subscribe method to listen on the queue. When messages arrive the code block will be executed with the header and message provided.

That's it. It's all pretty simple. Well, at least in these examples it is. This is just the tip of the AMQP iceberg but hopefully you've got enough information to start digging in.

Have fun.

Tags — AMQP, Computers, EventMachine, Programming, RabbitMQ, and Ruby