Saltar a contenido

Andreani.ARQ.AMQStreams

Autor Olivera Lucas.

Objetivo

El objetivo de esta librería es abstraer al desarrollador de la implementación de Apache Kafka y brindarle herramientas para que, rápidamente, pueda integrar AMQStream en sus proyectos.

La actual librería utiliza las librerías de Confluent kafka como base para poder interactuar con Apache Kafka. ver confluent-kafka

Configuración

Configuración inicial

ApplicationName: Example
AMQStreams:  
  BootstrapServer: "localhost:9092"  
  SchemaUrl: "http://localhost:8081/apis/ccompat/v6"
name descripción
ApplicationName Valor que se utiliza para definir el remitente y el consumer group
BootstrapServer URL de acceso al broker de kafka
SchemaUrl URL de acceso a apicurio registry

Configuración avanzada

ApplicationName: Example
AMQStreams:
  BootstrapServer: "SSL://cluster-kafka-bootstrap-amq-streams-test.apps.ocptest.andreani.com.ar:443"
  SchemaUrl: "http://apicurioregistry.apps.ocptest.andreani.com.ar/apis/ccompat/v6"
  Protocol: Ssl
  AutoOffsetReset: Earliest
  AutoRegisterSchemas: true
  MaxRetry: 3
name descripción
GroupId representa el valor de consumer group. Default: ApplicationName
Protocol Protocolo de conexión al broker opciones: Ssl, Plaintext Default: Plaintext
SslCertificateLocation Path de ubicación de certificado SSL. Default: ""
EnableSslCertificateVerification La aplicación puede ampliar esta verificación mediante la implementación de un certificate_verify_cb. Default: false
AutoOffsetReset Permite establecer la estrategia de offset inicial para la lectura de los Consumers. Opciones Latest, Earliest, Error. Default: Earliest
AutoRegisterSchemas Si está seteado en true, permite registrar, en caso que no existan, el schema en apicurio registry. Default: false
MessageMaxBytes Cantidad de bytes que se permite consumir o publicar. Default: 1000000 (1MB)
MillisecondsTimeout Cantidad de tiempo que un consumidor espera el ingreso de un mensaje. Default: 10000 (10 seg)
MaxRetry Cantidad de reitentos permitidos para un mensaje antes de marcarse como mensaje corrupto. Default: 3
MaxPollIntervalMs Configuración de intervalo de tiempo habilitado entre dos consumos de mensajes, en caso de que no se consuma un mensaje durante este intervalo de tiempo el coordinador de Kafka entenderá que el consumer está caído por lo que eliminará al consumer. Default: 300000
SessionTimeoutMs El consumer envía señales de vida al coordinador, esto ocurre cada cierto tiempo, si el coordinador no recibe estas señales eliminara al consumer. Este valor siempre debe ser menor al MaxPollIntervalMs Default: 45000
ConsumerDebug Configuración para ver el log del consumer, es un string en forma de array, dependiendo de lo que necesites que se muestre. Ejemplos: "consumer, topic", opciones: consumer,cgrp,topic,fetch
AutoCommitIntervalMs Configuración de intervalo de commits. Default: 100
EnableAutoCommit Configurar en False para consumidores que requieren un tiempo prolongado al procesar los mensajes. Default: True. Aclaración: en ambos casos los commit se continuan gestionando dentro de la libreria que proporcionamos

Para más información de la configuración de kafka, consumers o producers ver -> KafkaConfig, ConsumerConfig, ProducerConfig

Producer

El producer es un agente que permite realizar la publicación de un evento en un topic determinado.

La librería dispone de la interface IPublisher la cual posee la responsabilidad de realizar las publicaciones.

Configuración

En nuestro archivo de startup (si tenemos la arquitectura vieja) debemos importar la librería Andreani.ARQ.AMQStreams

Simplemente llamar al método de extensión AddKafka().

      // Ejemplo con nueva defición de Program

      // Configuramos el Services para las DI

      var builder = WebApplication.CreateBuilder(args);

      builder.Host.ConfigureAndreaniWebHost(args);
      builder.Services.ConfigureAndreaniServices();
      builder.Services.AddApplication();
      builder.Services.AddInfrastructure(builder.Configuration);
      builder.Services.AddKafka(builder.Configuration).ToProducer<CustomEvent>("my-topic").Build();

      var app = builder.Build();

      // Configuramos el middleware
      app.ConfigureAndreani();
    public void ConfigureServices(IServiceCollection services)
    {
        //...
        services.AddKafka(Configuration)
      .ToProducer<CustomEvent>("my-topic")
      .Build();
    }

