Introduction

Before reading this chapter, we assume that you already have the basic concepts of message queue, e.g broker, exchange, queue, producer, consumer, etc. Knowing AMQP protocol would very much facilitate understanding MQTT.

RoboMQ supports MQTT 3.1 as an extension to the AMQP broker. Its port is 1883, SSL port is 8883.

MQTT stands for Message Queue Telemetry Transport. It is a publish / subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimize network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging "machine-to-machine" (M2M) or "Internet of Things" (IoT) world of connected devices, and for mobile applications where bandwidth and battery power are at a premium. Full documentation of MQTT

RoboMQ builds MQTT adapter on top of AMQP exchanges and queues. Messages published to MQTT topics use a topic exchange (amq.topic by default) internally. Subscribers consume from queues bound to the topic exchange. This both enables interoperability with other protocols and makes it possible to use the Management GUI to inspect queue sizes, message rates, and so on.

Vhost specification

MQTT protocol itself does not have the concept of vhost and so all MQTT libraries do not provide vhost argument.
However, RoboMQ broker supplemented this feature. You can optionally specify a vhost while connecting, by prepending the vhost to the username and separating with a colon. For example, /:guest. If no vhost is specified, it will use the default vhost "/".

Durability and Persistence

RoboMQ MQTT adapter assumes two primary usage scenarios:

QoS stands for quality of service in MQTT. RoboMQ supports QoS up to 1.

  • Transient clients that use transient messages (non-persistent, QoS=0). It uses non-durable, auto-delete queues that will be deleted when the client disconnects.
  • Stateful clients that use durable subscriptions (non-clean sessions, QoS=1). It uses durable queues. Whether the queues are auto-deleted is controlled by the client's clean session flag. Clients with clean sessions use auto-deleted queues, others use non-auto-deleted ones.

For transient (QoS=0) publishes, RoboMQ will publish messages as transient (non-persistent). Naturally, for durable (QoS=1) publishes, persistent messages will be used internally.

Queues created for MQTT subscribers will have names starting with mqtt-subscription-, one per subscription QoS level.

MQTT use cases

We will provide examples in five languages, including Python, Node.js, PHP, Java and C++.

In the examples, MQTT producer will first ask user for the quantity of messages, then publish the certain number of test messages to a particular topic through MQTT broker. MQTT consumer will subscribe the same topic and print the topic and payload as it receives messages.

All examples have implemented automatic reconnecting, which is crucial in real production.

The example code provided bellow could be the short version, it might have omitted some advanced details. For full version code, please go to our SDK repository on GitHub.

Before testing the example code, replace hostname, yourvhost, username and password with the real variables in your network environment.
Always run consumer first to create the exchange and queue for producer to send messages to.

Python

Prerequisite

The Python library we use for this example can be found at https://eclipse.org/paho/clients/python/. Its source code is at https://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.python.git/.

You can install it through sudo pip install paho-mqtt.

Finally, import this library in your program.

import paho.mqtt.client as mqtt

The full documentation of this library is at https://pypi.python.org/pypi/paho-mqtt.

This library is built on the basis of a C++ library mosquitto. The documentation of mosquitto is at https://mosquitto.org.

Producer

The first thing we need to do is to establish a connection with the RoboMQ broker.
RoboMQ allows you to specify vhost along with username. See Vhost specification section for the detail.
Set keep alive to 60 seconds, so that client will confirm the connectivity with broker.
Many MQTT libraries, including this one, require network looping to complete and maintain the connection with broker. There could be several loop functions for you to choose. If none of them are called, incoming network data will not be processed and outgoing network data may not be sent in a timely fashion.

client = mqtt.Client(client_id="", clean_session=True, userdata=None, protocol="MQTTv31")
client.username_pw_set(vhost + ":" + username, password)
client.connect(server, port, keepalive=60, bind_address="")
client.loop_start()

After that, producer can send messages to a particular topic.

client.publish(topic, payload=message, qos=1, retain=False)

At last, producer will stop loop and disconnect with the RoboMQ broker.

client.loop_stop()
client.disconnect()

Consumer

The same as producer, consumer needs to connect to the RoboMQ broker and start loop. Not as the producer, this consumer loops forever.

client.loop_forever()

The callback function of connecting is to subscribe a topic, so that consumer knows where to listen to.
The second argument in subscribe() function is QoS.

def onConnect(client, userdata, rc):
    client.subscribe([(topic, 1)])

Once it receives a message from the queue bound by the topic, it will trigger the callback function onMessage() to print the topic and message payload.

