Routing - Filter Based (Topic)

For filter based routing, a producer declares the topic exchange when publishing a message. Messages sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. Filter based routing provides a method to use filter policies on routing key for choosing the recipients of messages.
* (star) can substitute for exactly one word.
example: 'topic.*' can be : topic1, topic2, topic3 etc.
# (hash) can substitute for zero or more words.
example: "#.topic" can be: topic, Ftopic, Secondtopic, 123topic etc.

Diagram of Routing - Filter based messaging

Browse the chapter of AMQP Introduction first if you're new to AMQP.


Python

Prerequisites

Python client AMQP library

The Python library we use for this example can be found at https://github.com/pika/pika.

You can install it through sudo pip install pika.

Finally, import this library in your program.

import pika

The full documentation of this library is at https://pika.readthedocs.org/en/0.9.14/.

pika library is not thread safe. Do not use a connection or channel across threads.

Producer

The first thing we need to do is to establish a connection with RoboMQ broker.
Set heartbeat to 60 seconds, so that client will confirm the connectivity with broker.

credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials, heartbeat_interval = 60))
channel = connection.channel()

Then producer can publish messages to a topic exchange where messages will be delivered to queues whose routing key matches. The essential difference between normal routing key and topic is that consumer can subscribe a topic with wild cards inside.
Delivery mode = 1 means it's a non-persistent message.

properties = pika.spec.BasicProperties(content_type = "text/plain", delivery_mode = 1)
channel.basic_publish(exchange = exchangeName, routing_key = routingKey, body = "Hello World!", properties = properties)

At last, producer will disconnect with the RoboMQ broker.

connection.close()

Consumer

The same as producer, consumer needs to first connect to RoboMQ broker.

Then consumer will declare a direct exchange, a queue, and bind the queue to the exchange with a routing key (topic). The routing key can contain wildcards to receive messages sent through different routing keys.
Auto-delete means after all consumers have finished consuming it, the exchange or queue will be deleted by broker.
Exclusive means no other consumer can consume the queue when this one is consuming it.

channel.exchange_declare(exchange = exchangeName, exchange_type = "topic", auto_delete = True)
channel.queue_declare(queue = queueName, exclusive = True, auto_delete = True)
channel.queue_bind(exchange = exchangeName, queue = queueName, routing_key = routingKey)

Finally, consumer can consume messages from the queue.
The no_ack parameter indicates if consumer needs to explicitly send acknowledgment back to broker when it has received the message. In this example, no_ack equals to true, so producer does not explicitly acknowledge received messages.
The start_consuming() function will be blocking the process until stop_consuming() is invoked or exception happens.

channel.basic_consume(consumer_callback = onMessage, queue = queueName, no_ack = True)
channel.start_consuming()

When messages are received, a callback function onMessage() will be invoked to print the message content.

def onMessage(channel, method, properties, body):
    print body

Putting it all together

producer.py

import pika

server = "hostname"
port = 5672
vhost = "yourvhost"
username = "username"
password = "password"
exchangeName = "testEx"
routingKey = "test.any"

try:
    #connect
    credentials = pika.PlainCredentials(username, password)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials, heartbeat_interval = 60))
    channel = connection.channel()

    #send message
    properties = pika.spec.BasicProperties(content_type = "text/plain", delivery_mode = 1)
    channel.basic_publish(exchange = exchangeName, routing_key = routingKey, body = "Hello World!", properties = properties)

    #disconnect
    connection.close()
except Exception, e:
    print e

consumer.py

import pika
import time

server = "hostname"
port = 5672
vhost = "yourvhost"
username = "username"
password = "password"
exchangeName = "testEx"
queueName = "testQ1"
routingKey = "test.#" #topic with wildcard

#callback funtion on receiving messages
def onMessage(channel, method, properties, body):
    print body

while True:
    try:
        #connect
        credentials = pika.PlainCredentials(username, password)
        connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials, heartbeat_interval = 60))
        channel = connection.channel()

        #declare exchange and queue, bind them and consume messages
        channel.exchange_declare(exchange = exchangeName, exchange_type = "topic", auto_delete = True)
        channel.queue_declare(queue = queueName, exclusive = True, auto_delete = True)
        channel.queue_bind(exchange = exchangeName, queue = queueName, routing_key = routingKey)
        channel.basic_consume(consumer_callback = onMessage, queue = queueName, no_ack = True)
        channel.start_consuming()
    except Exception, e:
        #reconnect on exception
        print "Exception handled, reconnecting...\nDetail:\n%s" % e
        try:
            connection.close()
        except:
            pass
        time.sleep(5)

Node.js

Prerequisites

Node.js client AMQP library

The Node.js library we use for this example can be found at https://github.com/squaremo/amqp.node.

You can install the library through sudo npm install amqplib.

Finally, require this library in your program.

var amqp = require("amqplib");

The full documentation of this library is at https://www.squaremobius.net/amqp.node/doc/channel_api.html

Producer