Para configurar un producer debemos especificar el evento que queremos publicar (CustomEvent) y a que topic vamos a publicar ese evento (my-topic).

La librería nos permite configurar múltiples producer y publicar el mismo evento en n-topics

      // Ejemplo con nueva defición de Program

      // Configuramos el Services para las DI

      var builder = WebApplication.CreateBuilder(args);

      builder.Host.ConfigureAndreaniWebHost(args);
      builder.Services.ConfigureAndreaniServices();
      builder.Services.AddApplication();
      builder.Services.AddInfrastructure(builder.Configuration);
      builder.Services.AddKafka(builder.Configuration)
        .ToProducer<CustomEvent>("my-topic")
        .ToProducer<CustomEvent2>("my-topic", "my-topic-2")
        .Build();

      var app = builder.Build();

      // Configuramos el middleware
      app.ConfigureAndreani();
    public void ConfigureServices(IServiceCollection services)
    {
        //...
        services.AddKafka(Configuration)
      .ToProducer<CustomEvent>("my-topic")
      .ToProducer<CustomEvent2>("my-topic", "my-topic-2")
      .Build();
    }

Cómo publicar

Para publicar un mensaje simplemente debemos inyectar la interface IPublisher

    private readonly IPublisher _publisher;

    public ProducerJob(IPublisher publisher)
    {
        _publisher = publisher;
    }
y utilizar el metodo To()
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var message = new CustomEvent
        {
            codigo = 1,
            mensaje = "example producer"
        };
        await _publisher.To(message, "Key");
    }
En este ejemplo creamos un CustomEvent y llamamos al método To() del publisher.

La librería posee una alternativa en cuanto a la necesidad del desarrollador planteando diferentes escenarios

 public interface IPublisher
    {  
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(TEvent @event,
            string key,
            string exactTopic = null,
            List<Header> headers = null)
        where TEvent : class, ISpecificRecord;
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(
            IEnumerable<TEvent> events, 
            Func<TEvent, string> setKey, 
            string exactTopic = null, 
            List<Header> headers = null, 
            Action<TEvent> beforePublish = null) 
        where TEvent : class, ISpecificRecord;

    }

Otros Ejemplos

Agregar Headers al mensaje:

        List<Header> headers = new List<Header>
        {
            new Header("keyHeader", Encoding.UTF8.GetBytes("1")),
        };
        ...
        await _publisher.To(message, "key", headers: headers);

Publicar un mensaje en un topic especifico (Ignora la configuración).

    await _publisher.To(message, "key", "ConTopicoExacto");

Procesamiento Batch

en el caso que necesitemos publicar n cantidad de eventos, la librería realiza un proceso en batch para facilitar esto.

El proceso de batch realiza publicaciones de a 1000 mensajes por llamado a kafka.

await _publisher.To(messages, (m) => "key");    
await _publisher.To(messages, (m) => m.codigo);

La función recibe:

  1. La lista de mensajes.
  2. Una función para definir la key de cada evento. Esta función recibe por parametros el mensaje por lo que puede seleccionar una propiedad del mensaje como key. Ej: id, codigo, número.
  3. ExactTopic y Headers cual como el metodo de publicación de un mensaje.
  4. beforePublish es una Action que se ejecuta antes de publicar cada mensaje, en caso de necesitar realizar una acción previa a la publicación.

Ejemplo:

await _publisher.To(messages, 
                    (m) => m.codigo,
                    beforePublish: (e) => _logger.LogInformation($"publicando el evento {e.codigo}"));

Consumer

El consumer es un agente que se subscribe a un topic y reacciona a los eventos que suceden en ese topic.

Para crear un consumer con la librería debemos simplemente declarar cuál será el objeto suscrito, que evento espera y en qué topic. La librería se encargará de ejecutar la acción adecuada.

Configuración

En nuestro archivo de startup debemos importar la librería Andreani.ARQ.AMQStreams

