2017-06-16 1 views
0

Je suis un novice à Scala et actuellement ce que je fais est de filtrer les données d'un grand ensemble de données et de les imprimer en tant que csv. Ainsi, le csv j'imprimer dans ce format:Comment ajouter le dernier enregistrement de chaque changement dans Scala

id   time        status 
___  _____       _________ 
1  2016-10-09 00:09:10     100 
1  2016-10-09 00:09:30     100 
1  2016-10-09 00:09:50     100 
1  2016-10-09 00:10:10     900 
2  2016-10-09 00:09:18     100 
2  2016-10-09 00:09:20     100 
2  2016-10-09 00:10:24     900 
3  2016-10-09 00:09:30     100 
3  2016-10-09 00:09:33     100 
3  2016-10-09 00:09:36     100 
3  2016-10-09 00:09:39     100 
3  2016-10-09 00:09:51     900 

J'utilise le code ci-dessous pour imprimer les données:

 var count=0; 

     val StatusList = ListBuffer[String](); 
     for (currentRow <- sortedRow) { 
       if (currentRow.status==100){ 
        StatusList.+=(currentRow.id+","+currentRow.time+","+currentRow.status) 
       } 
       if((count+1) < sortedRow.size && sortedRow(count+1).status==900) { 
        StatusList.+=(sortedRow(count+1).id+","+sortedRow(count+1).time+","+sortedRow(count+1).status) 
       } 
    count+=1; 

    } 

Au lieu de cela, je veux imprimer les lignes avec le statut 100 et append le dossier quand ils ont changé. Fondamentalement, je veux imprimer les données comme suit:

id  time    status id  change_time   status 
___  _____    _________ __ ______________  _______ 
1 2016-10-09 00:09:10  100  1  2016-10-09 00:10:10 900 
1 2016-10-09 00:09:30  100  1  2016-10-09 00:10:10 900 
1 2016-10-09 00:09:50  100  1  2016-10-09 00:10:10 900 
2 2016-10-09 00:09:18  100  2  2016-10-09 00:10:24 900 
2 2016-10-09 00:09:20  100  2  2016-10-09 00:10:24 900 
3 2016-10-09 00:09:30  100  3  2016-10-09 00:09:51 900 
3 2016-10-09 00:09:33  100  3  2016-10-09 00:09:51 900 
3 2016-10-09 00:09:36  100  3  2016-10-09 00:09:51 900 
3 2016-10-09 00:09:39  100  3  2016-10-09 00:09:51 900 
+0

Vous pouvez séparer les deux statuts en deux csv, mais Quelles sont les règles d'ajout? ajouter au hasard ou y a-t-il des règles strictes pour l'ajout? –

+0

Je peux séparer, mais pour une analyse plus approfondie, je dois garder dans le format ci-dessus – Ricky

+0

Vous n'avez pas lu ma question attentivement. J'ai demandé quelle est la règle pour la combinaison? –

Répondre

3

Je vous suggère une solution à l'aide dataframes qui est une des œuvres optimisées et l'amélioration fait pour RDD s.

Je suppose que les données sont au format suivant avec la ligne d'en-tête

id,time,status 
1,2016-10-0900:09:10,100 
1,2016-10-0900:09:30,100 
1,2016-10-0900:09:50,100 
1,2016-10-0900:10:10,900 

Première étape serait de lire les fichiers dans dataframe en utilisant sqlContext

val sqlContext = sparkSession.sqlContext 
val dataframe = sqlContext.read.format("csv").option("header", "true").load("absolute path to the input file") 

Vous devriez avoir dataframe comme

+---+------------------+------+ 
|id |time    |status| 
+---+------------------+------+ 
|1 |2016-10-0900:09:10|100 | 
|1 |2016-10-0900:09:30|100 | 
|1 |2016-10-0900:09:50|100 | 
|1 |2016-10-0900:10:10|900 | 
|2 |2016-10-0900:09:18|100 | 
|2 |2016-10-0900:09:20|100 | 
|2 |2016-10-0900:10:24|900 | 
|3 |2016-10-0900:09:30|100 | 
|3 |2016-10-0900:09:33|100 | 
|3 |2016-10-0900:09:36|100 | 
|3 |2016-10-0900:09:39|100 | 
|3 |2016-10-0900:09:51|900 | 
+---+------------------+------+ 

La prochaine étape serait b e pour filtrer l'dataframe en deux avec status différence

val df1 = dataframe.filter(dataframe("status") === "100") 

sortie est aussi

+---+------------------+------+ 
|id |time    |status| 
+---+------------------+------+ 
|1 |2016-10-0900:09:10|100 | 
|1 |2016-10-0900:09:30|100 | 
|1 |2016-10-0900:09:50|100 | 
|2 |2016-10-0900:09:18|100 | 
|2 |2016-10-0900:09:20|100 | 
|3 |2016-10-0900:09:30|100 | 
|3 |2016-10-0900:09:33|100 | 
|3 |2016-10-0900:09:36|100 | 
|3 |2016-10-0900:09:39|100 | 
+---+------------------+------+ 

suivent la même pour 900 statut pour df2 mais avec column noms rebaptisés

val df2 = dataframe.filter(dataframe("status") === "900") 
    .withColumnRenamed("id", "id2") 
    .withColumnRenamed("time", "changed_time") 
    .withColumnRenamed("status", "status2") 

sortie doit être

+---+------------------+-------+ 
|id2|changed_time  |status2| 
+---+------------------+-------+ 
|1 |2016-10-0900:10:10|900 | 
|2 |2016-10-0900:10:24|900 | 
|3 |2016-10-0900:09:51|900 | 
+---+------------------+-------+ 

étape finale est join ces deux dataframes

val finalDF = df1.join(df2, df1("id") === df2("id2"), "left") 

sortie finale est aussi

+---+------------------+------+---+------------------+-------+ 
|id |time    |status|id2|changed_time  |status2| 
+---+------------------+------+---+------------------+-------+ 
|1 |2016-10-0900:09:10|100 |1 |2016-10-0900:10:10|900 | 
|1 |2016-10-0900:09:30|100 |1 |2016-10-0900:10:10|900 | 
|1 |2016-10-0900:09:50|100 |1 |2016-10-0900:10:10|900 | 
|2 |2016-10-0900:09:18|100 |2 |2016-10-0900:10:24|900 | 
|2 |2016-10-0900:09:20|100 |2 |2016-10-0900:10:24|900 | 
|3 |2016-10-0900:09:30|100 |3 |2016-10-0900:09:51|900 | 
|3 |2016-10-0900:09:33|100 |3 |2016-10-0900:09:51|900 | 
|3 |2016-10-0900:09:36|100 |3 |2016-10-0900:09:51|900 | 
|3 |2016-10-0900:09:39|100 |3 |2016-10-0900:09:51|900 | 
+---+------------------+------+---+------------------+-------+ 

Sauvegarde du fichier final dataframe-csv est assez facile aussi bien

finalDF.write.format("csv").save("absolute path to output filename ")