The first thing we need to do is to establish a connection with RoboMQ broker.
Set heartbeat to 60 seconds, so that client will confirm the connectivity with broker.
As shown in the code, this library provides chainable callback API in the form of .then(callback).

For the default vhost "/", you will need to insert "%2f" (its hexadecimal ASCII code) to the AMQP URI, instead of "/" itself.

producer = amqp.connect("amqp://" + username + ":" + password + "@" + server + ":" + port + "/" + vhost + "?heartbeat=60");
producer.then(function(conn) {
    return conn.createConfirmChannel().then(successCallback);
}).then(null, failureCallback);

Then producer can publish messages to a topic exchange where messages will be delivered to queues whose routing key matches. The essential difference between normal routing key and topic is that consumer can subscribe a topic with wild cards inside.
Delivery mode = 1 means it's a non-persistent message.

ch.publish(exchangeName, routingKey, content = new Buffer("Hello World!"), options = {contentType: "text/plain", deliveryMode: 1},  callback);

At last, producer will disconnect with the RoboMQ broker.

conn.close();

Consumer

The same as producer, consumer needs to first connect to RoboMQ broker.
The difference is that consumer uses conn.createChannel() function, while producer uses conn.createConfirmChannel() because the latter one is only useful for publish confirm.

Then consumer will declare a direct exchange, a queue, and bind the queue to the exchange with a routing key (topic). The routing key can contain wildcards to receive messages sent through different routing keys.
Durable means the exchange or queue will survive possible broker failover. It's false in this example.
Auto-delete means after all consumers have finished consuming it, the exchange or queue will be deleted by broker.
Exclusive means no other consumer can consume the queue when this one is consuming it.

ch.assertExchange(exchangeName, "topic", {durable: false, autoDelete: true});
ch.assertQueue(queueName, {durable: false, autoDelete: true, exclusive: true});
ch.bindQueue(queueName, exchangeName, routingKey);

Finally, consumer can consume messages from the queue.
The noAck option indicates if consumer needs to explicitly send acknowledgment back to broker when it has received the message. In this example, noAck is true, so producer does not explicitly acknowledge received messages.
The second parameter of consume() function is the callback on receiving messages. In this example, when messages are received, the callback function will be invoked to print the message content.

ch.consume(queueName, function(message) {
    console.log(message.content.toString());
}, {noAck: true});  

Putting it all together

producer.js

var amqp = require("amqplib");

var server = "hostname";
var port = "5672";
var vhost = "yourvhost"; //for "/" vhost, use "%2f" instead
var username = "username";
var password = "password";
var exchangeName = "testEx";
var routingKey = "test.any";

producer = amqp.connect("amqp://" + username + ":" + password + "@" + server + ":" + port + "/" + vhost + "?heartbeat=60");
producer.then(function(conn) {
    return conn.createConfirmChannel().then(function(ch) {
        ch.publish(exchangeName, routingKey, content = new Buffer("Hello World!"), options = {contentType: "text/plain", deliveryMode: 1}, function(err, ok) {
            if (err != null) {
                console.error("Error: failed to send message\n" + err);
            }
            conn.close();
        });
    });
}).then(null, function(err) {
    console.error(err);
});

consumer.js

var amqp = require("amqplib");
var domain = require("domain");

var server = "hostname";
var port = "5672";
var vhost = "yourvhost"; //for "/" vhost, use "%2f" instead
var username = "username";
var password = "password";
var exchangeName = "testEx";
var queueName = "testQ1";
var routingKey = "test.#"; //topic with wildcard

//use domain module to handle reconnecting
var consumer = null;
var dom = domain.create();
dom.on("error", relisten);
dom.run(listen);

function listen() {
    consumer = amqp.connect("amqp://" + username + ":" + password + "@" + server + ":" + port + "/" + vhost + "?heartbeat=60");
    consumer.then(function(conn) {
        return conn.createChannel().then(function(ch) {
            ch.assertExchange(exchangeName, "topic", {durable: false, autoDelete: true});
            ch.assertQueue(queueName, {durable: false, autoDelete: true, exclusive: true});
            ch.bindQueue(queueName, exchangeName, routingKey);
            ch.consume(queueName, function(message) {
                //callback funtion on receiving messages
                console.log(message.content.toString());
            }, {noAck: true});
        });
    }).then(null, function(err) {
        console.error("Exception handled, reconnecting...\nDetail:\n" + err);
        setTimeout(listen, 5000);
    });
}

function relisten() {
    consumer.then(function(conn) {
        conn.close();
    }); 
    setTimeout(listen, 5000);
}

PHP

Prerequisite

PHP client AMQP library

The PHP library we use for this example can be found at https://github.com/videlalvaro/php-amqplib.

It uses composer to install in a few steps.

  1. Add a composer.json file to your project:
{
    "require": {
        "videlalvaro/php-amqplib": "2.2.*"
    }
}
  1. Download the latest composer in the same path:
curl -sS https://getcomposer.org/installer | php
  1. Install the library through composer:
./composer.phar install

Finally, require this library in your program and use the classes.

    require_once __DIR__ . '/../vendor/autoload.php'; //directory of library folder
    use PhpAmqpLib\Connection\AMQPConnection;
    use PhpAmqpLib\Message\AMQPMessage;

