2017-06-26 4 views
0

J'utilise Confluent.Kafka dotnet client.Kafka consommateur commit thread-sécurité

namespace Confluent.Kafka 
{ 
    public class Consumer<TKey, TValue> : IDisposable 
    { 
     public Task<CommittedOffsets> CommitAsync(); 
    } 
} 

Comme vous le voyez, Consumer.CommitAsync est une méthode asynchrone. Est-il sûr d'appeler CommitAsync méthode sans attendre sa réponse, puis passer le prochain appel à Subscribe?

Exemple de code ci-dessous.

using (var consumer = new Confluent.Kafka.Consumer<MessageKey, byte[]>(config, new MessageKeyDeserializer(), new ByteArrayDeserializer())) 
{ 
       consumer.Subscribe(topics); 

       while (true) 
       { 
        Message<MessageKey, byte[]> msg; 
        if (consumer.Consume(out msg, TimeSpan.FromSeconds(1))) 
        { 
         // ... 

         if(msg.Offset % 100 == 0) 
         { 
          consumer.CommitAsync().ContinueWith((t) => 
          { 
           // log t.Exception 
          }, TaskContinuationOptions.OnlyOnFaulted); 
         } 
        } 
       } 
} 

Répondre

0

Je suppose que vous vouliez dire prochain appel à Consommez

Oui, il est sûr, pas de problème avec cela. Je voudrais également ajouter une fenêtre de temps pour la validation (ce qui vient en premier entre 5s et 100msgs par exemple) de sorte que si vous ne recevez pas de messages pendant un certain temps, vous les validez encore