Reprocesamiento BOQ¶
Manejo de Mensajes con errores (Poison Messages)¶
Una buena practica siempre que desarrollamos una aplicación que consume mensajes del MQ, es generar una queue extra que se llama igual a la que nos conectamos pero que finaliza con .BOQ, en esta cola vamos a mandar los mensajes que no podemos procesar con nuestro consumidor de forma normal (PosionMessages) y no queremos perder. La idea de la BOQ es que nos sirva para salir de una situación en donde no podemos procesar el mensaje por alguna condición, pero que se puede solucionar en un futuro. Por ese motivo, la idea es pasar ese mensaje a la BOQ para que luego podemos reintentar el reproceso. Hasta el momento el reproceso de la BOQ era algo artesanal y que no sigue ningún lineamiento. Con esto se trata de resolver este problema de la manera mas simple y sin trabajo extra, para que las BOQ no queden desatendidas y acumulando mensajes.
Funcionamiento¶
- La aplicación se conecta a la BOQ y empieza a recibir mensajes de la queue.
- Al recibir un mensaje de la BOQ, se trata de generar un hash de identificación único del mensaje. Por default se utiliza un componente por defecto llamado
DefaultMessageKeyGenerator
, el mismo trata de generar una key con las propiedades timestamp y numeroDeOrden del evento de negocio. Implementando la interfazIMessageKeyGenerator
uno puede variar la estrategia para la generacion de esta key. Mas sobre como utilizar una estrategia diferente aqui. - Se verifica si ya existe el evento en la base de datos interna y si no existe se crea un registro.
- Se verifica que se debe hacer con el mensaje:
- Si no cumplio con el tiempo de espera
SleepDurationBetweenRetries
desde la ultima actualización, el mensaje se vuelve a poner en la BOQ. - Si la cantidad de reintentos es mayor o igual a
RetryCount
, el mensaje se elimina de la BOQ y pasa a la tabla PoisonMessage. - Si cumplió con el tiempo de espera y no excedió la cantidad de reintentos, el mensaje pasa a la REQ.
- Ante cualquier error en el procesamiento, el mensaje se elimina de la BOQ y pasa a la tabla DeadLetterQueue.
Implementacion¶
- Generar una nueva aplicacion Platform del tipo API.
- Agregar la libreria Infra.EventBus.IbmMQ
- Registrar los componentes necesarios para el reproceso de la BOQ en el startup.
public void Configure(IWebHostBuilder builder)
{
builder.ConfigureServices((ctx, c) =>
{
c.AddIbmMQReprocessing();
});
}
- Agregar la configuración del IbmMQReprocessing al archivo appsettings.json o appsettings.yml
"IbmMQReprocessing": {
"QueueManagerName": "GA02.AR.T.QM",
"Channel": "ITG.TO.GA02.TEST",
"ConnectionInfo": "192.6.6.39(1416)",
"BackoutQueueName" : "QL.ITG.ALERTRAN.SUBSCRIBER.BOQ",
"QueueName": "QL.ITG.ALERTRAN.SUBSCRIBER.REQ",
"SleepDurationBetweenRetries" : "90",
"OlderEntriesExpirationDays" : "1"
}
IbmMQReprocessing:
QueueManagerName: GA02.AR.T.QM
Channel: ITG.TO.GA02.TEST
ConnectionInfo: "192.6.6.39(1416)"
BackoutQueueName: QL.ITG.ALERTRAN.SUBSCRIBER.BOQ
QueueName: QL.ITG.ALERTRAN.SUBSCRIBER.REQ
SleepDurationBetweenRetries: 90
OlderEntriesExpirationDays: 1
MessageKeyGenerator¶
En caso de que la estrategia por defecto no cumpla por algun motivo con lo necesario, por ejemplo: los mensajes de la BOQ no son eventos de Negocio. Esta la posibilidad de realizar una a medida. Para esto se debe implementar lo siguiente:
public class MessageKeyGenerator : IMessageKeyGenerator
{
public string GenerateKeyFor(string message)
{
return "Mi propia key basada en el mensaje";
}
}
builder.ConfigureServices((ctx, c) =>
{
c.AddIbmMQReprocessing().WithCustomMessageKeyGenerator<MessageKeyGenerator>();
});
Note
De esta manera queda en el programador, saber interpretar el mensaje y ser capaz de devolver un valor que sirva como identificador único del mensaje.
Dashboard¶
Tambien se agrega un Dashboard que queda disponible en el path /dashboard en el cual podemos visualizar los mensajes que alcanzaron los reintentos máximos.
Para utilizar el mismo debemos agregar la libreria Infra.EventBus.IbmMQ.UI
al proyecto y la siguiente linea en el startup.
public void Configure(IWebHostBuilder builder)
{
builder.ConfigureServices((ctx, c) =>
{
c.AddIbmMQReprocessing();
c.AddIbmMQReprocessingUI(); // <-----
});
}
Eliminacion de mensajes antiguos¶
Debido a que cada vez que se recibe un evento de la BOQ, el mismo se almacena en la tabla Entry, es comun ver que en la misma queden registros que se han procesado bien en algun reintento y por tal motivo no han vuelto a la BOQ. En este caso, el registro queda huérfano en la tabla sin posibilidad de ser borrado. Para poder limpiar estos registros, la libreria dispara un JOB que se ejecuta diariamente que trata de eliminar los registros de la tabla Entry donde la ultima actualizan supera la cantidad de dias configurado en OlderEntriesExpirationDays
, por defecto se mantiene en un dia.
Opciones de Configuración¶
A continuación veremos un listado de la propiedades que podemos configurar:
Nombre | Tipo | Default | Requerido | Descripción |
---|---|---|---|---|
QueueManagerName | String | Null | Si | Nombre del QM |
Channel | String | Null | Si | Nombre del Canal |
ConnectionInfo | String | Null | Si | Conexión ip(puerto) |
BackoutQueueName | String | Null | Si | Nombre de la cola BOQ a procesar |
QueueName | String | Null | No | Nombre de la cola REQ a procesar |
User | String | Null | No | Nombre del usuario. En el caso de que el QM necesite autenticacion |
Password | String | Null | No | Password. En el caso de que el QM necesite autenticacion |
RetryCount | Int | 3 | No | Cantidad de reintentos antes de marcar el mensaje como PoisonMessage |
SleepDurationBetweenRetries | Int | 0 | No | Tiempo en segundos entre reintentos |
ShowNoMessageAvailableWarning | Bool | false | No | Muestra o no un warning a no encontrar mensajes en al cola |
WaitMessageIntervalInSeconds | Int | 15 | No | Tiempo de espera para la obtener mensajes |
OlderEntriesExpirationDays | Int | 1 | No | Tiempo que debe pasar para que las entries se consideren expiradas y puedan ser eliminadas. |