Simplemente llamar al método de extensión AddKafka().

      // Ejemplo con nueva defición de Program

      // Configuramos el Services para las DI

      var builder = WebApplication.CreateBuilder(args);

      builder.Host.ConfigureAndreaniWebHost(args);
      builder.Services.ConfigureAndreaniServices();
      builder.Services.AddApplication();
      builder.Services.AddInfrastructure(builder.Configuration);
      builder.Services.AddKafka(builder.Configuration)
        .ToConsumer<Subscriber, CustomEvent>("my-topic")
        .Build();

      var app = builder.Build();

      // Configuramos el middleware
      app.ConfigureAndreani();
    public void ConfigureServices(IServiceCollection services)
    {
        //...
        services.AddKafka(Configuration)
      .ToConsumer<Subscriber, CustomEvent>("my-topic")
      .Build();
    }

El metodo ToConsumer necesita que le declaren el objeto de tipo ISubscriber (Subscriber), el evento que va a esperar (CustomEvent) y el topic al cual va a estar suscrito (my-topic).

Como consumir

Para consumir un evento el objeto que será el suscriptor debe implementar la interface ISubscriber

public class Subscriber : ISubscriber

Luego el objeto suscriptor debe contener al menos un método que reciba por parámetro el evento.

public Task ReciveCustomEvent(CustomEvent @event)
{
    Console.WriteLine($"Content event codigo: {@event.codigo} - message {@event.mensaje}");
    return Task.CompletedTask;
}

La librería invocará a este método cuando reciba el evento que se declaró.

Tip

Si necesitamos conocer más información del evento, simplemente podemos crear un método que reciba el objeto ```ConsumerMetadata```
public Task ReciveCustomEvent(CustomEvent @event, ConsumerMetadata metadata)
{
    Console.WriteLine($"Recived event with key: {metadata.Key} and Timestamp: {metadata.Timestamp.UtcDateTime.ToString()}");
    Console.WriteLine($"Content event codigo: {@event.codigo} - message {@event.mensaje}");
    return Task.CompletedTask;
}

Warning

**Los métodos deben siempre devolver un `Task`**

Manejo de Topics

A la hora de trabajar con topics debemos definir que estructura tendrán estos. Es muy importante saber que cantidad de mensajes vamos a procesar como para saber que configuración de topic es la correcta.

Nombrado

Los topics deben tener nombres descriptivos que referencien a quienes son los owner del mismo. Siendo el nombre del proyecto el prefix de sus topic. {proyecto}-evento

Ejemplo: notificaciones-alta-automatica, fisa-facturacion, sce-alta-de-pedido

Crear Topic.

Podemos realizar la configuración en la app con la librería para crear o actualizar el topic del que sea publicador la app.

Warning

solo aquellos topicos que seamos owner se deben configurar en la applicación.

            services.AddKafka(Configuration)
                .CreateOrUpdateTopic(3, "Pedidos-Prueba")
            .Build();
el método CreateOrUpdateTopic() recibe la cantidad de particiones que queremos configurar y el listado de topic al cual querramos aplicar esa configuración. En caso de que el topic no exista, creara el mismo con la cantidad de particiones solicitadas y un factor de replicación de 3. En caso de que el topic ya exista se actualizara el número de particiones.

Danger

**Solo se puede incrementar la cantidad de particiones, ya que decrementar las mismas provocaría una perdida parcial o total de los mensajes.**

Estrategias de consumer

La librería dispone actualmente de las siguientes estrategias de consumo.

  1. RealTime: El proceso se subscribe al topic y consume los mensajes, en caso de no haber mensajes queda esperando la llegada de uno, es el consumidor por defecto.
  2. SleepAndGo: Este consumer es similar al RealTime salvo que cuando consume un mensaje espera cierto tiempo antes de procesarlo, esta pensado para procesos de reintentos en donde no queremos procesar un mensaje que fallo automáticamente.
  3. FullScan: Consumer que se subscribe al topic, consume todos los mensajes y cuando no hay más mensajes se des subscribe. Está pensado para utilizarlo en procesos del estilo CronJob.
  4. Tiempo de procesamiento extendido: Esta estrategia se recomienda para consumers que por su naturaleza requieren un tiempo de procesamiento prolongado o tienen tiempos variados al procesar cada mensaje.

