2017-07-09 2 views
1

J'ai un très gros fichier. Pour chaque bigram (deux mots) pour chaque ligne du fichier, je dois vérifier tout le fichier. Ce que j'ai fait dans Scala est évidemment faux, mais je n'ai aucune idée de comment le réparer.L'itération d'une liste énorme entraîne la dépassement de la limite de surcharge gc

Cette fonction retourne toutes les lignes du fichier

def allSentences() : ArrayList[String] = { 
     val res: ArrayList[String] = new ArrayList[String]() 
     val filename = "/path/test.txt" 
     val fstream: FileInputStream = new FileInputStream(filename) 
     val br: BufferedReader = new BufferedReader(new InputStreamReader(fstream)) 
     var strLine: String = null 
     while ({strLine = br.readLine(); strLine!= null}) 
     res.add(strLine) 
     br.close() 
     return res 
    } 

Et voilà comment je l'utilise (environ 3 millions!):

val p = sc.textFile("file:///path/test.txt") 
val result11 = p 
      .flatMap(line => biTuple(line)) 
      .map(word => (word, 1)) 
      .reduceByKey(_ + _) 

      val result10 = result11 
      .flatMap { tuple => allSentences().map(tuple._1 -> _) } 
      .map(tuple => (tuple._1, count10(tuple._1,tuple._2))) 
      .reduceByKey(_ + _) 

Je suis presque sûr que le problème est ici .flatMap { tuple => allSentences().map(tuple._1 -> _) } mais y a-t-il un autre moyen de le faire?

P.S: biTuple() renvoie un ArrayList de tous les bigrammes de la ligne. count10() renvoie 1 si le premier mot de bigram existe en ligne mais pas le second. result11 est un RDD de tous bugrams et leurs chefs comme ("mot1 mot2", COUNT)

Ceci est la sortie d'erreur:

java.lang.OutOfMemoryError: GC overhead limit exceeded 
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201) 
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198) 
     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
     at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
     at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
     at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152) 
     at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58) 
     at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83) 
     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

Notez que je SPARK_WORKER_MEMORY=90G et SPARK_DRIVER_MEMORY=90G.

+0

Pouvez-vous ajouter l'erreur de sortie? Nous devons savoir si le GC est dans le travailleur ou dans le conducteur. –

+0

Merci @ThiagoBaldim Je viens d'ajouter l'erreur de sortie. –

Répondre

2

Il ressemble à ce que vous essayez de faire est un produit cartésien de result11 avec p (votre liste originale des phrases), mais vous faites en ouvrant et en lisant le fichier en mémoire pour chaque entrée dans result11. Cela ne manquera pas de stresser le garbage collector, même si je ne peux pas dire avec certitude que c'est la cause du problème du GC. Spark a une méthode cartesian sur les RDD, et si mon interprétation de ce que vous essayez de faire est correcte, cela fonctionnera probablement mieux. (Il copiera cependant beaucoup de données sur le réseau.)

Vous pouvez également déterminer si la logique count10 doit être utilisée dans une opération de filtrage, ce qui réduit le nombre d'entrées qui doivent être traitées par la version finale reduceByKey .