1

J'essaie d'unir deux trames de données d'étincelles avec un ensemble de colonnes différent. A cet effet, je me suis référé à lien suivant: -Union de deux trames de données d'étincelles avec différentes colonnes

how to union 2 spark dataframes with different amounts of columns

Mon code est la suivante -

val cols1 = finalDF.columns.toSet 
val cols2 = df.columns.toSet 
val total = cols1 ++ cols2 
finalDF=finalDF.select(expr(cols1, total):_*).unionAll(df.select(expr(cols2, total):_*)) 

def expr(myCols: Set[String], allCols: Set[String]) = { 
    allCols.toList.map(x => x match { 
    case x if myCols.contains(x) => col(x) 
    case _ => lit(null).as(x) 
    }) 
} 

Mais le problème que je suis confronté est quelques-unes des colonnes dans les deux dataframes sont imbriquées. J'ai des colonnes de types StructType et primitifs. Maintenant, disons que la colonne A (de StructType) est en df et non en finalDF. Mais dans expr,

case _ => lit(null).as(x) 

ne le rend pas StructType. C'est pourquoi je ne suis pas capable de les unir. Il me donne l'erreur suivante -

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. NullType <> StructType(StructField(_VALUE,StringType,true), StructField(_id,LongType,true)) at the first column of the second table. 

Des suggestions que je peux faire ici?

+0

s'il vous plaît vérifier stackoverflow [fil] (https://stackoverflow.com/questions/42530431/spark-union-fails-with-nested-json-dataframe). –

+0

@ NinjaDev82 Oui, je peux importer tous les fichiers en même temps et son fonctionnement. Mais j'ai besoin d'ajouter une colonne avec une valeur extraite de l'en-tête du même fichier (j'importe un fichier xml dans une structure de données spark). Donc, j'importe les données dans la structure de données spark, en extrayant l'en-tête et en ajoutant cette valeur à une nouvelle colonne dans cette df. Cette valeur d'en-tête est différente pour chaque df. – Ishan

Répondre

1

J'utiliserais l'inférence de schéma intégrée pour cela. Il est beaucoup plus cher, mais beaucoup plus simple que les structures complexes correspondant, les éventuels conflits:

spark.read.json(df1.toJSON.union(df2.toJSON)) 

Vous pouvez également importer tous les fichiers en même temps, et join avec des informations extraites d'en-tête, à l'aide input_file_name.

import org.apache.spark.sql.function 

val metadata: DataFrame // Just metadata from the header 
val data: DataFrame  // All files loaded together 

metadata.withColumn("file", input_file_name) 
    .join(data.withColumn("file", input_file_name), Seq("file"))