2017-06-16 4 views
2

J'ai écrit un programme Java très simple pour Apache Flink et maintenant je suis intéressé par la mesure de statistiques telles que le débit (nombre de tuples traités par seconde) et la latence (le temps que le programme doit tuple d'entrée).Débit et latence sur Apache Flink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv") 
     .map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv"); 

JobExecutionResult res = env.execute(); 

Je sais que Flink expose certains paramètres:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html

Mais je ne suis pas sûr de savoir comment les utiliser afin d'obtenir ce que je veux. Du lien j'ai lu qu'un "mètre" peut être utilisé pour mesurer le débit moyen mais, après l'avoir défini, comment l'utiliser?

+0

Avec quoi traitez-vous exactement? Pour le débit, vous devez enregistrer un 'Meter' dans votre fonction' MyMapper', comme indiqué dans le lien que vous avez fourni. Vous pouvez regarder les statistiques en direct dans le tableau de bord Web Flink. – us2012

+0

Si je suis les instructions dont j'ai besoin pour implémenter la classe myMeter, j'ai essayé quelque chose mais cela ne fonctionne pas. Si j'utilise le compteur DropWizard et que j'essaie de l'exécuter en mode autonome, j'ai une erreur (java.lang.NoClassDefFoundError: com/codahale/metrics/Meter ) même si j'ai inclus la dépendance dans le fichier pom.xml. – LizardKing

Répondre

1

Nous exécutons des métriques personnalisées comme le mètre, jauge dans notre travail en continu de production fonctionnant sur le fil.

Voici les étapes:

dépendance additionnel à pom.xml

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-metrics-dropwizard</artifactId> 
    <version>${flink.version}</version> 
</dependency> 

Nous utilisons la version 1.2.1

Ensuite, ajoutez mètre à MyMapper classe.

import org.apache.flink.api.common.JobExecutionResult; 
import org.apache.flink.api.common.functions.RichMapFunction; 
import org.apache.flink.configuration.Configuration; 
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; 
import org.apache.flink.metrics.Meter; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 


public class Test { 


    public static void main(String[] args) throws Exception { 

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     env 
       .readTextFile("/home/LizardKing/Documents/Power/Prova.csv") 
       .map(new MyMapper()) 
       .writeAsCsv("/home/LizardKing/Results.csv"); 

     JobExecutionResult res = env.execute(); 
    } 


    private static class MyMapper extends RichMapFunction<String, Object> { 

     private transient Meter meter; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      this.meter = getRuntimeContext() 
        .getMetricGroup() 
        .meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); 
     } 

     @Override 
     public Object map(String value) throws Exception {  
      this.meter.markEvent(); 
      return value; 
     } 
    } 
} 

Espérons que cela aide.

+0

Cela m'a aidé, j'ai aussi eu un autre problème: quand j'ai essayé d'exécuter ce programme en flink (et non à partir de l'IDE), j'ai découvert que cela ne suffisait pas à inclure la dépendance dans le fichier pom.xml. Je devais fournir les bibliothèques à flinker, l'approche que j'ai suggéré est d'utiliser le plugin maven-shade. Il devrait emballer les dépendances dans le jar téléchargé. – LizardKing