2014-06-18 3 views
1

Le format de mes données estagrégation dans PySpark

UserId\tItemId:Score,ItemId:Score 
UserId\tItemId:Score,ItemId:Score,ItemId:Score 

et ainsi de suite ..

Je suis en train de normaliser le score en soustrayant la moyenne et en divisant par l'écart-type. Mes données sont présentes sur S3, environ 300 fichiers ou 30Mb chacun. J'utilise PySpark. Voici mon essai:

lines = sc.textFile("s3n://data-files/clustering") 

Itr1 = lines.map(lambda x:str(x)) 

Itr1.take(3) 

['1\t1:0.1,2:0.2', '2\t3:0.4,4:0.6', '3\t5:0.8,6:0.1'] 


Itr2 = Itr1.map(lambda x: x.split("\t")) 

Itr2.take(3) 

[['1', '1:0.1,2:0.2'], ['2', '3:0.4,4:0.6'], ['3', '5:0.8,6:0.1']] 

ItemRecScore = Itr2.map(lambda x:[x[1]]) 

ItemRecScore.take(3) 

[['1:0.1,2:0.2'], ['3:0.4,4:0.6'], ['5:0.8,6:0.1']] 

ItemRecScoreClean = ItemRecScore.map(lambda x: x[0].replace(':',' ')) 

ItemRecScore.take(3) 

['1 0.1,2 0.2', '3 0.4,4 0.6', '5 0.8,6 0.1'] 

1) Comment puis-je extraire simplement le score afin que je puisse faire appel, mean() et stdev() pour calculer les paramètres.

2) Comment transformer le score? Je suis nouveau à PySpark, donc excuses si c'est une tâche évidente simple. Toutes les instructions ou tutoriels qui montrent comment manipuler et agréger des données dans PySpark seraient utiles.

+0

Comment vous analysez les scores de ces chaînes dans un programme Python régulier? –

Répondre

0

Puisque vous avez plusieurs scores pour chaque ligne d'entrée, nous pouvons utiliser flatMap pour extraire tous les itemID & scores de chaque ligne et récupérer un RDD où chaque élément est l'une des valeurs de score article &. De là, nous pouvons extraire juste le score et le convertir en un flotteur afin que les méthodes numériques de PySpark puissent travailler dessus. Après que nous l'avons fait, nous pouvons simplement appeler les statistiques() sur le RDD pour obtenir l'information qui vous intéresse.

inputData = sc.textFile(inputFile) 
idScores = inputData.flatMap(lambda x: x.split("\t")[1].split(",")) 
scores = idScores.map(lambda x: float(x.split(":")[1])) 
print scores.stats()