2017-10-09 7 views
0

J'utilise spark 1.6.1.Comment enregistrer GroupedDataset au parquet ou le convertir en toDF

Y at-il une API disponible pour enregistrer GroupDataset dans un fichier parquet. Ou convertissez-le en DataFrame.

E.g. J'ai un objet 'Procedure' personnalisé, j'ai converti Dataframe en objet de procédure. Après cela, je fais un groupe sur patientID. Je voulais regrouper des fichiers dans un fichier parquet ou les transmettre en tant que données à d'autres fonctions. Je n'ai pas eu d'API pour le stockage ou la convertir en Dataframe.

val procedureDs: Dataset[Procedure] = joinDf.select("patientid", "patientprocedureid", "procedurecode").as[Procedure] 
val groupedDs:GroupedDataset[Long, Procedure] = procedureDs.groupBy{ x => x.patientid } 

Après avoir appliqué mapGroups

val a = groupedDs.mapGroups{ case (k,vs) => { (k, vs.toSeq)}} 

Il donne ci-dessous erreur:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.....PatientDiagnosis 
- array element class: "com....PatientDiagnosis" 
- field (class: "scala.collection.Seq", name: "_2") 
- root class: "scala.Tuple2" 

J'avais essayé de donner explicitement Encoder

val a = groupedDigDs.mapGroups((k,vs) => (k, vs.toSeq))(org.apache.spark.sql.Encoders.bean(classOf[(Long, Seq[com....PatientDiagnosis])])) 

Puis erreur changé:

java.lang.UnsupportedOperationException: Cannot infer type for class scala.Tuple2 because it is not bean-compliant 

Répondre

1

Identique GroupedData (RelationalGroupedDataset 2.x Spark), GroupedDataset (KeyValueGroupedDataset 2.x Spark) doit être agrégée avant de pouvoir sauver.

Si votre objectif est encore une autre groupByKey vous pouvez utiliser mapGroups:

val groupedDs: GroupedDataset[K, V] = ??? 
// ... { case (k, xs) => (k, xs.toSeq) } to preserve key as well 
groupedDs.mapGroups { case (_, xs) => xs.toSeq } 

et écrire le résultat.