Andreani.ARQ.AMQStreams¶
Autor Lucero Lucas.¶
Objetivo¶
El objetivo de esta librería es abstraer al desarrollador de la implementación de Apache Kafka y brindarle herramientas para que, rápidamente, pueda integrar AMQStream en sus proyectos.
La actual librería utiliza las librerías de Confluent kafka como base para poder interactuar con Apache Kafka. ver confluent-kafka
Configuración¶
Configuración inicial¶
AMQStreams:
ApplicationName: Example
BootstrapServer: "localhost:9092"
SchemaUrl: "http://localhost:8081/apis/ccompat/v6/andreani/"
name | descripción |
---|---|
ApplicationName | Valor que se utiliza para definir el remitente y el consumer group |
BootstrapServer | URL de acceso al broker de kafka |
SchemaUrl | URL de acceso a apicurio registry |
Configuración avanzada¶
AMQStreams:
ApplicationName: Example
GroupId: ExampleGroup
BootstrapServer: "SSL://cluster-prod-kafka-bootstrap:443"
SchemaUrl: "http://apicurioregistry/apis/ccompat/v6/andreani/"
Protocol: Ssl
SslCertificateLocation: "./kafka.pem"
EnableSslCertificateVerification: false
AutoOffsetReset: Earliest
name | descripción |
---|---|
GroupId | representa el valor de consumer group. Default: ApplicationName |
Protocol | Protocolo de conexión al broker opciones: Ssl, Plaintext Default: Plaintext |
SslCertificateLocation | Path de ubicación de certificado SSL. Default: "" |
EnableSslCertificateVerification | La aplicación puede ampliar esta verificación mediante la implementación de un certificate_verify_cb. Default: false |
AutoOffsetReset | Permite establecer la estrategia de offset inicial para la lectura de los Consumers. Opciones Latest, Earliest, Error. Default: Earliest |
MessageMaxBytes | Cantidad de bytes que se permite consumir o publicar. Default: 1000000 (1MB) |
MillisecondsTimeout | Cantidad de tiempo que un consumidor espera el ingreso de un mensaje. Default: 10000 (10 seg) |
MaxRetry | Cantidad de reintentos permitidos para un mensaje antes de marcarse como mensaje corrupto. Default: 3 |
ConsumerDebug | Configuración para ver el log del consumer, es un string en forma de array, dependiendo de lo que necesites que se muestre. Ejemplos: "consumer, topic", opciones: consumer,cgrp,topic,fetch |
PartitionAssignmentStrategy | Estrategia que se utilizará para asignar las particiones entre las instancias del consumidor. Opciones range, roundrobin, sticky. Default sticky. |
Nota
Para más información de la configuración de kafka, consumers, producers y AssignmentStrategy ***ver*** -> [KafkaConfig](https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/go.html#client), [ConsumerConfig](https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html), [ProducerConfig](https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html), [PartitionAssignmentStrategy](https://medium.com/streamthoughts/understanding-kafka-partition-assignment-strategies-and-how-to-write-your-own-custom-assignor-ebeda1fc06f3)
Producer¶
El producer es un agente que permite realizar la publicación de un evento en uno o varios topic determinados.
La librería dispone de la interface IPublisher la cual posee la responsabilidad de realizar las publicaciones.
Configuración¶
En nuestro archivo de go.mod debemos importar la librería github.com/architecture-it/go-platform
Simplemente llamar al método de AMQStream AMQStream.AddKafka().
func main()
{
//...
config, err := AMQStream.AddKafka()
if(err != nil){
log.Logger.Error(err.Error())
}
topics := []string{"my-topic-1", "my-topic-2"}
config.ToProducer(&CustomEvent, topics)
}
Para configurar un producer debemos especificar el evento que queremos publicar (CustomEvent)
y a que topic o topics vamos a publicar ese evento (topics)
.
La librería nos permite configurar múltiples producer y publicar el mismo evento en n-topics
func main()
{
//...
config, err := AMQStream.AddKafka()
if(err != nil){
log.Logger.Error(err.Error())
}
topics := []string{"my-topic-1", "my-topic-2"}
topics2 := []string{"my-topic-3", "my-topic-4"}
config.ToProducer(&CustomEvent, topics).
ToProducer(&CustomEvent2, topics2)
}
Cómo publicar¶
Para publicar un mensaje simplemente debemos *** utilizar el método AMQStream.To(Event, Key)*** dentro de nuestro método
type MyProductor struct {
}
func (p *MyProductor) Publicar() {
MyEvent := Events.CustomEvent{
Codigo: 231,
Message: "123",
}
AMQStream.To(&MyEvent, "key123")
}
En este ejemplo creamos un CustomEvent y llamamos al método AMQStream.To(Event, key)
del publisher
.
Eventos complejos¶
Se considera un evento complejo, cuando una estructura avro posee estructuras embebidas o campos opcionales, esto para Go significa definir funciones y estructuras especiales para soportar estos comportamientos.
Warning
Tenemos que prestar principal atención a la estructura del evento para poder crear correctamente esta estructura.
estructura opcional
Los eventos que poseen campos opcionales poseen una referencia a un puntero a una estructura denominada Union
esta estructura posee la unión de los campos que se solicitan, habitualmente es un null
con otro tipo de datos string
, int
, struct
, etc.
Por Ejemplo:
1. UnionNullTeam
la unión entre un null y un Team que corresponde a una estructura
2. UnionNullString
la unión entre un string y un null.
Las estructuras Union
poseen los siguientes valores
type UnionNullTeam struct {
Null *types.NullVal
Team Team
UnionType UnionNullTeamTypeEnum
}
Null
: valor nil.Team
: Opción de tipo estructura, en este ejemplo es una estructura pero toma el valor de la definición del evento.UnionType
: Este campo debe estar siempre en1
cuando queremos identificar que vamos a enviar información en el atributo no Null.
Ejemplo de creación de una estructura con campos opcionales:
Estructura a crear:
type KafkaDemoRetro struct {
Id int32 `json:"id"`
Title string `json:"title"`
Attendance int32 `json:"attendance"`
DidYouUnderstand bool `json:"didYouUnderstand"`
Me Person `json:"me"`
Time int64 `json:"time"`
}
type UnionNullTeam struct {
Null *types.NullVal
Team Team
UnionType UnionNullTeamTypeEnum
}
type Person struct {
Name string `json:"name"`
Surname string `json:"surname"`
Seniority string `json:"seniority"`
OnSite bool `json:"onSite"`
Team *UnionNullTeam `json:"team"`
Age int32 `json:"age"`
Direccion *UnionNullString `json:"direccion"`
}
type Team struct {
Tl *UnionNullString `json:"tl"`
Boss string `json:"boss"`
}
type UnionNullString struct {
Null *types.NullVal
String string
UnionType UnionNullStringTypeEnum
}
Tip
Como podemos ver, esta estructura posee un campo opcional de tipo estructura `Team` y otros campos opcionales de tipo string `Direccion y Tl`
Creación:
// Debemos instanciar la propiedad opcional
team := TestEvents.NewUnionNullTeam()
tl := TestEvents.NewUnionNullString()
// Declaramos que vamos a insertar datos en Tl
tl.UnionType = 1
// Valor del tl
tl.String = "sin tl"
// Declaramos que vamos a insertar datos en team
team.UnionType = 1
// Valor del team
team.Team.Tl = tl // se asigna TL a team
team.Team.Boss = "soy el jefe"
// Construimos el evento y suministramos team
evento := TestEvents.KafkaDemoRetro{
Id: 1,
DidYouUnderstand: true,
Attendance: 200,
Time: 20221008101213,
Title: "soy pepito o don jose",
Me: TestEvents.Person{
Name: "Lucas",
Surname: "lucero",
OnSite: true,
Seniority: "Sr.",
Team: team,
},
}
Aclaración
``TestEvents`` corresponde al nombre del packages donde se encuentra el evento.
Consumer¶
El consumer es un agente que se subscribe a un topic y reacciona a los eventos que suceden en ese topic.
Para crear un consumer con la librería debemos simplemente declarar cuál será el objeto suscrito, que evento espera y en qué topic. La librería se encargará de ejecutar la acción adecuada.
Configuración¶
En nuestro archivo go.mod debemos importar la librería github.com/architecture-it/go-platform
Simplemente llamar al método de AMQStream AMQStream.AddKafka()
.
func main()
{
//...
config, err := AMQStream.AddKafka()
if(err != nil){
log.Logger.Error(err.Error())
}
// Declaramos el evento y el manejador
CustomEvent := Events.MyCustomEvent{}
Subscriber := MySuscriber{}
go config.ToConsumer(Subscriber, &CustomEvent, []string{"my-topic"}).
Build()
}
El metodo ToConsumer
necesita que le declaren el objeto de tipo ISubscriber (Subscriber
), el puntero del evento que va a esperar (&CustomEvent
) y el topic al cual va a estar suscrito (my-topic
).
La función Build()
construye la configuración de consumer y lanza go rutines para quedarse escuchando los mensajes del topic.
Warning
En este ejemplo se lanza todo en una go rutine para que no quede bloqueado el thread del metodo main.
Como consumir¶
Para consumir un evento el objeto que será el suscriptor debe implementar la interface ISubscriber
La interface posee esta definición
type ISuscriber interface {
Handler(event interface{}, metadata ConsumerMetadata) error
}
type MySuscriber struct {
}
func (s MySuscriber) Handler(event interface{}, metadata AMQStream.ConsumerMetadata) error {
// parse de evento.
value, ok := event.(*TestEvents.KafkaDemoRetro)
if ok {
fmt.Printf("consumed message id: %v \n", value)
}
//...
}
Tip
Si necesitamos conocer más información del evento, simplemente podemos usar el objeto ```ConsumerMetadata```
func (s MySuscriber) Handler(event AMQStream.ISpecificRecord, metadata AMQStream.ConsumerMetadata) {
value, ok := event.(*TestEvents.KafkaDemoRetro)
if ok {
fmt.Printf("consumed message id: %v \n", value)
fmt.Printf("Content Event Timestamp: %v - message %v \n", metadata.Timestamp ,metadata.Header)
}
//...
}
Manejo de Topics¶
A la hora de trabajar con topics debemos definir que estructura tendrán estos. Es muy importante saber que cantidad de mensajes vamos a procesar como para saber que configuración de topic es la correcta.
Nombrado¶
Los topics deben tener nombres descriptivos que referencien a quienes son los owner del mismo. Siendo el nombre del proyecto el prefix de sus topic. {proyecto}-evento
Ejemplo: notificaciones-alta-automatica, fisa-facturacion, sce-alta-de-pedido
Tip
Los topic los crea automáticamente la librería al usar los métodos `AMQStream.ToProducer() o AMQStream.ToConsumer()`
Danger
La configuración de los tópicos de QA y Producción, su factor de replicación y sus particiones deben hablar con el equipo de arquitectura para crearlos Antes de salir a produccion
Políticas de reintentos¶
En Apache kafka no existe el concepto de republicar un mensaje como se venía manejando en MQ, por lo que cada aplicación debe ser capaz de asegurar la transaccionalidad del mensaje que consume. Con esto me refiero a tener:
- Asegurar puntos de falla - llamado a Apis externas, base de datos, etc.
En caso de que la función de handler devuelva un tipo error
, la librería interpretara como falla y comenzara el proceso de reintentos.
Al ejecutarse un error se moverá el mensaje al topic de reintento configurado en el Move
y se marcara ese mensaje.
Este mensaje puede saltar de tópicos las veces que sea configurado en el campo MaxRetry
de la configuración.
Cuando el mensaje sobrepase esta cantidad se moverá al tópico de deadline automáticamente y será marcado para no volverse a consumir.
En caso de agotar las instancias se podrá volver a intentar el flujo desde la app de deadline (Actualmente en Desarrollo).
Re Intento consumo mensajes¶
Cada app debe manejar su política de reintentos, especificando en su configuración que acción tomar en caso de error en algún topic.
Se debe especificar la cantidad máxima de reintentos y el tópico donde se moverá el mensaje.
func main()
{
//...
config, err := AMQStream.AddKafka()
if(err != nil){
log.Logger.Error(err.Error())
}
event := TestEvents.KafkaDemoRetro{}
handler := Handler{}
config.ToProducer(&event, topicsProducer).
ToConsumer(&handler, &event, []string{"my-topic"}).
Move("topicRetry")
go config.Build()
}
Move
es el que realiza la configuración, en el ejemplo configura que en caso de falla el mensaje se publique en topicRetry
.
Una posible configuración completa podría ser
func main()
{
//...
config, err := AMQStream.AddKafka()
if(err != nil){
log.Logger.Error(err.Error())
}
event := TestEvents.KafkaDemoRetro{}
handler := Handler{}
HandlerError := HandlerError{}
config.ToProducer(&event, topicsProducer).
ToConsumer(&handler, &event, []string{"my-topic"}).
Move("topicRetry").
ToConsumer(&HandlerError, &event, []string{"topicRetry"}).
Move("topicRetry")
go config.Build()
}
type HandlerError struct{}
func (h *HandlerError) Handler(event interface{}, metadata AMQStream.ConsumerMetadata) error {
value, ok := event.(*TestEvents.KafkaDemoRetro)
time.Sleep(8 * time.Second) // si queremos esperar para reintentar.
if ok {
fmt.Println(value.Id)
}
}