Saltar a contenido

Documentación RabbitMQ 🐇

Autor Gabriel Molinas.

Introducción

El presente trabajo de investigación parte de la necesidad de evaluar una herramienta de transferencia de mensajes por medio de Colas, se selecciona RabbitMQ para hacer esta investigación ya que es la herramienta de colas con más utilización actualmente y la cual posee una comunidad muy activa. RabbitMQ es un broker de mensajería de código abierto, distribuido y escalable, que sirve como intermediario para la comunicación eficiente entre productores y consumidores.

El objetivo es hacer una prueba de concepto con esta herramienta con el objetivo de complementar la implementación de Apache Kafka orientando a tener una herramienta de mensajería orientado a colas.

Librerías y Herramientas

Para realizar esta investigación utilizamos las librerías nativas que provee la comunidad de RabbitMQ, cabe destacar que esta dispone y da mantenimiento a librerías para las principales tecnologías en la que tenemos 100% de compatibilidad.

Instalación de RabbitMQ Local

Logo

Para realizar prubas en entorno local se levanta un docker compose con la imagen de rabbitmq expuesta en los puertos 5672 y 15672.

 version: "3.5"

# Docker services
services:
    rabbitmq:
        environment:
            RABBITMQ_DEFAULT_USER: arquitectura
            RABBITMQ_DEFAULT_PASS: arquitectura
        image: rabbitmq:3.10-management
        container_name: rabbitmq
        restart: always
        tty: true
        stdin_open: true
        volumes:
            - ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/
            - ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq
        ports:
            # HTTP DASHBOARD FOR MANAGEMENT
            - "15672:15672"
            # AMQP PROTOCOL
            - "5672:5672"

Info

Esta imagen contiene un management, una UI donde se puede ver información desde el localhost:15672.


PoC

Hola Mundo

En esta introduccion se repasan los conceptos basicos sobre el funcionamiento de un broker de mensajeria, y un ejemplo simple de como crear un publisher y consumer en .NET c#.

¿Que es un productor?

productor

Producir no significa nada más que enviar. Un programa que envía mensajes es un productor

¿Que es una cola?

cola

Una cola es el nombre de un buzón de correo que vive dentro de RabbitMQ. Una cola solo está limitada por los límites de memoria y disco del host, es esencialmente un gran búfer de mensajes. Muchos productores pueden enviar mensajes que van a una cola y muchos consumidores pueden intentar recibir datos de una cola. Para más info sobre las colas

¿Que es un Consumidor?

Consumir tiene un significado similar a recibir. Un consumidor es un programa que en su mayoría espera recibir mensajes:

cola


Tutorial

En esta parte del tutorial escribiremos dos programas en C#; a productor que envía un solo mensaje, y un consumidor que recibe mensajes y los imprime. Es un "Hola mundo" de mensajería.

Enviar (Productor)

Empezamos creando un programa de consola y agregamos las siguientes dependencias:

using System;
using RabbitMQ.Client;
using System.Text;
Para establecer una conexión con el servidor se realiza de la siguiente forma:
class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "arquitectura", Password = "arquitectura" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            ...
        }
    }
}

La ConnectionFactory abstrae la conexión del socket y se ocupa de la autenticación. Para nosotros. Aquí nos conectamos a un nodo de RabbitMQ en el Docker local; de ahí el servidor local . Si quisiéramos conectarnos a un nodo de una diferente máquina simplemente especificaríamos su nombre de host o dirección IP, contraseña aquí.

A continuación, creamos un canal, que es donde se encuentra la mayor parte de la API para obtener las cosas hechas residen.

Para enviar mensajes, debemos crear una cola, de la siguiente forma:

