Sign up for RoboMQ
This guide covers the basics of creating messaging applications using RoboMQ. You need to have the RoboMQ service account created before proceeding with client application development - please see the Getting Started.
First application in under 10 lines!
AMQP client
Now we are going to build our first AMQP application.
Prerequisite
The Python library we use for this example can be found at https://github.com/pika/pika.
You can install it through sudo pip install pika
.
Finally, import this library in your program.
import pika
The full documentation of this library is at https://pika.readthedocs.org/en/0.9.14/.
Producer
The first thing we need to do is to establish a connection with RoboMQ broker.
connection = pika.BlockingConnection(pika.ConnectionParameters(host = hostname, port = 5672, virtual_host = yourvhost, credentials = pika.PlainCredentials(username, password)))
channel = connection.channel()
Then producer can publish messages to the direct exchange where messages will be delivered to queues whose routing key matches.
channel.basic_publish(exchange = "amq.direct", routing_key = "test", body = "Hello World!", properties = None)
At last, producer will disconnect with the RoboMQ broker.
connection.close()
Consumer
The first step is the same as producer, consumer needs to connect to RoboMQ broker.
Then consumer will declare a queue, and bind the queue to the direct exchange with a routing key. The routing key decides what messages will the queue receive.
channel.queue_declare(queue = "testQ")
channel.queue_bind(exchange = "amq.direct", queue = "testQ", routing_key = "test")
Finally, consumer can consume messages from the queue.
channel.basic_consume(consumer_callback = onMessage, queue = "testQ", no_ack = True)
channel.start_consuming()
When messages are received, a callback function onMessage()
will be invoked to print the message content.
def onMessage(channel, method, properties, body):
print body
Putting it together
Before testing the example code, replace hostname, yourvhost, username and password with the real variables in your network environment.
producer.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host = hostname, port = 5672, virtual_host = yourvhost, credentials = pika.PlainCredentials(username, password)))
channel = connection.channel()
channel.basic_publish(exchange = "amq.direct", routing_key = "test", body = "Hello World!", properties = None)
connection.close()
consumer.py
import pika
def onMessage(channel, method, properties, body):
print body
connection = pika.BlockingConnection(pika.ConnectionParameters(host = hostname, port = 5672, virtual_host = yourvhost, credentials = pika.PlainCredentials(username, password)))
channel = connection.channel()
channel.queue_declare(queue = "testQ")
channel.queue_bind(exchange = "amq.direct", queue = "testQ", routing_key = "test")
channel.basic_consume(consumer_callback = onMessage, queue = "testQ", no_ack = True)
channel.start_consuming()
MQTT client
Now we are going to build our first MQTT application.
Prerequisites
The Python library we use for this example can be found at https://eclipse.org/paho/clients/python/. Its source code is at https://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.python.git/.
You can install it through sudo pip install paho-mqtt
.
Finally, import this library in your program.
import paho.mqtt.client as mqtt
The full documentation of this library is at https://pypi.python.org/pypi/paho-mqtt.
This library is built on the basis of a C++ library mosquitto. The documentation of mosquitto is at http://mosquitto.org.
Producer
The first thing we need to do is to establish a connection with RoboMQ broker and start looping then.
RoboMQ allows you to specify vhost along with username. See Vhost specification section for the detail.
Many MQTT libraries, including this one, require network looping to complete and maintain the connection with broker. There could be several loop functions for you to choose. If none of them are called, incoming network data will not be processed and outgoing network data may not be sent in a timely fashion.
client = mqtt.Client()
client.username_pw_set(yourvhost + ":" + username, password)
client.connect(hostname, 1883)
client.loop_start()
After that, producer can send messages to a particular topic.
In this example, the topic is "test"; It lets user input the message to send.
message = raw_input("Input message to send: ")
client.publish(topic = "test", payload = message)
At last, producer will stop looping and disconnect with the RoboMQ broker.
client.loop_stop()
client.disconnect()
Consumer
The same as producer, consumer needs to connect to RoboMQ broker and start looping. The difference is consumer loops forever.
client.loop_forever()
After connecting, consumer will subscribe a topic, so that consumer knows where to listen to.
client.subscribe("test")
Once it receives a message from the queue bound by the topic, it will call the callback function onMessage()
to print the topic and message payload.
def onMessage(client, userdata, message):
print("Topic: " + message.topic + ", Message: " + message.payload)
The callback functions should be preset before connecting to RoboMQ broker.
client.on_message = onMessage
Putting it all together
Before testing the example code, replace hostname, yourvhost, username and password with the real variables in your network environment.
producer.py
import sys, paho.mqtt.client as mqtt
client = mqtt.Client()
client.username_pw_set(yourvhost + ":" + username, password)
client.connect(hostname, 1883)
client.loop_start()
message = raw_input("Input message to send: ")
client.publish(topic = "test", payload = message)
client.loop_stop()
client.disconnect()
consumer.py
import sys, paho.mqtt.client as mqtt
def onMessage(client, userdata, message):
print("Topic: " + message.topic + ", Message: " + message.payload)
client = mqtt.Client()
client.username_pw_set(yourvhost + ":" + username, password)
client.on_message = onMessage
client.connect(hostname, 1883)
client.subscribe("test")
client.loop_forever()
STOMP client
Now we are going to build our first STOMP application.
Prerequisite
The Python library we use for this example can be found at https://pypi.python.org/pypi/stompest/. Its GitHub repository is at https://github.com/nikipore/stompest.
It supports STOMP version 1.0, 1.1 and 1.2.
You can install it through sudo pip install stompest
.
The full documentation of this library is at http://nikipore.github.io/stompest/.
Producer
The first thing we need to do is to establish a connection with RoboMQ broker.
In STOMP, username is called login and password is called passcode.
client = Stomp(StompConfig("tcp://" + hostname + ":61613", login = username, passcode = password, version = "1.2"))
client.connect(versions = ["1.2"], host = yourvhost)
After that, producer can send messages to a particular destination. In this example, it is a queue bound to the default exchange, but it can be replaced by other types of destinations to perform the corresponding messaging. The Message destinations section in STOMP chapter elaborates it.
client.send(destination = "/queue/test", body = "Hello World!", headers = None)
At last, producer will disconnect with the RoboMQ broker.
client.disconnect()
Consumer
The first step is the same as producer, consumer needs to connect to RoboMQ broker.
Next step is to subscribe a destination, so that consumer knows where to listen to. Once it receives a message from the destination, it will print the message body.
subscription = client.subscribe("/queue/test", {StompSpec.ACK_HEADER: StompSpec.ACK_AUTO, StompSpec.ID_HEADER: '0'})
while True:
frame = client.receiveFrame()
print frame.body
Putting it together
Before testing the example code, replace hostname, yourvhost, username and password with the real variables in your network environment.
producer.py
import sys
from stompest.config import StompConfig
from stompest.sync import Stomp
client = Stomp(StompConfig("tcp://" + hostname + ":61613", login = username, passcode = password, version = "1.2"))
client.connect(versions = ["1.2"], host = yourvhost)
client.send(destination = "/queue/test", body = "Hello World!", headers = None)
client.disconnect()
consumer.py
import sys
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.sync import Stomp
client = Stomp(StompConfig("tcp://" + hostname + ":61613", login = username, passcode = password, version = "1.2"))
client.connect(versions = ["1.2"], host = yourvhost)
subscription = client.subscribe("/queue/test", {StompSpec.ACK_HEADER: StompSpec.ACK_AUTO, StompSpec.ID_HEADER: '0'})
while True:
frame = client.receiveFrame()
print frame.body