2016-07-10 2 views
0

Question: Est-ce une façon valide de tester le temps nécessaire pour construire un RDD?Apache Spark timing pourChaque opération sur JavaRDD

Je fais deux choses ici. L'approche de base est que nous avons M instances de ce que nous appelons une DropEvaluation, et N DropResults. Nous devons comparer chaque N DropResult à chacune des M DropEvaluations. Chaque N doit être vu par chaque M, pour nous donner M résultats à la fin. Si je n'utilise pas le .count() une fois que le RDD est construit, le pilote continue à la ligne de code suivante et dit qu'il ne faut presque pas de temps pour construire un RDD qui prend 30 minutes à construire.

Je veux juste m'assurer que je ne manque pas quelque chose, comme peut-être le .count() prend beaucoup de temps? Je suppose que pour chronométrer le .count() je devrais modifier la source de Spark?

M = 1000 ou 2000. N = 10^7.

C'est effectivement un problème cartésien - l'accumulateur a été choisi parce que nous devons écrire à chaque M en place. Je serais aussi moche de construire le RDD cartésien complet.

Nous construisons une liste d'accumulateurs M (ne peut pas faire un accumulateur de liste en Java non?). Ensuite, nous parcourons chacun des N dans un RDD avec un foreach.

Clarifier la question: La durée totale est mesurée correctement, je demande si le .count() sur le RDD oblige Spark à attendre que le RDD soit terminé avant de pouvoir effectuer un comptage. L'heure .count() est-elle significative?

Voici notre code:

// assume standin exists and does it's thing correctly 

// this controls the final size of RDD, as we are not parallelizing something with an existing length 
List<Integer> rangeN = IntStream.rangeClosed(simsLeft - blockSize + 1, simsLeft).boxed().collect(Collectors.toList()); 

// setup bogus array of size N for parallelize dataSetN to lead to dropResultsN  
JavaRDD<Integer> dataSetN = context.parallelize(rangeN); 

// setup timing to create N 
long NCreationStartTime = System.nanoTime(); 

// this maps each integer element of RDD dataSetN to a "geneDropped" chromosome simulation, we need N of these: 
JavaRDD<TholdDropResult> dropResultsN = dataSetN.map(s -> standin.call(s)).persist(StorageLevel.MEMORY_ONLY()); 

// **** this line makes the driver wait until the RDD is done, right? 
long dummyLength = dropResultsN.count(); 


long NCreationNanoSeconds = System.nanoTime() - NCreationStartTime; 
double NCreationSeconds = (double)NCreationNanoSeconds/1000000000.0; 
double NCreationMinutes = NCreationSeconds/60.0; 

logger.error("{} test sims remaining", simsLeft); 

// now get the time for just the dropComparison (part of accumulable's add) 
long startDropCompareTime = System.nanoTime(); 

// here we iterate through each accumulator in the list and compare all N elements of dropResultsN RDD to each M in turn, our .add() is a custom AccumulableParam 
for (Accumulable<TholdDropTuple, TholdDropResult> dropEvalAccum : accumList) { 
    dropResultsN.foreach(new VoidFunction<TholdDropResult>() { 
        @Override 
        public void call(TholdDropResult dropResultFromN) throws Exception { 
          dropEvalAccum.add(dropResultFromN); 
        } 
       }); 
      } 

    // all the dropComparisons for all N to all M for this blocksize are done, check the time... 
    long dropCompareNanoSeconds = System.nanoTime() - startDropCompareTime; 
    double dropCompareSeconds = (double)dropCompareNanoSeconds/1000000000.0; 
    double dropCompareMinutes = dropCompareSeconds/60.0; 

    // write lines to indicate timing section 
    // log and write to file the time for the N-creation 

    ... 

} // end for that goes through dropAccumList 
+0

Je noterais en réponse à la réponse de Dikei ci-dessous, ce que j'ai attribué de 1, il ne répond pas au problème de base, est-ce une façon valable de chronométrer exactement la création d'un RDD? Est-ce que le compte prend un temps significatif en soi, que cela gonfle le temps de création de RDD? Des liens vers de bons exemples de synchronisation des choses? – JimLohse

Répondre

1

programme Spark est paresseux, il ne fonctionnera pas jusqu'à ce que vous appelez toutes les actions comme count sur le RDD. Vous pouvez trouver une liste d'une action commune dans Spark's document

// **** this line makes the driver wait until the RDD is done, right? 
long dummyLength = dropResultsN.count(); 

Oui, dans ce cas count forcer le dropResultsN à calculer, donc ça va prendre beaucoup de temps. Si vous faites une seconde count, il reviendra très vite puisque le RDD est déjà calculé et mis en cache.

+0

Si le RDD n'a jamais été calculé, je serais d'accord avec vous. Puisque j'appelle 'dropResultsN.foreach' cela le fait être calculé. Foreach est dans la liste que vous avez liée. Donc, à moitié faux, je pense? Aussi je pensais que je l'ai mis en cache ainsi: 'JavaRDD dropResultsN = donnéesSetN.map (s -> standin.call (s)). Persist (StorageLevel.MEMORY_ONLY());' ... ou suis-je persiste seulement dataSetN? wow vous avez peut-être juste soufflé mon esprit :) – JimLohse

+1

Mon erreur, je n'ai pas vu l'appel 'persistent'. Appeler 'persist' avec MEMORY_ONLY revient à appeler' cache'. – Dikei

+0

Voir le commentaire ci-dessus sur ma question, vous avez frappé sur l'idée que le premier .count() prend beaucoup de temps, si vous aviez d'autres observations sur des approches valides pour chronométrer Spark ou quelques liens/exemples, je pourrais accepter cette réponse, Sinon, je vais laisser aller ça quelques jours et voir qui se rapproche. Merci. – JimLohse