J'ai un très gros fichier. Pour chaque bigram (deux mots) pour chaque ligne du fichier, je dois vérifier tout le fichier. Ce que j'ai fait dans Scala est évidemment faux, mais je n'ai aucune idée de comment le réparer.L'itération d'une liste énorme entraîne la dépassement de la limite de surcharge gc
Cette fonction retourne toutes les lignes du fichier
def allSentences() : ArrayList[String] = {
val res: ArrayList[String] = new ArrayList[String]()
val filename = "/path/test.txt"
val fstream: FileInputStream = new FileInputStream(filename)
val br: BufferedReader = new BufferedReader(new InputStreamReader(fstream))
var strLine: String = null
while ({strLine = br.readLine(); strLine!= null})
res.add(strLine)
br.close()
return res
}
Et voilà comment je l'utilise (environ 3 millions!):
val p = sc.textFile("file:///path/test.txt")
val result11 = p
.flatMap(line => biTuple(line))
.map(word => (word, 1))
.reduceByKey(_ + _)
val result10 = result11
.flatMap { tuple => allSentences().map(tuple._1 -> _) }
.map(tuple => (tuple._1, count10(tuple._1,tuple._2)))
.reduceByKey(_ + _)
Je suis presque sûr que le problème est ici .flatMap { tuple => allSentences().map(tuple._1 -> _) }
mais y a-t-il un autre moyen de le faire?
P.S: biTuple()
renvoie un ArrayList
de tous les bigrammes de la ligne. count10()
renvoie 1 si le premier mot de bigram existe en ligne mais pas le second. result11
est un RDD de tous bugrams et leurs chefs comme ("mot1 mot2", COUNT)
Ceci est la sortie d'erreur:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Notez que je SPARK_WORKER_MEMORY=90G
et SPARK_DRIVER_MEMORY=90G
.
Pouvez-vous ajouter l'erreur de sortie? Nous devons savoir si le GC est dans le travailleur ou dans le conducteur. –
Merci @ThiagoBaldim Je viens d'ajouter l'erreur de sortie. –