2017-10-17 1 views
0

Compte tenu de la trame de données suivante étincelle:Spark: trame de données d'importation MongoDB (scala)

Name,LicenseID_1,TypeCode_1,State_1,LicenseID_2,TypeCode_2,State_2,LicenseID_3,TypeCode_3,State_3  
"John","123ABC",1,"WA","456DEF",2,"FL","789GHI",3,"CA" 
"Jane","ABC123",5,"AZ","DEF456",7,"CO","GHI789",8,"GA" 

Comment pourrais-je utiliser scala à étincelle pour écrire ceci dans MongoDB comme collection de documents comme suit:

{ "Name" : "John", 
    "Licenses" : 
    { 
    [ 
     {"LicenseID":"123ABC","TypeCode":"1","State":"WA" }, 
     {"LicenseID":"456DEF","TypeCode":"2","State":"FL" }, 
     {"LicenseID":"789GHI","TypeCode":"3","State":"CA" } 
    ] 
    } 
}, 

{ "Name" : "Jane", 
    "Licenses" : 
    { 
    [ 
     {"LicenseID":"ABC123","TypeCode":"5","State":"AZ" }, 
     {"LicenseID":"DEF456","TypeCode":"7","State":"CO" }, 
     {"LicenseID":"GHI789","TypeCode":"8","State":"GA" } 
    ] 
    } 
} 

J'ai essayé de le faire, mais a obtenu au bloc le code suivant:

val customSchema = StructType(Array(StructField("Name", StringType, true), StructField("LicenseID_1", StringType, true), StructField("TypeCode_1", StringType, true), StructField("State_1", StringType, true), StructField("LicenseID_2", StringType, true), StructField("TypeCode_2", StringType, true), StructField("State_2", StringType, true), StructField("LicenseID_3", StringType, true), StructField("TypeCode_3", StringType, true), StructField("State_3", StringType, true))) 
val license = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("D:\\test\\test.csv") 
case class License(LicenseID:String, TypeCode:String, State:String) 
case class Data(Name:String, Licenses: Array[License]) 
val transformedData = license.map(data => Data(data(0),Array(License(data(1),data(2),data(3)),License(data(4),data(5),data(6)),License(data(7),data(8),data(9))))) 

<console>:46: error: type mismatch; 
found : Any 
required: String 
     val transformedData = license.map(data => Data(data(0),Array(License(data(1),data(2),data(3)),License(data(4),data(5),data(6)),License(data(7),data(8),data(9))))) 
... 
+1

s'il vous plaît être plus précis sur exactement quel problème vous avez. Peut-être poster une partie du code que vous avez déjà essayé. –

+0

Comme vous pouvez le voir, compte tenu de la multi-colonne avec des informations similaires (trois informations de licence distinctes couvrant plusieurs colonnes), je veux importer dans mongodb comme un document avec le "Licences" comme un nom d'attribut et la valeur comme un tableau de licences de paires de valeurs de nom contenant chaque information de licence. – SYL

+0

Avez-vous essayé d'écrire du code pour le faire? Si oui, affichez-le et indiquez où se situe le problème. Sinon, faites une tentative. –

Répondre

0

Je ne sais pas ce qui est vous question r, en ajoutant par exemple comment économiser l'étincelle de données et Mango

Lire

sparkSession.loadFromMongoDB() // Uses the SparkConf for configuration 
sparkSession.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the ReadConfig 

sparkSession.read.mongo() 
sparkSession.read.format("com.mongodb.spark.sql").load() 

// Set custom options: 
sparkSession.read.mongo(customReadConfig) 
sparkSession.read.format("com.mongodb.spark.sql").options. 
(customReadConfig.asOptions).load() 

Le connecteur offre la possibilité de conserver les données dans MongoDB.

MongoSpark.save(centenarians.write.option("collection", "hundredClub")) 
    MongoSpark.load[Character](sparkSession, ReadConfig(Map("collection" -> 

"data"), certains (ReadConfig (sparkSession)))). Show()

Alternative pour sauvegarder les données

dataFrameWriter.write.mongo() 
dataFrameWriter.write.format("com.mongodb.spark.sql").save() 
+0

Lire et écrire sur mongo n'est pas mon problème. Le problème est de savoir comment restructurer les données (par exemple, des colonnes pour des données similaires au tableau de paire de valeurs clés), puis enregistrez-le dans mongodb pour qu'il apparaisse comme dans l'exemple json. – SYL

+0

https://stackoverflow.com/questions/39389700/spark-dataframe-is-saved-to-mongodb-in-wrong-format –

0

Ajout .toString résolu le problème et j'ai pu enregistrer à mongodb le format que je voulais.

val transformerData = license.map (data => Données (données (0) .to Chaîne, Array (Licence (data (1) .toString, data (2) .toString, data (3) .toString), Licence (data (4) .toString, data (5) .toString, données (6) .toString), licence (data (7) .toString, données (8) .toString, données (9) .toString))))