2017-05-24 1 views
1

J'ai une grande dataframe qui a été mis en cache commemoyen efficace de se joindre à une trame de données d'allumage en cache avec d'autres et cache à nouveau

val largeDf = someLargeDataframe.cache 

Maintenant, je dois l'union avec un minuscule et mis en mémoire cache à nouveau

val tinyDf = someTinyDataframe.cache 
val newDataframe = largeDf.union(tinyDf).cached 
tinyDf.unpersist() 
largeDf.unpersist() 

Il est très inefficace car il a besoin de re-mettre en cache toutes les données à nouveau. Existe-t-il un moyen efficace d'ajouter une petite quantité de données à une grande base de données en cache?


Après avoir lu l'explication de Teodors, je sais que je ne peux pas unpersist l'ancien dataframe avant de faire une action sur mon nouveau dataframe. Mais que faire si j'ai besoin de faire quelque chose comme ça?

def myProcess(df1: Dataframe, df2: Dataframe): Dataframe{ 
    val df1_trans = df1.map(....).cache 
    val df2_trans = df2.map(....).cache 

    doSomeAction(df1_trans, df2_trans) 

    val finalDf = df1_trans.union(df2_trans).map(....).cache 
    // df1_trans.unpersist() 
    // df2_trans.unpersist() 
    finalDf 
} 

Je veux que mes df1_trans & df2_trans à être mises en cache pour améliorer les performances dans la fonction, car ils seront appelés plus d'une fois, mais le dataframe je dois retourner à la fin est également construit par df1_trans & df2_trans, si je ne peux pas les décompresser avant de quitter la fonction, je ne peux jamais trouver d'autre endroit pour faire cela, cependant, si je les désaspire, mon FinalDf ne bénéficiera pas du cache.

Que puis-je faire dans cette situation? Merci!

+0

Une union est une opération relativement peu coûteuse, car vous n'avez pas besoin de mélanger les données à travers le cluster. Je pense que la meilleure solution est de garder le largeDf en cache et ne pas conserver le newDataframe. –

+0

Mais si j'ai besoin d'utiliser newDataframe beaucoup de temps. Si je ne l'ai pas persisté, il a besoin de faire l'union encore et encore. En fait, dans mon cas, c'est une boucle qui va union nouveau tinyDataFrame dépendra du temps encore et encore. –

+1

Oui, mais l'utilisation d'une union sur une structure de données modifie simplement une référence sans effectuer de calculs ni de shuffles. Si vous mettez en cache/persistez les deux données d'entrée, cela devrait être la solution la plus performante. Je propose que vous exécutiez les repères dans Teodors répondre et voir par vous-même. L'alternative 2 devrait être la plus rapide. –

Répondre

1
val largeDf = someLargeDataframe.cache 
val tinyDf = someTinyDataframe.cache 
val newDataframe = largeDf.union(tinyDf).cache 

Si vous appelez unpersist() maintenant avant toute action qui passe par tous vos dataframe de largeDf vous ne bénéficierez pas de la mise en cache des deux dataframes.

tinyDf.unpersist() 
largeDf.unpersist() 

Je ne vous soucier de la mise en cache du dataframe tant que filles fusionnées les deux autres dataframes sont déjà mises en cache, vous ne verrez probablement une baisse de performance.

Benchmark les éléments suivants:

========= now? ============ 
val largeDf = someLargeDataframe.cache 
val tinyDf = someTinyDataframe.cache 
val newDataframe = largeDf.union(tinyDf).cache 
tinyDf.unpersist() 
largeDf.unpersist() 
#force evaluation 
newDataframe.count() 

========= alternative 1 ============ 
val largeDf = someLargeDataframe.cache 
val tinyDf = someTinyDataframe.cache 
val newDataframe = largeDf.union(tinyDf).cache 

#force evaluation 
newDataframe.count() 
tinyDf.unpersist() 
largeDf.unpersist() 

======== alternative 2 ============== 
val largeDf = someLargeDataframe.cache 
val tinyDf = someTinyDataframe.cache 
val newDataframe = largeDf.union(tinyDf) 

newDataframe.count() 


======== alternative 3 ============== 
val largeDf = someLargeDataframe 
val tinyDf = someTinyDataframe 
val newDataframe = largeDf.union(tinyDf).cache 

#force evaluation 
newDataframe.count() 
+0

Merci! Maintenant, je sais ce qui se passe vraiment quand j'appelle unpersiste, mais je modifie ma question et je ne sais toujours pas quelle est la bonne façon de résoudre le dilemme –

+0

J'ai eu le même problème il y a quelque temps, mais je n'ai pas trouvé de solution (je n'ai pas trop étudié les alternatives.) J'ai fini par utiliser persistant avec un niveau de stockage différent: Ex: 'val df1_trans = df1.map (....). persistent (StorageLevel.MEMORY_AND_DISK_SER) 'ces données finiront par être évacuées sur le disque si vous n'avez plus de RAM, mais utiliseront également moins d'espace RAM car il sera sérialisé. [la mode la moins utilisée récemment] (http://spark.apache.org/docs/latest/programming-guide.html#removing-data) –

1

est-il un moyen efficace d'ajouter une petite quantité de données à une grande dataframe mises en cache?

Je ne pense pas que toute autre opération pourrait battre union. Je pensais que la fonction broadcast pourrait aider ici, mais après avoir regardé le plan d'exécution, je ne le pense plus.

Cela m'a conduit à écrire la réponse. Si vous voulez savoir si votre cache a un effet sur une requête, il explain:

expliquer(): Unité Imprime le plan physique à la console à des fins de débogage.

Avec l'exemple suivant, broadcast n'affecte pas union (qui est maintenant pas surprenant étant donné qu'il est un indice pour les jointures et les autres opérateurs physiques ignorent simplement).

scala> left.union(broadcast(right)).explain 
== Physical Plan == 
Union 
:- *Range (0, 4, step=1, splits=8) 
+- *Range (0, 3, step=1, splits=8) 

Il est également intéressant d'utiliser Détails pour Query sous l'onglet SQL.

enter image description here

+0

Merci, mais ma question concerne le timing de la mise en cache et de l'unissist (), pas comment mettre en œuvre l'union. –

0

Je suis juste essayer de répondre à la partie de la question qui est toujours sans réponse ici. Il y a une façon de dénuder les df1_trans et df2_trans de votre fonction myProcess(). Vous pouvez créer une vue temporaire des DataFrames en utilisant df1_trans.createOrReplaceTempView(viewName) et df2_trans.createOrReplaceTempView(viewName). Voir le Dataset API pour référence. Ensuite, après avoir effectué une action sur ces deux DataFrames et que vous êtes prêt à décompresser, vous pouvez le faire comme ceci: sqlContext.table(viewName).unpersist, où viewName est le nom que vous avez utilisé pour créer la vue temporaire.

Espérons que cela aide!