2017-01-11 2 views
8

Permettez-moi aider à clarifier au sujet aléatoire en profondeur et comment Spark utilise gestionnaires shuffle. Je signale quelques ressources très utiles:gestionnaires de lecture aléatoire Comprendre Spark

https://trongkhoanguyenblog.wordpress.com/

https://0x0fff.com/spark-architecture-shuffle/

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md

En les lisant, je compris qu'il existe différents gestionnaires de lecture aléatoire. Je veux mettre l'accent sur deux d'entre eux: hash manager et sort manager (qui est le gestionnaire par défaut).

Pour exposer ma question, je veux commencer par une transformation très commune:

val rdd = reduceByKey(_ + _) 

Cette transformation provoque l'agrégation carte côté puis Shuffle pour amener tous les mêmes clés dans la même partition .

Mes questions sont les suivantes:

  • TOTALISATION Plan-Side implémentés à l'aide d'une transformation interne de mapPartition et l'agrégation ainsi toutes les mêmes touches en utilisant la fonction de combinaison ou est-il mis en œuvre avec un AppendOnlyMap ou ExternalAppendOnlyMap?

  • Si AppendOnlyMap ou ExternalAppendOnlyMap cartes sont utilisées pour l'agrégation, ils sont également utilisés pour réduire l'agrégation côté qui se passe dans le ResultTask? Quel est le but de ces deux types de cartes (AppendOnlyMap ou ExternalAppendOnlyMap)?

  • Les adresses AppendOnlyMap ou ExternalAppendOnlyMap sont-elles utilisées par tous les gestionnaires de shuffle ou uniquement par sortManager?

  • Je l'ai lu après AppendOnlyMap ou ExternalAppendOnlyMap sont pleins, sont déversée dans un fichier, comment exactement ces étapes se produisent? En utilisant le gestionnaire de shuffle de tri, nous utilisons un appendOnlyMap pour agréger et combiner des enregistrements de partition, n'est-ce pas? Ensuite, lorsque la mémoire d'exécution est pleine, nous commençons à trier la carte, à la déverser sur le disque puis à nettoyer la carte, ma question est la suivante: quelle est la différence entre une sauvegarde sur disque et une écriture aléatoire? Ils consistent essentiellement à créer un fichier sur un système de fichiers local, mais ils sont traités différemment, les enregistrements d'écriture aléatoire, ne sont pas placés dans appendOnlyMap. Est-ce que peut expliquer en détail ce qui se passe quand reduceByKey est en cours d'exécution, en m'expliquant toutes les étapes nécessaires pour accomplir cela? Comme par exemple toutes les étapes pour l'agrégation côté carte, le brassage et ainsi de suite.

+0

@JacekLaskowski Vous êtes un expert en shuffling Spark;) –

+0

également @ 0x0FFF est un expert en shuffling Spark – Giorgio

Répondre

1

Il suit la description de reduceByKey étape par étape:

  1. reduceByKey appels combineByKeyWithTag, avec combinateur d'identité et de la valeur de fusion identique et créer de la valeur
  2. combineByKeyWithClassTag crée un Aggregator et retourne ShuffledRDD. Les agrégations latérales "map" et "reduce" utilisent un mécanisme interne et n'utilisent pas mapPartitions.
  3. Agregator utilise ExternalAppendOnlyMap pour les combineValuesByKey ("carte de réduction de côté") et combineCombinersByKey ("réduire la réduction de côté")
  4. Les deux méthodes utilisent ExternalAppendOnlyMap.insertAllMethod
  5. ExternalAppendOnlyMapkeeps track of spilled parts et la carte actuelle en mémoire (SizeTrackingAppendOnlyMap)
  6. insertAll met à jour la carte en mémoire et checks on insert si la taille estimée de la carte actuelle dépasse le seuil. Il utilise la méthode héritée Spillable.maybeSpill. Si le seuil est dépassé cette méthode appelle spill as a side effect et insertAll propres SizeTrackingAppendOnlyMap
  7. initialise
  8. spill appels spillMemoryIteratorToDisk which gets DiskBlockObjectWriter object du gestionnaire de blocs.

insertAll étapes sont appliquées à la fois la carte et réduire les concentrations latérales avec des fonctions correspondant Aggregator à l'étage de brassage entre les deux.

Au Spark 2.0, il est seul gestionnaire basé genre: SPARK-14667