2017-08-15 1 views
0

Je souhaite convertir des éléments de liste en map unique en tant qu'étape dans mon flux de production Akka Streams. À titre d'exemple, disons que j'avais la classe suivante.Convertir la liste en Map dans Akka Stream

case class MyClass(myString: String, myInt: Int) 

Je veux convertir un List de MyClass cas à un Map qui les relie clés par myString.

Donc, si je devais List(MyClass("hello", 1), MyClass("world", 2), MyClass("hello", 3)), je veux une carte de hello cartographie à List(1, 3) et world cartographie à List(2).

Ce qui suit est ce que j'ai jusqu'à présent.

val flowIWant = { 
    Flow[MyClass].map { entry => 
     entry.myString -> entry.myInt 
    } ??? // How to combine tuples into a single map? 
} 

En outre, il serait idéal pour l'écoulement pour finir par produire les entités cartographiques individuelles afin que je puisse travailler avec chacun individuellement pour la prochaine étape (je veux faire une opération sur chaque individu d'entité carte).

Je ne suis pas sûr si cela une opération de type fold ou quoi. Merci pour toute aide.

+0

Chacun individuellement est exactement ce que vous avez écrit: 'entry => entry.myString -> entry.myInt'. Je ne suis pas sûr de ce que vous voulez dire/vouloir accomplir. –

+0

Cela les convertirait simplement en Tuples, n'est-ce pas? Je veux agréger les données. Donc je veux "bonjour" -> Liste (1,3) au lieu de "bonjour" -> 1 et "bonjour" -> 3 –

Répondre

0

Il n'est pas vraiment clair ce que vous voulez réellement obtenir. De la façon dont vous avez déclaré votre problème, je vois au moins les transformations suivantes, vous pourriez être dire:

Flow[List[MyClass], Map[String, Int], _] 
Flow[List[MyClass], Map[String, List[Int]], _] 
Flow[MyClass, (String, Int), _] 
Flow[MyClass, (String, List[Int]), _] 

de votre texte je soupçonne que vous voulez très probablement quelque chose comme le dernier, mais il ne fait pas vraiment sens d'avoir une telle transformation, car il ne sera pas capable d'émettre quoi que ce soit - afin de combiner toutes les valeurs correspondant à une clé, vous devez lire toute l'entrée.

Si vous avez un flux entrant de MyClass et que vous voulez en obtenir un Map[String, List[Int]], alors il n'y a pas d'autre choix que de l'attacher à un évier pliant et d'exécuter le flux jusqu'à la fin. Par exemple:

val source: Source[MyClass, _] = ??? // your source of MyClass instances 

val collect: Sink[MyClass, Future[Map[String, List[Int]]] = 
    Sink.fold[Map[String, List[Int]], MyClass](Map.empty.withDefaultValue(List.empty)) { 
    (m, v) => m + (v.myString -> (v.myInt :: m(v.myString))) 
    } 

val result: Future[Map[String, List[Int]]] = source.toMat(collect)(Keep.right).run() 
0

Je pense que vous voulez scan il:

source.scan((Map.empty[String, Int], None: Option((String, Int))))((acc, next) => { val (map, _) 
    val newMap = map.updated(next._1 -> map.getOrElse(next._1, List())) 
    (newMap, Some(newMap.get(next._1)))}).map(_._2.get) 

De cette façon, vous pouvez vérifier le contenu du Map jusqu'à ce que la mémoire est épuisée. (Le contenu lié au dernier élément est dans la partie de la valeur du tuple initial enveloppé dans un Option.)

0

Cela peut être ce que vous cherchez:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 

import scala.util.{Failure, Success} 

object Stack { 

    def main(args: Array[String]): Unit = { 
    case class MyClass(myString: String, myInt: Int) 
    implicit val actorSystem = ActorSystem("app") 
    implicit val actorMaterializer = ActorMaterializer() 
    import scala.concurrent.ExecutionContext.Implicits.global 

    val list = List(MyClass("hello", 1), MyClass("world", 2), MyClass("hello", 3)) 

    val eventualMap = Source(list).fold(Map[String, List[Int]]())((m, e) => { 
     val newValue = e.myInt :: m.get(e.myString).getOrElse(Nil) 
     m + (e.myString -> newValue) 
    }).runWith(Sink.head) 
    eventualMap.onComplete{ 
     case Success(m) => { 
     println(m) 
     actorSystem.terminate() 
     } 
     case Failure(e) => { 
     e.printStackTrace() 
     actorSystem.terminate() 
     } 
    } 
    } 
} 

Avec ce code, vous aurez obtenir le résultat suivant:

Map(hello -> List(3, 1), world -> List(2)) 

Si vous souhaitez avoir la sortie suivante:

Vector(Map(), Map(hello -> List(1)), Map(hello -> List(1), world -> List(2)), Map(hello -> List(3, 1), world -> List(2))) 

Utilisez simplement scan au lieu de fold et exécuter avec Sink.seq. La différence entre fold et scan est que fold attend que l'amont se termine avant de pousser vers le bas, alors que scan pousse toutes les mises à jour vers l'aval.