Producer

The first thing we need to do is to establish a connection with RoboMQ broker.
Set heartbeat to 60 seconds, so that client will confirm the connectivity with broker.

$connection = new AMQPConnection($server, $port, $username, $password, $vhost, $heartbeat = 60);
$channel =  $connection->channel();

Then producer can publish messages to a topic exchange where messages will be delivered to queues whose routing key matches. The essential difference between normal routing key and topic is that consumer can subscribe a topic with wild cards inside.
Delivery mode = 1 means it's a non-persistent message.

$message = new AMQPMessage("Hello World!", array("content_type" => "text/plain", "delivery_mode" => 1));
$channel->basic_publish($message, $exchangeName, $routingKey);

At last, producer will disconnect with the RoboMQ broker.

$connection->close();

Consumer

The same as producer, consumer needs to first connect to RoboMQ broker.

Then consumer will declare a direct exchange, a queue, and bind the queue to the exchange with a routing key (topic). The routing key can contain wildcards to receive messages sent through different routing keys.
Auto-delete means after all consumers have finished consuming it, the exchange or queue will be deleted by broker.
Exclusive means no other consumer can consume the queue when this one is consuming it.

$channel->exchange_declare($exchangeName, $type = "topic", false, false, $auto_delete = true);
$channel->queue_declare($queueName, false, false, $exclusive = true, $auto_delete = true);
$channel->queue_bind($queueName, $exchangeName, $routingKey);

Finally, consumer can consume messages from the queue.
The no_ack parameter indicates if consumer needs to explicitly send acknowledgment back to broker when it has received the message. In this example, no_ack equals to true, so producer does not explicitly acknowledge received messages.
The while loop will be blocking the process and listening for messages until exception happens.

$channel->basic_consume($queueName, "", false, $no_ack = true, false, false, $callback = $onMessage);

while(count($channel->callbacks)) {
    $channel->wait();
}

When messages are received, a callback function will be invoked to print the message content.

$onMessage = function ($message) {
    echo $message->body.PHP_EOL;
};

Putting it together

producer.php

<?php
require_once __DIR__ . '/../vendor/autoload.php'; //directory of library folder
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$server = "hostname";
$port = 5672;
$vhost = "yourvhost";
$username = "username";
$password = "password";
$exchangeName = "testEx";
$routingKey = "test.any";

try {
    //connect
    $connection = new AMQPConnection($server, $port, $username, $password, $vhost, $heartbeat = 60);
    $channel =  $connection->channel(); 

    //send message
    $message = new AMQPMessage("Hello World!", array("content_type" => "text/plain", "delivery_mode" => 1));
    $channel->basic_publish($message, $exchangeName, $routingKey);

    //disconnect
    $connection->close();
} catch(Exception $e) {
    echo $e.PHP_EOL;
}
?>

consumer.php

<?php
require_once __DIR__."/../vendor/autoload.php"; //directory of library folder
use PhpAmqpLib\Connection\AMQPConnection;

$server = "hostname";
$port = 5672;
$vhost = "yourvhost";
$username = "username";
$password = "password";
$exchangeName = "testEx";
$queueName = "testQ1";
$routingKey = "test.#"; //topic with wildcard

//callback funtion on receiving messages
$onMessage = function ($message) {
    echo $message->body.PHP_EOL;
};

while (true) {
    try {
        //connect
        $connection = new AMQPConnection($server, $port, $username, $password, $vhost, $heartbeat = 60);
        $channel = $connection->channel();

        //declare exchange and queue, bind them and consume messages
        $channel->exchange_declare($exchangeName, $type = "topic", false, false, $auto_delete = true);
        $channel->queue_declare($queueName, false, false, $exclusive = true, $auto_delete = true);
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
        $channel->basic_consume($queueName, "", false, $no_ack = true, false, false, $callback = $onMessage);

        //start consuming
        while(count($channel->callbacks)) {
            $channel->wait();
        }
    } catch(Exception $e) {
        //reconnect on exception
        echo "Exception handled, reconnecting...\nDetail:\n".$e.PHP_EOL;
        if ($connection != null) {
            try {
                $connection->close();
            } catch (Exception $e1) {}
        }
        sleep(5);
    }
}
?>

Ruby

Prerequisites

Ruby client AMQP library

The Ruby library we use for this example can be found at http://rubybunny.info/.

With Ruby version >= 2.0, you can install it through sudo gem install bunny.

Finally, import this library in your program.

require "bunny"

The full documentation of this library is at http://rubybunny.info/articles/guides.html.

We recommend combining the documentation with the source code of this library when you use it because some of the documentation out there is not being updated timely from our observation.

Producer

The first thing we need to do is to establish a connection with RoboMQ broker.
Set heartbeat to 60 seconds, so that client will confirm the connectivity with broker.
Although the library provides a connection property named recover_from_connection_close, we discourage you to use it. The reason will be explained in the Consumer section.

