Saltar a contenido

Andreani.ARQ.AMQStreams

Autor Lucero Lucas.

Objetivo

El objetivo de esta librería es abstraer al desarrollador de la implementación de Apache Kafka y brindarle herramientas para que, rápidamente, pueda integrar AMQStream en sus proyectos.

La actual librería utiliza las librerías de Confluent kafka como base para poder interactuar con Apache Kafka. ver confluent-kafka

Configuración

Configuración inicial

AMQStreams:
  ApplicationName: Example
  BootstrapServer: "localhost:9092"  
  SchemaUrl: "http://localhost:8081/apis/ccompat/v6/andreani/"
name descripción
ApplicationName Valor que se utiliza para definir el remitente y el consumer group
BootstrapServer URL de acceso al broker de kafka
SchemaUrl URL de acceso a apicurio registry

Configuración avanzada

AMQStreams:
  ApplicationName: Example
  GroupId: ExampleGroup
  BootstrapServer: "SSL://cluster-prod-kafka-bootstrap:443"
  SchemaUrl: "http://apicurioregistry/apis/ccompat/v6/andreani/"
  Protocol: Ssl
  SslCertificateLocation: "./kafka.pem"
  EnableSslCertificateVerification: false
  AutoOffsetReset: Earliest
name descripción
GroupId representa el valor de consumer group. Default: ApplicationName
Protocol Protocolo de conexión al broker opciones: Ssl, Plaintext Default: Plaintext
SslCertificateLocation Path de ubicación de certificado SSL. Default: ""
EnableSslCertificateVerification La aplicación puede ampliar esta verificación mediante la implementación de un certificate_verify_cb. Default: false
AutoOffsetReset Permite establecer la estrategia de offset inicial para la lectura de los Consumers. Opciones Latest, Earliest, Error. Default: Earliest
MessageMaxBytes Cantidad de bytes que se permite consumir o publicar. Default: 1000000 (1MB)
MillisecondsTimeout Cantidad de tiempo que un consumidor espera el ingreso de un mensaje. Default: 10000 (10 seg)
MaxRetry Cantidad de reintentos permitidos para un mensaje antes de marcarse como mensaje corrupto. Default: 3
ConsumerDebug Configuración para ver el log del consumer, es un string en forma de array, dependiendo de lo que necesites que se muestre. Ejemplos: "consumer, topic", opciones: consumer,cgrp,topic,fetch
PartitionAssignmentStrategy Estrategia que se utilizará para asignar las particiones entre las instancias del consumidor. Opciones range, roundrobin, sticky. Default sticky.

Nota