def onMessage(client, userdata, message):
    print("Topic: " + message.topic + ", Message: " + message.payload)

The callback functions should be preset before connecting to the RoboMQ broker.

client.on_connect = onConnect
client.on_message = onMessage

When you no longer need it, you can also unsubscribe a topic.

client.unsubscribe(topic)

Putting it together

producer.py

import time
import paho.mqtt.client as mqtt

server = "hostname"
port = 1883
vhost = "yourvhost"
username = "username"
password = "password"
topic = "test/any"

try:
    client = mqtt.Client(client_id="", clean_session=True, userdata=None, protocol="MQTTv31")
    client.username_pw_set(vhost + ":" + username, password)
    client.connect(server, port, keepalive=60, bind_address="") #connect
    client.loop_start() #start loop
    msgNum = int(input("Quantity of test messages: "))
    for i in range(msgNum):
        message = "test msg " + str(i + 1)
        client.publish(topic, payload=message, qos=1, retain=False) #publish
        time.sleep(1)
    client.loop_stop()  #stop loop
    client.disconnect()
except Exception, e:
    print e

consumer.py

import time
import paho.mqtt.client as mqtt

server = "hostname"
port = 1883
vhost = "yourvhost"
username = "username"
password = "password"
topic = "test/#"

"""
 * This method is the callback on connecting to broker.
 * @ It subscribes the target topic.
"""
def onConnect(client, userdata, rc):    #event on connecting
    client.subscribe([(topic, 1)])  #subscribe

"""
 * This method is the callback on receiving messages.
 * @ It prints the message topic and payload on console.
"""
def onMessage(client, userdata, message):   #event on receiving message
    print("Topic: " + message.topic + ", Message: " + message.payload)

while True:
    try:
        client = mqtt.Client(client_id="", clean_session=True, userdata=None, protocol="MQTTv31")
        client.username_pw_set(vhost + ":" + username, password)
        client.on_connect = onConnect
        client.on_message = onMessage
        client.connect(server, port, keepalive=60, bind_address="") #connect
        client.loop_forever()   #automatically reconnect once loop forever
    except Exception, e:
        #when initialize connection, reconnect on exception
        print "Exception handled, reconnecting...\nDetail:\n%s" % e 
        time.sleep(5)

Node.js

Prerequisite

The Node.js library we use for this example can be found at https://github.com/adamvr/MQTT.js.

You can install the library through sudo npm install mqtt.

Finally, require this library in your program.

var mqtt = require("mqtt");

The full documentation of this library is at https://github.com/mqttjs/MQTT.js/wiki.

Producer

The first thing we need to do is to establish a connection with the RoboMQ broker.
RoboMQ allows you to specify vhost along with username. See Vhost specification section for the detail.
Set keep alive to 60 seconds, so that client will confirm the connectivity with broker.

var client = mqtt.connect("mqtt://" + server + ":" + port, {username: vhost + ":" + username, password: password, keepalive: 60, clean: true, will: null});

Using this library, you will probably incorporate most other functions in the callback on connect.

client.on("connect", callback); 

After that, producer can send messages to a particular topic.

client.publish(topic, message, {qos: 1, retain: false});

At last, producer will disconnect with the RoboMQ broker. The end() function contains disconnecting.

client.end();

Consumer

The first step is the same as producer, consumer needs to connect to the RoboMQ broker.

In the callback function on connect, next step is to subscribe a topic, so that consumer knows where to listen to. It uses a callback function to handle incoming messages. Once it receives a message from the queue bound by the topic, it will print the topic and message payload.

client.subscribe(topic, {qos: 1, dup: false})
.on("message", function(topic, payload, packet) {
    console.log("Topic: " + topic + ", Message: " + payload);
});

When you no longer need it, you can also unsubscribe a topic.

client.unsubscribe(topic, callback);

Putting it together

producer.js

var mqtt = require("mqtt");

var server = "hostname";
var port = "1883";
var vhost = "yourvhost";
var username = "username";
var password = "password";
var topic = "test/any";

var client = mqtt.connect("mqtt://" + server + ":" + port, {username: vhost + ":" + username, password: password, keepalive: 60, clean: true, will: null});
client.on("connect", function() {   //this library automatically reconnects on errors
    //ask user to input the number of test messages
    process.stdout.write("Quantity of test messages: ");
    process.stdin.on("data", function (msgNum) {
        //send certain number of messages
        try {
            for(var i = 1; i <= msgNum; i++){   
                var message = "test msg " + i;
                client.publish(topic, message, {qos: 1, retain: false});
            }
        } catch(ex) {
            console.log(ex);
            process.exit(-1);
        }
        //shut down producer after messages sent
        setTimeout(function() {
            client.end();   //includes disconnect()
            process.exit(0);
        }, msgNum);
    });
});

