2017-07-05 1 views
0

J'utilise Spark 2.1.1 et Scala 2.11.8Comment mapper une colonne avec d'autres colonnes dans un fichier avro?

Cette question est une extension d'un mes de questions précédentes:

How to identify null fields in a csv file?

Le changement est que, plutôt que de lire les données à partir d'un Fichier CSV, je suis en train de lire les données d'un fichier avro. Ceci est le format du fichier Avro je lis les données à partir de:

var ttime: Long = 0; 
var eTime: Long = 0; 
var tids: String = ""; 
var tlevel: Integer = 0; 
var tboot: Long = 0; 
var rNo: Integer = 0; 
var varType: String = ""; 
var uids: List[TRUEntry] = Nil; 

Je suis l'analyse du fichier Avro dans une catégorie distincte.

Je dois mapper la colonne tids avec chacun des uids de la même manière que mentionné dans la réponse acceptée du lien posté ci-dessus, sauf cette fois d'un fichier avro plutôt que d'un fichier csv bien formaté. Comment puis-je faire ceci?

C'est le code que je suis en train de le faire avec:

val avroRow = spark.read.avro(inputString).rdd 
    val avroParsed = avroRow 
    .map(x => new TRParser(x)) 
    .map((obj: TRParser) => ((obj.tids, obj.uId),1)) 
    .reduceByKey(_+_) 
    .saveAsTextFile(outputString) 

Après obj.tids, toutes les colonnes uids doivent être mis en correspondance individuellement pour obtenir une sortie finale même comme mentionné dans la réponse acceptée du lien ci-dessus.

Voilà comment je suis analyse tous les uids dans la classe d'analyse syntaxique du fichier Avro:

this.uids = Nil 
    row.getAs[Seq[Row]]("uids") 
    .foreach((objRow: Row) => 
     this.uids ::= (new TRUEntry(objRow)) 
    ) 

this.uids  
.foreach((obj:TRUEntry) => { 
    uInfo += obj.uId + " , " + obj.initM.toString() + " , " 
}) 

PS: Excusez-moi si la question semble stupide mais c'est ma première rencontre avec le fichier Avro

Répondre

0

il peut être fait en passant le même pour le traitement de la boucle

this.uids 

dans le code principal comme:

val avroParsed = avroRow 
    .map(x => new TRParser(x)) 
    .map((obj: TRParser) => { 
     val tId = obj.source.trim 
     var retVal: String = "" 
     obj.uids 
     .foreach((obj: TRUEntry) => { 
      retVal += tId + "," + obj.uId.trim + ":" 
     }) 
     retVal.dropRight(1) 
    }) 

val flattened = avroParsed 
.flatMap(x => x.split(":")) 
.map(y => ((y),1))