connection = Bunny.new(:host => server, :port => port, :vhost => vhost, :user => username, :pass => password, :heartbeat => 60, :recover_from_connection_close => false)
connection.start
channel = connection.create_channel

Then producer can publish messages to a topic exchange where messages will be delivered to queues whose routing key matches. The essential difference between normal routing key and topic is that consumer can subscribe a topic with wild cards inside.
Delivery mode = 1 means it's a non-persistent message.

exchange = channel.topic(exchangeName, :auto_delete => true)
exchange.publish("Hello World!", :routing_key => routingKey, :content_type => "text/plain", :delivery_mode => 1)

At last, producer will disconnect with the RoboMQ broker.

connection.close

Consumer

The same as producer, consumer needs to first connect to RoboMQ broker.

Then consumer will declare a direct exchange, a queue, and bind the queue to the exchange with a routing key (topic). The routing key can contain wildcards to receive messages sent through different routing keys.
Auto-delete means after all consumers have finished consuming it, the exchange or queue will be deleted by broker.
Exclusive means no other consumer can consume the queue when this one is consuming it.

exchange = channel.topic(exchangeName, :auto_delete => true)
queue = channel.queue(queueName, :exclusive => true, :auto_delete => true)
queue.bind(exchange, :routing_key => routingKey)

After that, consumer can consume messages from the queue.
The manual_ack parameter indicates if consumer needs to manually send acknowledgment back to broker when it has received the message. In this example, manual_ack equals to false, so producer does not manually acknowledge received messages.
The subscribe() function is followed by a callback which will be invoked to print the message payload on receiving a message.

queue.subscribe(:block => false, :manual_ack => false) do |delivery_info, metadata, payload|
  puts payload
end

As we mentioned in the Producer section, recover_from_connection_close is set to false when connecting to RoboMQ broker. It matters for consumers because recover_from_connection_close will only recover the connection, it won't recreate exchange and queue in case they are gone. Therefore, a more robust approach is letting your code handle reconnecting on its own and keep checking the existence of the subscribed queue.

while true
  raise "Lost the subscribed queue %s" % queueName unless connection.queue_exists?(queueName)
  sleep 1
end

Putting it all together

producer.rb

require "bunny"

server = "hostname"
port = 5672
vhost = "yourvhost"
username = "username"
password = "password"
exchangeName = "testEx"
routingKey = "test.any"

begin
  #connect
  connection = Bunny.new(:host => server, :port => port, :vhost => vhost, :user => username, :pass => password, :heartbeat => 60, :recover_from_connection_close => false)
  connection.start
  channel = connection.create_channel

  #send message
  exchange = channel.topic(exchangeName, :auto_delete => true)
  exchange.publish("Hello World!", :routing_key => routingKey, :content_type => "text/plain", :delivery_mode => 1)

  #disconnect
  connection.close
rescue Exception => e
  puts e
end

consumer.rb

require "bunny"

server = "hostname"
port = 5672
vhost = "yourvhost"
username = "username"
password = "password"
exchangeName = "testEx"
queueName = "testQ1"
routingKey = "test.#" #topic with wildcard

while true
  begin
    #connect, disable auto-reconnect so as to manually reconnect
    connection = Bunny.new(:host => server, :port => port, :vhost => vhost, :user => username, :pass => password, :heartbeat => 60, :recover_from_connection_close => false)
    connection.start
    channel = connection.create_channel

    #declare exchange and queue, bind them and consume messages
    exchange = channel.topic(exchangeName, :auto_delete => true)
    queue = channel.queue(queueName, :exclusive => true, :auto_delete => true)
    queue.bind(exchange, :routing_key => routingKey)
    queue.subscribe(:block => false, :manual_ack => false) do |delivery_info, metadata, payload|
      puts payload
    end
    #keep checking the existence of the subscribed queue
    while true
      raise "Lost the subscribed queue %s" % queueName unless connection.queue_exists?(queueName)
      sleep 1
    end
  rescue Exception => e
    #reconnect on exception
    puts "Exception handled, reconnecting...\nDetail:\n%s" % e
    #blindly clean old connection
    begin
      connection.close
    end
    sleep 5
  end
end

Java

Prerequisites

Java client AMQP library

The Java library we use for this example can be found at https://www.rabbitmq.com/java-client.html.

Download the library jar file, then import this library in your program import com.rabbitmq.client.*; and compile your source code with the jar file. For example,

javac -cp ".:./rabbitmq-client.jar" Producer.java Consumer.java 

Run the producer and consumer classes. For example,

java -cp ".:./rabbitmq-client.jar" Consumer
java -cp ".:./rabbitmq-client.jar" Producer

Of course, you can eventually compress your producer and consumer classes into jar files.

Producer

The first thing we need to do is to establish a connection with RoboMQ broker.
Set heartbeat to 60 seconds, so that client will confirm the connectivity with broker.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(server);
factory.setPort(port);
factory.setVirtualHost(vhost);
factory.setUsername(username);
factory.setPassword(password);
factory.setRequestedHeartbeat(60);
connection = factory.newConnection();
channel = connection.createChannel();

