2017-07-26 5 views
0

J'ai un fichier comme ceci:étincelle scala RDD/dataframe enregistrer filtré et rejeté les données

id,insert_date,name 
==================== 
1,20170620,abc 
2,20170620,xyz 
1,20170621,pqr 
3,20170624,huy 
,20170624,stu 

J'ai besoin de filtrer les enregistrements avec ids nuls. Aussi, si un identificateur est répété, j'ai besoin de sélectionner l'enregistrement avec max (insert_date).

que je fais comme ceci:

val myDF = sqlContext.read.format("com.databricks.spark.csv").option("delimiter",",") 
    .schema(myschema) 
    .load(mypath) 
myDF.registerTempTable("myTable") 
val myFilteredDF=sqlContext.sql("""SELECT id,max(insert_date),name 
       FROM myTable GROUP BY id,name""").filter("length(id) >0" 
myFilteredDF.show() 

Je RESULT la façon dont je voulais. Cependant, en même temps, j'ai besoin d'obtenir les enregistrements rejetés/filtrés à un autre DataFrame/RDD, pour écrire dans le fichier des enregistrements rejetés. Quelle est la meilleure solution ici. Je comprends que je peux faire le contraire de ce que je fais pour filtrer, mais qui ne ressemble pas à une meilleure solution

+0

@mtoto merci pour la recherche, malheureusement, je ne suis pas avoir – user3124284

Répondre

1

Vous pouvez essayer except:

val otherDF = myDF.except(myFilteredDF) 
+0

Merci mtoto, mais n'obtiendra pas tous les enregistrements rejetés, si nous avons deux enregistrements identiques, qui sont filtrés par max/groupe par – user3124284

+1

si vous avez des enregistrements identiques, vous devez d'abord dédupliquer vos données. Sauf si vous aimez recueillir des redondances dans vos projets. – mtoto