Je développe actuellement une application de diffusion en continu Spark (v2.0.0) et je suis confrontée à des problèmes liés à la façon dont Spark semble allouer du travail à travers le cluster. Cette application est soumise à AWS EMR en mode client, de sorte qu'il existe un noeud de pilote et deux nœuds de travail. Voici une capture d'écran de Ganglions qui montre l'utilisation de la mémoire dans la dernière heure:Utilisation de la mémoire Spark concentrée sur Driver/Master
Le nœud le plus à gauche est le nœud « maître » ou « pilote », et les deux autres sont des nœuds de travailleurs. Il y a des pics dans l'utilisation de la mémoire pour les trois nœuds qui correspondent aux charges de travail arrivant dans le flux, mais les pics ne sont pas égaux (même lorsqu'ils sont mis à l'échelle pour% d'utilisation de la mémoire). Quand une grande charge de travail arrive, le nœud du pilote semble être surchargé de travail, et le travail se bloque avec une erreur sur la mémoire:
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x000000053e980000, 674234368, 0) failed; error='Cannot allocate memory' (errno=12)
J'ai couru aussi dans ce: Exception in thread "streaming-job-executor-10" java.lang.OutOfMemoryError: Java heap space
lorsque le maître manque de mémoire, ce qui est également source de confusion, car je crois savoir que le mode "client" n'utiliserait pas le nœud pilote/maître en tant qu'exécuteur.
détails PERTINENTES:
- Comme mentionné précédemment, cette demande est présentée en mode client:
spark-submit --deploy-mode client --master yarn ...
. - Nulle part dans le programme en cours d'exécution, je suis
collect
oucoalesce
- Tout travail que je soupçonne d'être exécuté sur un seul nœud (
jdbc
lit principalement) estrepartition
« d après l'achèvement. - Il y a quelques très petits ensembles de données
persist
en mémoire. - 1 x caractéristiques du pilote: 4 noyaux, RAM 16 Go (en instance m4.xlarge)
- 2 x Spéc travailleurs: 4 noyaux, 30.5GB RAM (en instance r3.xlarge)
- j'ai essayé à la fois permettant Spark choisir taille de l'exécuteur/cœurs et en les spécifiant manuellement. Les deux cas se comportent de la même manière. (J'ai manuellement spécifié 6 exécuteurs, 1 noyau, RAM de 9GB)
Je suis certainement à une perte ici. Je ne suis pas sûr de ce qui se passe dans le code pour déclencher le conducteur de monopoliser la charge de travail comme ça.
Le seul suspect que je peux penser est un extrait de code semblable au suivant:
val scoringAlgorithm = HelperFunctions.scoring(_: Row, batchTime)
val rawScored = dataToScore.map(scoringAlgorithm)
Ici, une fonction est chargée à partir d'un objet statique, et utilisé pour cartographier sur la Dataset
. Je crois comprendre que Spark sérialisera cette fonction à travers le cluster (réf: http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#passing-functions-to-spark). Cependant, je me trompe peut-être et c'est simplement en train de faire cette transformation sur le conducteur.
Si quelqu'un a un aperçu de ce problème, j'aimerais l'entendre!