consumer.js

var mqtt = require("mqtt");

var server = "hostname";
var port = "1883";
var vhost = "yourvhost";
var username = "username";
var password = "password";
var topic = "test/#";

var client = mqtt.connect("mqtt://" + server + ":" + port, {username: vhost + ":" + username, password: password, keepalive: 60, clean: true, will: null});
client.on("connect", function() {   //this library automatically reconnects on errors
    try {
        client.subscribe(topic, {qos: 1, dup: false})   //chainable API
        .on("message", function(topic, payload, packet) {   //event handling
            console.log("Topic: " + topic + ", Message: " + payload);
        });
    } catch(ex) {
        console.log(ex);
    }
});

PHP

Prerequisite

The PHP library we use for this example can be found at https://github.com/mgdm/Mosquitto-PHP/.

This library depends on php 5.3+ and libmosquitto, so first ensure that your have them installed.
You may obtain the package using PECL sudo pecl install Mosquitto-alpha.
Now you should see mosquitto.so in your php shared library directory, e.g /usr/lib/php5/20121212/. Finally, edit your php.ini. In Dynamic Extensions section, add one line extension=mosquitto.so.

After installation, you don't need to explicitly require this library in your PHP script. Your PHP interpreter will integrate it for you.

Producer

The first thing we need to do is to establish a connection with the RoboMQ broker.
RoboMQ allows you to specify vhost along with username. See Vhost specification section for the detail.
In the constructor of client, first parameter is client ID, second is boolean flag for clean session.
The third parameter of connect function is keep alive the in seconds. Set keep alive to 60 seconds, so that client will confirm the connectivity with broker.

$client = new Mosquitto\Client("1", true);
$client->setCredentials($vhost.":".$username, $password);
$client->connect($server, $port, 60);

After that, producer can send messages to a particular topic.
The third parameter is QoS, fourth is boolean flag for retain.

$client->publish($topic, $message, 1, false);

Many MQTT libraries, including this one, require network looping to complete and maintain the connection with broker. There could be several loop functions for you to choose. If none of them are called, incoming network data will not be processed and outgoing network data may not be sent in a timely fashion.
It is strongly recommended that you call loop() each time you send a message.

$client->loop();

At last, producer will disconnect with the the RoboMQ broker.

$client->disconnect();

Consumer

The first step is the same as producer, consumer needs to connect to the RoboMQ broker. Not as the producer, this consumer loops forever.

$client->loopForever();

The next step is to subscribe a topic, so that consumer knows where to listen to.
The second argument in subscribe() function is QoS.

client->subscribe(topic, 1);

Once it receives a message from the queue bound by the topic, it will trigger the callback function onMessage() to print the topic and message payload.

function onMessage($message) {
    printf("Topic: %s, Message: %s\n", $message->topic, $message->payload);
}

The callback functions should be preset before connecting to the RoboMQ broker. Foe example,

$client->onMessage("onMessage");

When you no longer need it, you can also unsubscribe a topic.

client->unsubscribe(topic, qos);

Putting it together

producer.php

<?php
$server = "hostname";
$port = "1883";
$vhost = "yourvhost";
$username = "username";
$password = "password";
$topic = "test/any";

try {
    $client = new Mosquitto\Client("1", true); //clientid="1", clean_session=true
    $client->setCredentials($vhost.":".$username, $password);
    $client->connect($server, $port, 60); //keepalive=60
    echo "Quantity of test messages: ";
    $msgNum = rtrim(fgets(STDIN), PHP_EOL);
    for ($i = 1; $i <= $msgNum; $i++) {
        $message = "test msg ".$i;
        $client->publish($topic, $message, 1, false); //publish test messages to the topic
        $client->loop(); //frequently loop to to keep communications with broker
        sleep(1);
    }
    $client->disconnect();
} catch (Exception $e) {
    echo $e;
}
?>

consumer.php

<?php
$GLOBALS["client"] = $client;
$GLOBALS["topic"] = $topic;

$server = "hostname";
$port = "1883";
$vhost = "yourvhost";
$username = "username";
$password = "password";
$topic = "test/#";

function subscribe() {
    $GLOBALS["client"]->subscribe($GLOBALS["topic"], 1); //qos=1
}