Then producer can publish messages to a topic exchange where messages will be delivered to queues whose routing key matches. The essential difference between normal routing key and topic is that consumer can subscribe a topic with wild cards inside.

String message = "Hello World!";
channel.basicPublish(exchangeName, routingKey, MessageProperties.TEXT_PLAIN, message.getBytes());

At last, producer will disconnect with the RoboMQ broker.

connection.close();

Consumer

The same as producer, consumer needs to first connect to RoboMQ broker.

Then consumer will declare a direct exchange, a queue, and bind the queue to the exchange with a routing key (topic). The routing key can contain wildcards to receive messages sent through different routing keys.
The fourth parameter of exchangeDeclare() and queueDeclare() are auto-delete. That means after all consumers have finished consuming it, the exchange or queue will be deleted by broker.
The third parameter of queueDeclare() is exclusive. That means no other consumer can consume the queue when this one is consuming it.

channel.exchangeDeclare(exchangeName, "topic", false, true, false, null);
channel.queueDeclare(queueName, false, true, true, null);
channel.queueBind(queueName, exchangeName, routingKey, null);

Finally, consumer can consume messages from the queue.
The second parameter of basicConsume() function no-ack indicates if consumer needs to explicitly send acknowledgment back to broker when it has received the message. In this example, no-ack equals to true, so producer does not explicitly acknowledge received messages.
The while loop will be blocking the process and listening for messages until exception happens. When messages are received, it will print the message content.

QueueingConsumer qc = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, qc);
while (true) {
    QueueingConsumer.Delivery delivery = qc.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(message);
}

Putting it all together

Producer.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class Producer {

    private Connection connection;
    private Channel channel;
    private static String server = "hostname";
    private static int port = 5672;
    private static String vhost = "yourvhost";
    private static String username = "username";
    private static String password = "password";
    private static String exchangeName = "testEx";
    private static String routingKey = "test.any";

    private void produce() {
        try {
            //connect
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(server);
            factory.setPort(port);
            factory.setVirtualHost(vhost);
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setRequestedHeartbeat(60);
            connection = factory.newConnection();
            channel = connection.createChannel();

            //send message
            String message = "Hello World!";
            channel.basicPublish(exchangeName, routingKey, MessageProperties.TEXT_PLAIN, message.getBytes());

            //disconnect
            connection.close();
        } catch(Exception e) {
            System.out.println(e);
            System.exit(-1);            
        }   
    }

    public static void main(String[] args) {
        Producer p = new Producer();
        p.produce();
    }
}

Consumer.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {

    private Connection connection;
    private Channel channel;
    private static String server = "hostname";
    private static int port = 5672;
    private static String vhost = "yourvhost";
    private static String username = "username";
    private static String password = "password";
    private static String exchangeName = "testEx";
    private static String queueName = "testQ1";
    private static String routingKey = "test.#"; //topic with wildcard

    private void consume() {
        while (true) {
            try {
                //connect
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost(server);
                factory.setPort(port);
                factory.setVirtualHost(vhost);
                factory.setUsername(username);
                factory.setPassword(password);
                factory.setRequestedHeartbeat(60);
                connection = factory.newConnection();
                channel = connection.createChannel();

                //declare exchange and queue, bind them and consume messages
                channel.exchangeDeclare(exchangeName, "topic", false, true, false, null);
                channel.queueDeclare(queueName, false, true, true, null);
                channel.queueBind(queueName, exchangeName, routingKey, null);
                QueueingConsumer qc = new QueueingConsumer(channel);
                channel.basicConsume(queueName, true, qc);
                while (true) {
                    QueueingConsumer.Delivery delivery = qc.nextDelivery();
                    String message = new String(delivery.getBody());
                    System.out.println(message);
                }
            } catch(Exception e) {
                //reconnect on exception
                System.out.printf("Exception handled, reconnecting...\nDetail:\n%s\n", e);
                try {
                    connection.close();
                } catch (Exception e1) {}
                try {
                    Thread.sleep(5000); 
                } catch(Exception e2) {}
            }
        }
    }

    public static void main(String[] args) {
        Consumer c = new Consumer();
        c.consume();
    }
}

Go

Prerequisites

Go client AMQP library

The Go library we use for this example can be found at https://github.com/streadway/amqp.

You can install it through go get github.com/streadway/amqp.

Finally, import this library in your program.

import "github.com/streadway/amqp"

The full documentation of this library is at https://godoc.org/github.com/streadway/amqp.

Producer

The first thing we need to do is to establish a connection with RoboMQ broker.
Set heartbeat to 60 seconds, so that client will confirm the connectivity with broker.

connection, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s:%d/%s", username, password, server, port, vhost), amqp.Config{Heartbeat: 60 * time.Second})
channel, err := connection.Channel()

Then producer can publish messages to a topic exchange where messages will be delivered to queues whose routing key matches. The essential difference between normal routing key and topic is that consumer can subscribe a topic with wild cards inside.
Delivery mode = 1 means it's a non-persistent message.

