1

J'ai 1.2GB des données orc sur S3 et je suis en train de faire ce qui suit avec le même:SnappyData: java.lang.OutOfMemoryError: GC limite de frais généraux dépassé

1) mettre en cache les données sur les clusters accrocheurs [snappydata 0,9]

2) Exécuter une requête groupby sur l'ensemble de données en mémoire cache

3) comparer la performance avec Spark 2.0.0

J'utilise un 64 GB/8 machine de base et la configuration de l'Snappy Les grappes sont les suivantes:

$ cat locators 
localhost 

$cat leads 
localhost -heap-size=4096m -spark.executor.cores=1 

$cat servers 
localhost -heap-size=6144m 
localhost -heap-size=6144m 
localhost -heap-size=6144m 
localhost -heap-size=6144m 
localhost -heap-size=6144m 
localhost -heap-size=6144m 

Maintenant, je l'ai écrit un petit script python, pour mettre en cache les données orc de S3 et exécuter un simple groupe par requête, qui est aussi la façon suivante:

from pyspark.sql.snappy import SnappyContext 
from pyspark import SparkContext,SparkConf 
conf = SparkConf().setAppName('snappy_sample') 
sc = SparkContext(conf=conf) 
sqlContext = SnappyContext(sc) 

sqlContext.sql("CREATE EXTERNAL TABLE if not exists my_schema.my_table using orc options(path 's3a://access_key:[email protected]_name/path')") 
sqlContext.cacheTable("my_schema.my_table") 

out = sqlContext.sql("select * from my_schema.my_table where (WeekId = '1') order by sum_viewcount desc limit 25") 
out.collect() 

Le script ci-dessus est exécutée à l'aide la commande suivante:

spark-submit --master local[*] snappy_sample.py 

et je reçois l'erreur suivante:

17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_5 in memory! (computed 21.2 MB so far) 
17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_0 in memory! (computed 21.2 MB so far) 
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_5 to disk instead. 
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_0 to disk instead. 
17/10/04 02:50:47 WARN storage.BlockManager: Putting block rdd_2_2 failed due to an exception 
17/10/04 02:50:47 WARN storage.BlockManager: Block rdd_2_2 could not be removed as it was not found on disk or in memory 
17/10/04 02:50:47 ERROR executor.Executor: Exception in task 2.0 in stage 0.0 (TID 2) 
java.lang.OutOfMemoryError: GC overhead limit exceeded 


at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) 
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) 
    at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96) 
    at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98) 
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
17/10/04 02:50:47 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-2,5,main] 
java.lang.OutOfMemoryError: GC overhead limit exceeded 
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) 
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) 
    at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96) 
    at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98) 
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
17/10/04 02:50:48 INFO snappystore: VM is exiting - shutting down distributed system 

En plus de l'erreur ci-dessus, comment vérifier si les données ont été mises en cache dans un cluster Snappy?

Répondre

2

1) Premièrement, il ne semble pas que vous vous connectiez au cluster SnappyData avec le script python, mais que vous l'exécutiez en mode local. Dans ce cas, la JVM lancée par le script python échoue avec OOM comme prévu. Lorsque vous utilisez python se connecter à SnappyData groupe en mode « smart connector »:

spark-submit --master local[*] --conf snappydata.connection=locator:1527 snappy_sample.py 

L'hôte: port ci-dessus est l'hôte de localisation et le port sur lequel le serveur est en cours d'exécution Thrift (1527 par défaut).

2) Deuxièmement, l'exemple que vous avez ne fera que mettre en cache en utilisant Spark. Si vous souhaitez utiliser SnappyData, charge dans une colonne de table:

from pyspark.sql.snappy import SnappySession 
from pyspark import SparkContext,SparkConf 
conf = SparkConf().setAppName('snappy_sample') 
sc = SparkContext(conf=conf) 
session = SnappySession(sc) 

session.sql("CREATE EXTERNAL TABLE if not exists my_table using orc options(path 's3a://access_key:[email protected]_name/path')") 
session.table("my_table").write.format("column").saveAsTable("my_column_table") 

out = session.sql("select * from my_column_table where (WeekId = '1') order by sum_viewcount desc limit 25") 
out.collect() 

Notez également l'utilisation de « SnappySession » plutôt que dans un contexte qui est dépréciée depuis Spark 2.0.x. Lorsque vous effectuez une comparaison avec la mise en cache de Spark, vous pouvez utiliser "cacheTable" dans un script distinct et l'exécuter sur Spark en amont. Notez que "cacheTable" fera paresseusement la mise en cache, ce qui signifie que la première requête effectuera la mise en cache réelle, donc la première exécution de la requête sera très lente avec la mise en cache Spark mais les suivantes seront plus rapides.

3) Mise à jour vers la version 1.0 qui comporte de nombreuses améliorations au lieu d'utiliser la version 0.9. Vous devrez également ajouter hadoop-aws-2.7.3 et aws-java-sdk-1.7.4 au "-classpath" dans conf/leads et conf/servers (ou dans le répertoire jars du produit) avant de lancer le cluster.