2017-10-09 10 views
1

Il y a un SparkSQL qui va rejoindre 4 grandes tables (50 millions pour la première table 3 et 200 millions pour la dernière table) et faire un groupe par opération qui consomme 60 jours de Les données. et ce SQL prendra 2 heures à courir, au cours de laquelle, j'ai vérifié que Shuffle Write augmente considérablement, ce qui pourrait aller jusqu'à plus de 200GB. Par contre, lorsque je diminue la plage de consommation de 60 jours à 45 jours, il ne faut que 6,3 min pour fonctionner. J'ai vérifié sur le graphique DAG, pour 45 jours de données, il sort 1 milliard de données après le dernier sortMergeJoin.Optimisation lorsque l'écriture aléatoire est grande et la tâche d'étincelle devient super lente

Quelqu'un pourrait-il me donner une idée de la direction dans laquelle je pourrais optimiser ce scénario? Merci!

P.S.

Relatifs possible:

  • Spark.version = 2.1.0
  • spark.executor.instances = 20
  • spark.executor.memory = 6g
  • spark.yarn.executor .memoryOverhead = 5g

Répondre

0

Vous devez partitionner les données pour paralléliser correctement les travaux Assurez-vous que les partitions de l'interface Spark sont distribuées correctement .

L'implémentation par défaut d'une jointure dans Spark est une jointure hachée mélangée. La jointure de hachage shuffled garantit que les données de chaque partition contiendront les mêmes clés en partitionnant le second ensemble de données avec le même partitionneur par défaut que le premier, de sorte que les clés ayant la même valeur de hachage proviennent des deux partitions. Bien que cette approche fonctionne toujours, elle peut être plus coûteuse que nécessaire car elle nécessite un mélange. Le mélange peut être évité si:

  • Les deux RDD ont un partitionneur connu.

  • L'un des ensembles de données est assez petit pour tenir dans la mémoire, dans ce cas, nous pouvons faire un hachage de diffusion se joindre à

L'optimisation la plus simple est que si l'un des ensembles de données est assez petit pour tenir dans mémoire, il devrait être diffusé à chaque nœud de calcul. Ce cas d'utilisation est très courant car les données doivent être combinées avec des données latérales, comme un dictionnaire, tout le temps.

Généralement, les jointures sont lentes en raison de la réorganisation des données sur le réseau. Avec la jointure de diffusion, l'ensemble de données le plus petit est copié sur tous les nœuds de travail afin que le parallélisme d'origine du DataFrame plus grand soit conservé.

https://spark.apache.org/docs/latest/tuning.html

+0

hey man, merci pour la réponse détaillée. En fait, il y a une partition dans toutes les 3 premières tables (50 millions en taille pour les partitions de 60 jours) et pas de support de partition pour la dernière table (200 millions en taille pour 60 jours). et il y a de petites tables, que j'ai pu voir à partir du graphe DAG, qu'elles sont 'BroadCastJoin' comme prévu. Cependant, il y a plusieurs (6/7) 'SortMergeJoin' sur ces grandes tables pendant le comuting. C'est le phénomène. D'autres suggestions? – KAs

+0

Quel type de partition utilisez-vous? –

+0

partitionné sur 'day = aaaa-MM-jj', donc je ne récupère que les données des 60 derniers jours, qui sont 50 millions de taille totalement – KAs