/**
 * This method is the callback on receiving messages.
 * @ It prints the message topic and payload on console.
 */
function onMessage($message) {
    printf("Topic: %s, Message: %s\n", $message->topic, $message->payload);
}

while (true) {
    try {
        $client = new Mosquitto\Client("0", true); //clientid="0", clean_session=true
        $client->setCredentials($vhost.":".$username, $password);
        $client->onConnect("subscribe");
        $client->onMessage("onMessage");
        $client->connect($server, $port, 60); //keepalive=60
        $client->loopForever(); //automatically reconnect when loopForever
    } catch (Exception $e) {
        //when initialize connection, reconnect on exception
        echo "Exception handled, reconnecting...\nDetail:\n".$e."\n";
        sleep(5);
    }
}
?>

Ruby

Prerequisite

The Ruby gem we use for this example can be found at https://rubygems.org/gems/mqtt. Its source code is at https://github.com/njh/ruby-mqtt

You can install it through gem install mqtt.

Finally, require this gem in your program.

require 'mqtt'

The full documentation of this gem is at https://www.rubydoc.info/gems/mqtt/.

Producer

The first thing we need to do is to establish a connection with the RoboMQ broker.
RoboMQ allows you to specify vhost along with username. See Vhost specification section for the detail.
Set keep alive to 60 seconds, so that client will confirm the connectivity with broker.

client = MQTT::Client.connect(
  :host => server,
  :port => port,
  :username => "#{vhost}:#{username}",
  :password => password,
  :version => "3.1.0",
  :keep_alive => 60,
  :clean_session => true,
  :client_id => "",
  :will_qos => 1,
  :will_retain => false
)

After that, producer can send messages to a particular topic.

client.publish(topic, msg)

At last, producer will disconnect with the RoboMQ broker.

client.disconnect

Consumer

The first step is the same as producer, consumer needs to connect to the RoboMQ broker.

Next step is to subscribe a topic, so that consumer knows where to listen to.

subscription = client.subscribe([topic,1])

To receive a message, use the get method. This method will block until a message is available. If you give it a block, then the block will be executed for every message received.

client.get do |topic, message|
  onMessage(topic, message)
end

When you no longer need it, you can also unsubscribe a topic.

client.unsubscribe(topic)

Putting it together

producer.rb

require "mqtt"

# connection options
server = "hostname"
port = 1883
vhost = "yourvhost"
username = "username"
password = "password"
topic = "test/any"

print "Quantity of test messages: "
msgNum = gets.to_i

# create connection
begin
  client = MQTT::Client.connect(
      :host => server,
      :port => port,
      :username => "#{vhost}:#{username}",
      :password => password,
      :version => "3.1.0",
      :keep_alive => 60,
      :clean_session => true,
      :client_id => "",
      :will_qos => 1,
      :will_retain => false
  )

  # publish messages
  (1..msgNum).each do |counter|
    msg = "test msg  #{counter}"
    client.publish(topic, msg)
    sleep 1
  end

  client.disconnect
end

consumer.rb

require "mqtt"

# connection options
server = "hostname"
port = 1883
vhost = "yourvhost"
username = "username"
password = "password"
topic = "test/any"

# event on receiving message
def onMessage(topic, message)
  puts "Topic: #{topic}, Message: #{message}"
end

# create connection and keep getting messages
loop do
  begin
    # connect
    client = MQTT::Client.connect(
      :host => server,
      :port => port,
      :username => "#{vhost}:#{username}",
      :password => password,
      :version => "3.1.0",
      :keep_alive => 60,
      :clean_session => true,
      :client_id => "",
      )

    # subscribe
    client.subscribe([topic,1])

    client.get do |topic, message|
      onMessage(topic, message)
    end
  rescue MQTT::ProtocolException => pe
    puts "Exception handled, reconnecting...\nDetail:\n#{pe.message}"
    sleep 5
  end
end

Java

Prerequisite

The Java library we use for this example can be found at https://www.eclipse.org/paho/clients/java/.

Download the library jar file at https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/mqtt-client/0.4.0/mqtt-client-0.4.0.jar, import this library in your program import org.eclipse.paho.client.mqttv3.*; and compile your source code with the jar file. For example,

javac -cp ".:./mqtt-client-0.4.0.jar" Producer.java Consumer.java 

Run the producer and consumer classes. For example,

java -cp ".:./mqtt-client-0.4.0.jar" Consumer
java -cp ".:./mqtt-client-0.4.0.jar" Producer

Of course, you can eventually compress your producer and consumer classes into jar files.

