0

EDIT2: Enfin j'ai fait mon propre producteur en Java et fonctionne bien, donc le problème est dans le Kafka-console-producteur. Le kafka-console-consumer fonctionne bien.Kafka + Spark Lecture en continu: retard constant de 1 seconde

EDIT: J'ai déjà essayé avec la version 0.9.0.1 et a le même comportement.

Je travaille sur mon projet final de baccalauréat, une comparaison entre Spark Streaming et Flink. Avant les deux frameworks, j'utilise Kafka et un script pour générer les données (expliqué ci-dessous). Mon premier test est de comparer la latence entre les deux frameworks avec des charges de travail simples et Kafka me donne une latence vraiment élevée (1 seconde en permanence). Pour la simplicité, pour le moment je cours dans une seule machine à la fois Kafka et Spark.

J'ai déjà recherché et trouvé des problèmes similaires, et essayé les solutions qu'ils donnent mais rien n'a changé. J'ai vérifié aussi toutes les configurations Kafka dans la documentation officielle et de mettre les Importants pour le temps d'attente dans mes fichiers de configuration, voici ma configuration:

Kafka 0.10.2.1 - Spark 2.1.0

server.properties :

num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
num.partitions=2 
num.recovery.threads.per.data.dir=1 
log.flush.interval.messages=1000 
log.flush.interval.ms=50 
log.retention.hours=24 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=localhost:2181 
zookeeper.connection.timeout.ms=6000 
flush.messages=100 
flush.ms=10 

producer.properties:

compression.type=none 
max.block.ms=200 
linger.ms=50 
batch.size=0 

programme Streaming Spark: (qui imprime les données reçues, et la différence entre le moment où les données ont été créées et quand est en cours de traitement pour la fonction)

package com.tfg.spark1.spark1; 

import java.util.Map; 
import java.util.HashMap; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.*; 
import scala.Tuple2; 
import org.apache.spark.streaming.kafka.*; 

public final class Timestamp { 

    public static void main(String[] args) throws Exception { 
     if (args.length < 2) { 
      System.err.println("Usage: Timestamp <topics> <numThreads>"); 
      System.exit(1); 
     } 

     SparkConf conf = new SparkConf().setMaster("spark://192.168.0.155:7077").setAppName("Timestamp"); 
     JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(100)); 


     Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
     int numThreads = Integer.parseInt(args[1]); 
     topicMap.put(args[0], numThreads); 

     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "192.168.0.155:2181", "grupo-spark", topicMap); //Map<"test", 2> 

     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
      private static final long serialVersionUID = 1L; 

      public String call (Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
     }); 

     JavaDStream<String> newLine = lines.map(new Function<String, String>() { 
      private static final long serialVersionUID = 1L; 

      public String call(String line) { 
       String[] tuple = line.split(" "); 
       String totalTime = String.valueOf(System.currentTimeMillis() - Long.valueOf(tuple[1])); 
       //String newLine = line.concat(" " + String.valueOf(System.currentTimeMillis()) + " " + totalTime); 

       return totalTime; 
      } 
     }); 

     lines.print(); 
     newLine.print(); 

     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

Les données générées a le format suivant:

"Random bits" + " " + "current time in ms" 
01 1496421618634 
11 1496421619044 
00 1496421619451 
00 1496421618836 
10 1496421619247 

Enfin quand je lance mon programme en streaming Spark et le générateur de script, qui génère les données toutes les 200ms, Spark (intervalle de lot = 100ms) imprime des 9 lots vides, et chaque seconde (toujours 900ms moment, comme dans cet exemple : Tim e: 1496421619 ms) il en résulte:

------------------------------------------- 
Time: 1496421619900 ms 
------------------------------------------- 
01 1496421618634 
11 1496421619044 
00 1496421619451 
00 1496421618836 
10 1496421619247 
------------------------------------------- 
Time: 1496421619900 ms 
------------------------------------------- 
1416 
1006 
599 
1214 
803 

Aussi, si je lance une ligne de commande-producteur Kafka et une autre ligne de commande consommateur, il faut toujours un certain temps pour imprimer les données produites dans le consommateur.

Merci d'avance pour l'aide!

+1

essayer le consommateur simple d'abord pour voir si son étincelle spécifique ou kafka spécifique. il y a quelques messages (également de linkedin) qui rapportent 30 ms de latence. –

+0

Voulez-vous dire le kafka-console-consumer? J'ai déjà essayé et il reçoit les éléments aussi avec retard. J'ai également lu de plusieurs sites qu'il peut atteindre cette latence. J'essaierai aussi d'utiliser une version plus ancienne de Kafka. Merci! : D – Franmoti

+0

Cela peut également dépendre de votre matériel (par exemple, le nombre de threads). Essayez également de voir le système dans un état stable (pas seulement un ou deux messages) peut-être que cela prend du temps pour se réchauffer –

Répondre

1

Je viens de mettre à jour le JIRA que vous avez ouvert avec la raison pour laquelle vous voyez toujours le retard de 1000 ms.

https://issues.apache.org/jira/browse/KAFKA-5426

je signale ici la raison ...

le paramètre linger.ms est réglé en utilisant l'option --timeout sur la ligne de commande qui, si non spécifié est de 1000 ms. En même temps, le lot.Le paramètre size est défini à l'aide de l'option --max-partition-memory-bytes sur la ligne de commande qui, si elle n'est pas spécifiée, est 16384. Cela signifie que même si vous spécifiez linger.ms et batch.size en utilisant --producer-property ou --producer.config, ils seront toujours remplacé par les options "spécifiques" ci-dessus.