2017-07-20 1 views
1

Mettez à jour TTL pour un sujet afin que les enregistrements restent dans le sujet pendant 10 jours. Je dois faire cela pour un sujet particulier en laissant tous les autres sujets TTL est la même configuration actuelle, je dois le faire en utilisant java parce que je pousse un sujet à kafka via Java. Je suis en train de propriétés suivantes pour pousser un sujet à kafkaMise à jour de TTL pour un sujet particulier dans kafka en utilisant Java

Properties props = new Properties(); 
    props.put("bootstrap.servers", KAFKA_SERVERS); 
    props.put("acks", ACKS); 
    props.put("retries", RETRIES); 
    props.put("linger.ms", new Integer(LINGER_MS)); 
    props.put("buffer.memory", new Integer(BUFFER_MEMORY)); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

Répondre

3

Vous pouvez le faire en utilisant le AdminClient, après un bout de code qui obtiennent la configuration actuelle (juste pour le test) puis mettre à jour les « retention.ms » config sur le sujet nommé "test".

Properties props = new Properties(); 
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 

    AdminClient adminClient = AdminClient.create(props); 

    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "test"); 

    // get the current topic configuration 
    DescribeConfigsResult describeConfigsResult = 
      adminClient.describeConfigs(Collections.singleton(resource)); 

    Map<ConfigResource, Config> config = describeConfigsResult.all().get(); 

    System.out.println(config); 

    // create a new entry for updating the retention.ms value on the same topic 
    ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "50000"); 
    Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>(); 
    updateConfig.put(resource, new Config(Collections.singleton(retentionEntry))); 

    AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(updateConfig); 
    alterConfigsResult.all(); 

    describeConfigsResult = adminClient.describeConfigs(Collections.singleton(resource)); 

    config = describeConfigsResult.all().get(); 

    System.out.println(config); 

    adminClient.close(); 
+0

Merci pour la réponse, pas en mesure d'obtenir ces classes d'information ('AdminClient',' DescribeConfigsResult', 'AlterConfigsResult',' ConfigEntry'), je me sers 'pom' ne pas obtenir des déclarations d'importation – Sat

+0

utilisez-vous la 0.11.0 version? J'utilise celui-ci. L'API AdminClient a été introduite avec la dernière version, sinon vous ne pouvez pas le faire à partir du code Java. – ppatierno

+0

' \t \t \t org.apache.kafka \t \t \t kafka_2.10 \t \t \t 0.8.0 \t \t' J'ai essayé avec 0.11.0 aussi, mais pour moi n'a pas fonctionné – Sat