Skip to content

Consumidor

Ejemplo de consumidor básico.

Installar libreria

Para construir un publicar debemos instalar la libreri Confluent.kafka

PM> Install-Package Confluent.Kafka
o
> dotnet add package Confluent.Kafka

Creamos el Consumer

importamos la libreria

using Confluent.Kafka;
using System.Net;
//(...)

Creamos la clase y heredamos de BackgroundService para que se ejecute cuando se levanta la app.

    public class ConsumerService : BackgroundService
    {
        private readonly ILogger<ConsumerService> _logger;

        public ConsumerService(ILogger<ConsumerService> logger)
        {
            _logger = logger;
        }
        //(...)
    }

Implementamos el metodo ExecuteAsync.

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "Test",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
        var topics = new List<string>(){"test-documentation"};
        // Consumer builder<Tkey, TValue> el value es el evento que estamos esperando.
        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            // nos subscribimos a los topicos
            consumer.Subscribe(topics);
            while (true)
            {
                try
                {
                // escuchamos si hay mensjaes
                var consumeResult = consumer.Consume(stoppingToken);

                // manejamos los mensajes.
                _logger.LogInformation(
                    consumeResult.Message.Timestamp.UtcDateTime.ToString("yyyy-MM-dd HH:mm:ss")
                    + $": [{consumeResult.Message.Value}]");
                }
                catch (ConsumeException ex)
                {
                    _logger.LogError($"an error occured: {ex.Error.Reason}");
                }
            }

            consumer.Close();
        };
    }

La propiedad GroupId es obligatoria y especifica a qué grupo de consumidores pertenece el consumidor. La propiedad AutoOffsetReset especifica desde qué compensación debe comenzar a leer el consumidor en caso de que no haya compensaciones comprometidas para una partición, o la compensación comprometida no sea válida (quizás debido al truncamiento del registro).

Configuramos el Startup

En el Startup.cs agregamos la configuración

            builder.ConfigureServices((ctx, c) =>
            {
                //...
                c.AddHostedService<ConsumerService>();
            }

Corremos la Aplicación

Corremos la aplicación y deberiamos ver algo así en la consola

 ____  _       _    __                      _____  _
|  _ \| | __ _| |_ / _| ___  _ __ _ __ ___ |___ / / |
| |_) | |/ _` | __| |_ / _ \| '__| '_ ` _ \  |_ \ | |
|  __/| | (_| | |_|  _| (_) | |  | | | | | |___) || |
|_|   |_|\__,_|\__|_|  \___/|_|  |_| |_| |_|____(_)_|

[17:44:11 INF] Try to start Server in Version: 1.0.0.0
[17:44:11 INF] Informing Listeners about OnStart()
[17:44:16 INF] 2021-07-02 20:16:04: [Test documentación]