SleepAndGo

Como mencionamos este consumer esta orientado a procesos de reitentos.

      // Ejemplo con nueva defición de Program

      // Configuramos el Services para las DI

      var builder = WebApplication.CreateBuilder(args);

      builder.Host.ConfigureAndreaniWebHost(args);
      builder.Services.ConfigureAndreaniServices();
      builder.Services.AddApplication();
      builder.Services.AddInfrastructure(builder.Configuration);
      builder.Services.AddKafka(builder.Configuration)
        .ToConsumerSleepAndGo<Subscriber, CustomEvent>(new TimeSpan(0, 1, 0), "Pedidos-Prueba-retry")
        .Build();

      var app = builder.Build();

      // Configuramos el middleware
      app.ConfigureAndreani();
    public void ConfigureServices(IServiceCollection services)
    {
        //...
        services.AddKafka(Configuration)
      .ToConsumerSleepAndGo<Subscriber, CustomEvent>(new TimeSpan(0, 1, 0), "Pedidos-Prueba-retry")
      .Build();
    }

FullScan

Este consumer se utiliza con la configuración de un cron por lo que se debe especificar cuando es el momento de iniciar el CronJob.

      // Ejemplo con nueva defición de Program

      // Configuramos el Services para las DI

      var builder = WebApplication.CreateBuilder(args);

      builder.Host.ConfigureAndreaniWebHost(args);
      builder.Services.ConfigureAndreaniServices();
      builder.Services.AddApplication();
      builder.Services.AddInfrastructure(builder.Configuration);

        IAmqTrigger trigger = new AmqTrigger
        {
            WithIntervalInSeconds = 60,
        }
      builder.Services.AddKafka(builder.Configuration)
        .ToConsumerJob<Subscriber, CustomEvent>(trigger, "Pedidos-Prueba-retry")
        .Build();

      var app = builder.Build();

      // Configuramos el middleware
      app.ConfigureAndreani();
    public void ConfigureServices(IServiceCollection services)
    {
        //...
        IAmqTrigger trigger = new AmqTrigger
        {
            WithIntervalInSeconds = 60,
        }

        services.AddKafka(Configuration)
      .ToConsumerJob<Subscriber, CustomEvent>(trigger, "Pedidos-Prueba-retry")
      .Build();
    }

La struct AmqTrigger tiene la configuración del trigger a ejecutar.

IAmqTrigger trigger = new AmqTrigger
{
    WithIntervalInSeconds = 60
};
Se puede configurar lo siguiente

Propiedad Descripción
StartNow (bool) En caso de se true el Cronjob se ejecutará en el momento que se levante la app. Default: true
WithIntervalInSeconds (int) Intervalo en segundos de despertar el Cronjob
StartTimeUtc (DateTimeOffset) Fecha de inicio para el CronJob, se tomará en caso de que StartNow=false

Tiempo de procesamiento extendido

Como se menciono es para consumidores que requieren tiempos extendidos para procesar los mensajes. Al estar activa, le indica al coordinador de kafka que cada miembro del grupo de consumidores recibira el proximo mensaje cuando termine de procesar el actual.

Recomendamos ajustar el MaxPollIntervalMs cuando el tiempo de procesamiento del mensaje supere este intervalo.

Es compatible con las estrategias actuales Realtime, SLeepAndGo, FullScan por lo que no es necesario modificarlas.

Simplemente se activa agregando el EnableAutoCommit a la configuración

ApplicationName: Example
AMQStreams:
  BootstrapServer: "SSL://cluster-kafka-bootstrap-amq-streams-test.apps.ocptest.andreani.com.ar:443"
  SchemaUrl: "http://apicurioregistry.apps.ocptest.andreani.com.ar/apis/ccompat/v6"
  AutoOffsetReset: Earliest
  EnableAutoCommit: False ## Activa el consumo extendido

Warning

Disponible a partir de la versión 6.5.10.

Políticas de reintentos

En Apache kafka no existe el concepto de republicar un mensaje como se venía manejando en MQ, por lo que cada aplicación debe ser capaz de asegurar la transaccionalidad del mensaje que consume. Con esto me refiero a tener:

  1. Asegurar puntos de falla - llamado a Apis externas, base de datos, etc.

