2017-10-08 6 views
5

je crée un sujet de kafka propriétés ci-dessousKafka clé ne pas supprimer avec la pierre tombale

min.cleanable.dirty.ratio = 0,01, delete.retention.ms = 100, segment.ms = 100, cleanup.policy = compact

Disons que j'insérer des paires de kv pour 1111: 1, 1111: 2, 1111: null, 2222: 1 Qu'est-ce qui se passe maintenant sauf le dernier message, le compactage du journal fonctionne sur le repos des messages et efface deux premiers mais conserve 1111: null

Acc à la documentation,

Kafka log compaction also allows for deletes. A message with a key and a null payload acts like a tombstone, a delete marker for that key. Tombstones get cleared after a period. 

Alors, j'espère quand delete.retention.ms est atteint, le marqueur nul ne doit supprimer le message avec la clé

J'ai deux questions - Pourquoi le marqueur de pierre tombale ne fonctionne-t-il pas? Pourquoi le dernier message est-il ignoré du compactage?

C'est ce que server.properties fichier a -

log.retention.ms=100 
log.retention.bytes=1073741824 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=100 
log.cleaner.delete.retention.ms=100 
log.cleaner.enable=true 
log.cleaner.min.cleanable.ratio=0.01 

Répondre

3

dossiers de pierre tombale sont conservés plus longtemps par la conception. La raison en est que les courtiers ne suivent pas les consommateurs. Supposons qu'un consommateur se déconnecte pendant un certain temps après avoir lu le premier enregistrement. Alors que le consommateur est en panne, les coups de pied de compactage de journal. Si le compactage du journal supprimait l'enregistrement de pierre tombale, le consommateur n'apprendrait jamais le fait que l'enregistrement avait été supprimé. Si le consommateur implémente un cache, il peut arriver que l'enregistrement ne soit jamais effacé. Ainsi, les pierres tombales sont conservées plus longtemps pour permettre au consommateur hors ligne de recevoir toutes les pierres tombales pour le nettoyage local.

Pierre tombale sera supprimé seulement après delete.retention.ms (valeur par défaut est de 1 jour). Remarque: il s'agit d'une configuration au niveau du sujet et il n'y a pas de configuration au niveau du courtier. Ainsi, vous devez définir la configuration par sujet si vous voulez le changer.

+0

je l'ai décrit dans mon post que le sujet a été créé avec delete.retention.ms = 100 ce qui signifie que la clé de marqueur de pierre tombale aurait été nettoyé après 100 ms après avoir été envoyé. Une question: peut-elle être liée au paramètre compact.policy? J'ai lu quelque part que nous devons le mettre à compacter, supprimer pour activer les suppressions aussi bien. – Sam

+0

Si vous activez l'option « compact, supprimer », vous obtenez essentiellement un TTL sur le dessus et les enregistrements antérieurs alors le temps de rétention sera supprimé (même s'il n'y a pas la pierre tombale). Cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist Pouvez-vous vérifier deux fois config sujet, qu'il est vraiment configuré avec delete.retention.ms = 100? Je –

+0

couru --describe sur le sujet et la sortie est Configs: min.cleanable.dirty.ratio = 0,01, delete.retention.ms = 100, cleanup.policy = compact, segment.ms = 100 – Sam

1

L'algorithme pour enlever la pierre tombale dans un compact est censé être le suivant.

  1. La pierre tombale n'est jamais retirée lorsqu'elle est encore dans la partie sale de la bûche.
  2. Après la pierre tombale est dans la partie nettoyée du journal, nous avons encore retarder la suppression de la pierre tombale par delete.retention.ms depuis l'époque de la pierre tombale est dans la partie nettoyée.

Il est possible que les pierres tombales soient toujours dans la partie sale du journal et ne soient donc pas effacées. Déclenchement quelques messages de clés différentes devrait pousser les pierres tombales dans la partie nettoyée du journal et de les supprimer.

0

sujet compactée comporte deux parties:

1) partie Nettoyé: portion de log de kafka nettoyé par nettoyant kafka au moins une fois.

2) partie sale: Partie de kafka pas nettoyé par log kafka plus propre, même une fois jusqu'à présent. Kafka maintient le décalage sale. Tous les messages avec décalage> = décalage sale appartiennent à la partie sale.Remarque: Le nettoyeur Kafka nettoie tous les segments (que le segment soit nettoyé ou sale) et les recopie chaque fois que le taux de saleté atteint min.cleanable.dirty.ratio.

pierres tombales sont supprimés segments sage. Les pierres tombales d'un segment sont supprimées si le segment satisfait aux conditions suivantes:

  1. Le segment doit être dans la partie nettoyée de la bille.

  2. La dernière heure de segment modifiée doit être < = (Dernière heure de segment modifiée contenant un message avec offset = (dirty offset - 1)) - delete.retention.ms.

Il est difficile d'élaborer deuxième point, mais en termes simples, Deuxième point implique => la taille du segment doit être égale à log.segment.bytes/segment.bytes (1 Go par défaut). Pour que la taille du segment (dans la partie nettoyante) soit égale à 1 Go, vous devez produire un grand nombre de messages avec des touches distinctives. Mais vous avez produit seulement 4 messages avec 3 messages ayant la même clé. C'est pourquoi les pierres tombales ne sont pas supprimées dans le segment contenant le message 1111: null (le segment ne satisfait pas le second point mentionné ci-dessus).

Vous avez deux options pour supprimer les pierres tombales avec 4 messages:

  1. font delete.retention.ms = 0 ou
  2. font log.segment.bytes/segment.bytes = 50.

Source Code (lecture supplémentaire): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala

try { 
     // clean segments into the new destination segment 
     for (old <- segments) { 
     val retainDeletes = old.lastModified > deleteHorizonMs 
     info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." 
      .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) 
     cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats) 
     }