Message Queues in Golang Via RabbitMQ

Introduction:

In this article, we shall see how to implement message queues in Golang using RabbitMQ.

 

We believe message queues are a vital component to any architecture or application. For example, some process takes more time to process or occasionally, one needs to delay a time-consuming job for a while. In such cases, the process needs to be queued for future execution.

 

For this to happen, you need a broker: someone who will accept messages (e.g. jobs, tasks) from various senders (i.e. a web application), queue them up, and distribute them to the relevant parties (i.e. workers) to make use of them – all asynchronously and on demand.

 

We can use NSQ for the purpose, which is purely written in Golang or alternatively we can use RabbitMQ which is written in Erlang but wholly supports Golang.

 

As mentioned above, in this article, we will walk you through RabbitMQ queue implementation in Golang. It is written based on Ubuntu, but it works in other Linux libraries too.

Why use message queues?

Here are the main reasons:

 

  • DecouplingAllows you to extend and modify these processes independently
  • Elasticity & SpikabilityReduce loads and delivery times
  • PersistencePersistence is needed to pass information along from one place to another. This eliminates data loss since it is readily available in the queue until the process in the queue is completed
  • ScalabilityEasy to scale up the rate with which messages are added to the queue or processed
  • Asynchronous CommunicationAllows you to put a message on the queue without processing it immediately. Queue up as many messages as you like, then process them at your leisure
  • Delivery GuaranteeProvides guaranteed delivery

 

RabbitMQ:

 

It is an open-source message-broker application stack which implements the Advanced Message Queuing Protocol (AMQP) to facilitate the passing of messages between or within systems.

How does it work?

RabbitMQ connects message senders (Publishers) with receivers (Consumers) through an exchange (Broker) which distributes the data to relevant lists (Message Queues).

 

Installing RabbitMQ

Install RabbitMQ with following commands


## Enable RabbitMQ application repository ##

echo 'deb http://www.rabbitmq.com/debian/ testing main' |
sudo tee /etc/apt/sources.list.d/rabbitmq.list

 

## Add the verification key for the package ##

curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add -

 

## Update ##

sudo apt-get update

 

## Install RabbitMQ ##

sudo apt-get install rabbitmq-server

 

## to manage the maximum amount of connections upon launch, open up and edit the following configuration file ##

sudo nano /etc/default/rabbitmq-server

 

Uncomment the limit line (i.e. remove #)


## To enable RabbitMQ Management ##

sudo rabbitmq-plugins enable rabbitmq_management

 

Then we can be accessed using web browser: http://localhost:15672/

 

## To start/stop the service ##

service rabbitmq-server start

service rabbitmq-server stop

Using RabbitMQ:

The following steps explains how your message gets published and how your message gets handled correctly:

 

  1. Requirement
  2. Making a connection
  3. Making a channel
  4. Declaring a queue
  5. Publishing a message
  6. Consuming a message

Requirement

To implement message queue, we need Go RabbitMQ client


go get github.com/streadway/amqp

Making a connection

AMQP connections are typically long-lived. AMQP is an application level protocol that uses TCP for reliable delivery. AMQP connections use authentication and can be protected using TLS (SSL). When an application no longer needs to be connected to an AMQP broker, it should gracefully close the AMQP connection instead of abruptly closing the underlying TCP connection.

 

This creates long-lived TCP connection between an AMQP client and a queue broker.

 


conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

Making a channel

This creates a short-lived sub-connection between a client and a broker.


ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

 

Declaring a queue

A queue is a first-in, first out (FIFO) holder of messages.

We can handle several queues at the same time. This facilitates categorisation.


q, err := ch.QueueDeclare(
"retaildash-scrapy", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait (wait time for processing)
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

 

Publishing a message

Publish() is the method that puts (publishes) the data into the queue.

 


err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")

 

Consuming a message

 

Now that the data is ‘published’ into the queue, it has to be consumed so as to handle the data for the processing.


msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

for d := range msgs {
log.Printf("Received a message: %s", d.Body) //any kind of further processing code
}

 

This is a sample usage of the RabbitMQ to handle message queueing. As mentioned above, the advantages of queueing messages are numerous, and RabbitMQ in Golang enables easy usage to implement it.

For more examples, Check this https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go

To learn more, see RabbitMQ.