En caso de que la app arroje una Exception, la librería interpretara como falla y comenzara el proceso de reintentos.

Al ejecutarse un error se moverá el mensaje al topic de reintento configurado en el Move y se marcara ese mensaje.

Este mensaje puede saltar de tópicos las veces que sea configurado en el campo MaxRetry de la configuración.

Cuando el mensaje sobrepase esta cantidad se moverá al tópico de deadline automáticamente y será marcado para no volverse a consumir.

En caso de agotar las instancias se podrá volver a intentar el flujo desde la app de deadline (Actualmente en Desarrollo).

Re Intento consumo mensajes

Cada app debe manejar su política de reintentos, especificando en su configuración que acción tomar en caso de error en algún topic.

Se debe especificar la cantidad máxima de reintentos y el tópico donde se moverá el mensaje.

      // Ejemplo con nueva defición de Program

      // Configuramos el Services para las DI

      var builder = WebApplication.CreateBuilder(args);

      builder.Host.ConfigureAndreaniWebHost(args);
      builder.Services.ConfigureAndreaniServices();
      builder.Services.AddApplication();
      builder.Services.AddInfrastructure(builder.Configuration);
      builder.Services.AddKafka(builder.Configuration)
        .ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
        .Move("Example-Pedidos-Prueba-retry")
        .Build();

      var app = builder.Build();

      // Configuramos el middleware
      app.ConfigureAndreani();
    public void ConfigureServices(IServiceCollection services)
    {
        //...
        services.AddKafka(Configuration)
        .ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
        .Move("Example-Pedidos-Prueba-retry")
    .Build();
    }

El método Move es el que realiza la configuración, en el ejemplo configura que en caso de falla el mensaje se publique en Example-Pedidos-Prueba-retry.

Una posible configuración completa podría ser

      // Ejemplo con nueva defición de Program

      // Configuramos el Services para las DI

      var builder = WebApplication.CreateBuilder(args);

      builder.Host.ConfigureAndreaniWebHost(args);
      builder.Services.ConfigureAndreaniServices();
      builder.Services.AddApplication();
      builder.Services.AddInfrastructure(builder.Configuration);

      var topicRetry= "Example-Pedidos-Prueba-retry"
      builder.Services.AddKafka(builder.Configuration)
        .ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
        .Move(topicRetry)
        .ToConsumerSleepAndGo<Subscriber, CustomEvent>(new System.TimeSpan(0,15,0), topicRetry)
        .Move(topicRetry)
        .Build();

      var app = builder.Build();

      // Configuramos el middleware
      app.ConfigureAndreani();
        public void ConfigureServices(IServiceCollection services)
    {
        //...
        var topicRetry= "Example-Pedidos-Prueba-retry"
        services.AddKafka(Configuration)
        .ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
        .Move(topicRetry)
        .ToConsumerSleepAndGo<Subscriber, CustomEvent>(new System.TimeSpan(0,15,0), topicRetry)
        .Move(topicRetry)
        .Build();
    }

Tip

El segundo consumer debe apuntar al mismo topic de reintentos, para poder volver a procesar el mesanje.

Donde se configura un consumer de tipo SleepAndGo con una espera de 15 minutos para volver a procesar este mensaje.

o podemos configurar un cron que corra de noche y reprocese.

      // Ejemplo con nueva defición de Program

      // Configuramos el Services para las DI

      var builder = WebApplication.CreateBuilder(args);

      builder.Host.ConfigureAndreaniWebHost(args);
      builder.Services.ConfigureAndreaniServices();
      builder.Services.AddApplication();
      builder.Services.AddInfrastructure(builder.Configuration);

      IAmqTrigger trigger = new AmqTrigger
        {
            WithIntervalInSeconds = 60 * 60 * 24 // 24hs
            StartTimeUtc = new DateTimeOffset(new DateTime(2022,6,14,2,0,0)), // 2022/06/14T:02:00:00
            StartNow = false
        };
            //...
            var topicRetry= "Example-Pedidos-Prueba-retry"
            builder.Services.AddKafka(Configuration)
            .ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
        .Move(topicRetry)
            .ToConsumerJob<Subscriber, CustomEvent>(trigger, topicRetry)
        .Move(topicRetry)
        .Build();

      var app = builder.Build();

      // Configuramos el middleware
      app.ConfigureAndreani();
        public void ConfigureServices(IServiceCollection services)
    {
        IAmqTrigger trigger = new AmqTrigger
        {
            WithIntervalInSeconds = 60 * 60 * 24 // 24hs
            StartTimeUtc = new DateTimeOffset(new DateTime(2022,6,14,2,0,0)), // 2022/06/14T:02:00:00
            StartNow = false
        };
            //...
            var topicRetry= "Example-Pedidos-Prueba-retry"
            services.AddKafka(Configuration)
            .ToConsumer<Subscriber, CustomEvent>("Pedidos-Prueba")
        .Move(topicRetry)
            .ToConsumerJob<Subscriber, CustomEvent>(trigger, topicRetry)
        .Move(topicRetry)
        .Build();
    }

