2017-08-18 1 views
2

Je veux lire de nombreux fichiers .csv dans un dossier de manière asynchrone et retourner un Iterable d'une classe de cas personnalisée.lecture de plusieurs fichiers en utilisant de manière asynchrone, Scala Akka Streams

Puis-je arriver avec Akka cours d'eau et comment?

* J'ai tenté d'équilibrer en quelque sorte le travail en fonction de la documentation, mais il est un peu difficile à gérer par ...

Ou

Est-il une bonne pratique d'utiliser des acteurs au lieu? (Un parent Acteur avec enfants, chaque enfant doit lire un fichier, et retourner un Iterable à parent, puis parent combiner tous Iterables?)

+0

Question est pas très claire. 1. Voulez-vous renvoyer un seul Iterable d'une classe de cas personnalisée pour tous les fichiers CSV, ou un pour chaque fichier CSV? 2. Et s'il y a des milliers de fichiers, voulez-vous les lire tous en même temps, ou voulez-vous juste un certain niveau de parallélisme? –

Répondre

1

tout d'abord vous devez lire/apprendre comment Akka flux fonctionne, avec Source, Flow et Sink. Ensuite, vous pouvez commencer à apprendre les opérateurs.

Pour effectuer plusieurs actions en parallèle, vous pouvez utiliser l'opérateur mapAsync dans lequel vous spécifiez le nombre de parallélisme.

/** 
    * Using mapAsync operator, we pass a function which return a Future, the number of parallel run futures will 
    * be determine by the argument passed to the operator. 
    */ 
    @Test def readAsync(): Unit = { 
    Source(0 to 10)//-->Your files 
     .mapAsync(5) { value => //-> It will run in parallel 5 reads 
     implicit val ec: ExecutionContext = ActorSystem().dispatcher 
     Future { 
      //Here read your file 
      Thread.sleep(500) 
      println(s"Process in Thread:${Thread.currentThread().getName}") 
      value 
     } 
     } 
     .runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}"))) 
    } 

Vous pouvez en savoir plus sur Akka et Akka flux ici https://github.com/politrons/Akka

1

La plupart du temps la même réponse, mais comme avec @ Paul petites améliorations

def files = new java.io.File("").listFiles().map(_.getAbsolutePath).to[scala.collection.immutable.Iterable] 

Source(files).flatMapConcat(filename => //you could use flatMapMerge if you don't bother about line ordering 
    FileIO.fromPath(Paths.get(filename)) 
     .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true).map(_.utf8String)) 
).map { csvLine => 
    // parse csv here 
    println(csvLine) 
    }