The full documentation of this library is at http://www.eclipse.org/paho/files/javadoc/index.html.

Producer

The first thing we need to do is to establish a connection with the RoboMQ broker.
RoboMQ allows you to specify vhost along with username. See Vhost specification section for the detail.
Set keep alive to 60 seconds, so that client will confirm the connectivity with broker.

private String broker = "tcp://" + server + ":" + port;
private String clientId = MqttClient.generateClientId();
private MemoryPersistence persistence = new MemoryPersistence();

client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(vhost + ":" + username);
connOpts.setPassword(password.toCharArray());
connOpts.setKeepAliveInterval(60);
connOpts.setCleanSession(true);
client.connect(connOpts);

After that, producer can send messages to a particular topic.
It is remarkable that the message argument of publish() function isn't a String. Instead, it is a instance of MqttMessage class. Message payload text is the argument of the constructor of MqttMessage class. It has some public methods to set the headers, e.g. setQos(), setRetained(), etc.

client.publish(topic, message);

At last, producer will disconnect with the RoboMQ broker.

client.disconnect();

Consumer

The first step is the same as producer, consumer needs to connect to the RoboMQ broker.

Next step is to subscribe a topic, so that consumer knows where to listen to. You need to set the callback on message before subscribe. Once it receives a message from queue bound by the topic, it will call the overridden function messageArrived() to print the topic and message payload.
The second parameter of subscribe() function is QoS.

private class onMessage implements MqttCallback {
    public void messageArrived(String topic, MqttMessage message) {     
        System.out.println("Topic: " + topic + ", Message: " + (new String(message.getPayload())));
    }
    public void connectionLost(Throwable cause) {}
    public void deliveryComplete(IMqttDeliveryToken token) {}
}

onMessage callback = new onMessage();
client.setCallback(callback);
client.subscribe(topic, 1);

When you no longer need it, you can also unsubscribe a topic.

client.unsubscribe(topic);

Putting it together

Producer.java

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Scanner;

public class Producer {

    private MqttClient client;
    private String server = "hostname";
    private String port = "1883";
    private String broker = "tcp://" + server + ":" + port;
    private String vhost = "yourvhost";
    private String username = "username";
    private String password = "password";
    private String topic = "test/any";
    private String clientId = MqttClient.generateClientId();
    private MemoryPersistence persistence = new MemoryPersistence();

    private void produce() {
        try {
            client = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(vhost + ":" + username);
            connOpts.setPassword(password.toCharArray());
            connOpts.setKeepAliveInterval(60);
            connOpts.setCleanSession(true);
            client.connect(connOpts);
            System.out.print("Quantity of test messages: ");
            Scanner scanner = new Scanner(System.in);
            int msgNum = scanner.nextInt();
            for (int i = 0; i < msgNum; i ++) {
                MqttMessage message = new MqttMessage(("test msg " + Integer.toString(i + 1)).getBytes());
                message.setQos(1);
                message.setRetained(false);
                client.publish(topic, message);
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {}
            }
            client.disconnect();
        } catch(MqttException me) {
            System.out.println(me);
            System.exit(-1);            
        }
    }

    public static void main(String[] args) {
        Producer p = new Producer();
        p.produce();
    }
}

Consumer.java

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

public class Consumer {

    private MqttClient client;
    private String server = "hostname";
    private String port = "1883";
    private String broker = "tcp://" + server + ":" + port;
    private String vhost = "yourvhost";
    private String username = "username";
    private String password = "password";
    private String topic = "test/#";
    private String clientId = MqttClient.generateClientId();
    private MemoryPersistence persistence = new MemoryPersistence();
    private boolean connected = false;

    /**
     * This method is the overridden callback on receiving messages.
     * @ It is event-driven. You don't call it in your code.
     * @ It prints the message topic and payload on console.
     * @ There're other callback functions provided by this library.
     */
    private class onMessage implements MqttCallback {

        public void messageArrived(String topic, MqttMessage message) {     
            System.out.println("Topic: " + topic + ", Message: " + (new String(message.getPayload())));
        }

        public void connectionLost(Throwable cause) {
            System.out.printf("Exception handled, reconnecting...\nDetail:\n%s\n", cause.getMessage());
            connected = false; //reconnect on exception
        }

        public void deliveryComplete(IMqttDeliveryToken token) {
        }
    }

