2017-01-12 2 views
1

Je dois lire les fichiers JSON à partir de répertoires sources distincts et créer des tables distinctes pour chaque répertoire. Je voudrais que cela soit fait en parallèle, mais Spark ne supporte pas les RDD imbriqués, donc actuellement il le fait de manière séquentielle. Existe-t-il une bonne solution pour lire/traiter ces répertoires en parallèle?Lire des répertoires séparés et créer des RDD séparés en parallèle via Scala Spark

Voici un extrait de l'échantillon de ce que je suis en train, mais il ne fonctionne pas en raison de RDD imbriquées:

def readJsonCreateTable(tableInfo: (String, String)) { 
    val df = spark 
      .read 
      .json(tableInfo._1) 
    df.createOrReplaceTempView(tableInfo._2) 
} 

val dirList = List(("/mnt/jsondir1", "temptable1"), 
        ("/mnt/jsondir2", "temptable2"), 
        ("/mnt/jsondir3", "temptable3")) 
val dirRDD = sc.parallelize(dirList) 
dirRDD.foreach(readJsonCreateTable) // Nested RDD error 

Changer la dernière ligne dirRDD.collect.foreach travaux, mais le travail n'est pas distribué et s'exécute séquentiellement, donc très lent.

Également essayé dirRDD.collect.par.foreach, mais cela exécute uniquement des threads parallèles sur le pilote et n'utilise pas tous les autres nœuds.

J'ai regardé foreachAsync, mais je ne suis pas sûr qu'asynchrone soit nécessairement parallèle dans cette situation en raison de l'imbrication.

Ceci utilise Spark 2.0 & Scala 2.11 via Databricks.

===========
Addition:

J'ai essayé foreachAsync qui retourne une FutureAction Spark, mais qui a donné une erreur aussi bien.

import scala.concurrent._ 
import scala.concurrent.duration._ 
. 
. 
. 
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable) 
Await.result(dirFuture, 1 second) 

Et apparemment SimpleFutureAction n'est pas sérialisable

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction 

Répondre

1

Vous pouvez utiliser Scala parallel collections ou futures pour paralléliser le code en cours d'exécution sur le pilote Spark. Le pilote Spark est thread-safe donc cela fonctionnera comme prévu.

Voici un exemple en utilisant des collections parallèles avec fil-pools explicitement spécifiés:

val dirList = List(
    ("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"), 
    ("dbfs:/databricks-datasets/amazon/users/", "users") 
).par 

val pool = new scala.concurrent.forkjoin.ForkJoinPool(2) 

try { 
    dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool) 
    dirList.foreach { case (filename, tableName) => 
    println(s"Starting to create table for $tableName") 
    val df = spark.read.json(filename) 
    println(s"Done creating table for $tableName") 
    df.createOrReplaceTempView(tableName) 
    } 
} finally { 
    pool.shutdown() // to prevent thread leaks. 
    // You could also re-use thread pools across collections. 
} 

Quand je courais cela dans Databricks, elle a produit la sortie du journal en continu indiquant que les deux tables étaient chargées en parallèle:

Starting to create table for departuredelays 
Starting to create table for users 
Done creating table for departuredelays 
Done creating table for users 

Ce parallélisme était également pris en compte dans la vue chronologique des travaux de l'interface utilisateur de Spark.

Bien sûr, vous pouvez également utiliser des threads Java pour cela. En un mot, il est prudent d'appeler les API du pilote Spark à partir de plusieurs threads, choisissez votre framework de concurrence JVM de votre choix et lancez des appels parallèles au pilote Spark pour créer vos tables.

+0

Selon http://stackoverflow.com/questions/41426576/is-using-parallel-collections-encouraged-in-spark, il est déconseillé: > "Toute exécution parallèle à l'intérieur d'une tâche est complètement opaque pour le gestionnaire de ressources et par conséquent il ne peut pas allouer automatiquement les ressources requises " – TBhimdi

+1

Cette question liée SO discute le parallélisme _en_une tâche, ce qui est différent de l'exécution de plusieurs jobs Spark en parallèle avec du code parallèle sur le pilote. –