2017-06-28 1 views
0

J'ai ci-dessous scénario:égalité de deux trames de données

J'ai 2 dataframes contenant seulement 1 colonne permet de dire

DF1=(1,2,3,4,5) 
DF2=(3,6,7,8,9,10) 

Fondamentalement, ces valeurs sont les clés et je crée un fichier parquet de DF1 si la les clés dans DF1 ne sont pas dans DF2 (dans l'exemple actuel, il devrait retourner false). Ma façon actuelle de parvenir à mon exigence est:

val df1count= DF1.count 
val df2count=DF2.count 
val diffDF=DF2.except(DF1) 
val diffCount=diffDF.count 
if(diffCount==(df2count-df1count)) true 
else false 

Le problème avec cette approche est que je fais appel des éléments d'action 4 fois ce qui est sûr pas la meilleure façon. Quelqu'un peut-il me suggérer le meilleur moyen d'y parvenir?

Répondre

1

Vous pouvez utiliser ci-dessous fonction:

import org.apache.spark.sql.functions._ 

def diff(key: String, df1: DataFrame, df2: DataFrame): DataFrame = { 
    val fields = df1.schema.fields.map(_.name) 
    val diffColumnName = "Diff" 

    df1 
    .join(df2, df1(key) === df2(key), "full_outer") 
    .withColumn(
     diffColumnName, 
     when(df1(key).isNull, "New row in DataFrame 2") 
     .otherwise(
      when(df2(key).isNull, "New row in DataFrame 1") 
      .otherwise(
       concat_ws("", 
       fields.map(f => when(df1(f) =!= df2(f), s"$f ").otherwise("")):_* 
      ) 
      ) 
     ) 
    ) 
    .filter(col(diffColumnName) =!= "") 
    .select(
     fields.map(f => 
     when(df1(key).isNotNull, df1(f)).otherwise(df2(f)).alias(f) 
    ) :+ col(diffColumnName):_* 
    ) 
} 

Dans votre cas courir ceci:

diff("emp_id", df1, df2) 

Exemple

import org.apache.spark.sql.{DataFrame, SparkSession} 
import org.apache.spark.sql.functions._ 

object DiffDataFrames extends App { 
    val session = SparkSession.builder().master("local").getOrCreate() 

    import session.implicits._ 

    val df1 = session.createDataset(Seq((1,"a",11),(2,"b",2),(3,"c",33),(5,"e",5))).toDF("n", "s", "i") 
    val df2 = session.createDataset(Seq((1,"a",11),(2,"bb",2),(3,"cc",34),(4,"d",4))).toDF("n", "s", "i") 

    def diff(key: String, df1: DataFrame, df2: DataFrame): DataFrame = 
    /* above definition */ 

    diff("n", df1, df2).show(false) 
} 
+0

Pouvez-vous s'il vous plaît laissez-moi savoir comment déclarer le df1 et df2. J'ai déclaré comme ci-dessous sqlContext = SQLContext (sc) df = sqlContext.sql ("select * from table1") df2 = sqlContext.sql ("select * from table2") puis copié le code ci-dessus tel quel. .. obtenir erreur de syntaxe .... Je suis très nouveau à cette étincelle scala code –

+0

Pouvez-vous me corriger ce que je fais mal, Quand j'essaie de courir le code ci-dessous je reçois une erreur: pas trouvé: valeur df1, pas trouvé df2 . .. org.apache.spark.sql import {dataframe, SqlContext} importation org.apache.spark.sql.functions._ val sc: SparkContext val SqlContext = new org.apache.spark.sql.SQLContext (sc) sqlContext = SQLContext (sc) df1 = sqlContext.sql ("select * from table1 ") DF2 = sqlContext.sql (" select * from table2 ") diff (" locataire », DF1, DF2) def diff (key: String, DF1: dataframe, DF2: dataframe): = {dataframe ......} /// diff code amusant tel que fourni –

+0

Bonjour, j'ai ajouté un petit exemple. –

0

est ici un moyen de pour obtenir les lignes hors du commun entre deux blocs de données:

val d1 = Seq((3, "Chennai", "rahman", "9848022330", 45000, "SanRamon"), (1, "Hyderabad", "ram", "9848022338", 50000, "SF"), (2, "Hyderabad", "robin", "9848022339", 40000, "LA"), (4, "sanjose", "romin", "9848022331", 45123, "SanRamon")) 
val d2 = Seq((3, "Chennai", "rahman", "9848022330", 45000, "SanRamon"), (1, "Hyderabad", "ram", "9848022338", 50000, "SF"), (2, "Hyderabad", "robin", "9848022339", 40000, "LA"), (4, "sanjose", "romin", "9848022331", 45123, "SanRamon"), (4, "sanjose", "romino", "9848022331", 45123, "SanRamon"), (5, "LA", "Test", "1234567890", 12345, "Testuser")) 

val df1 = d1.toDF("emp_id" ,"emp_city" ,"emp_name" ,"emp_phone" ,"emp_sal" ,"emp_site") 
val df2 = d2.toDF("emp_id" ,"emp_city" ,"emp_name" ,"emp_phone" ,"emp_sal" ,"emp_site") 

spark.sql("((select * from df1) union (select * from df2)) minus ((select * from df1) intersect (select * from df2))").show //spark is SparkSession