2017-10-12 2 views
0

J'écris un script Scala qui lit à partir d'une table, qui transforme les données et affiche le résultat à l'aide de Spark. J'utilise Spark 2.1.1.2 et Scala 2.11.8. Il y a une instance de dataframe que j'utilise deux fois dans le script (df2 dans le code ci-dessous.). Puisque les données sont calculées lorsqu'une action est appelée sur elles, pas quand elles sont déclarées, je prédis que cette trame de données doit être calculée deux fois. Je pensais que la persistance de cette base de données améliorerait la performance en pensant qu'elle serait calculée une fois (si elle persistait), au lieu de deux fois, si elle persistait. Cependant, l'exécution de script dure ~ 10 secondes de plus lorsque je persiste par rapport à quand je ne persiste pas. Je ne peux pas comprendre quelle est la raison de cela. Si quelqu'un a une idée, ce serait très apprécié.La persistance de DataFrame n'améliore pas les performances dans Spark

Ma ligne de commande de soumission est la suivante:

spark-submit --class TestQuery --master yarn --driver-memory 10G --executor-memory 10G --executor-cores 2 --num-executors 4 /home/bcp_data/test/target/TestQuery-1.0-SNAPSHOT.jar 

scénario Scala est ci-dessous:

val spark = SparkSession 
      .builder() 
      .appName("TestQuery") 
      .config("spark.sql.warehouse.dir", "file:/tmp/hsperfdata_hdfs/spark-warehouse/") 
      .enableHiveSupport() 
      .getOrCreate() 


val m = spark.sql("select id, startdate, enddate, status from members") 
val l = spark.sql("select mid, no, status, potential from log") 
val r = spark.sql("select mid, code from records") 

val df1 = m.filter(($"status".isin(1,2).and($"startdate" <= one_year_ago)).and((($"enddate" >= one_year_ago))) 

val df2 = df1.select($"id", $"code").join(l, "mid").filter(($"status".equalTo(1)).and($"potential".notEqual(9))).select($"no", $"id", $"code") 
df2.persist 

val df3 = df2.join(r, df2("id").equalTo(r("mid"))).filter($"code".isin("0001","0010","0015","0003","0012","0014","0032","0033")).groupBy($"code").agg(countDistinct($"no")) 


val fa = spark.sql("select mid, acode from actions") 
val fc = spark.sql("select dcode, fcode from params.codes") 

val df5 = fa.join(fc, fa("acode").startsWith(fc("dcode")), "left_outer").select($"mid", $"fcode") 
val df6 = df2.join(df5, df2("id").equalTo(df5("mid"))).groupBy($"code", $"fcode") 

println("count1: " + df3.count + " count2: " + df6.count) 
+2

Je crois que vous devez l'assigner à une nouvelle variable et l'utiliser, 'val df2_2 = df2.presist'. – Shaido

Répondre

1

en utilisant la mise en cache est le bon choix, mais votre déclaration

df2.persist 

n'a pas effet parce que vous n'utilisez pas la trame de données retournée. Je le crois

val df2 = df1.select($"id", $"code") 
.join(l, "mid") 
.filter(($"status".equalTo(1)).and($"potential".notEqual(9))) 
.select($"no", $"id", $"code") 
.persist