2017-02-10 7 views
0

L'élagage de partition est-il activé pour les TempTables mis en cache dans l'étincelle Apache? Si oui, comment puis-je le configurer? Mes données sont un ensemble de lectures de capteur dans différentes installations, une ligne contient nomInstallation, étiquette, horodatage et valeur.Élagage de partition Spark SQL pour une table mise en cache

J'ai écrit les données au format de parquet en utilisant les commandes suivantes:

rdd.toDF("installationName", "tag", "timestamp", "value") 
    .repartition($"installationName", $"tag") 
    .write.partitionBy("installationName","tag").mode("overwrite").parquet(config.output) 

J'ai lu ces données en utilisant la commande suivante dans une table SQL en utilisant Spark HiveContext:

val parquet = hc.read.parquet("/path_to_table/tablename") 
parquet.registerTempTable("tablename") 

Maintenant, si je exécuter une requête SQL sur cette table, il effectue l'élagage de partition comme prévu:

hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'") 

Et la requête prend environ 8 secondes. Mais si je en cache la table en mémoire, puis effectuer la même requête, il faut toujours environ 50 secondes:

hc.sql("CACHE TABLE tablename") 
hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'") 

J'utilise actuellement Spark 1.6.1.

+0

Salut, merci pour votre commentaire. En effet, je fais une opération de répartition avant d'écrire des données sur le parquet. J'ai également testé la requête ci-dessus avec le repartitionnement et il est plus efficace avec un temps de requête de 20 s, mais il est quand même plus lent que la lecture des fichiers parquet sans mise en cache. Mon but est d'éviter d'écrire sur des fichiers parquet. Pourriez-vous fournir une source? Comment savez-vous que l'élagage de partition n'est pas pris en charge après la mise en cache? Si vous écrivez une réponse ici, je pourrais l'accepter. –

+0

Correction, mise en cache en mémoire réduit le temps de requête à moins de 1 seconde, ce qui est bien sûr déjà acceptable. Je me demande, si elle évolue: ce n'est qu'une partie de mon Dasta, j'ai plus de 200 fois plus et continuellement en croissance, donc plus je possède de données, plus je prends le temps de parcourir toutes les partitions, . –

Répondre

0

La raison pour laquelle cela arrive est due à la façon dont le cache fonctionne en étincelle.

Lorsque vous appelez une sorte de processus à une trame de données, RDD ou DataSet l'exécution a un plan voir ci-dessous:

val df = sc.parallelize(1 to 10000).toDF("line") 
df.withColumn("new_line", col("line") * 10).queryExecution 

La commande queryExecution retour à vous le plan. Voir le plan logique ci-dessous du code:

== Parsed Logical Plan == 
Project [*,('line * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Optimized Logical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at 

== Physical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- Scan ExistingRDD[_1#4] 

Dans ce cas, vous pouvez voir tout le processus que votre code va faire. Lorsque vous appelez une fonction cache comme ceci:

df.withColumn("new_line", col("line") * 10).cache().queryExecution 

Le résultat sera comme ceci:

== Parsed Logical Plan == 
'Project [*,('line * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Optimized Logical Plan == 
InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#8], None 

== Physical Plan == 
InMemoryColumnarTableScan [line#5,new_line#8], InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Pro... 

Cette exécution retourne à vous l'exécution d'un InMemoryRelation dans le plan logique optmized, cela permettra d'économiser structure de données dans votre mémoire, ou si vos données sont vraiment grandes, elles se déverseront sur le disque.

Le temps d'enregistrement dans votre cluster prend du temps, il sera un peu lent lors de la première exécution, mais si vous avez besoin d'accéder à nouveau aux mêmes données dans un autre endroit, le DF ou le RDD sera sauvegardé. Spark ne demandera plus l'exécution.

+0

Merci pour votre réponse! Dans la mise en cache de la table spark est une opération ardue, ce qui signifie que les données sont déjà mises en cache lorsque j'exécute ma requête pour la première fois. La mise en cache des données prend actuellement 500 secondes ici, et en effet après le cache, la performance de la requête est améliorée, il ne faut plus que 50 secondes pour parcourir toutes les partitions. Peu importe combien de fois j'exécute la requête, la performance est toujours à peu près la même. Votre réponse ne répond pas à ma question, qui concerne l'élagage de partition après le cahking. –