2017-05-05 3 views
3

Je souhaite utiliser l'étincelle pour lire un fichier XML volumineux (51 Go) (sur un disque dur externe) dans une image (à l'aide de spark-xml plugin), effectuer un mappage/filtrage simple, le réorganiser et ensuite l'écrire sur le disque, sous la forme d'un fichier CSV.Erreur mémoire insuffisante lors de la lecture d'un fichier volumineux dans Spark 2.1.0

Mais je reçois toujours un java.lang.OutOfMemoryError: Java heap space, peu importe comment je tweak cela.

Je veux comprendre pourquoi ne pas augmenter le nombre de partitions arrêter l'erreur OOM

devrait-il pas diviser la tâche en plusieurs parties afin que chaque partie est plus petite et ne provoque pas la mémoire problèmes?

(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)

choses que j'ai essayé:

  • repartitioning/coalescent à (5.000 et 10.000 partitions) la trame de données lors de la lecture et l'écriture (valeur initiale est 1,604)
  • en utilisant un plus petit nombre de exécuteurs (6, 4, même avec exécuteurs je reçois erreur OOM!)
  • diminuer la taille des fichiers séparés (par défaut ressemble à c'est 33MB)
  • tonnes give de RAM (je l'ai)
  • augmentation spark.memory.fraction à 0,8 (valeur par défaut est de 0,6)
  • diminution spark.memory.storageFraction à 0,2 (valeur par défaut est de 0,5)
  • mis spark.default.parallelism à 30 et 40 (valeur par défaut est 8 pour moi)
  • mis spark.files.maxPartitionBytes à 64M (valeur par défaut est 128M)

Tout mon code est ici (notez que je ne suis pas quoi que ce soit la mise en cache):

val df: DataFrame = spark.sqlContext.read 
    .option("mode", "DROPMALFORMED") 
    .format("com.databricks.spark.xml") 
    .schema(customSchema) // defined previously 
    .option("rowTag", "row") 
    .load(s"$pathToInputXML") 

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 
// prints 1604 

// i pass `numPartitions` as cli arguments 
val df2 = df.coalesce(numPartitions) 

// filter and select only the cols i'm interested in 
val dsout = df2 
    .where(df2.col("_TypeId") === "1") 
    .select(
    df("_Id").as("id"), 
    df("_Title").as("title"), 
    df("_Body").as("body"), 
).as[Post] 

// regexes to clean the text 
val tagPat = "<[^>]+>".r 
val angularBracketsPat = "><|>|<" 
val whitespacePat = """\s+""".r 


// more mapping 
dsout 
.map{ 
    case Post(id,title,body,tags) => 

    val body1 = tagPat.replaceAllIn(body,"") 
    val body2 = whitespacePat.replaceAllIn(body1," ") 

    Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(",")) 

} 
.orderBy(rand(SEED)) // random sort 
.write // write it back to disk 
.option("quoteAll", true) 
.mode(SaveMode.Overwrite) 
.csv(output) 

NOTES

  • la fente d'entrée sont vraiment petites (33MB seulement), alors pourquoi ne puis-je avoir 8 threads chaque traitement une fractionnés? il ne devrait vraiment pas souffler ma mémoire (je l'ai Sé

MISE À JOUR J'ai écrit une version plus courte du code qui lit tout le fichier, puis forEachPartition (println).

j'obtenir la même erreur OOM:

val df: DataFrame = spark.sqlContext.read 
    .option("mode", "DROPMALFORMED") 
    .format("com.databricks.spark.xml") 
    .schema(customSchema) 
    .option("rowTag", "row") 
    .load(s"$pathToInputXML") 
    .repartition(numPartitions) 

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 

df 
    .where(df.col("_PostTypeId") === "1") 
    .select(
    df("_Id").as("id"), 
    df("_Title").as("title"), 
    df("_Body").as("body"), 
    df("_Tags").as("tags") 
).as[Post] 
    .map { 
    case Post(id, title, body, tags) => 
     Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase)) 
    } 
    .foreachPartition { rdd => 
    if (rdd.nonEmpty) { 
     println(s"HI! I'm an RDD and I have ${rdd.size} elements!") 
    } 
    } 

PS: J'utilise étincelle v 2.1.0 Ma machine a 8 cœurs et 16 Go de RAM..

+0

Avez-vous inspecté la taille des partitions créées dans l'interface utilisateur de Spark? – Khozzy

+0

@Khozzy C'est ce que j'ai eu quand j'ai lancé l'application avec 1604 partitions pour le DF de lecture et 50 partitions pour le DF à écrire: [screenshot-spark-ui] (http://i.imgur.com/a5LjEmc. png) –

+0

Oui, mais regardez dans l'interface utilisateur lors de l'exécution du travail. Vous trouverez combien de temps chaque tâche est exécutée et comment votre CPU est utilisée (il y a peut-être des traînards). – Khozzy

Répondre

0

Parce que vous stockez votre RDD deux fois et Votre logique doit être changement comme celui-ci ou le filtre avec SparkSql

val df: DataFrame = SparkFactory.spark.read 
     .option("mode", "DROPMALFORMED") 
     .format("com.databricks.spark.xml") 
     .schema(customSchema) // defined previously 
     .option("rowTag", "row") 
     .load(s"$pathToInputXML") 
     .coalesce(numPartitions) 

    println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 
    // prints 1604 


    // regexes to clean the text 
    val tagPat = "<[^>]+>".r 
    val angularBracketsPat = "><|>|<" 
    val whitespacePat = """\s+""".r 

    // filter and select only the cols i'm interested in 
    df 
     .where(df.col("_TypeId") === "1") 
     .select(
     df("_Id").as("id"), 
     df("_Title").as("title"), 
     df("_Body").as("body"), 
    ).as[Post] 
     .map{ 
     case Post(id,title,body,tags) => 

      val body1 = tagPat.replaceAllIn(body,"") 
      val body2 = whitespacePat.replaceAllIn(body1," ") 

      Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(",")) 

     } 
     .orderBy(rand(SEED)) // random sort 
     .write // write it back to disk 
     .option("quoteAll", true) 
     .mode(SaveMode.Overwrite) 
     .csv(output) 
+0

Faire tout un seul DF n'a pas vraiment aidé .. J'ai encore 'java.lang.OutOfMemoryError: Java heap space' –

-2

Vous pouvez changer la taille du tas en ajoutant ce qui suit dans votre variable d'environnement:

  1. Environnement nom de la variable: _JAVA_OPTIONS
  2. Environnement variable Valeur: -Xmx512M -Xms512m