Publicador
Ejemplos básicos de productores.¶
Installar libreria¶
Para construir un publicar debemos instalar la libreri Confluent.kafka
PM> Install-Package Confluent.Kafka
> dotnet add package Confluent.Kafka
Creamos el Publicador¶
importamos la libreria
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
//(...)
Creamos la clase y heredamos de BackgroundService
para que se ejecute cuando se levanta la app.
public class ProducerService : BackgroundService
{
private readonly ILogger<ProducerService> _logger;
public ProducerService(ILogger<ProducerService> logger)
{
_logger = logger;
}
//(...)
}
Implementamos el metodo ExecuteAsync
.
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
SecurityProtocol = SecurityProtocol.Plaintext,
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
string topic = "test-documentation";
// creamos el mensaje <Key, Value>, el Value será nuestro evento.
var message = new Message<Null, string>
{
Value="Test documentación"
};
try
{
// publicamos el evento.
var dr = await producer.ProduceAsync(topic, message);
_logger.LogInformation($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (Exception ex)
{
_logger.LogError(ex.Message);
}
}
}
Configuramos el Startup¶
En el Startup.cs agregamos la configuración
builder.ConfigureServices((ctx, c) =>
{
//...
c.AddHostedService<ProducerService>();
}
Corremos la Aplicación¶
Corremos la aplicación y deberiamos ver algo así en la consola
____ _ _ __ _____ _
| _ \| | __ _| |_ / _| ___ _ __ _ __ ___ |___ / / |
| |_) | |/ _` | __| |_ / _ \| '__| '_ ` _ \ |_ \ | |
| __/| | (_| | |_| _| (_) | | | | | | | |___) || |
|_| |_|\__,_|\__|_| \___/|_| |_| |_| |_|____(_)_|
[17:15:59 INF] Try to start Server in Version: 1.0.0.0
[17:15:59 INF] Informing Listeners about OnStart()
[17:16:04 INF] Environment Name: [Production]
[17:16:06 INF] Delivered 'Test documentación' to 'test-documentation [[2]] @0'
Si entramos al Kafdrop deberiamos ver el mensaje publicado.
Crear Topic¶
Hay que tener cuidado, debemos asegurarnos que el topic existe, para eso podemos crearlo antes de publicar.
private async void CreateTopic(string name,
int numPartitions,
short replicationFactor,
ClientConfig cloudConfig)
{
using (var adminClient = new AdminClientBuilder(cloudConfig).Build())
{
try
{
await adminClient.CreateTopicsAsync(
new List<TopicSpecification>
{
new TopicSpecification
{
Name = name,
NumPartitions = numPartitions,
ReplicationFactor = replicationFactor
}
});
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
_logger.LogError($"An error occured creating topic {name}: {e.Results[0].Error.Reason}");
}
else
{
_logger.LogWarning("Topic already exists");
}
}
}
}