2017-03-06 1 views
2

Je cours emr-5.2.0 et j'ai un an de données stockées dans S3 sous forme de partition partitionnée par jour. Lorsque j'interroge pendant un mois, je m'attends à ce que Spark ne charge qu'un mois de données en mémoire. Cependant, l'utilisation de ma mémoire de cluster ressemble à ce que je charge les années complètes 1,7 To de données.Performance des requêtes Spark & ​​Parquet

Spark Memory Usage

Je suppose que je peux charger le lac de données complet comme celui-ci

val lakeDF = spark.sqlContext.read.parquet("s3://mybucket/mylake.parquet") 
lakeDF.cache() 
lakeDF.registerTempTable("sightings") 

Et que Spark utiliserais les dates dans la requête pour sélectionner uniquement les partitions qui correspondent là où filtre .

val leftDF = spark.sql("SELECT * FROM sightings WHERE DATE(day) BETWEEN "2016-01-09" AND "2016-01-10"") 
val audienceDF = leftDF.join(ghDF, Seq("gh9")) 
audienceDF.select(approxCountDistinct("device_id", red = 0.01).as("distinct"), sum("requests").as("avails")).show() 

Je suis curieux de savoir si la coulée de la partition que la date est à l'origine de cette question? J'ai aussi testé avec Athena/PrestoDB sur le même jeu de données et il est très clair que seulement quelques gigaoctets de données sont en cours d'analyse.

Y at-il un moyen pour Spark de me dire combien de données vont être chargées avant de soumettre une requête?

+1

avez-vous essayé de supprimer l'instruction 'lakeDF.cache()'? Vous avez aussi froid étudier le plan physique ex donné par 'df.explain()' à la fin de vos transformations (avant d'appeler une action), peut-être que cela vous donne un indice –

+0

Oui 'lakeDF.cache()' était le problème. – jspooner

Répondre

1

Le problème a été causé par l'appel lakeDF.cache() avant que le filtre a été appliqué.