Andreani.ARQ.AMQStreams¶
Autor Olivera Lucas.¶
Objetivo¶
El objetivo de esta librería es abstraer al desarrollador de la implementación de Apache Kafka y brindarle herramientas para que, rápidamente, pueda integrar AMQStream en sus proyectos.
La actual librería utiliza las librerías de Confluent kafka como base para poder interactuar con Apache Kafka. ver confluent-kafka
Configuración¶
Configuración inicial¶
ApplicationName: Example
AMQStreams:
BootstrapServer: "localhost:9092"
SchemaUrl: "http://localhost:8081/apis/ccompat/v6"
name | descripción |
---|---|
ApplicationName | Valor que se utiliza para definir el remitente y el consumer group |
BootstrapServer | URL de acceso al broker de kafka |
SchemaUrl | URL de acceso a apicurio registry |
Configuración avanzada¶
ApplicationName: Example
AMQStreams:
BootstrapServer: "SSL://cluster-kafka-bootstrap-amq-streams-test.apps.ocptest.andreani.com.ar:443"
SchemaUrl: "http://apicurioregistry.apps.ocptest.andreani.com.ar/apis/ccompat/v6"
Protocol: Ssl
AutoOffsetReset: Earliest
AutoRegisterSchemas: true
MaxRetry: 3
name | descripción |
---|---|
GroupId | representa el valor de consumer group. Default: ApplicationName |
Protocol | Protocolo de conexión al broker opciones: Ssl, Plaintext Default: Plaintext |
SslCertificateLocation | Path de ubicación de certificado SSL. Default: "" |
EnableSslCertificateVerification | La aplicación puede ampliar esta verificación mediante la implementación de un certificate_verify_cb. Default: false |
AutoOffsetReset | Permite establecer la estrategia de offset inicial para la lectura de los Consumers. Opciones Latest, Earliest, Error. Default: Earliest |
AutoRegisterSchemas | Si está seteado en true, permite registrar, en caso que no existan, el schema en apicurio registry. Default: false |
MessageMaxBytes | Cantidad de bytes que se permite consumir o publicar. Default: 1000000 (1MB) |
MillisecondsTimeout | Cantidad de tiempo que un consumidor espera el ingreso de un mensaje. Default: 10000 (10 seg) |
MaxRetry | Cantidad de reitentos permitidos para un mensaje antes de marcarse como mensaje corrupto. Default: 3 |
MaxPollIntervalMs | Configuración de intervalo de tiempo habilitado entre dos consumos de mensajes, en caso de que no se consuma un mensaje durante este intervalo de tiempo el coordinador de Kafka entenderá que el consumer está caído por lo que eliminará al consumer. Default: 300000 |
SessionTimeoutMs | El consumer envía señales de vida al coordinador, esto ocurre cada cierto tiempo, si el coordinador no recibe estas señales eliminara al consumer. Este valor siempre debe ser menor al MaxPollIntervalMs Default: 45000 |
ConsumerDebug | Configuración para ver el log del consumer, es un string en forma de array, dependiendo de lo que necesites que se muestre. Ejemplos: "consumer, topic", opciones: consumer,cgrp,topic,fetch |
AutoCommitIntervalMs | Configuración de intervalo de commits. Default: 100 |
EnableAutoCommit | Configurar en False para consumidores que requieren un tiempo prolongado al procesar los mensajes. Default: True . Aclaración: en ambos casos los commit se continuan gestionando dentro de la libreria que proporcionamos |
Para más información de la configuración de kafka, consumers o producers ver -> KafkaConfig, ConsumerConfig, ProducerConfig
Producer¶
El producer es un agente que permite realizar la publicación de un evento en un topic determinado.
La librería dispone de la interface IPublisher la cual posee la responsabilidad de realizar las publicaciones.
Configuración¶
En nuestro archivo de startup (si tenemos la arquitectura vieja) debemos importar la librería Andreani.ARQ.AMQStreams
Simplemente llamar al método de extensión AddKafka().
// Ejemplo con nueva defición de Program
// Configuramos el Services para las DI
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureAndreaniWebHost(args);
builder.Services.ConfigureAndreaniServices();
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
builder.Services.AddKafka(builder.Configuration).ToProducer<CustomEvent>("my-topic").Build();
var app = builder.Build();
// Configuramos el middleware
app.ConfigureAndreani();
public void ConfigureServices(IServiceCollection services)
{
//...
services.AddKafka(Configuration)
.ToProducer<CustomEvent>("my-topic")
.Build();
}
Para configurar un producer debemos especificar el evento que queremos publicar (CustomEvent)
y a que topic vamos a publicar ese evento (my-topic)
.
La librería nos permite configurar múltiples producer y publicar el mismo evento en n-topics
// Ejemplo con nueva defición de Program
// Configuramos el Services para las DI
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureAndreaniWebHost(args);
builder.Services.ConfigureAndreaniServices();
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
builder.Services.AddKafka(builder.Configuration)
.ToProducer<CustomEvent>("my-topic")
.ToProducer<CustomEvent2>("my-topic", "my-topic-2")
.Build();
var app = builder.Build();
// Configuramos el middleware
app.ConfigureAndreani();
public void ConfigureServices(IServiceCollection services)
{
//...
services.AddKafka(Configuration)
.ToProducer<CustomEvent>("my-topic")
.ToProducer<CustomEvent2>("my-topic", "my-topic-2")
.Build();
}
Cómo publicar¶
Para publicar un mensaje simplemente debemos inyectar la interface IPublisher
private readonly IPublisher _publisher;
public ProducerJob(IPublisher publisher)
{
_publisher = publisher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var message = new CustomEvent
{
codigo = 1,
mensaje = "example producer"
};
await _publisher.To(message, "Key");
}
To()
del publisher
.
La librería posee una alternativa en cuanto a la necesidad del desarrollador planteando diferentes escenarios
public interface IPublisher
{
Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(TEvent @event,
string key,
string exactTopic = null,
List<Header> headers = null)
where TEvent : class, ISpecificRecord;
Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(
IEnumerable<TEvent> events,
Func<TEvent, string> setKey,
string exactTopic = null,
List<Header> headers = null,
Action<TEvent> beforePublish = null)
where TEvent : class, ISpecificRecord;
}
Otros Ejemplos¶
Agregar Headers al mensaje:
List<Header> headers = new List<Header>
{
new Header("keyHeader", Encoding.UTF8.GetBytes("1")),
};
...
await _publisher.To(message, "key", headers: headers);
Publicar un mensaje en un topic especifico (Ignora la configuración).
await _publisher.To(message, "key", "ConTopicoExacto");
Procesamiento Batch¶
en el caso que necesitemos publicar n cantidad de eventos, la librería realiza un proceso en batch para facilitar esto.
El proceso de batch realiza publicaciones de a 1000 mensajes por llamado a kafka.
await _publisher.To(messages, (m) => "key");
await _publisher.To(messages, (m) => m.codigo);
La función recibe:
- La lista de mensajes.
- Una función para definir la key de cada evento. Esta función recibe por parametros el mensaje por lo que puede seleccionar una propiedad del mensaje como key. Ej: id, codigo, número.
- ExactTopic y Headers cual como el metodo de publicación de un mensaje.
beforePublish
es una Action que se ejecuta antes de publicar cada mensaje, en caso de necesitar realizar una acción previa a la publicación.
Ejemplo:
await _publisher.To(messages,
(m) => m.codigo,
beforePublish: (e) => _logger.LogInformation($"publicando el evento {e.codigo}"));
Consumer¶
El consumer es un agente que se subscribe a un topic y reacciona a los eventos que suceden en ese topic.
Para crear un consumer con la librería debemos simplemente declarar cuál será el objeto suscrito, que evento espera y en qué topic. La librería se encargará de ejecutar la acción adecuada.
Configuración¶
En nuestro archivo de startup debemos importar la librería Andreani.ARQ.AMQStreams
Simplemente llamar al método de extensión AddKafka()
.
// Ejemplo con nueva defición de Program
// Configuramos el Services para las DI
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureAndreaniWebHost(args);
builder.Services.ConfigureAndreaniServices();
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
builder.Services.AddKafka(builder.Configuration)
.ToConsumer<Subscriber, CustomEvent>("my-topic")
.Build();
var app = builder.Build();
// Configuramos el middleware
app.ConfigureAndreani();
public void ConfigureServices(IServiceCollection services)
{
//...
services.AddKafka(Configuration)
.ToConsumer<Subscriber, CustomEvent>("my-topic")
.Build();
}
El metodo ToConsumer
necesita que le declaren el objeto de tipo ISubscriber (Subscriber
), el evento que va a esperar (CustomEvent
) y el topic al cual va a estar suscrito (my-topic
).
Como consumir¶
Para consumir un evento el objeto que será el suscriptor debe implementar la interface ISubscriber
public class Subscriber : ISubscriber
Luego el objeto suscriptor debe contener al menos un método que reciba por parámetro el evento.
public Task ReciveCustomEvent(CustomEvent @event)
{
Console.WriteLine($"Content event codigo: {@event.codigo} - message {@event.mensaje}");
return Task.CompletedTask;
}
La librería invocará a este método cuando reciba el evento que se declaró.
Tip
Si necesitamos conocer más información del evento, simplemente podemos crear un método que reciba el objeto ```ConsumerMetadata```
public Task ReciveCustomEvent(CustomEvent @event, ConsumerMetadata metadata)
{
Console.WriteLine($"Recived event with key: {metadata.Key} and Timestamp: {metadata.Timestamp.UtcDateTime.ToString()}");
Console.WriteLine($"Content event codigo: {@event.codigo} - message {@event.mensaje}");
return Task.CompletedTask;
}
Warning
**Los métodos deben siempre devolver un `Task`**
Manejo de Topics¶
A la hora de trabajar con topics debemos definir que estructura tendrán estos. Es muy importante saber que cantidad de mensajes vamos a procesar como para saber que configuración de topic es la correcta.
Nombrado¶
Los topics deben tener nombres descriptivos que referencien a quienes son los owner del mismo. Siendo el nombre del proyecto el prefix de sus topic. {proyecto}-evento
Ejemplo: notificaciones-alta-automatica, fisa-facturacion, sce-alta-de-pedido
Crear Topic.¶
Podemos realizar la configuración en la app con la librería para crear o actualizar el topic del que sea publicador la app.
Warning
solo aquellos topicos que seamos owner se deben configurar en la applicación.
services.AddKafka(Configuration)
.CreateOrUpdateTopic(3, "Pedidos-Prueba")
.Build();
CreateOrUpdateTopic()
recibe la cantidad de particiones que queremos configurar y el listado de topic al cual querramos aplicar esa configuración. En caso de que el topic no exista, creara el mismo con la cantidad de particiones solicitadas y un factor de replicación de 3
. En caso de que el topic ya exista se actualizara el número de particiones.
Danger
**Solo se puede incrementar la cantidad de particiones, ya que decrementar las mismas provocaría una perdida parcial o total de los mensajes.**
Estrategias de consumer¶
La librería dispone actualmente de las siguientes estrategias de consumo.
- RealTime: El proceso se subscribe al topic y consume los mensajes, en caso de no haber mensajes queda esperando la llegada de uno, es el consumidor por defecto.
- SleepAndGo: Este consumer es similar al RealTime salvo que cuando consume un mensaje espera cierto tiempo antes de procesarlo, esta pensado para procesos de reintentos en donde no queremos procesar un mensaje que fallo automáticamente.
- FullScan: Consumer que se subscribe al topic, consume todos los mensajes y cuando no hay más mensajes se des subscribe. Está pensado para utilizarlo en procesos del estilo CronJob.
- Tiempo de procesamiento extendido: Esta estrategia se recomienda para consumers que por su naturaleza requieren un tiempo de procesamiento prolongado o tienen tiempos variados al procesar cada mensaje.
SleepAndGo¶
Como mencionamos este consumer esta orientado a procesos de reitentos.
// Ejemplo con nueva defición de Program
// Configuramos el Services para las DI
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureAndreaniWebHost(args);
builder.Services.ConfigureAndreaniServices();
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
builder.Services.AddKafka(builder.Configuration)
.ToConsumerSleepAndGo<Subscriber, CustomEvent>(new TimeSpan(0, 1, 0), "Pedidos-Prueba-retry")
.Build();
var app = builder.Build();
// Configuramos el middleware
app.ConfigureAndreani();
public void ConfigureServices(IServiceCollection services)
{
//...
services.AddKafka(Configuration)
.ToConsumerSleepAndGo<Subscriber, CustomEvent>(new TimeSpan(0, 1, 0), "Pedidos-Prueba-retry")
.Build();
}
FullScan¶
Este consumer se utiliza con la configuración de un cron por lo que se debe especificar cuando es el momento de iniciar el CronJob.
// Ejemplo con nueva defición de Program
// Configuramos el Services para las DI
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureAndreaniWebHost(args);
builder.Services.ConfigureAndreaniServices();
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
IAmqTrigger trigger = new AmqTrigger
{
WithIntervalInSeconds = 60,
}
builder.Services.AddKafka(builder.Configuration)
.ToConsumerJob<Subscriber, CustomEvent>(trigger, "Pedidos-Prueba-retry")
.Build();
var app = builder.Build();
// Configuramos el middleware
app.ConfigureAndreani();
public void ConfigureServices(IServiceCollection services)
{
//...
IAmqTrigger trigger = new AmqTrigger
{
WithIntervalInSeconds = 60,
}
services.AddKafka(Configuration)
.ToConsumerJob<Subscriber, CustomEvent>(trigger, "Pedidos-Prueba-retry")
.Build();
}
La struct AmqTrigger
tiene la configuración del trigger a ejecutar.
IAmqTrigger trigger = new AmqTrigger
{
WithIntervalInSeconds = 60
};
Propiedad | Descripción |
---|---|
StartNow | (bool) En caso de se true el Cronjob se ejecutará en el momento que se levante la app. Default: true |
WithIntervalInSeconds | (int) Intervalo en segundos de despertar el Cronjob |
StartTimeUtc | (DateTimeOffset) Fecha de inicio para el CronJob, se tomará en caso de que StartNow=false |
Tiempo de procesamiento extendido¶
Como se menciono es para consumidores que requieren tiempos extendidos para procesar los mensajes.
Al estar activa
, le indica al coordinador de kafka que cada miembro
del grupo de consumidores recibira el proximo mensaje cuando termine de procesar el actual
.
Recomendamos ajustar el MaxPollIntervalMs cuando el tiempo de procesamiento del mensaje supere este intervalo.
Es compatible con las estrategias actuales
Realtime, SLeepAndGo, FullScan por lo que no es necesario modificarlas.
Simplemente se activa agregando el EnableAutoCommit
a la configuración
ApplicationName: Example
AMQStreams:
BootstrapServer: "SSL://cluster-kafka-bootstrap-amq-streams-test.apps.ocptest.andreani.com.ar:443"
SchemaUrl: "http://apicurioregistry.apps.ocptest.andreani.com.ar/apis/ccompat/v6"
AutoOffsetReset: Earliest
EnableAutoCommit: False ## Activa el consumo extendido
Warning
Disponible a partir de la versión 6.5.10
.
Políticas de reintentos¶
En Apache kafka no existe el concepto de republicar un mensaje como se venía manejando en MQ, por lo que cada aplicación debe ser capaz de asegurar la transaccionalidad del mensaje que consume. Con esto me refiero a tener:
- Asegurar puntos de falla - llamado a Apis externas, base de datos, etc.
En caso de que la app arroje una Exception, la librería interpretara como falla y comenzara el proceso de reintentos.
Al ejecutarse un error se moverá el mensaje al topic de reintento configurado en el Move
y se marcara ese mensaje.
Este mensaje puede saltar de tópicos las veces que sea configurado en el campo MaxRetry
de la configuración.
Cuando el mensaje sobrepase esta cantidad se moverá al tópico de deadline automáticamente y será marcado para no volverse a consumir.
En caso de agotar las instancias se podrá volver a intentar el flujo desde la app de deadline (Actualmente en Desarrollo).
Re Intento consumo mensajes¶
Cada app debe manejar su política de reintentos, especificando en su configuración que acción tomar en caso de error en algún topic.
Se debe especificar la cantidad máxima de reintentos y el tópico donde se moverá el mensaje.
// Ejemplo con nueva defición de Program
// Configuramos el Services para las DI
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureAndreaniWebHost(args);
builder.Services.ConfigureAndreaniServices();
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
builder.Services.AddKafka(builder.Configuration)
.ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
.Move("Example-Pedidos-Prueba-retry")
.Build();
var app = builder.Build();
// Configuramos el middleware
app.ConfigureAndreani();
public void ConfigureServices(IServiceCollection services)
{
//...
services.AddKafka(Configuration)
.ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
.Move("Example-Pedidos-Prueba-retry")
.Build();
}
El método Move
es el que realiza la configuración, en el ejemplo configura que en caso de falla el mensaje se publique en Example-Pedidos-Prueba-retry
.
Una posible configuración completa podría ser
// Ejemplo con nueva defición de Program
// Configuramos el Services para las DI
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureAndreaniWebHost(args);
builder.Services.ConfigureAndreaniServices();
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
var topicRetry= "Example-Pedidos-Prueba-retry"
builder.Services.AddKafka(builder.Configuration)
.ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
.Move(topicRetry)
.ToConsumerSleepAndGo<Subscriber, CustomEvent>(new System.TimeSpan(0,15,0), topicRetry)
.Move(topicRetry)
.Build();
var app = builder.Build();
// Configuramos el middleware
app.ConfigureAndreani();
public void ConfigureServices(IServiceCollection services)
{
//...
var topicRetry= "Example-Pedidos-Prueba-retry"
services.AddKafka(Configuration)
.ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
.Move(topicRetry)
.ToConsumerSleepAndGo<Subscriber, CustomEvent>(new System.TimeSpan(0,15,0), topicRetry)
.Move(topicRetry)
.Build();
}
Tip
El segundo consumer debe apuntar al mismo topic de reintentos, para poder volver a procesar el mesanje.
Donde se configura un consumer de tipo SleepAndGo
con una espera de 15 minutos para volver a procesar este mensaje.
o podemos configurar un cron que corra de noche y reprocese.
// Ejemplo con nueva defición de Program
// Configuramos el Services para las DI
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureAndreaniWebHost(args);
builder.Services.ConfigureAndreaniServices();
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
IAmqTrigger trigger = new AmqTrigger
{
WithIntervalInSeconds = 60 * 60 * 24 // 24hs
StartTimeUtc = new DateTimeOffset(new DateTime(2022,6,14,2,0,0)), // 2022/06/14T:02:00:00
StartNow = false
};
//...
var topicRetry= "Example-Pedidos-Prueba-retry"
builder.Services.AddKafka(Configuration)
.ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
.Move(topicRetry)
.ToConsumerJob<Subscriber, CustomEvent>(trigger, topicRetry)
.Move(topicRetry)
.Build();
var app = builder.Build();
// Configuramos el middleware
app.ConfigureAndreani();
public void ConfigureServices(IServiceCollection services)
{
IAmqTrigger trigger = new AmqTrigger
{
WithIntervalInSeconds = 60 * 60 * 24 // 24hs
StartTimeUtc = new DateTimeOffset(new DateTime(2022,6,14,2,0,0)), // 2022/06/14T:02:00:00
StartNow = false
};
//...
var topicRetry= "Example-Pedidos-Prueba-retry"
services.AddKafka(Configuration)
.ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
.Move(topicRetry)
.ToConsumerJob<Subscriber, CustomEvent>(trigger, topicRetry)
.Move(topicRetry)
.Build();
}
Circuit Breaker¶
Esta funcionalidad se creo para corta y/o habilitar el consumo en Kafka, de nuestras apis o workers, con la debida configuración.
Configuración¶
Después de haber configurado el consumer como muestra el apartado Consumer, usamos el metodo de extensión Breaker<>()
esto permite que se ejecute
el Task<CircuitBreakerState> ActionAsync()
de forma parela al ciclo de vida de la aplicación.
// Ejemplo con nueva defición de Program
// Configuramos el Services para las DI
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureAndreaniWebHost(args);
builder.Services.ConfigureAndreaniServices();
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
builder.Services.AddKafka(Configuration)
.ToConsumer<Subscriber, CustomEvent>("my-topic")
.Breaker<CircuitBreaker>()
.Build();
var app = builder.Build();
// Configuramos el middleware
app.ConfigureAndreani();
public void ConfigureServices(IServiceCollection services)
{
//...
services.AddKafka(Configuration)
.ToConsumer<Subscriber, CustomEvent>("my-topic")
.Breaker<CircuitBreaker>()
.Build();
}
El metodo Breaker
necesita que le declaren el objeto de tipo ICircuitBreaker(CircuitBreaker
).
Como crear un CircuitBreaker¶
Para crear un CircuitBreaker debemos crear una nueva Clase que implemente la interface ICircuitBreaker
public class CircuitBreaker : ICircuitBreaker
Luego se debe implementar la función Task<CircuitBreakerState> ActionAsync()
, en la cual tendremos que crear el o los healthchecks que deben tener en cuenta contar o habilitar el consumo.
Con healthchecks
nos referimos a la dependencia externa que necesitemos validar que este funcionando, como una base de datos, una API, etc. Por ejemplo se puede validar enviando un Ping, Get o algun tipo de operación.
public class CircuitBreaker : ICircuitBreaker
{
private readonly string _connectionString = "Data Source=localhost;Initial Catalog=test;Persist Security Info=True;User ID=sa;Password=Admin#1234";
private readonly string _sql = @"SELECT TOP (1000) [id],[descripcion] FROM[test].[dbo].[estadoDelPedido]";
///Enviamos una petición a la base de datos para
///validar su disponibilidad
public async Task<CircuitBreakerState> ActionAsync()
{
try
{
using (var connection = new SqlConnection(_connectionString))
{
await connection.OpenAsync(CancellationToken.None);
using (var command = connection.CreateCommand())
{
command.CommandText = _sql;
_ = await command.ExecuteScalarAsync(CancellationToken.None).ConfigureAwait(false);
}
return CircuitBreakerState.Closed;
}
}
catch (Exception ex)
{
return CircuitBreakerState.Open;
}
}
}
Warning
CircuitBreaker funciona como un circuito electrónico, cuando está en Closed (está funcionando) y cuando está en Open(deja de funcionar).
Tip
Para algunos ejemplos de healthcheck de otros servicios pueden visitar el siguiente repo
[AspNetCore.Diagnostics.HealthChecks](https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/tree/master/src)