2017-10-03 4 views
0

Donc, fondamentalement, je veux que plusieurs tâches s'exécutent sur le même noeud/exécuteur pour lire des données à partir d'une mémoire partagée. Pour cela, j'ai besoin d'une fonction d'initialisation qui chargerait les données dans la mémoire avant le démarrage des tâches. Si Spark fournit un point d'ancrage pour un démarrage d'Executor, je peux placer ce code d'initialisation dans cette fonction de rappel, les tâches ne s'exécutant qu'une fois ce démarrage terminé. Donc, ma question est, est-ce que Spark fournit de tels hooks? Si non, avec quelle autre méthode, je peux réaliser la même chose?Y a-t-il un point d'ancrage pour Executor Startup dans Spark?

Répondre

0

Vous ne devez pas exécuter plusieurs instances de l'application pour pouvoir exécuter plusieurs tâches (par exemple, une instance d'application, une tâche Spark). Le même objet SparkSession peut être utilisé par plusieurs threads pour soumettre des tâches Spark en parallèle.

Il peut fonctionner comme ceci:

  • L'application démarre et exécute une fonction d'initialisation de charger des données partagées dans la mémoire. Dites, dans un objet de classe SharedData.
  • SparkSession est créé
  • Un pool de threads est créé, chaque thread a accès (SparkSession, shareddata) objets
  • Chaque thread crée tâche Spark à l'aide SparkSession partagée et shareddata objets.
  • En fonction de votre cas d'utilisation, l'application fait alors l'un des suivants:
    • attend toutes les tâches à accomplir et ferme puis Spark Session
    • attend dans une boucle pour les nouvelles demandes arrivent et crée une nouvelle Spark tâches si nécessaire en utilisant des threads du pool de threads.

SparkContext (sparkSession.sparkContext) est utile lorsque vous voulez faire par thread des choses comme l'attribution d'une tâche à l'aide setJobDescription ou l'attribution d'un groupe à la tâche à l'aide setJobGroup tâches afin connexes peuvent être annulées en utilisant simultanément cancelJobGroup. Vous pouvez également modifier la priorité pour les tâches qui utilisent le même pool, voir https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application pour plus de détails.

+1

tout est vrai, mais c'est impossible si "SharedData" n'est pas sérialisable, et pas trop efficace s'il est sérialisable mais grand.L'utilisation de "SharedData" créé sur l'application du pilote directement dans les transformations Spark signifierait qu'il serait sérialisé et envoyé à l'exécuteur ** par tâche **. –

+0

@TzachZohar bon point sur SharedData envoyé à l'exécuteur par tâche. Oui, l'utilisation d'une variable de diffusion pour SharedData permettrait d'éviter cela. L'exigence de sérialisation s'applique également aux deux variables dans les variables de fermeture et de diffusion, n'est-ce pas? –

+1

Oui, l'exigence de sérialisation s'applique également à la diffusion. Mais pas pour l'option d'initialisation "statique" que j'ai également mentionnée, qui (si je la lis bien) est ce que vise l'OP. –

0

solution Spark pour « données partagées » utilise diffusion - où vous chargez les données une fois dans l'application pilote et Spark sérialise et envoie à chacun des exécuteurs (une fois). Si une tâche utilise ces données, Spark s'assurera qu'elles sont présentes avant l'exécution de la tâche. Par exemple:

object MySparkTransformation { 

    def transform(rdd: RDD[String], sc: SparkContext): RDD[Int] = { 
    val mySharedData: Map[String, Int] = loadDataOnce() 
    val broadcast = sc.broadcast(mySharedData) 
    rdd.map(r => broadcast.value(r)) 
    } 
} 

Sinon, si vous voulez éviter de lire les données dans la mémoire du pilote et de l'envoyer vers les exécuteurs, vous pouvez utiliser lazy valeurs dans une Scala object pour créer une valeur qui obtient peuplée une fois par JVM, qui dans le cas de Spark est une fois par exécuteur. Par exemple:

// must be an object, otherwise will be serialized and sent from driver 
object MySharedResource { 
    lazy val mySharedData: Map[String, Int] = loadDataOnce() 
} 

// If you use mySharedData in a Spark transformation, 
// the "local" copy in each executor will be used: 
object MySparkTransformation { 
    def transform(rdd: RDD[String]): RDD[Int] = { 
    // Spark won't include MySharedResource.mySharedData in the 
    // serialized task sent from driver, since it's "static" 
    rdd.map(r => MySharedResource.mySharedData(r)) 
    } 
} 

Dans la pratique, vous aurez une copie de mySharedData dans chaque exécuteur testamentaire.

+0

Oui, je suis déjà au courant de la fonction de diffusion, mais la raison pour laquelle je ne veux pas l'utiliser est parce que mes tâches exécuteraient un fichier exécutable, le code compilé à travers un programme C. Je veux charger ces données directement à partir d'un fichier HDFS, et mettre les données dans une mémoire partagée, afin que ces tâches puissent l'utiliser. Bien sûr, je devrais aussi modifier le code C pour cela. La chose Val paresseux semble plus approprié à cet effet. Donc, je vais vérifier cela. – pythonic