Circuit Breaker

Esta funcionalidad se creo para corta y/o habilitar el consumo en Kafka, de nuestras apis o workers, con la debida configuración.

Configuración

Después de haber configurado el consumer como muestra el apartado Consumer, usamos el metodo de extensión Breaker<>() esto permite que se ejecute el Task<CircuitBreakerState> ActionAsync() de forma parela al ciclo de vida de la aplicación.

      // Ejemplo con nueva defición de Program

      // Configuramos el Services para las DI

      var builder = WebApplication.CreateBuilder(args);

      builder.Host.ConfigureAndreaniWebHost(args);
      builder.Services.ConfigureAndreaniServices();
      builder.Services.AddApplication();
      builder.Services.AddInfrastructure(builder.Configuration);
      builder.Services.AddKafka(Configuration)
        .ToConsumer<Subscriber, CustomEvent>("my-topic")
        .Breaker<CircuitBreaker>()
        .Build();

      var app = builder.Build();

      // Configuramos el middleware
      app.ConfigureAndreani();
    public void ConfigureServices(IServiceCollection services)
    {
        //...
        services.AddKafka(Configuration)
      .ToConsumer<Subscriber, CustomEvent>("my-topic")
      .Breaker<CircuitBreaker>()
      .Build();
    }

El metodo Breaker necesita que le declaren el objeto de tipo ICircuitBreaker(CircuitBreaker).

Como crear un CircuitBreaker

Para crear un CircuitBreaker debemos crear una nueva Clase que implemente la interface ICircuitBreaker

public class CircuitBreaker : ICircuitBreaker

Luego se debe implementar la función Task<CircuitBreakerState> ActionAsync(), en la cual tendremos que crear el o los healthchecks que deben tener en cuenta contar o habilitar el consumo.

Con healthchecks nos referimos a la dependencia externa que necesitemos validar que este funcionando, como una base de datos, una API, etc. Por ejemplo se puede validar enviando un Ping, Get o algun tipo de operación.

  public class CircuitBreaker : ICircuitBreaker
    {
        private readonly string _connectionString = "Data Source=localhost;Initial Catalog=test;Persist Security Info=True;User ID=sa;Password=Admin#1234";
        private readonly string _sql = @"SELECT TOP (1000) [id],[descripcion] FROM[test].[dbo].[estadoDelPedido]";

        ///Enviamos una petición a la base de datos para 
        ///validar su disponibilidad
        public async Task<CircuitBreakerState> ActionAsync()
        {
            try
            {
                using (var connection = new SqlConnection(_connectionString))
                {
                    await connection.OpenAsync(CancellationToken.None);
                    using (var command = connection.CreateCommand())
                    {
                        command.CommandText = _sql;
                        _ = await command.ExecuteScalarAsync(CancellationToken.None).ConfigureAwait(false);
                    }
                    return CircuitBreakerState.Closed;
                }
            }
            catch (Exception ex)
            {
                return CircuitBreakerState.Open;
            }
        }

    }
El ejemplo de arriba es un healthcheck a sql server.

Warning

CircuitBreaker funciona como un circuito electrónico, cuando está en Closed (está funcionando) y cuando está en Open(deja de funcionar).

Tip

Para algunos ejemplos de healthcheck de otros servicios pueden visitar el siguiente repo
[AspNetCore.Diagnostics.HealthChecks](https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/tree/master/src)