2017-04-04 1 views
0

Je suis en train de trier comment réduireByKey fonctionner mais cette affaire me déroute et je ne peux pas le comprendre du tout.ReduceByKey + Map + Seq explication

Le code est:

stream.foreachRDD((rdd: RDD[Record]) => { 
     // convert string to PoJo and generate rows as tuple group 
    val pairs = rdd 
      .map(row => (row.timestamp(), jsonDecode(row.value()))) 
      .map(row => (row._2.getType.name(), (1, row._2.getValue, row._1))) 
    val flatten = pairs 
       .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, (y._3 + x._3)/2)) 
       .map(f => Row.fromSeq(Seq(f._1, f._2._2/f._2._1, new Timestamp(f._2._3)))) 

Imaginer le revenu de données: [ "océanie", 500], [ "australie", 450], etc

Dans aplatissent variable I J'essaie d'agréger les données par type de marché ou ce premier type dans JSON. Voici générer tuple: * le premier est la valeur du compteur et cette valeur est 1, * deuxième est le taux et reçu de Kafka, * le troisième est l'heure de l'événement. par exemple 2017-05-12 16:00:00 * * sur la carte, * méthode f._1 est le nom du marché, * nous divisons taux global au point compte total f._2._2/f._2._1 * comme vous pouvez le voir f._2._3 est temps d'événement moyen

Quelqu'un peut-il me aider à expliquer ce que signifie f._2._3 (je veux dire je connais sa variable temp, mais ce qui est là ou pourrait être) et comment est le comptage total des taux en divisant f._2._2/f._2._1, que divise exactement? Merci :)

+1

Peut-être qu'il est temps de faire des 'classes de cas' qui représentent la structure JSON, alors vous pourrez répondre à votre question par vous-même :) –

+0

Ok, maintenant j'ai un _struct_ qui a 3 fils. Vous suggérez de faire _case class_like struct aaa et de remplacer 'f_2._2' et etc par des variables d'objets? –

+0

Si vous avez une structure avec un champ nommé, il est beaucoup plus lisible et vous n'aurez pas ce problème;) –

Répondre

1

Pour chaque ligne, vous définissez l'élément suivant dans votre RDD pairs:

(marketType, (counter, rate, eventTime)) 

Notez que ceci est un Tuple2 dont le second élément est un Tuple3. Tuple s sont des classes de cas spéciaux dont le 0e élément (commençant à 1) est appelé _n. Par exemple, pour accéder au rate d'un élément f, vous devrez faire f._2._2 (le deuxième élément du Tuple3, qui est le deuxième élément du Tuple2).

Étant donné que vos éléments ont une signification particulière, vous voudrez peut-être envisager de définir une classe de cas MyRow(counter: Int, rate: Int, time: Timestamp), afin d'avoir une vision plus claire sur ce que fait votre code lorsque vous écrivez quelque chose comme f._2._3 (en passant, le type de eventTime n'est pas clair pour moi, puisque vous l'avez seulement représenté comme String, mais vous faites des opérations numériques dessus).

maintenant à ce que votre code tente vraiment faire:

La fonction de réduction prend deux Tuple3 (ou MyRow, si vous changez votre code) et émet un autre (ici, votre réduction des sommes de fonction sur les compteurs, la taux, et fait la moyenne entre deux valeurs sur le eventTime).

reduceByKey applique cette fonction réductrice tant qu'elle trouve deux éléments avec la même clé: puisque la sortie de la fonction réductrice est du même type que ses entrées, elle peut être appliquée sur elle, aussi longtemps que vous avez d'autres valeurs sur votre RDD qui a la même clé.

Pour un exemple simple, si vous avez

(key1, (1, 200, 2017/04/04 12:00:00)) 
(key1, (1, 300, 2017/04/04 12:00:00)) 
(key1, (1, 500, 2017/04/04 12:00:00)) 
(key2, (1, 500, 2017/04/04 12:00:00)) 

Alors le reduceByKey sortira

(key1, (3, 1000, 2017/04/04 12:00:00)) 
(key2, (1, 500, 2017/04/04 12:00:00)) 

Et puis votre dernier map travaillera sur ce en calculant le taux global:

(key1, (333, 2017/04/04 12:00:00)) 
(key2, (500, 2017/04/04 12:00:00)) 

Vous avez peut-être remarqué que j'ai toujours utilisé la même heure dans tous les exemples. C'est parce que votre fonction de réduction sur ce champ donnera des résultats inattendus car il n'est pas associatif. Essayez de faire le même exercice que ci-dessus mais avec des horodatages différents, et vous verrez que la valeur réduite pour key1 sera différente selon l'ordre dans lequel vous appliquez la réduction.

Voyons voir: nous voulons réduire 4, 8 et 16 avec cette fonction afin que nous puissions vouloir faire comme

((4 + 8)/2 + 16)/2 

ou

(4 + (8 + 16)/2)/2 

selon que nous voulons pour commencer à gauche ou à droite (dans un cas réel, il y a beaucoup plus de possibilités différentes, et elles se produiront dans Spark, puisque vous ne savez pas toujours comment vos valeurs sont réparties sur le cluster).

En calculant les deux possibilités ci-dessus, nous obtenons des valeurs différentes: 11 et 8, donc vous voyez que cela peut causer de plus gros problèmes dans un cas réel.

Une solution simple dans votre cas serait de faire aussi la somme de tous les horodateurs (en supposant qu'ils sont Long valeurs, ou même BigInteger, pour éviter tout débordement), et diviser seulement à la fin par le nombre de valeurs à avoir la moyenne en temps réel.

+0

C'est fantastique, je vous dois beaucoup de temps ^^ Vous m'avez expliqué tout ce dont j'avais besoin et pour plus 'datetime' est dans * TimeStampType * donc si vous avez des suggestions pour cela. Merci beaucoup, je suis très reconnaissant :) –

+0

La façon la plus simple de gérer vos horodatages est d'utiliser 'getMillis' ou équivalent, pour changer votre horodatage en' Long'. –