2017-07-20 1 views
-4

J'essaie d'écrire dans Kafka Topic par JAVA, comme j'ai créé le sujet, mais je veux insérer des données dans ce sujet.Ecrire dans le sujet dans Kafka via Java Code

Merci d'avance.

+2

Que voulez-vous dire par "Je suis en train"? Avez-vous un code qui ne fonctionne pas? Pouvez-vous le montrer? Sinon, il ya beaucoup d'exemples de code en ligne sur l'utilisation d'un KafkaProducer, pas besoin d'une question Stack Overflow. – ppatierno

Répondre

0

Voici un exemple de producteur synchrone. Il devrait travailler avec Kafka 0.11 (et quelques versions précédentes aussi):

import org.apache.kafka.clients.producer.*; 
import org.apache.kafka.common.serialization.LongSerializer; 
import org.apache.kafka.common.serialization.StringSerializer; 
import java.util.Properties; 

public class MyKafkaProducer { 

    private final static String TOPIC = "my-example-topic"; 
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094"; 

    private static Producer<Long, String> createProducer() { 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 
     props.put(ProducerConfig.CLIENT_ID_CONFIG, "MyKafkaProducer"); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
     return new KafkaProducer<>(props); 
    } 

    static void runProducer(final int sendMessageCount) throws Exception { 
     final Producer<Long, String> producer = createProducer(); 

     try { 
      for (long index = 1; index <= sendMessageCount; index++) { 
       final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "Message " + index); 
       RecordMetadata metadata = producer.send(record).get(); 
       System.out.printf("sent record(key=%s value='%s')" + " metadata(partition=%d, offset=%d)\n", 
        record.key(), record.value(), metadata.partition(), metadata.offset()); 
      } 
     } finally { 
      producer.flush(); 
      producer.close(); 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     if (args.length == 0) { 
      runProducer(5); 
     } else { 
      runProducer(Integer.parseInt(args[0])); 
     } 
    } 
} 

Vous devrez peut-être modifier certains paramètres codés en dur.

Référence: http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html