0

J'essaie d'exécuter le travail de streaming Flink. Je veux déterminer le débit et la latence pour le processus de streaming. J'ai commencé le serveur de courtier de Kafka et ai des messages entrants de kafka. Comment je compte des messages par seconde (débit)? (Comme rdd.count.Y a-t-il une méthode similaire pour obtenir le nombre de messages entrants)Flink streaming - latence et détection de débit

(Scénario complet: J'ai envoyé le message via Producer en tant qu'objet Json. J'ajoute des informations comme le nom de chaîne et aussi System.currentTimeMills dans l'objet Json Pendant le streaming, comment puis-je obtenir l'objet json envoyé via messageStream (DataStream)?)

Merci d'avance.

CODE:

/** * Lire Les chaînes de Kafka et de les imprimer à la sortie standard. */

public static void main(String[] args) throws Exception { 
    System.setProperty("hadoop.home.dir", "c:/winutils/"); 
    // parse input argum ents 
    final ParameterTool parameterTool = ParameterTool.fromArgs(args); 

    if(parameterTool.getNumberOfParameters() < 4) { 
     System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " + 
       "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>"); 
     return; 
    } 

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.getConfig().disableSysoutLogging(); 
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); 
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds 
    env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface 

    DataStream<String> messageStream = env 
      .addSource(new FlinkKafkaConsumer010<>(
        parameterTool.getRequired("topic"), 
        new SimpleStringSchema(), 
        parameterTool.getProperties())); 


    messageStream.print(); 

    env.execute(); 
} 

Répondre

0

Il existe quelques métriques disponibles dans l'interface utilisateur de Flink dans lesquelles vous pouvez calculer le nombre d'événements par seconde et ainsi de suite.

Vous pouvez également ajouter vos propres statistiques lorsque vous calculez des nombres en fonction de vos besoins, ce qui peut être affiché dans l'interface utilisateur de Flink.

Et enfin pour le suivi spécifique de latence, vous pouvez peut-être essayer ce qui est expliqué ici - latency-tracking et de même que vous pouvez obtenir en utilisant des débits - meters