    private void consume() {
        while (true) {
            try {
                client = new MqttClient(broker, clientId, persistence);
                MqttConnectOptions connOpts = new MqttConnectOptions();
                connOpts.setUserName(vhost + ":" + username);
                connOpts.setPassword(password.toCharArray());
                connOpts.setKeepAliveInterval(60);
                connOpts.setCleanSession(true);
                client.connect(connOpts);
                onMessage callback = new onMessage();
                client.setCallback(callback);
                client.subscribe(topic, 1); //qos=1
                connected = true;
                while (connected) { //check connection status
                    try {
                        Thread.sleep(5000);
                    } catch (Exception e) {}
                } 
            } catch(MqttException me) {
                //reconnect on exception
                System.out.printf("Exception handled, reconnecting...\nDetail:\n%s\n", me); 
                try {
                    Thread.sleep(5000); 
                } catch(Exception e) {}
            }
        }
    }

    public static void main(String[] args) {
        Consumer c = new Consumer();
        c.consume();
    }
}

Go

Prerequisite

The Go library we use for this example can be found at https://eclipse.org/paho/clients/golang/. Its GitHub repository is at https://github.com/eclipse/paho.mqtt.golang.

You can install it through go get github.com/eclipse/paho.mqtt.golang.

Finally, import this library in your program.

import MQTT "github.com/eclipse/paho.mqtt.golang"

The full documentation of this library is at https://godoc.org/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git.

This library depends on Google's websockets package, which is installed with go get golang.org/x/net/websocket

Producer

The first thing we need to do is to establish a connection with the RoboMQ broker.
RoboMQ allows you to specify vhost along with username. See Vhost specification section for the detail.
Set keep alive to 60 seconds, so that client will confirm the connectivity with broker.
Although the library provides an AutoReconnect connection option, we discourage you to use it. The reason will be explained in the Consumer section.

connOpts := MQTT.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:%d", server, port))
connOpts.SetUsername(fmt.Sprintf("%s:%s", vhost, username))
connOpts.SetPassword(password)
connOpts.SetClientID("0")
connOpts.SetCleanSession(true)
connOpts.SetKeepAlive(60 * time.Second)
connOpts.SetAutoReconnect(false)

client := MQTT.NewClient(connOpts)
client.Connect()

After that, producer can send messages to a particular topic.
The second parameter is QoS, third is boolean flag for retain.

client.Publish(topic, 1, false, message)

At last, producer will disconnect with the RoboMQ broker.
The parameter 250 is the number of milliseconds to wait for existing work to be completed.

client.Disconnect(250)

Consumer

The first step is the same as producer, consumer needs to connect to the RoboMQ broker.
As we mentioned in the Producer section, AutoReconnect is set to false when connecting to the RoboMQ broker. It matters for consumers because AutoReconnect will only recover the connection, it won't resubscribe the topics. Therefore, a more robust approach is letting your code handle reconnecting and resubscribing on its own.

The next step is to subscribe a topic, so that consumer knows where to listen to. The second argument in subscribe() function is QoS, the third one is the callback function to handle incoming messages.

client.Subscribe(topic, 1, OnMessage)

Once it receives a message from the queue bound by the topic, it will trigger the callback function onMessage() to print the topic and message payload.

var OnMessage MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    fmt.Printf("Topic: %s, Message: %s\n", msg.Topic(), msg.Payload())
}

When you no longer need it, you can also unsubscribe a topic.

client.Unsubscribe(topic)

Putting it together

producer.go

package main

import (
    "fmt"
    MQTT "github.com/eclipse/paho.mqtt.golang"
    "os"
    "time"
)

var server = "hostname"
var port = 1883
var vhost = "yourvhost"
var username = "username"
var password = "password"
var topic = "test/any"

func main() {
    connOpts := MQTT.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:%d", server, port))
    connOpts.SetUsername(fmt.Sprintf("%s:%s", vhost, username))
    connOpts.SetPassword(password)
    connOpts.SetClientID("1")
    connOpts.SetCleanSession(true)
    connOpts.SetKeepAlive(60 * time.Second)
    connOpts.SetAutoReconnect(false)

    // Create and start a client using the above ClientOptions
    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        fmt.Printf("Failed to connect, err: %v\n", token.Error())
        os.Exit(1)
    }

    var msgNum int
    fmt.Print("Quantity of test messages: ")
    fmt.Scanf("%d", &msgNum)
    for i := 0; i < msgNum; i++ {
        message := fmt.Sprintf("test msg %d", i+1)
        // QoS = 1, retained = false
        token := client.Publish(topic, 1, false, message)
        // Use PublishToken to confirmed receipt from the broker
        token.Wait()
        if token.Error() != nil {
            fmt.Printf("Failed to publish, err: %v\n", token.Error())
            os.Exit(1)
        }
        time.Sleep(time.Second)
    }

    client.Disconnect(250)
}

