2017-09-28 6 views
0

Dans mon application KafkaConsumer, je souhaite lire un lot de messages avec poll() et les traiter. Mais le traitement peut échouer. Dans ce cas, je veux réessayer jusqu'à ce que je réussisse mais seulement réessayer si le client possède toujours des partitions. Je ne veux pas appeler constamment poll() parce que je ne veux pas lire plus de données.Comment vérifier que KafkaConsumer a toujours attribué des partitions sans lire plus de données avec poll()

C'est un extrait de code:

consumer = new KafkaConsumer<>(consumerConfig); 
try { 
    consumer.subscribe(config.topics() /** Callback does not work as I do not call poll in between */); 
    while (true) { 
     ConsumerRecords<byte[], Value> values = consumer.poll(10000); 
     while (/* I am still owner of partitions */) { 
      try { 
       process(values); 
      } catch (Exception e) { 
       log.error("I dont care, just retry while I own the partitions", e) 
      } 
     } 
    } 
} catch (WakeupException e) { 
    // shutting down 
} finally { 
    consumer.close(); 
} 

Répondre

0

Je suis venu à une conclusion qu'il est impossible d'appeler poll() sans lire des messages avec kafka actuelle 10.2.x des consommateurs Cependant, il est possible de mettre à jour le décalage après un échec de traitement. Donc, je mets à jour OFFSET comme si les messages ont jamais lu

while (!stopped) { 
    ConsumerRecords<byte[], Value> values = consumer.poll(timeout); 
    try { 
     process(values); 
    } catch (Exception e) { 
     rewind(records); 
     // Ensure a delay after errors to let dependencies recover 
     Thread.sleep(delay); 
    } 
} 

et la méthode de retour rapide est

private void rewind(ConsumerRecords<byte[], Value> records) { 
    records.partitions().forEach(partition -> { 
     long offset = records.records(partition).get(0).offset(); 
     consumer.seek(partition, offset); 
    }); 
} 

Il résout le problème initial

0

Il existe une méthode de rappel qui vous indique quand vos clients affectations de partitions sont sur le point d'être révoqué. Continuez à traiter le message sauf si vous obtenez un événement onPartitionRevoked().

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)

+0

Mais comment cette méthode de rappel va être appelé? De quel fil? –

+0

On pense qu'il est appelé par le thread principal client Kafka Consumer. Essaye le. Il est recommandé d'implémenter ce rappel de toute façon afin que vous ne perdiez pas votre décalage et finissiez les messages de retraitement lors d'un rééquilibrage. –

+0

Le thread principal du client KafkaConsumer essaie de traiter les ressources exécutant 'process (values);' dans une boucle, n'est-ce pas? –