2017-07-15 3 views
0

Comment puis-je faire la gestion des exceptions dans Spark - Scala pour les enregistrements invalides Voici mon code:Apache étincelle scala Gestion des exceptions

val rawData = sc.textFile(file) 
val rowRDD = rawData.map(line => Row.fromSeq(line.split(","))) 
val rowRDMapped = rowRDD.map { x => x.get(1), x.get(10) } 
val DF = rowRDMapped.toDF("ID", "name") 

Tout fonctionne très bien si les données d'entrée est très bien, si je n'ai pas assez de champs , J'obtiens ArrayIndexOutOfBoundException.

Je suis en train de mettre try-catch autour, mais je ne suis pas en mesure de sauter les enregistrements avec des données non valides, par prises try

val rowRDMapped = rowRDD.map { try { 
            x => x.get(1), x.get(10) 
            }catch { 
             println("Invalid Data") 
             //Here it expects to return ROW, but I am not sure what to do here, since I dont want any data to be returned. 
            } 
          } 

S'il vous plaît laissez-moi savoir comment résoudre le problème avec des captures d'essai et s'il y a une meilleure solution, ce serait aussi aider beaucoup

+0

Que voulez-vous? Passer les lignes trop courtes? – Zernike

+0

@Zernike, merci de regarder, Oui, j'ai besoin d'ignorer les lignes courtes, ce qui provoque l'exception Index hors limite. – user3124284

Répondre

1

le plus simple:

val rawData = sc.textFile(file) 
val rowRDD = rawData.map(line => Row.fromSeq(line.split(","))) 
val rowRDMapped = rowRDD.filter(_.length >= 11).map(x => x.get(1), x.get(10)) 

mieux utiliser collect (ne pas confondre avec other function)

val rowRDMapped = rowRDD.collect{x if x.length >= 11 => x.get(1), x.get(10)} 
+0

'> = 9' devrait être assez bon, n'est ce pas? –

+0

Peu importe pour une réponse. Les exigences réelles peuvent être n'importe. Pour cet exemple, la valeur correcte est '11' car Row a des index basés sur zéro. – Zernike

+0

oui vous avez raison. J'allais écrire 11 aussi mais j'ai juste écrit par erreur 9. :) merci –

1

vous pouvez utiliser essayer attraper comme ci-dessous et filtrer plus tard

val rawData = sc.textFile(file) 
val rowRDD = rawData.map(line => Row.fromSeq(line.split(","))) 
val rowRDMapped = rowRDD.map(x => (Try(x.get(1).toString) getOrElse "blank", Try(x.get(10).toString) getOrElse "blank")) 
val DF = rowRDMapped.toDF("ID", "name").filter($"name" =!= "blank") 
1

Au lieu de try-catch vous pouvez utiliser Try

code ci-dessous va filtrer les lignes de données qui n'ont pas assez de champs et obtenir des données avec les autres.

val rawData = sc.textFile(line) 
val rowRDD = rawData.map(line => Row.fromSeq(line.split(","))) 
val rowRDMapped = rowRDD.flatMap{ x => Try(x.getString(1), x.getString(10)).toOption } 
val DF = rowRDMapped.toDF("ID", "name")