Skip to content

Producers

El producer es un agente que permite realizar la publicación de un evento en un topic determinado.

La librería dispone de la interface IPublisher la cual posee la responsabilidad de realizar las publicaciones.

Example

Configuración

En nuestro archivo de startup debemos importar la librería Andreani.ARQ.AMQStreams

Simplemente llamar al método de extensión AddKafka().

public void Configure(IWebHostBuilder builder)
    {
      builder.ConfigureServices((ctx, c) =>
      {
          c.AddKafka(ctx.Configuration)
          .ToProducer<CustomEvent>("my-topic")
          .Build();
      });
    }

Para configurar un producer debemos especificar el evento que queremos publicar (CustomEvent) y a que topic vamos a publicar ese evento (my-topic).

La librería nos permite configurar múltiples producer y publicar el mismo evento en n-topics

  public void Configure(IWebHostBuilder builder)
    {
      builder.ConfigureServices((ctx, c) =>
      {
          c.AddKafka(ctx.Configuration)
          .ToProducer<CustomEvent>("my-topic")
          .ToProducer<CustomEvent2>("my-topic", "my-topic-2")
          .Build();
      });
    }

Cómo publicar

Para publicar un mensaje simplemente debemos inyectar la interface IPublisher

    private readonly IPublisher _publisher;

    public ProducerJob(IPublisher publisher)
    {
        _publisher = publisher;
    }
y utilizar el metodo To()
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var message = new CustomEvent
        {
            codigo = 1,
            mensaje = "example producer"
        };
        await _publisher.To(message, "Key");
    }
En este ejemplo creamos un CustomEvent y llamamos al método To del publisher.

La libreria posee una alternativa en cuanto a la necesidad del desarrollador planteando diferentes escenarios

 public interface IPublisher
    { //**Evento y una key**
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(TEvent @event, string key) where TEvent : class, ISpecificRecord;
    // **Evento, key y un exactTopic**
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(TEvent @event, string key, string exactTopic) where TEvent : class, ISpecificRecord;
    // **Evento,key,lista de headers**
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(TEvent @event, string key, List<Header> headers) where TEvent : class, ISpecificRecord;
    //**Evento, key, lista de headers y exactTopic**
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(TEvent @event, string key, List<Header> headers, string exactTopic) where TEvent : class, ISpecificRecord;
    //** Lista de N eventos y key**
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(IEnumerable<TEvent> events, string key) where TEvent : class, ISpecificRecord;
    //** Lista de N eventos, key y exactTopic**
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(IEnumerable<TEvent> events, string key, string exactTopic) where TEvent : class, ISpecificRecord;
    //** Lista de eventos, key, lista de headers**
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(IEnumerable<TEvent> events, string key, List<Header> headers) where TEvent : class, ISpecificRecord;
    //**Lista de eventos, key, lista de headers y exactTopic**
        Task<List<DeliveryResult<string, TEvent>>> To<TEvent>(IEnumerable<TEvent> events, string key, List<Header> headers, string exactTopic) where TEvent : class, ISpecificRecord;
    }