2017-08-23 3 views

Répondre

2

Il n'existe pas de mesure directe «Donnez-moi le débit pour cet opérateur» disponible. Vous pouvez implémenter un opérateur primitif qui accède à la métrique nTuplesProcessed au fil du temps et calcule le débit à partir de cette valeur. (Le list of available metrics.) Mais, je trouve effectivement beaucoup plus facile à utiliser l'opérateur composite suivant:

public composite PeriodicThroughputSink(input In) { 
param expression<float64> $period; 
     expression<rstring> $file; 
graph 
    stream<boolean b> Period = Beacon() { 
     param period: $period; 
    } 

    stream<float64 throughput> Throughput = Custom(In; Period) { 
     logic state: { 
      mutable uint64 _count = 0; 
      float64 _period = $period; 
     } 

     onTuple In: { 
      ++_count; 
     } 

     onTuple Period: { 
      if (_count > 0ul) { 
       submit({throughput=((float64)_count/_period)}, Throughput); 
       _count = 0ul; 
      } 
     } 

     config threadedPort: queue(Period, Sys.Wait); // ensures that the throughput calculation and file 
                 // writing is on a different thread from the rest 
                 // of the application 
    } 

    () as Sink = FileSink(Throughput) { 
     param file: $file; 
       format: txt; 
       flush: 1u; 
    } 
} 

Vous pouvez alors utiliser l'opérateur composite comme un « tap débit », où il consomme le courant de tout opérateur dont débit que vous voulez enregistrer. Par exemple, vous pouvez l'utiliser comme ceci:

stream<Data> Result = OperatorYouCareAbout(In) {} 

() as ResultThroughput = PeriodicThroughputSink(Result) { 
    param period: 5.0; 
      file: "ResultThroughput.txt"; 
} 

Bien sûr, vous pouvez toujours utiliser le flux Result ailleurs dans votre application. Gardez à l'esprit que cette méthode peut avoir un impact sur les performances de l'application: nous mettons un doigt sur le chemin de données. Mais, l'impact ne doit pas être important, surtout si vous vous assurez que les opérateurs du PeriodicThroughputSink sont fusionnés dans le même PE que n'importe quel opérateur que vous appuyez. En outre, plus la période est courte, plus il est probable que cela aura une incidence sur les performances de l'application.

Encore une fois, nous pourrions faire quelque chose de similaire dans un opérateur primitif C++ ou Java en accédant à la métrique nTuplesProcessed, mais je trouve l'approche ci-dessus beaucoup plus facile. Vous pouvez également récupérer les statistiques du système à l'extérieur de votre application. disons, vous pourriez avoir un script qui utilise périodiquement streamtool capturestate ou l'API REST, puis analyser la sortie, trouver la métrique nTuplesProcessed pour l'opérateur qui vous intéresse et l'utiliser pour calculer le débit. Mais je trouve la technique dans cet opérateur composite beaucoup plus facile.

+0

Merci Scott, d'abord pour avoir clarifié qu'il n'y a pas de façon directe de le capturer; J'avais quelque chose comme la solution que vous avez donnée, mais je voulais m'assurer que je ne la rendrais pas trop compliquée s'il y a un moyen direct. Grande aide! –