Para más información de la configuración de kafka, consumers, producers y AssignmentStrategy ***ver*** -> [KafkaConfig](https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/go.html#client), [ConsumerConfig](https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html), [ProducerConfig](https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html),  [PartitionAssignmentStrategy](https://medium.com/streamthoughts/understanding-kafka-partition-assignment-strategies-and-how-to-write-your-own-custom-assignor-ebeda1fc06f3)

Producer

El producer es un agente que permite realizar la publicación de un evento en uno o varios topic determinados.

La librería dispone de la interface IPublisher la cual posee la responsabilidad de realizar las publicaciones.

Configuración

En nuestro archivo de go.mod debemos importar la librería github.com/architecture-it/go-platform

Simplemente llamar al método de AMQStream AMQStream.AddKafka().

        func main()
        {
            //...
            config, err := AMQStream.AddKafka()
            if(err != nil){
                log.Logger.Error(err.Error())
            }

            topics := []string{"my-topic-1", "my-topic-2"}
            config.ToProducer(&CustomEvent, topics)
        }

Para configurar un producer debemos especificar el evento que queremos publicar (CustomEvent) y a que topic o topics vamos a publicar ese evento (topics).

La librería nos permite configurar múltiples producer y publicar el mismo evento en n-topics

        func main()
        {
            //...
            config, err := AMQStream.AddKafka()
            if(err != nil){
                log.Logger.Error(err.Error())
            }

            topics := []string{"my-topic-1", "my-topic-2"}
            topics2 := []string{"my-topic-3", "my-topic-4"}

            config.ToProducer(&CustomEvent, topics).
                   ToProducer(&CustomEvent2, topics2)
        }

Cómo publicar

Para publicar un mensaje simplemente debemos *** utilizar el método AMQStream.To(Event, Key)*** dentro de nuestro método

    type MyProductor struct {
    }

    func (p *MyProductor) Publicar() {
        MyEvent := Events.CustomEvent{
            Codigo: 231,
            Message: "123",
        }
        AMQStream.To(&MyEvent, "key123")

    }

En este ejemplo creamos un CustomEvent y llamamos al método AMQStream.To(Event, key) del publisher.

Eventos complejos

Se considera un evento complejo, cuando una estructura avro posee estructuras embebidas o campos opcionales, esto para Go significa definir funciones y estructuras especiales para soportar estos comportamientos.

Warning

Tenemos que prestar principal atención a la estructura del evento para poder crear correctamente esta estructura.

estructura opcional

Los eventos que poseen campos opcionales poseen una referencia a un puntero a una estructura denominada Union esta estructura posee la unión de los campos que se solicitan, habitualmente es un null con otro tipo de datos string, int, struct, etc.

Por Ejemplo: 1. UnionNullTeam la unión entre un null y un Team que corresponde a una estructura 2. UnionNullString la unión entre un string y un null.

Las estructuras Union poseen los siguientes valores

type UnionNullTeam struct {
    Null      *types.NullVal
    Team      Team
    UnionType UnionNullTeamTypeEnum
}
Donde se evidencia como maneja Go los campos opcionales

  • Null: valor nil.
  • Team: Opción de tipo estructura, en este ejemplo es una estructura pero toma el valor de la definición del evento.
  • UnionType: Este campo debe estar siempre en 1 cuando queremos identificar que vamos a enviar información en el atributo no Null.

Ejemplo de creación de una estructura con campos opcionales:

Estructura a crear:

type KafkaDemoRetro struct {
    Id int32 `json:"id"`

    Title string `json:"title"`

    Attendance int32 `json:"attendance"`

    DidYouUnderstand bool `json:"didYouUnderstand"`

    Me Person `json:"me"`

    Time int64 `json:"time"`
}

type UnionNullTeam struct {
    Null      *types.NullVal
    Team      Team
    UnionType UnionNullTeamTypeEnum
}

type Person struct {
    Name string `json:"name"`

    Surname string `json:"surname"`

    Seniority string `json:"seniority"`

    OnSite bool `json:"onSite"`

    Team *UnionNullTeam `json:"team"`

    Age int32 `json:"age"`

    Direccion *UnionNullString `json:"direccion"`
}

type Team struct {
    Tl *UnionNullString `json:"tl"`

    Boss string `json:"boss"`
}

type UnionNullString struct {
    Null      *types.NullVal
    String    string
    UnionType UnionNullStringTypeEnum
}

Tip

Como podemos ver, esta estructura posee un campo opcional de tipo estructura `Team` y otros campos opcionales de tipo string `Direccion y Tl`

Creación:

// Debemos instanciar la propiedad opcional
team := TestEvents.NewUnionNullTeam()
tl := TestEvents.NewUnionNullString()

// Declaramos que vamos a insertar datos en Tl
tl.UnionType = 1

// Valor del tl
tl.String = "sin tl"

// Declaramos que vamos a insertar datos en team
team.UnionType = 1

// Valor del team
team.Team.Tl = tl // se asigna TL a team
team.Team.Boss = "soy el jefe"

// Construimos el evento y suministramos team
evento := TestEvents.KafkaDemoRetro{
    Id:               1,
    DidYouUnderstand: true,
    Attendance:       200,
    Time:             20221008101213,
    Title:            "soy pepito o don jose",
    Me: TestEvents.Person{
        Name:      "Lucas",
        Surname:   "lucero",
        OnSite:    true,
        Seniority: "Sr.",
        Team:      team,
    },
}

Aclaración

``TestEvents`` corresponde al nombre del packages donde se encuentra el evento.

Consumer

El consumer es un agente que se subscribe a un topic y reacciona a los eventos que suceden en ese topic.

Para crear un consumer con la librería debemos simplemente declarar cuál será el objeto suscrito, que evento espera y en qué topic. La librería se encargará de ejecutar la acción adecuada.

Configuración

En nuestro archivo go.mod debemos importar la librería github.com/architecture-it/go-platform

Simplemente llamar al método de AMQStream AMQStream.AddKafka() .

func main()
{
    //...
    config, err := AMQStream.AddKafka()
    if(err != nil){
        log.Logger.Error(err.Error())
    }
    // Declaramos el evento y el manejador
    CustomEvent := Events.MyCustomEvent{}
    Subscriber := MySuscriber{}

    go config.ToConsumer(Subscriber, &CustomEvent, []string{"my-topic"}).
        Build()

}

El metodo ToConsumer necesita que le declaren el objeto de tipo ISubscriber (Subscriber), el puntero del evento que va a esperar (&CustomEvent) y el topic al cual va a estar suscrito (my-topic).

La función Build() construye la configuración de consumer y lanza go rutines para quedarse escuchando los mensajes del topic.

Warning

En este ejemplo se lanza todo en una go rutine para que no quede bloqueado el thread del metodo main.

Como consumir

Para consumir un evento el objeto que será el suscriptor debe implementar la interface ISubscriber

La interface posee esta definición

type ISuscriber interface {
    Handler(event interface{}, metadata ConsumerMetadata) error
}
Por lo que debemos generar una estructura que maneje los eventos.

type MySuscriber struct {
}

func (s MySuscriber) Handler(event interface{}, metadata AMQStream.ConsumerMetadata) error {

    // parse de evento.
    value, ok := event.(*TestEvents.KafkaDemoRetro)
    if ok {
        fmt.Printf("consumed message id: %v \n", value)
    }
    //...
}
La librería invocará a este método cuando reciba el evento que se declaró.

Tip

Si necesitamos conocer más información del evento, simplemente podemos usar el objeto ```ConsumerMetadata```
func (s MySuscriber) Handler(event AMQStream.ISpecificRecord, metadata AMQStream.ConsumerMetadata) {

    value, ok := event.(*TestEvents.KafkaDemoRetro)
    if ok {
        fmt.Printf("consumed message id: %v \n", value)
        fmt.Printf("Content Event Timestamp: %v - message %v \n", metadata.Timestamp ,metadata.Header)
    }
    //...
}

Manejo de Topics

A la hora de trabajar con topics debemos definir que estructura tendrán estos. Es muy importante saber que cantidad de mensajes vamos a procesar como para saber que configuración de topic es la correcta.

Nombrado

Los topics deben tener nombres descriptivos que referencien a quienes son los owner del mismo. Siendo el nombre del proyecto el prefix de sus topic. {proyecto}-evento

Ejemplo: notificaciones-alta-automatica, fisa-facturacion, sce-alta-de-pedido

Tip

Los topic los crea automáticamente la librería al usar los métodos `AMQStream.ToProducer() o AMQStream.ToConsumer()`

Danger

La configuración de los tópicos de QA y Producción, su factor de replicación y sus particiones deben hablar con el equipo de arquitectura para crearlos Antes de salir a produccion

Políticas de reintentos

En Apache kafka no existe el concepto de republicar un mensaje como se venía manejando en MQ, por lo que cada aplicación debe ser capaz de asegurar la transaccionalidad del mensaje que consume. Con esto me refiero a tener:

  1. Asegurar puntos de falla - llamado a Apis externas, base de datos, etc.

En caso de que la función de handler devuelva un tipo error, la librería interpretara como falla y comenzara el proceso de reintentos.

Al ejecutarse un error se moverá el mensaje al topic de reintento configurado en el Move y se marcara ese mensaje.

Este mensaje puede saltar de tópicos las veces que sea configurado en el campo MaxRetry de la configuración.

Cuando el mensaje sobrepase esta cantidad se moverá al tópico de deadline automáticamente y será marcado para no volverse a consumir.

En caso de agotar las instancias se podrá volver a intentar el flujo desde la app de deadline (Actualmente en Desarrollo).

Re Intento consumo mensajes

Cada app debe manejar su política de reintentos, especificando en su configuración que acción tomar en caso de error en algún topic.

Se debe especificar la cantidad máxima de reintentos y el tópico donde se moverá el mensaje.

func main()
{
    //...
    config, err := AMQStream.AddKafka()
    if(err != nil){
        log.Logger.Error(err.Error())
    }
    event := TestEvents.KafkaDemoRetro{}
    handler := Handler{}

    config.ToProducer(&event, topicsProducer).
            ToConsumer(&handler, &event, []string{"my-topic"}).
            Move("topicRetry")
    go config.Build()
}
El método Move es el que realiza la configuración, en el ejemplo configura que en caso de falla el mensaje se publique en topicRetry.

Una posible configuración completa podría ser

func main()
{
    //...
    config, err := AMQStream.AddKafka()
    if(err != nil){
        log.Logger.Error(err.Error())
    }
    event := TestEvents.KafkaDemoRetro{}
    handler := Handler{}
    HandlerError := HandlerError{}

    config.ToProducer(&event, topicsProducer).
            ToConsumer(&handler, &event, []string{"my-topic"}).
            Move("topicRetry").
            ToConsumer(&HandlerError, &event, []string{"topicRetry"}).
            Move("topicRetry")

    go config.Build()
}

type HandlerError struct{}

func (h *HandlerError) Handler(event interface{}, metadata AMQStream.ConsumerMetadata) error {
    value, ok := event.(*TestEvents.KafkaDemoRetro)
    time.Sleep(8 * time.Second) // si queremos esperar para reintentar.
    if ok {
        fmt.Println(value.Id)
    }
}