EDIT2: Enfin j'ai fait mon propre producteur en Java et fonctionne bien, donc le problème est dans le Kafka-console-producteur. Le kafka-console-consumer fonctionne bien.Kafka + Spark Lecture en continu: retard constant de 1 seconde
EDIT: J'ai déjà essayé avec la version 0.9.0.1 et a le même comportement.
Je travaille sur mon projet final de baccalauréat, une comparaison entre Spark Streaming et Flink. Avant les deux frameworks, j'utilise Kafka et un script pour générer les données (expliqué ci-dessous). Mon premier test est de comparer la latence entre les deux frameworks avec des charges de travail simples et Kafka me donne une latence vraiment élevée (1 seconde en permanence). Pour la simplicité, pour le moment je cours dans une seule machine à la fois Kafka et Spark.
J'ai déjà recherché et trouvé des problèmes similaires, et essayé les solutions qu'ils donnent mais rien n'a changé. J'ai vérifié aussi toutes les configurations Kafka dans la documentation officielle et de mettre les Importants pour le temps d'attente dans mes fichiers de configuration, voici ma configuration:
Kafka 0.10.2.1 - Spark 2.1.0
server.properties :
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=2
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=1000
log.flush.interval.ms=50
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
flush.messages=100
flush.ms=10
producer.properties:
compression.type=none
max.block.ms=200
linger.ms=50
batch.size=0
programme Streaming Spark: (qui imprime les données reçues, et la différence entre le moment où les données ont été créées et quand est en cours de traitement pour la fonction)
package com.tfg.spark1.spark1;
import java.util.Map;
import java.util.HashMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.spark.streaming.kafka.*;
public final class Timestamp {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: Timestamp <topics> <numThreads>");
System.exit(1);
}
SparkConf conf = new SparkConf().setMaster("spark://192.168.0.155:7077").setAppName("Timestamp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(100));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
int numThreads = Integer.parseInt(args[1]);
topicMap.put(args[0], numThreads);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "192.168.0.155:2181", "grupo-spark", topicMap); //Map<"test", 2>
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call (Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> newLine = lines.map(new Function<String, String>() {
private static final long serialVersionUID = 1L;
public String call(String line) {
String[] tuple = line.split(" ");
String totalTime = String.valueOf(System.currentTimeMillis() - Long.valueOf(tuple[1]));
//String newLine = line.concat(" " + String.valueOf(System.currentTimeMillis()) + " " + totalTime);
return totalTime;
}
});
lines.print();
newLine.print();
jssc.start();
jssc.awaitTermination();
}
}
Les données générées a le format suivant:
"Random bits" + " " + "current time in ms"
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247
Enfin quand je lance mon programme en streaming Spark et le générateur de script, qui génère les données toutes les 200ms, Spark (intervalle de lot = 100ms) imprime des 9 lots vides, et chaque seconde (toujours 900ms moment, comme dans cet exemple : Tim e: 1496421619 ms) il en résulte:
-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247
-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
1416
1006
599
1214
803
Aussi, si je lance une ligne de commande-producteur Kafka et une autre ligne de commande consommateur, il faut toujours un certain temps pour imprimer les données produites dans le consommateur.
Merci d'avance pour l'aide!
essayer le consommateur simple d'abord pour voir si son étincelle spécifique ou kafka spécifique. il y a quelques messages (également de linkedin) qui rapportent 30 ms de latence. –
Voulez-vous dire le kafka-console-consumer? J'ai déjà essayé et il reçoit les éléments aussi avec retard. J'ai également lu de plusieurs sites qu'il peut atteindre cette latence. J'essaierai aussi d'utiliser une version plus ancienne de Kafka. Merci! : D – Franmoti
Cela peut également dépendre de votre matériel (par exemple, le nombre de threads). Essayez également de voir le système dans un état stable (pas seulement un ou deux messages) peut-être que cela prend du temps pour se réchauffer –