err = channel.Publish(exchangeName, routingKey, false, false, amqp.Publishing{ContentType:  "text/plain", DeliveryMode: 1, Body: []byte("Hello World!")})

At last, producer will disconnect with the RoboMQ broker.

connection.Close()

Consumer

The same as producer, consumer needs to first connect to RoboMQ broker.

Then consumer will declare a direct exchange, a queue, and bind the queue to the exchange with a routing key (topic). The routing key can contain wildcards to receive messages sent through different routing keys.
Durable means the exchange or queue will survive possible broker failover. It's false in this example.
Auto-delete means after all consumers have finished consuming it, the exchange or queue will be deleted by broker.
Exclusive means no other consumer can consume the queue when this one is consuming it.

// audo-delete = true
err = channel.ExchangeDeclare(exchangeName, "topic", false, true, false, false, nil)

// durable = false; auto-delete = true; exclusive = true
queue, err := channel.QueueDeclare(queueName, false, true, true, false, nil)

err = channel.QueueBind(queueName, routingKey, exchangeName, false, nil)

Finally, consumer can consume messages from the queue.
Consumer-tag can be later used to Cancel() this consumer when it's no longer needed.
Auto-ack parameter indicates if consumer needs to explicitly send acknowledgment back to broker when it has received the message. In this example, auto-ack equals to true, so producer does not explicitly acknowledge received messages.

// consumer-tag = "consumer"; auto-ack = true
messageChan, err := channel.Consume(queue.Name, "consumer", true, true, false, false, nil)

Note a message channel is returned by the Consume() function. Incoming messages will be received through that channel.

Channel in Golang is a typed conduit through which you can send and receive values. Sends and receives block until the other side is ready.

for message := range messageChan {
    fmt.Println(string(message.Body))
}

Putting it all together

producer.go

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "os"
    "time"
)

var server = "hostname"
var port = 5672
var vhost = "yourvhost"
var username = "username"
var password = "password"
var exchangeName = "testEx"
var routingKey = "test.any"

func main() {
    connection, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s:%d/%s", username, password, server, port, vhost),
        amqp.Config{Heartbeat: 60 * time.Second})
    if err != nil {
        fmt.Printf("Failed to connect, err: %v\n", err)
        os.Exit(1)
    }
    defer connection.Close()

    channel, err := connection.Channel()
    if err != nil {
        fmt.Printf("Failed to create channel, err: %v\n", err)
        os.Exit(1)
    }
    defer channel.Close()

    err = channel.Publish(
        exchangeName, // exchange
        routingKey,   // routing key
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            ContentType:  "text/plain",
            DeliveryMode: 1,
            Body:         []byte("Hello World!"),
        })
    if err != nil {
        fmt.Printf("Failed to publish message, err: %v\n", err)
        os.Exit(1)
    }
}

consumer.go

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)

var server = "hostname"
var port = 5672
var vhost = "yourvhost"
var username = "username"
var password = "password"
var exchangeName = "testEx"
var queueName = "testQ1"
var routingKey = "test.#" // topic with wildcard

func main() {
    // Infinite loop to auto-reconnect on failure
Loop:
    for {
        fmt.Println("Starting in 5 seconds...")
        time.Sleep(5 * time.Second)

        connection, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s:%d/%s", username, password, server, port, vhost),
            amqp.Config{Heartbeat: 60 * time.Second})
        if err != nil {
            fmt.Printf("Failed to connect, err: %v\n", err)
            continue Loop
        }
        defer connection.Close()

        channel, err := connection.Channel()
        if err != nil {
            fmt.Printf("Failed to create channel, err: %v\n", err)
            continue Loop
        }
        defer channel.Close()

        err = channel.ExchangeDeclare(
            exchangeName, // name
            "topic",      // type
            false,        // durable
            true,         // audo-delete
            false,        // internal
            false,        // no-wait
            nil,          // args
        )
        if err != nil {
            fmt.Printf("Failed to declare exchange, err: %v\n", err)
            continue Loop
        }

        queue, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            true,      // auto-delete
            true,      // exclusive
            false,     // no-wait
            nil,       // args
        )
        if err != nil {
            fmt.Printf("Failed to declare queue, err: %v\n", err)
            continue Loop
        }

        err = channel.QueueBind(
            queueName,    // queue
            routingKey,   // key
            exchangeName, // exchange
            false,        // no-wait
            nil,          // args
        )
        if err != nil {
            fmt.Printf("Failed to bind queue with exchange, err: %v\n", err)
            continue Loop
        }

        messageChan, err := channel.Consume(
            queue.Name, // queue
            "consumer", // consumer tag
            true,       // auto-ack
            true,       // exclusive
            false,      // no-local
            false,      // no-wait
            nil,        // args
        )
        if err != nil {
            fmt.Printf("Failed to consume messages, err: %v\n", err)
            continue Loop
        }

        fmt.Println("Started consuming messages.")
        for message := range messageChan {
            fmt.Println(string(message.Body))
        }
    }
}

C

Prerequisites

C client AMQP library

