2015-04-24 1 views
1

J'essaie d'accéder à kafka déployé sur le serveur AWS avec IP publique, mais en essayant de le connecter et d'envoyer des données je ne reçois aucune réponse et la connexion au serveur est fermée.Suivez mon code de producteur J'ai également essayé d'ajouter advertise.host.name au fichier config de server.properties. Apache kafka producteur ne stocke pas de données

public SensorDevice() { 
    Properties props = new Properties(); 
    props.put("metadata.broker.list", "myip-xyz:9092"); 
    props.put("bootstrap.servers", "myip-xyz:9092"); 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 
    props.put("key.serializer", 
      "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", 
      "org.apache.kafka.common.serialization.StringSerializer"); 
    // props.put("partitioner.class", "example.producer.SimplePartitioner"); 
    props.put("request.required.acks", "1"); 
    producer = new KafkaProducer<String, String>(props); 

} 

public void run() { 

    Object objectData = new Object(); 

    ProducerRecord<String, String> data = new ProducerRecord<String, String>(
      topic, "mytopic", objectData.toString()); 
    System.out.println(data); 
    Future<RecordMetadata> rs = producer.send(data, 
      new org.apache.kafka.clients.producer.Callback() { 

       @Override 
       public void onCompletion(RecordMetadata recordMetadata, 
         Exception arg1) { 
        System.out.println("Received ack for partition=" 
          + recordMetadata.partition() + " offset = " 
          + recordMetadata.offset()); 
       } 
      }); 

    try { 
     String msg = ""; 
     RecordMetadata rm = rs.get(); 
     msg = msg + " partition = " + rm.partition() + " offset =" 
       + rm.offset(); 
     System.out.println(msg); 
    } catch (Exception e) { 
     System.out.println(e); 
    } 
    producer.close(); 

} 
Kafka montre l'erreur suivante -

>  [2015-04-24 09:06:35,329] INFO Created log for partition [mytopic,0] in /tmp/kafka-logs with properties {segment.index.bytes -> 
> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824, 
> flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, 
> index.interval.bytes -> 4096, retention.bytes -> -1, 
> min.insync.replicas -> 1, cleanup.policy -> delete, 
> unclean.leader.election.enable -> true, segment.ms -> 604800000, 
> max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, 
> min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000, 
> segment.jitter.ms -> 0}. (kafka.log.LogManager) 
>  [2015-04-24 09:06:35,330] WARN Partition [mytopic,0] on broker 0: No checkpointed highwatermark is found for partition [mytopic,0] 
> (kafka.cluster.Partition) 
>  [2015-04-24 09:07:34,788] INFO Closing socket connection to /50.156.87.157. (kafka.network.Processor) 

S'il vous plaît me aider à résoudre ce problème!

Répondre

1

EC2 Les adresses IP sont internes. Vous pouvez rencontrer quelques problèmes en traitant avec le serveur EC2 exécutant kafka et zookeeper. Essayez de définir les variables advertised.host.name et advertised.port dans votre fichier server.properties.

advertised.host.name doit être l'adresse IP du serveur EC2. advertised.port devrait être port kafka. Par défaut, il s'agit de 9092.