J'essaie d'accéder à kafka déployé sur le serveur AWS avec IP publique, mais en essayant de le connecter et d'envoyer des données je ne reçois aucune réponse et la connexion au serveur est fermée.Suivez mon code de producteur J'ai également essayé d'ajouter advertise.host.name au fichier config de server.properties. Apache kafka producteur ne stocke pas de données
public SensorDevice() {
Properties props = new Properties();
props.put("metadata.broker.list", "myip-xyz:9092");
props.put("bootstrap.servers", "myip-xyz:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
producer = new KafkaProducer<String, String>(props);
}
public void run() {
Object objectData = new Object();
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topic, "mytopic", objectData.toString());
System.out.println(data);
Future<RecordMetadata> rs = producer.send(data,
new org.apache.kafka.clients.producer.Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata,
Exception arg1) {
System.out.println("Received ack for partition="
+ recordMetadata.partition() + " offset = "
+ recordMetadata.offset());
}
});
try {
String msg = "";
RecordMetadata rm = rs.get();
msg = msg + " partition = " + rm.partition() + " offset ="
+ rm.offset();
System.out.println(msg);
} catch (Exception e) {
System.out.println(e);
}
producer.close();
}
Kafka montre l'erreur suivante -
> [2015-04-24 09:06:35,329] INFO Created log for partition [mytopic,0] in /tmp/kafka-logs with properties {segment.index.bytes ->
> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824,
> flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000,
> index.interval.bytes -> 4096, retention.bytes -> -1,
> min.insync.replicas -> 1, cleanup.policy -> delete,
> unclean.leader.election.enable -> true, segment.ms -> 604800000,
> max.message.bytes -> 1000012, flush.messages -> 9223372036854775807,
> min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000,
> segment.jitter.ms -> 0}. (kafka.log.LogManager)
> [2015-04-24 09:06:35,330] WARN Partition [mytopic,0] on broker 0: No checkpointed highwatermark is found for partition [mytopic,0]
> (kafka.cluster.Partition)
> [2015-04-24 09:07:34,788] INFO Closing socket connection to /50.156.87.157. (kafka.network.Processor)
S'il vous plaît me aider à résoudre ce problème!