Skip to content

Publicador

Ejemplos básicos de productores.

Installar libreria

Para construir un publicar debemos instalar la libreri Confluent.kafka

PM> Install-Package Confluent.Kafka
o
> dotnet add package Confluent.Kafka

Creamos el Publicador

importamos la libreria

using System;
using System.Threading.Tasks;
using Confluent.Kafka;

//(...)

Creamos la clase y heredamos de BackgroundService para que se ejecute cuando se levanta la app.

    public class ProducerService : BackgroundService
    {
        private readonly ILogger<ProducerService> _logger;

        public ProducerService(ILogger<ProducerService> logger)
        {
            _logger = logger;
        }
        //(...)
    }

Implementamos el metodo ExecuteAsync.

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092",
            SecurityProtocol = SecurityProtocol.Plaintext,
        };

        using (var producer = new ProducerBuilder<Null, string>(config).Build())
        {
            string topic = "test-documentation";
            // creamos el mensaje <Key, Value>, el Value será nuestro evento.
            var message = new Message<Null, string>
            {
                Value="Test documentación"
            };
            try
            {
                // publicamos el evento.
                var dr = await producer.ProduceAsync(topic, message);
                _logger.LogInformation($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex.Message);
            }
        }
    }

Configuramos el Startup

En el Startup.cs agregamos la configuración

            builder.ConfigureServices((ctx, c) =>
            {
                //...
                c.AddHostedService<ProducerService>();
            }

Corremos la Aplicación

Corremos la aplicación y deberiamos ver algo así en la consola

 ____  _       _    __                      _____  _
|  _ \| | __ _| |_ / _| ___  _ __ _ __ ___ |___ / / |
| |_) | |/ _` | __| |_ / _ \| '__| '_ ` _ \  |_ \ | |
|  __/| | (_| | |_|  _| (_) | |  | | | | | |___) || |
|_|   |_|\__,_|\__|_|  \___/|_|  |_| |_| |_|____(_)_|

[17:15:59 INF] Try to start Server in Version: 1.0.0.0
[17:15:59 INF] Informing Listeners about OnStart()
[17:16:04 INF] Environment Name: [Production]
[17:16:06 INF] Delivered 'Test documentación' to 'test-documentation [[2]] @0'

Si entramos al Kafdrop deberiamos ver el mensaje publicado.

Crear Topic

Hay que tener cuidado, debemos asegurarnos que el topic existe, para eso podemos crearlo antes de publicar.

    private async void CreateTopic(string name, 
    int numPartitions, 
    short replicationFactor, 
    ClientConfig cloudConfig)
    {
    using (var adminClient = new AdminClientBuilder(cloudConfig).Build())
            {
                try
                {
                    await adminClient.CreateTopicsAsync(
                        new List<TopicSpecification> 
                        {
                            new TopicSpecification 
                            { 
                                Name = name, 
                                NumPartitions = numPartitions, 
                                ReplicationFactor = replicationFactor
                            }
                        });
                }
                catch (CreateTopicsException e)
                {
                    if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
                    {
                        _logger.LogError($"An error occured creating topic {name}: {e.Results[0].Error.Reason}");
                    }
                    else
                    {
                        _logger.LogWarning("Topic already exists");
                    }
                }
            }
        }