channel.QueueDeclare(queue: "NombreDeLaCola",
                     durable: false,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

Entonces si se junta todo queda algo asi:

using System;
using RabbitMQ.Client;
using System.Text;

class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "arquitectura", Password = "arquitectura" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "NombreDeLaCola",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                 routingKey: "NombreDeLaCola",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

Warning

Declarar una cola es idempotente: solo se creará si no existe ya. El contenido del mensaje es una matriz de bytes, por lo que puede codificar lo que quieras allí.

Recibir (Consumidor)

En cuanto al consumidor, está escuchando mensajes de Rabbit. A diferencia del enviador que publica un solo mensaje, nosotros mantenemos al consumidor funcionando continuamente para escuchar mensajes e imprimirlos.

Creamos otro programa de consola y agregamos las siguientes dependencias:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

La configuración es la misma que la del Enviador; abrimos una conexión y un channel, y declaramos la cola de la que vamos a consumir.

class Receive
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "arquitectura", Password = "arquitectura" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "NombreDeLaCola",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            ...
        }
    }
}
Declaramos la cola acá también porque podríamos correr el consumidor antes que el enviador, queremos asegurarnos de que exista la cola antes de que intentemos consumir mensajes de él.

Estamos a punto de decirle al servidor que nos entregue los mensajes. Dado que nos enviará mensajes de forma asíncrona, proporcionamos un llamar de vuelta. Eso es lo hacemos con EventingBasicConsumer.Received

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

class Receive
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "arquitectura", Password = "arquitectura" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "NombreDeLaCola",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "NombreDeLaCola",
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

Publicador Subscriptor

La suposición detrás de una cola de trabajo es que cada mensaje es entregado a exactamente un consumidor. En esta parte haremos algo. completamente diferente: enviaremos un mensaje a varios consumidores Este patrón se conoce como "publicar/suscribir".

publicador subscriptor

Tip

- Un productor es una aplicación de usuario que envía mensajes.
- Una cola es un búfer que almacena mensajes.
- Un consumidor es una aplicación de usuario que recibe mensajes.

La idea central en el modelo de mensajería en RabbitMQ es que el productor nunca envía ningún mensaje directamente a una cola. En realidad, muy a menudo el productor ni siquiera sabe si un mensaje será entregado a algúna cola en absoluto.

En cambio, el productor solo puede enviar mensajes a un exchange. El Exchange es una cosa muy simple. Por un lado recibe mensajes de productores y al otro lado los empuja a las colas. El Exchange debe saber exactamente qué hacer con un mensaje que recibe. Deberia ser agregado a una cola en particular? ¿Debería agregarse a muchas colas? O debería desecharse. Las reglas para eso están definidas por el tipo de cambio.

Warning

En partes anteriores del tutorial no sabíamos nada sobre los Exchange, pero aun así pudieron enviar mensajes a las colas. eso fue posible porque estábamos usando un intercambio predeterminado, que identificamos por la cadena vacía ( "" ).

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",              /   *exchangevacio*/
                     routingKey: "NombreDeLaCola",
                     basicProperties: null,
                     body: body);

Ahora, podemos publicar en nuestro Exchange con nombre en su lugar:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "NombreDelExchange",
                     routingKey: "",
                     basicProperties: null,
                     body: body);
Anteriormente al crear la cola de dedicabamos un nombre al cual nuestro consumidor se iba a conectar, en el caso del exchange esto no sera necesario. En primer lugar, cada vez que nos conectamos a Rabbit, necesitamos una cola nueva y vacía. Para ello podríamos crear una cola con un nombre aleatorio, o, incluso mejor: deje que el servidor elija un nombre de cola aleatorio para nosotros.

En segundo lugar, una vez que desconectemos al consumidor, la cola debería ser eliminada automáticamente.

En .NET, cuando no proporcionamos parámetros a QueueDeclare() creamos una cola de eliminación automática exclusiva y no duradera con un nombre generado:

var  queueName = channel.QueueDeclare().QueueName;

Ya hemos creado un Exchange y una cola. Ahora tenemos que dígale al Exchange que envíe mensajes a nuestra cola. esa relacion entre el Exchange y una cola se llama enlace o binding.

channel.QueueBind(queue: queueName,
                  exchange: "NombreDelExchange",
                  routingKey: "");

Todo junto