RoboMQ is built on AMQP, an open, general-purpose protocol for messaging. There are a number of clients for AMQP in many different languages. However, we'll choose a simple C-language AMQP client library written for use with v2.0+ of the RabbitMQ broker.

https://github.com/alanxz/rabbitmq-c/tree/master/librabbitmq

You can copy librabbitmq subfolder from latest release located here on GitHub:

https://github.com/alanxz/rabbitmq-c

Alternatively, thanks to Subversion support in GitHub, you can use svn export directly:

svn export https://github.com/alanxz/rabbitmq-c/trunk/librabbitmq

Copy the librabbitmq package into your working directory:

cp librabbitmq ./

Also copy all source files and Makefile from RoboMQ SDK at https://github.com/robomq/robomq.io/tree/master/sdk/AMQP/C into the same directory.

Now your working directory should have the content as bellow:
broadcast config.h librabbitmq Makefile one-to-one request-reply routing-key topic

Use the Makefile to compile under a Linux terminal.

  • Run make type={sub-directory} to compile the producer and consumer under the sub-directory.
  • Before compiling the next sub-directory, run make clean to clean up the compiled files.

Note that these examples provide a simple client implementation to get started but does not go into detailed description of all flags passed into the AMQP methods. A complete reference to RabbitMQ's implementaton of version 0-9-1 of the AMQP specification can be found in this guide. https://www.rabbitmq.com/amqp-0-9-1-reference.html

Producer

For filter based routing, the producer should publish messages to the topic type exchange. All messages sent with the routing key, "mytopic.new", will be delivered to all the queues that are bound with a matching binding key. Note that the routing key must be a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message. A few valid routing key examples: "log.warning", "log.error".

amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 1; /* non-persistent delivery mode */
amqp_boolean_t mandatory = 0;
amqp_boolean_t immediate = 0;
char exchange_name[] = "topic-exchange";
char routing_key[] = "mytopic.new";
int result;

// Sending message
result = amqp_basic_publish(conn,
        channel,
        amqp_cstring_bytes(exchange_name),
        amqp_cstring_bytes(routing_key),
        mandatory,
        immediate,
        &props,
        amqp_cstring_bytes("Hello"));

Consumer

Then the consumer should create an exchange and subscribe to a queue. This exchange will be defined similarly to the one-to-one example, however, the topic exchange type is specified below as exchange_type.

amqp_bytes_t queue;
amqp_channel_t channel = 1;
amqp_boolean_t passive = 0;
amqp_boolean_t durable = 0;
amqp_boolean_t exclusive = 0;
amqp_boolean_t auto_delete = 1;
char exchange_name[] = "topic-exchange";
char exchange_type[] = "topic";
char queue_name[] = "hello-queue";
char binding_key[] = "mytopic.new";

// Declaring exchange
amqp_exchange_declare(conn, channel, amqp_cstring_bytes(exchange_name), amqp_cstring_bytes(exchange_type),
        passive, durable, amqp_empty_table);

// Declaring queue
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_cstring_bytes(queue_name),
        passive, durable, exclusive, auto_delete, amqp_empty_table);

queue = amqp_bytes_malloc_dup(r->queue);

// Binding to queue
amqp_queue_bind(conn, channel, queue, amqp_cstring_bytes(exchange_name), amqp_cstring_bytes(binding_key),
        amqp_empty_table);

At this point, consumer should start consuming messages.

Putting it all together

The full code below includes some basic AMQP error handling for consumer that is useful when declaring exchanges and queues. In addition, main receiver loop attempts to reconnect upon network connection failure.

producer.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

amqp_connection_state_t mqconnect() {

    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t *socket = NULL;
    char hostname[] = "localhost"; // RoboMQ hostname
    int port = 5672; //default
    char user[] = "guest"; // RoboMQ username
    char password[] = "guest"; // RoboMQ password
    char vhost[] = "/"; // RoboMQ account vhost
    amqp_channel_t channel = 1;
    int channel_max = 0;
    int frame_max = 131072;
    int heartbeat = 60;
    int status = 0;

    // Opening socket
    socket = amqp_tcp_socket_new(conn);

    status = amqp_socket_open(socket, hostname, port);
    if (status) {
        printf("Error opening TCP socket, status = %d, exiting.", status);
    }

    amqp_login(conn, vhost, channel_max, frame_max, heartbeat, AMQP_SASL_METHOD_PLAIN, user, password);
    amqp_channel_open(conn, channel);

    return conn;
}

int main(int argc, char const *const *argv)
{
    amqp_connection_state_t conn;
    amqp_channel_t channel = 1;
    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
    props.content_type = amqp_cstring_bytes("text/plain");
    props.delivery_mode = 1; /* non-persistent delivery mode */
    amqp_boolean_t mandatory = 0;
    amqp_boolean_t immediate = 0;
    char exchange_name[] = "topic-exchange";
    char routing_key[] = "mytopic.new";
    char *msg_body = "Hello\n";
    int result;

    conn = mqconnect();

    // Sending message
    result = amqp_basic_publish(conn,
            channel,
            amqp_cstring_bytes(exchange_name),
            amqp_cstring_bytes(routing_key),
            mandatory,
            immediate,
            &props,
            amqp_cstring_bytes(msg_body));

    // Closing connection
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);

    return 0;
}

