Question: Est-ce une façon valide de tester le temps nécessaire pour construire un RDD?Apache Spark timing pourChaque opération sur JavaRDD
Je fais deux choses ici. L'approche de base est que nous avons M instances de ce que nous appelons une DropEvaluation, et N DropResults. Nous devons comparer chaque N DropResult à chacune des M DropEvaluations. Chaque N doit être vu par chaque M, pour nous donner M résultats à la fin. Si je n'utilise pas le .count() une fois que le RDD est construit, le pilote continue à la ligne de code suivante et dit qu'il ne faut presque pas de temps pour construire un RDD qui prend 30 minutes à construire.
Je veux juste m'assurer que je ne manque pas quelque chose, comme peut-être le .count() prend beaucoup de temps? Je suppose que pour chronométrer le .count() je devrais modifier la source de Spark?
M = 1000 ou 2000. N = 10^7.
C'est effectivement un problème cartésien - l'accumulateur a été choisi parce que nous devons écrire à chaque M en place. Je serais aussi moche de construire le RDD cartésien complet.
Nous construisons une liste d'accumulateurs M (ne peut pas faire un accumulateur de liste en Java non?). Ensuite, nous parcourons chacun des N dans un RDD avec un foreach.
Clarifier la question: La durée totale est mesurée correctement, je demande si le .count() sur le RDD oblige Spark à attendre que le RDD soit terminé avant de pouvoir effectuer un comptage. L'heure .count() est-elle significative?
Voici notre code:
// assume standin exists and does it's thing correctly
// this controls the final size of RDD, as we are not parallelizing something with an existing length
List<Integer> rangeN = IntStream.rangeClosed(simsLeft - blockSize + 1, simsLeft).boxed().collect(Collectors.toList());
// setup bogus array of size N for parallelize dataSetN to lead to dropResultsN
JavaRDD<Integer> dataSetN = context.parallelize(rangeN);
// setup timing to create N
long NCreationStartTime = System.nanoTime();
// this maps each integer element of RDD dataSetN to a "geneDropped" chromosome simulation, we need N of these:
JavaRDD<TholdDropResult> dropResultsN = dataSetN.map(s -> standin.call(s)).persist(StorageLevel.MEMORY_ONLY());
// **** this line makes the driver wait until the RDD is done, right?
long dummyLength = dropResultsN.count();
long NCreationNanoSeconds = System.nanoTime() - NCreationStartTime;
double NCreationSeconds = (double)NCreationNanoSeconds/1000000000.0;
double NCreationMinutes = NCreationSeconds/60.0;
logger.error("{} test sims remaining", simsLeft);
// now get the time for just the dropComparison (part of accumulable's add)
long startDropCompareTime = System.nanoTime();
// here we iterate through each accumulator in the list and compare all N elements of dropResultsN RDD to each M in turn, our .add() is a custom AccumulableParam
for (Accumulable<TholdDropTuple, TholdDropResult> dropEvalAccum : accumList) {
dropResultsN.foreach(new VoidFunction<TholdDropResult>() {
@Override
public void call(TholdDropResult dropResultFromN) throws Exception {
dropEvalAccum.add(dropResultFromN);
}
});
}
// all the dropComparisons for all N to all M for this blocksize are done, check the time...
long dropCompareNanoSeconds = System.nanoTime() - startDropCompareTime;
double dropCompareSeconds = (double)dropCompareNanoSeconds/1000000000.0;
double dropCompareMinutes = dropCompareSeconds/60.0;
// write lines to indicate timing section
// log and write to file the time for the N-creation
...
} // end for that goes through dropAccumList
Je noterais en réponse à la réponse de Dikei ci-dessous, ce que j'ai attribué de 1, il ne répond pas au problème de base, est-ce une façon valable de chronométrer exactement la création d'un RDD? Est-ce que le compte prend un temps significatif en soi, que cela gonfle le temps de création de RDD? Des liens vers de bons exemples de synchronisation des choses? – JimLohse