consumer.go

package main

import (
    "fmt"
    MQTT "github.com/eclipse/paho.mqtt.golang"
    "time"
)

var server = "hostname"
var port = 1883
var vhost = "yourvhost"
var username = "username"
var password = "password"
var topic = "test/#"

/**
 * This function is the callback on receiving messages.
 * @ It prints the message topic and payload on console.
 */
var OnMessage MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    fmt.Printf("Topic: %s, Message: %s\n", msg.Topic(), msg.Payload())
}

func main() {
    connOpts := MQTT.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:%d", server, port))
    connOpts.SetUsername(fmt.Sprintf("%s:%s", vhost, username))
    connOpts.SetPassword(password)
    connOpts.SetClientID("0")
    connOpts.SetCleanSession(true)
    connOpts.SetKeepAlive(60 * time.Second)
    connOpts.SetAutoReconnect(false)

    // Infinite loop to auto-reconnect on failure
    for {
        // Create and start a client using the above ClientOptions
        client := MQTT.NewClient(connOpts)
        if token := client.Connect(); token.Wait() && token.Error() != nil {
            fmt.Printf("Failed to connect, err: %v\n", token.Error())
        }

        // QoS = 1
        if token := client.Subscribe(topic, 1, OnMessage); token.Wait() && token.Error() != nil {
            fmt.Printf("Failed to subscribe, err: %v\n", token.Error())
        }

        // Constantly checking connectivity
        for client.IsConnected() {
            time.Sleep(time.Second)
        }

        fmt.Println("Restarting in 5 seconds...")
        time.Sleep(5 * time.Second)
    }
}

C++

Prerequisite

The C++ library we use for this example can be found at http://mosquitto.org/.

You will find elaborate installation guide at https://mosquitto.org/download/. Install the library according to your operating system. The recommended approach is installing from the source. First, download the latest source package, uncompress it and enter its root directory; Then, run the following two commands:

make
sudo make install

Include this library in your program #include <mosquitto.h> and compile it by

g++ producer.cpp -o producer -lmosquitto
g++ consumer.cpp -o consumer -lmosquitto

See the full documentation of this library at https://mosquitto.org/documentation/.

Producer

The first thing we need to do is to establish a connection with the RoboMQ broker.
RoboMQ allows you to specify vhost along with username. See Vhost specification section for the detail.
Remember to mosquitto_lib_init(); before creating the mosquitto instance.
Many MQTT libraries, including this one, require network looping to complete and maintain the connection with broker. There could be several loop functions for you to choose. If none of them are called, incoming network data will not be processed and outgoing network data may not be sent in a timely fashion. Using this library, you usually starts loop right after connecting.
The second parameter of mosquitto_new() function is boolean flag for clean session.
The fourth parameter of mosquitto_connect() function is keep alive time in seconds. Set keep alive to 60 seconds, so that client will confirm the connectivity with broker.

string vhusn = vhost + ":" + usn;
const char *username = vhusn.c_str();

struct mosquitto *mosq = NULL;

mosquitto_lib_init();
mosq = mosquitto_new(NULL, true, NULL);
mosquitto_username_pw_set(mosq, username, password);     
mosquitto_connect(mosq, host, port, 60));
mosquitto_loop_start(mosq); 

After that, producer can send messages to a particular topic.
The fourth argument is length of payload char array; The sixth argument is QoS; The seventh argument is boolean flag for retain.

mosquitto_publish(mosq, NULL, topic, 20, payload, 1, false);

At last, producer will stop loop and disconnect with the RoboMQ broker.

mosquitto_loop_stop(mosq, true); 
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();

Consumer

The first step is the same as producer, consumer needs to connect to the RoboMQ broker and start the loop. Not as the producer, this consumer loops forever.

while(!mosquitto_loop_forever(mosq, 0, 1)){
} 

Then you need to set some callback functions. They play an significant role when using this library. Callback on receiving message is indispensable.

void onMessage(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) {
    if(message->payloadlen) {
        printf("Topic: %s, Message: %s\n", (char*)message->topic, (char*)message->payload);
    } else {
        printf("Topic: %s, Message: (null)\n", message->topic);
    }
    fflush(stdout);
}

mosquitto_message_callback_set(mosq, onMessage);