consumer.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

amqp_connection_state_t mqconnect() {

    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t *socket = NULL;
    char hostname[] = "localhost"; // RoboMQ hostname
    int port = 5672; //default
    char user[] = "guest"; // RoboMQ username
    char password[] = "guest"; // RoboMQ password
    char vhost[] = "/"; // RoboMQ account vhost
    amqp_channel_t channel = 1;
    amqp_rpc_reply_t reply;
    int channel_max = 0;
    int frame_max = 131072;
    int heartbeat = 60;
    int status = 0;

    // Opening socket
    socket = amqp_tcp_socket_new(conn);

    status = amqp_socket_open(socket, hostname, port);
    if (status) {
        printf("Error opening TCP socket, status = %d\n", status);
    }

    reply = amqp_login(conn, vhost, channel_max, frame_max, heartbeat, AMQP_SASL_METHOD_PLAIN, user, password);
    if(reply.reply_type != AMQP_RESPONSE_NORMAL) {
        fprintf(stderr, "%s: server connection reply code: %d\n",
                "Error logging in", reply.reply_type);
    }

    amqp_channel_open(conn, channel);

    return conn;
}

amqp_bytes_t mqdeclare(amqp_connection_state_t conn, const char *exchange_name, const char *queue_name) {
    amqp_bytes_t queue;
    amqp_channel_t channel = 1;
    amqp_boolean_t passive = 0;
    amqp_boolean_t durable = 0;
    amqp_boolean_t exclusive = 0;
    amqp_boolean_t auto_delete = 1;
    amqp_boolean_t internal = 0;
    char exchange_type[] = "topic";
    char binding_key[] = "mytopic.*";
    amqp_rpc_reply_t reply;

    // Declaring exchange
    amqp_exchange_declare(conn, channel, amqp_cstring_bytes(exchange_name), amqp_cstring_bytes(exchange_type),
            passive, durable, auto_delete, internal, amqp_empty_table);

    reply = amqp_get_rpc_reply(conn);
    if(reply.reply_type != AMQP_RESPONSE_NORMAL) {
        amqp_connection_close_t *m = (amqp_connection_close_t *) reply.reply.decoded;
        if(NULL != m) {
            fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
                    "Error declaring exchange",
                    m->reply_code,
                    (int) m->reply_text.len, (char *) m->reply_text.bytes);
        }
    }

    // Declaring queue
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, amqp_cstring_bytes(queue_name),
            passive, durable, exclusive, auto_delete, amqp_empty_table);

    reply = amqp_get_rpc_reply(conn);
    if(reply.reply_type != AMQP_RESPONSE_NORMAL) {
        fprintf(stderr, "%s: server connection reply code: %d\n",
                "Error declaring queue", reply.reply_type);
    }
    else {
        queue = amqp_bytes_malloc_dup(r->queue);

        // Binding to queue
        amqp_queue_bind(conn, channel, queue, amqp_cstring_bytes(exchange_name), amqp_cstring_bytes(binding_key),
                amqp_empty_table);
    }

    return queue;
}

int main(int argc, char const *const *argv)
{
    amqp_connection_state_t conn;
    amqp_bytes_t queue;
    amqp_channel_t channel = 1;
    amqp_boolean_t no_local = 0;
    amqp_boolean_t no_ack = 1;
    amqp_boolean_t exclusive = 0;
    char exchange_name[] = "topic-exchange";
    char queue_name[] = "hello-queue";
    int retry_time = 5; // retry time in seconds

    conn = mqconnect();
    queue = mqdeclare(conn, &exchange_name[0], &queue_name[0]);

    // Consuming the message
    amqp_basic_consume(conn, channel, queue, amqp_empty_bytes, no_local, no_ack, exclusive, amqp_empty_table);

    while (1) {
        amqp_rpc_reply_t result;
        amqp_envelope_t envelope;

        amqp_maybe_release_buffers(conn);
        result = amqp_consume_message(conn, &envelope, NULL, 0);

        if (AMQP_RESPONSE_NORMAL != result.reply_type) {
            printf("Consumer AMQP failure occurred, response code = %d, retrying in %d seconds...\n",
                    result.reply_type, retry_time);

            // Closing current connection before reconnecting
            amqp_connection_close(conn, AMQP_CONNECTION_FORCED);
            amqp_destroy_connection(conn);

            // Reconnecting on exception
            conn = mqconnect();
            queue = mqdeclare(conn, &exchange_name[0], &queue_name[0]);
            amqp_basic_consume(conn, channel, queue, amqp_empty_bytes, no_local, no_ack, exclusive, amqp_empty_table);
            sleep(retry_time);
        }
        else {
            printf("Received message size: %d\nbody: %s\n", (int)envelope.message.body.len, (char *)envelope.message.body.bytes);

            amqp_destroy_envelope(&envelope);
        }
    }

    return 0;
}