2017-02-28 2 views
1

que je voulais mettre en œuvre une rediffusion de données pour certains que nous avons, et pour cela, je dois utiliser la politique de rétention Kafka (depuis que je me sers de rejoindre et j'ai besoin le temps de fenêtre pour être précis). P.S. J'utilise la version Kafka 0.10.1.1politique de conservation Kafka ne fonctionne pas comme prévu

J'ai envoyé mes données dans le sujet comme celui-ci:

kafkaProducer.send(
        new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r) 
      ); 

Et je crée mon sujet comme celui-ci:

kafka-sujets --create - -zookeeper localhost: 2181 --replication-factor 1 --partitions 1 --top myTopic
kafka-topics --zookeeper localhost --alter --top myTopic --config rétention.ms = 172800000 kafka-topics --zookeeper localhost --alter --topic MyTopic --config segment.ms = 172800000

donc par le réglage ci-dessus, je régler le temps de rétention de mon sujet à 48 heures.

J'adresse TimestampExtractor pour enregistrer le temps réel de chaque message.

public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor { 
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class); 
    @Override 
    public long extract(ConsumerRecord<Object, Object> consumerRecord) { 
     LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp())); 
     return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis(); 
    } 
} 

Pour tester, j'ai envoyé 4 messages à mon sujet et j'obtiens ces 4 messages de journal.

2017-02-28 10:23:39 INFO ConsumerRecordOrWallclockTimestampExtractor: 21 - timestamp: 1488295086292 humaine retranscrit -Tue 28 février 10:18:06 EST 2017
2017-02-28 10:24 : 01 INFO ConsumerRecordOrWallclockTimestampExtractor: 21 - timestamp: 1483272000000 humaine retranscrit -Soleil 1 janvier 07:00:00 EST 2017
2017-02-28 10:26:11 INFO ConsumerRecordOrWallclockTimestampExtractor: 21 - timestamp: 1485820800000 humaine retranscrit -mon 30 janvier 19:00:00 EST 2017
2017-02-28 10:27:22 INFO ConsumerRecordOrWallclockTimestampExtractor: 21 - timestamp: 1488295604411 humaine retranscrit -Tue 28 février 10:26:44 EST 2017

donc basé sur Kafka's retention policy Je me attendais à voir deux de mes messaged purgés/supprimé après 5 minutes (2e et 3e messager puisqu'ils sont pour le 1er janvier et le 30 janvier). Mais j'ai essayé de consommer mon sujet pendant une heure et chaque fois que j'ai consommé mon sujet j'ai reçu tous les 4 messages.

kafka-Avro-console-consommateur --zookeeper localhost: 2181 --from-commençant --topic MyTopic

config

Mon Kafka est comme ceci:

############################# Log Retention Policy ############################# 

# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 

# The minimum age of a log file to be eligible for deletion 
log.retention.hours=168 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining 
# segments don't drop below log.retention.bytes. 
#log.retention.bytes=1073741824 

# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=1073741824 

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=300000 

Am Je fais quelque chose de mal ou quelque chose me manque ici?

Répondre

5

Kafka met en œuvre sa politique de rétention en supprimant des segments du journal. Kafka ne supprime jamais le segment actif, qui est le segment dans lequel il ajoutera les nouveaux messages envoyés à la partition. Kafka ne supprime que les anciens segments.Kafka roule tronçon actif dans un ancien secteur lorsqu'un nouveau message est envoyé à la cloison, et soit

  • la taille du segment actif avec le nouveau message dépasserait log.segment.bytes ou
  • l'horodatage de la première message dans le segment actif est plus ancienne que log.roll.ms (valeur par défaut est de 7 jours)

donc, dans votre exemple, vous devez attendre 7 jours après le 28 Tue Feb 10:18:06 eST 2017, envoyer un nouveau message, et alors tous les 4 messages initiaux seront supprimés.

+0

Si oui, comment cela explique quand j'ai envoyé deux messages avec l'horodatage de 1970 (très vieux message) après 5 minutes les deux sont supprimés? – Am1rr3zA