Finally, you need to subscribe a topic, so that consumer knows where to listen to. Once it receives a message from the queue bound by the topic, it will call onMessage() function to print the topic and message payload.

mosquitto_subscribe(mosq, NULL, topic, 1);

When you no longer need it, you can also unsubscribe a topic.

mosquitto_unsubscribe(mosq, NULL, topic);

Putting it together

producer.cpp

#include <stdio.h>
#include <iostream>
#include <mosquitto.h>
#include <exception>
#include <stdlib.h>
#include <unistd.h>

using namespace std;

//The library automatically reconnects to broker

string hst = "hostname";
const char *host = hst.c_str();
int port = 1883;
string vhost = "yourvhost";
string usn = "username";
string vhusn = vhost + ":" + usn;
const char *username = vhusn.c_str();
string pwd = "password";
const char *password = pwd.c_str();
string tpc = "test/any";
const char *topic = tpc.c_str();

/**
 * This is the main method which creates and runs producer instance.
 * @Looping is essential for this MQTT library to work.
 * @Exceptions on connection and publish error.
 */
int main(int argc, char *argv[]) {
    int keepalive = 60;
    bool clean_session = true;
    struct mosquitto *mosq = NULL;

    //create producer and connect to broker
    mosquitto_lib_init();
    mosq = mosquitto_new(NULL, clean_session, NULL);
    mosquitto_username_pw_set(mosq, username, password);
    if(mosquitto_connect(mosq, host, port, keepalive)) {
        printf("Error: Failed to connect\n");
        return 1;
    }
    //usually start loop right after connecting
    mosquitto_loop_start(mosq); 

    //send certain number of test messages
    int msgNum;
    cout << "Quantity of test messages: ";
    cin >> msgNum;
    char payload[20];
    for (int i = 1; i <= msgNum; i++) {
        sprintf(payload, "test msg %d", i);
        try {
            mosquitto_publish(mosq, NULL, topic, 20, payload, 1, false);
        } catch(exception& e) {
            printf("Error: Failed to publish message\n%s\n", e.what());
            return 1;
        }
        sleep(1);
    }

    //stop producer
    mosquitto_loop_stop(mosq, true); 
    mosquitto_disconnect(mosq);
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return 0;
}

consumer.cpp

#include <stdio.h>
#include <iostream>
#include <mosquitto.h>
#include <exception>
#include <stdlib.h>
#include <unistd.h>

using namespace std;

//The library automatically reconnects to broker

string hst = "hostname";
const char *host = hst.c_str();
int port = 1883;
string vhost = "yourvhost";
string usn = "username";
string vhusn = vhost + ":" + usn;
const char *username = vhusn.c_str();
string pwd = "passwrod";
const char *password = pwd.c_str();
string tpc = "test/#";
const char *topic = tpc.c_str();

/**
 * This method is the callback on connecting broker. 
 * @It is event-driven. You don't call it in your code.
 * @It subscribes the specific topic.
 * @There're other callback functions provided by this library.
 */
void onConnect(struct mosquitto *mosq, void *userdata, int result) {
    if (!result) {
        try {
            mosquitto_subscribe(mosq, NULL, topic, 1); 
        } catch (exception& e) {
            printf("Error: Failed to subscribe\n%s\n", e.what());
        }
    } else {
        printf("Error: Failed to connect\n");
    }
}

/**
 * This method is the callback on receiving messages. 
 * @It is event-driven. You don't call it in your code.
 * @It prints the message topic and payload on console.
 * @There're other callback functions provided by this library.
 */
void onMessage(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) {
    if(message->payloadlen) {
        printf("Topic: %s, Message: %s\n", (char*)message->topic, (char*)message->payload);
    } else {
        printf("Topic: %s, Message: (null)\n", message->topic);
    }
    fflush(stdout);
}

/**
 * This is the main method which creates and sets consumer instance.
 * @Looping is essential for this MQTT library to work.
 * @Exceptions on connection and subscription error.
 */
int main(int argc, char *argv[]) {
    int keepalive = 60;
    bool clean_session = true;
    struct mosquitto *mosq = NULL;
    mosquitto_lib_init();
    mosq = mosquitto_new(NULL, clean_session, NULL);
    mosquitto_username_pw_set(mosq, username, password);     
    mosquitto_connect_callback_set(mosq, onConnect);
    mosquitto_message_callback_set(mosq, onMessage);
    mosquitto_connect(mosq, host, port, keepalive);
    //looping is essential for consumer to work
    while(!mosquitto_loop_forever(mosq, 0, 1)){
    }
    return 0;
}