Consumidor
Ejemplo de consumidor básico.¶
Installar libreria¶
Para construir un publicar debemos instalar la libreri Confluent.kafka
PM> Install-Package Confluent.Kafka
> dotnet add package Confluent.Kafka
Creamos el Consumer¶
importamos la libreria
using Confluent.Kafka;
using System.Net;
//(...)
Creamos la clase y heredamos de BackgroundService
para que se ejecute cuando se levanta la app.
public class ConsumerService : BackgroundService
{
private readonly ILogger<ConsumerService> _logger;
public ConsumerService(ILogger<ConsumerService> logger)
{
_logger = logger;
}
//(...)
}
Implementamos el metodo ExecuteAsync
.
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "Test",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var topics = new List<string>(){"test-documentation"};
// Consumer builder<Tkey, TValue> el value es el evento que estamos esperando.
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// nos subscribimos a los topicos
consumer.Subscribe(topics);
while (true)
{
try
{
// escuchamos si hay mensjaes
var consumeResult = consumer.Consume(stoppingToken);
// manejamos los mensajes.
_logger.LogInformation(
consumeResult.Message.Timestamp.UtcDateTime.ToString("yyyy-MM-dd HH:mm:ss")
+ $": [{consumeResult.Message.Value}]");
}
catch (ConsumeException ex)
{
_logger.LogError($"an error occured: {ex.Error.Reason}");
}
}
consumer.Close();
};
}
La propiedad
GroupId
es obligatoria y especifica a qué grupo de consumidores pertenece el consumidor. La propiedadAutoOffsetReset
especifica desde qué compensación debe comenzar a leer el consumidor en caso de que no haya compensaciones comprometidas para una partición, o la compensación comprometida no sea válida (quizás debido al truncamiento del registro).
Configuramos el Startup¶
En el Startup.cs agregamos la configuración
builder.ConfigureServices((ctx, c) =>
{
//...
c.AddHostedService<ConsumerService>();
}
Corremos la Aplicación¶
Corremos la aplicación y deberiamos ver algo así en la consola
____ _ _ __ _____ _
| _ \| | __ _| |_ / _| ___ _ __ _ __ ___ |___ / / |
| |_) | |/ _` | __| |_ / _ \| '__| '_ ` _ \ |_ \ | |
| __/| | (_| | |_| _| (_) | | | | | | | |___) || |
|_| |_|\__,_|\__|_| \___/|_| |_| |_| |_|____(_)_|
[17:44:11 INF] Try to start Server in Version: 1.0.0.0
[17:44:11 INF] Informing Listeners about OnStart()
[17:44:16 INF] 2021-07-02 20:16:04: [Test documentación]