2017-10-18 20 views
0

Je développe actuellement une application de diffusion en continu Spark (v2.0.0) et je suis confrontée à des problèmes liés à la façon dont Spark semble allouer du travail à travers le cluster. Cette application est soumise à AWS EMR en mode client, de sorte qu'il existe un noeud de pilote et deux nœuds de travail. Voici une capture d'écran de Ganglions qui montre l'utilisation de la mémoire dans la dernière heure:Utilisation de la mémoire Spark concentrée sur Driver/Master

Ganglia Screenshot

Le nœud le plus à gauche est le nœud « maître » ou « pilote », et les deux autres sont des nœuds de travailleurs. Il y a des pics dans l'utilisation de la mémoire pour les trois nœuds qui correspondent aux charges de travail arrivant dans le flux, mais les pics ne sont pas égaux (même lorsqu'ils sont mis à l'échelle pour% d'utilisation de la mémoire). Quand une grande charge de travail arrive, le nœud du pilote semble être surchargé de travail, et le travail se bloque avec une erreur sur la mémoire:

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x000000053e980000, 674234368, 0) failed; error='Cannot allocate memory' (errno=12)

J'ai couru aussi dans ce: Exception in thread "streaming-job-executor-10" java.lang.OutOfMemoryError: Java heap space lorsque le maître manque de mémoire, ce qui est également source de confusion, car je crois savoir que le mode "client" n'utiliserait pas le nœud pilote/maître en tant qu'exécuteur.

détails PERTINENTES:

  • Comme mentionné précédemment, cette demande est présentée en mode client: spark-submit --deploy-mode client --master yarn ....
  • Nulle part dans le programme en cours d'exécution, je suis collect ou coalesce
  • Tout travail que je soupçonne d'être exécuté sur un seul nœud (jdbc lit principalement) est repartition « d après l'achèvement.
  • Il y a quelques très petits ensembles de données persist en mémoire.
  • 1 x caractéristiques du pilote: 4 noyaux, RAM 16 Go (en instance m4.xlarge)
  • 2 x Spéc travailleurs: 4 noyaux, 30.5GB RAM (en instance r3.xlarge)
  • j'ai essayé à la fois permettant Spark choisir taille de l'exécuteur/cœurs et en les spécifiant manuellement. Les deux cas se comportent de la même manière. (J'ai manuellement spécifié 6 exécuteurs, 1 noyau, RAM de 9GB)

Je suis certainement à une perte ici. Je ne suis pas sûr de ce qui se passe dans le code pour déclencher le conducteur de monopoliser la charge de travail comme ça.

Le seul suspect que je peux penser est un extrait de code semblable au suivant:

 val scoringAlgorithm = HelperFunctions.scoring(_: Row, batchTime) 
     val rawScored = dataToScore.map(scoringAlgorithm) 

Ici, une fonction est chargée à partir d'un objet statique, et utilisé pour cartographier sur la Dataset. Je crois comprendre que Spark sérialisera cette fonction à travers le cluster (réf: http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#passing-functions-to-spark). Cependant, je me trompe peut-être et c'est simplement en train de faire cette transformation sur le conducteur.

Si quelqu'un a un aperçu de ce problème, j'aimerais l'entendre!

Répondre

0

J'ai fini par résoudre ce problème. Voici comment je l'ai adressé:

J'ai fait une assertion incorrecte en déclarant le problème: il y avait une déclaration collect au début du programme Spark.

J'avais une transaction qui nécessitait collect() pour s'exécuter telle qu'elle a été conçue. Mon hypothèse était qu'appeler repartition(n) sur les données résultantes diviserait les données entre les exécuteurs du cluster. D'après ce que je peux dire, cette stratégie ne fonctionne pas. Une fois que j'ai réécrit cette ligne, Spark a commencé à se comporter comme je l'avais prévu et à faire de l'agriculture sur des noeuds de travail.

Mon conseil à toute âme perdue qui trébuche sur ce problème: ne pas collect à moins que ce soit la fin de votre programme Spark. Vous ne pouvez pas récupérer de cela. Trouvez un autre moyen d'effectuer votre tâche. (J'ai fini par passer d'une transaction SQL de la syntaxe where col in (,,,) à une jointure sur la base de données.)