using System;
using RabbitMQ.Client;
using System.Text;

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "arquitectura", Password = "arquitectura" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "NombreDelExchange", type: ExchangeType.Fanout);

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "NombreDelExchange",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}
Como ve, después de establecer la conexión declaramos el exchange. Este paso es necesario ya que la publicación en un sitio no existente Prohibido el exchange.

Los mensajes se perderán si aún no hay una cola vinculada al exchange, pero eso está bien para nosotros; si ningún consumidor está escuchando todavía, podemos descartar el mensaje con seguridad.

El código para Suscriptor
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "NombreDelExchange", type: ExchangeType.Fanout);

            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "NombreDelExchange",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

Consumir OnDemand

OnDemand se refiere a consumir un único mensaje de una cola, el primero en entrar es el primero en salir (FiFo).

En este caso no se utilizara un exchange, como en el ejemplo del publicador/suscriptor, y utilizaremos únicamente una cola.

Esto no quiere decir que no se pueda utilizar un exchange para crear un FiFo, simplemente es por practicidad

Comenzamos creando un proyecto e implementado la libreria

using RabbitMQ.Client;

Establecemos las conexiones y creamos una cola:

class OnDemandConsumer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "arquitectura", Password = "arquitectura" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "NombreDeLaCola",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            ...
        }
    }
}

Para consumir un unico mensaje es muy simple, desde channel podemos acceder a una funcion llamada BasicGet a la cual le vamos a pasar dos parametros. 1. El nombre de la cola 2. autoAck

channel.BasicGet( **NombreDeLaCola** , **autoAck** );

Warning

El autoAck cuando esta activado (true) el mensaje se eliminará de la cola después de el BasicGet o el BasicConsume. Este uso suele ser para mensajes de bajo valor, si el procesamiento falla después de consumir, el mensaje se perderá.

autoAck cuando esta desactivado (false) se usaría para mensajes importantes, donde la pérdida de un mensaje no es una opción. Si su proceso de consumo falla y no puede reconocer el mensaje, el mensaje se devuelve a la cola.

Entonces el código agregando un poco mas de lógica quedaría algo así:

class OnDemandConsumer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "arquitectura", Password = "arquitectura" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "NombreDeLaCola",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            while (true)
            {
                var data = channel.BasicGet("NombreDeLaCola", true);

                if (data != null)
                {
                    var message = System.Text.Encoding.UTF8.GetString(data.Body.ToArray());

                    Console.WriteLine(" [x] Received {0}", message);
                }
                else
                {
                    Console.WriteLine(" [x] No hay mensajes para consumir");

                }

                Console.WriteLine(" Press [enter] to recive");
                Console.ReadLine();
                Console.Clear();

            }
        }
    }
}

Para finalizar, le recomendamos que lea las diferencias del Basic Consumer y Basic Get de RabbitMQ aquí


Info

Para consultar más información puede acceder a la página oficial de [rabbitMQ](https://www.rabbitmq.com/getstarted.html)

Conclusión

A lo largo de la prueba pudimos comprobar que la utilización de la herramienta por parte de las librerías de .NET es relativamente sencilla y cumple con las principales necesidades.

Cabe aclarar que las pruebas que se realizaron están lejos de ser pruebas para un entorno productivo y no se llego a profundizar con lo necesario para disponibilizar una estrategia cross-company para su utilización. Por otro lado no se avanzo con la implementación de la herramienta en entornos productivos de OCP o servidores Andreani esto debido a la falta de necesidad de implementar esta herramienta y el desconocimiento del los equipos (Arquitectura – Infraestructura – Operaciones - Monitoreo) para dar soporte y mantenimiento de la herramienta.

Para finalizar este trabajo, podemos definir que para el momento que se encuentre una necesidad puntal para el uso de esta herramienta debemos capacitar a los equipos para dar soporte y mantenimiento, ya que es una herramienta open source, otra posibilidad es avanzar con la contratación de un partner para la implementación y la capacitación de la herramienta a los equipos de Andreani.

Anexo

PoC

Pueden acceder al código de la prueba desde git