Je dois lire les fichiers JSON à partir de répertoires sources distincts et créer des tables distinctes pour chaque répertoire. Je voudrais que cela soit fait en parallèle, mais Spark ne supporte pas les RDD imbriqués, donc actuellement il le fait de manière séquentielle. Existe-t-il une bonne solution pour lire/traiter ces répertoires en parallèle?Lire des répertoires séparés et créer des RDD séparés en parallèle via Scala Spark
Voici un extrait de l'échantillon de ce que je suis en train, mais il ne fonctionne pas en raison de RDD imbriquées:
def readJsonCreateTable(tableInfo: (String, String)) {
val df = spark
.read
.json(tableInfo._1)
df.createOrReplaceTempView(tableInfo._2)
}
val dirList = List(("/mnt/jsondir1", "temptable1"),
("/mnt/jsondir2", "temptable2"),
("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error
Changer la dernière ligne dirRDD.collect.foreach travaux, mais le travail n'est pas distribué et s'exécute séquentiellement, donc très lent.
Également essayé dirRDD.collect.par.foreach, mais cela exécute uniquement des threads parallèles sur le pilote et n'utilise pas tous les autres nœuds.
J'ai regardé foreachAsync, mais je ne suis pas sûr qu'asynchrone soit nécessairement parallèle dans cette situation en raison de l'imbrication.
Ceci utilise Spark 2.0 & Scala 2.11 via Databricks.
===========
Addition:
J'ai essayé foreachAsync qui retourne une FutureAction Spark, mais qui a donné une erreur aussi bien.
import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)
Et apparemment SimpleFutureAction n'est pas sérialisable
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction
Selon http://stackoverflow.com/questions/41426576/is-using-parallel-collections-encouraged-in-spark, il est déconseillé: > "Toute exécution parallèle à l'intérieur d'une tâche est complètement opaque pour le gestionnaire de ressources et par conséquent il ne peut pas allouer automatiquement les ressources requises " – TBhimdi
Cette question liée SO discute le parallélisme _en_une tâche, ce qui est différent de l'exécution de plusieurs jobs Spark en parallèle avec du code parallèle sur le pilote. –