2017-10-16 4 views
1

J'ai un RDD [(Int, Iterable [Coordinates])] qui a été groupé par clé (index: Int). Coordonnées est une classe avec les membres:Comment aplatir le contenu Spark RDD groupé en lignes individuelles puis enregistrer dans le fichier

latitude: Double, longitude: Double

Je voudrais créer l'impression ou créer un fichier csv, ce serait sous la forme suivante (une ligne pour chaque point de données):

index,latitude,longitude 

Avec le RDD [(Int, coordonnées)] non regroupées, il a travaillé comme ceci:

val textOutputRDD = initialRDD.map(
    f => f._1.toString() + "," + f._2.latitude.toString() + "," + f._2.longitude.toString()) 
textOutputRDD.saveAsTextFile("TextOutput") 

Comment puis-je réussi à le faire dans ce cas?

+0

Quelle version de Spark Apache utilisez-vous? – stefanobaghino

+0

@stefanobaghino 2.1.0 – ilvo

Répondre

1

Une simple boucle imbriquée fera l'affaire. Ici, je coordonnées approximatives avec une simple paire de doubles:

val rdd = 
    sc.parallelize(
    Seq(
     1 -> Seq((4.1, 3.4), (5.6, 6.7), (3.4, 9.0)), 
     2 -> Seq((0.4, -4.1), (-3.4, 6.7), (7.0, 8.9)) 
    ) 
) 

val csvLike = 
    for ((key, coords) <- rdd; (lat, lon) <- coords) yield s"$key,$lat,$lon" 

for (row <- csvLike) println(row) 

Ce code entraînera la sortie suivante:

2,0.4,-4.1 
2,-3.4,6.7 
2,7.0,8.9 
1,4.1,3.4 
1,5.6,6.7 
1,3.4,9.0 

Modifier

Une autre approche possible consiste à échanger dans la flatMap réelle/map séquence le compilateur transformerait la compréhension for en:

rdd.flatMap { 
    case (key, coords) => 
    coords.map { 
     case (lat, lon) => s"$key,$lat,$lon" 
    } 
} 
+0

Essayé cela, à la fois avec mon propre rdd et copié le vôtre pour essayer, entraîne la même erreur: value withFilter n'est pas un membre de org.apache.spark.rdd.RDD [(Int, Seq [(Double double)])]. J'ai essayé de trouver une solution, mais sans succès. – ilvo

+1

Je l'ai couru avec succès dans le 'spark-shell', ma meilleure estimation est qu'il vous manque quelque implicite. Pouvez-vous essayer d'échanger dans cette ligne au lieu de la boucle et voir ce qui se passe? C'est sémantiquement équivalent, en fait le compilateur traduit réellement la boucle en ceci: 'rdd.flatMap {case (clé, coords) => coords.map {case (lat, lon) => s" $ clé, $ lat, $ lon "}}' – stefanobaghino

+0

Je vois un avertissement lors de l'utilisation de la variante 'for' mais pas d'erreur. Êtes-vous par hasard compiler avec l'indicateur de compilateur '-Xfatal-warnings' activé? Dans tous les cas, la variante 'flatMap' /' map' devrait résoudre le problème. – stefanobaghino

1

Essayez flatmap-

val output = rdd.flatMap(s=>{ 
     val list=List[String]() 
     for (latlon <- s._2) { 
     list.addString(s._1.toString() + "," + latlon.latitude.toString() + "," + latlon.longitude.toString()) 
     } 
     return list 
    }) 
output.save(....) 
+1

Impossible de faire fonctionner la liste avec ceci, mais en l'imprimant, 'println (s._1.toString() +", "+ latlon.latitude +", "+ latlon.longitude)' fonctionne comme prévu. De plus, addString a besoin d'un StringBuilder comme premier paramètre et la sortie devrait probablement être sauvegardée avec 'output.saveAsTextFile (....)'. Merci pour votre contribution si, la flatmap fonctionne! – ilvo