2017-04-24 1 views
4

Je veux avoir une Source qui évalue une fonction à intervalles donnés et émet sa sortie. Comme une solution de contournement, je peux le faire avec un Source.queue + offer, mais n'ai pas trouvé une manière plus propre de le faire. Idéalement, j'aurais quelque chose commeAkka Stream, Source de la fonction?

def myFunction() = ....      // function with side-effects 
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick 

Des idées?

Répondre

8

Probablement la façon la plus propre est d'utiliser map

Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction()) 
1

Je suppose que, throttle est ce que vous avez besoin. exemple entièrement exécutable avec Source appliqué à Iterable, qui utilise la fonction de next():

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.ThrottleMode.Shaping 
import akka.stream.scaladsl.Source 

import scala.concurrent.duration._ 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 
var i = 0 

def myFunction(): Int = { 
    i = i + 1 
    i 
} 

import scala.collection.immutable.Iterable 

val x: Iterable[Int] = new Iterable[Int] { 
    override def iterator: Iterator[Int] = 
    new Iterator[Int] { 
     override def hasNext: Boolean = true 

     override def next(): Int = myFunction() 
    } 
} 
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println) 

paramètres throttle: source de gaz avec 1 élément par une seconde avec rafale max = 1, avec des pauses avant d'émettre des messages à respecter le taux d'étranglement (que qu'est-ce que Shaping pour).