Dec 13 2008
Riding the Rabbit
We’ve started using RabbitMQ at work for some of our messaging needs. We’ve been using the Ruby AMQP library for all of our communication with Rabbit so far. So, I thought I’d write up a quick article on how to use the Ruby AMQP library.
I’ll be showing some code from a fork of the Ruby AMQP code that adds a bit of reconnect logic. I’ll call out this difference when we get there.
The Ruby AMQP bindings are written using the EventMachine framework so we’ll be working under that for this article.
We’re going to look at a couple simple examples, adding to an exchange and draining messages off an exchange.
First, a quick note on AMQP terminology, this isn’t complete but should cover us for our purpose. When you publish a message to an AMQP server (I’m just going to use Rabbit from now on) it gets published onto an exchange. A client who wishes to listen to the exchange attaches a queue to the exchange. Queues and exchanges can be set as durable which means they’ll survive a restart of Rabbit. Messages can be marked as persistent which means they’ll sit on the queues attached to the exchange until the consumer picks them up. If a message isn’t persistent it will just disappear from the queue if there is no client listening.
The first step is setting up our Rabbit instance. I’m not going to explain this stuff, the Rabbit documentation can do a better job then I can.
[dj2@titania ~]# rabbitmqctl add_vhost /my_vhost
Creating vhost "/my_vhost" ...done.
[dj2@titania ~]# rabbitmqctl add_user dj2 secret_password
Creating user "dj2" ...done.
[dj2@titania ~]# rabbitmqctl add_user reader some_pass
Creating user "reader" ...done.
[dj2@titania ~]# rabbitmqctl map_user_vhost dj2 /my_vhost
Mapping user "dj2" to vhost "/my_vhost" ...done.
[dj2@titania ~]# rabbitmqctl map_user_vhost reader /my_vhost
Mapping user "reader" to vhost "/my_vhost" ...done.
[dj2@titania ~]# rabbitmqctl set_permissions dj2 /my_vhost /data all
Setting permissions for user "dj2", vhost "/my_vhost", realm "/data" ...done.
[dj2@titania ~]# rabbitmqctl set_permissions reader /my_vhost /data all
Setting permissions for user "reader", vhost "/my_vhost", realm "/data" ...done.
With that out of the way, lets move to something a bit more interesting.
#!/usr/local/bin/ruby
require 'rubygems'
require 'mq'
EM.run do
AMQP.connect(:user => 'dj2', :pass => 'secret_password',
:host => 'localhost', :vhost => '/my_vhost') do |conn|
@connection = conn
channel = MQ.new(@connection)
@xchange = channel.fanout('my_exchange', :durable => true)
end
EM.add_timer(1) do
@xchange.publish("data to publish")
EM.add_timer(1) { EM.stop_event_loop }
end
end
As I mentioned, everything runs under EventMachine, so our main code is wrapped inside an EM#run block. The first thing we’re doing is connecting to Rabbit. This is done with AMQP#connect. In the official Ruby AMQP code you don’t provide a block to AMQP#connect it returns the connection object. If you’re using the reconnect fork you pass a block to AMQP#connect that will be executed when we reconnect to the server (it also returns the connection object if you want).
Once we’ve got the connection we need to create the channel that we’re going to work with. This is done with MQ#new. With the channel in hand we can create our exchange. In this case we’re creating a fanout exchange. The exchange will be named my_exchange and will be durable, so the exchange will survive a Rabbit restart.
I’m setting up an EventMachine timer to publish my message. This gives everything a few seconds to startup and connect to Rabbit. We then use the publish method to send the message to the exchange.
We kill off EventMachine a second later so the application will exit.
#!/usr/local/bin/ruby
require 'rubygems'
require 'mq'
EM.run do
trap("INT") { EM.stop }
trap("TERM") { EM.stop }
AMQP.connect(:user => 'reader', :pass => 'some_pass',
:host => 'localhost', :vhost => '/my_vhost') do |conn|
@connection = conn
channel = MQ.new(@connection)
xchange = channel.fanout('my_exchange', :durable => false)
q = MQ::Queue.new(channel, 'my_queue', :durable => false)
q.bind(xchange)
q.subscribe do |header, msg|
puts "GOT #{msg}"
end
end
end
You’ll notice the similarities between this code and the last block of code. Once we’ve created our exchange we create a queue on the exchange. This is done using MQ::Queue#new We give the queue a unique name and set it to a non-durable. We then bind the queue to the exchange. With the queue bound we use the subscribe method to listen on the queue. When messages arrive the code block will be executed with the header and message provided.
That’s it. It’s all pretty simple. Well, at least in these examples it is. This is just the tip of the AMQP iceberg but hopefully you’ve got enough information to start digging in.
Have fun.
December 15th, 2008 at 10:37
Very cool! I have forwarded this to Aman Gupta who wrote this excellent Ruby client, and will tell folks on the RabbitMQ mailing list about your article. I have one question though … Did you mean “Riding the Rabbit” or “Ridding the Rabbit”? ;-)
Cheers, alexis
December 15th, 2008 at 11:16
Sigh, I always have to get something wrong, heh. Riding, it’s fixed now. Thanks.
December 16th, 2008 at 12:19
thanks!
December 16th, 2008 at 12:34
Nice article.
We also are working on integrating RabbitMQ into our application. I have a fork of the amqp library on github (http://github.com/dougbarth/amqp) that integrates our changes (client controlled ack) as well as your changes.
Also, I started another related project called amqp-utils (http://github.com/dougbarth/amqp-utils). It is a collection of CLI programs for interacting with the AMQP backed queue. I use it for things like deleting queues, peeking into their contents, and checking status.
December 16th, 2008 at 13:52
Doug, I’ve been looking for something like your amqp-utils. I’ll definitely check it out, thanks.