2015-08-30 1 views
0

j'ai fait une source pour un flux Akka basé sur un ReactiveStreams Editeur comme ceci:Publisher base Source ne pas les éléments de sortie

object FlickrSource { 

    val apiKey = Play.current.configuration.getString("flickr.apikey") 
    val flickrUserId = Play.current.configuration.getString("flickr.userId") 
    val flickrPhotoSearchUrl = s"https://api.flickr.com/services/rest/?method=flickr.photos.search&api_key=$apiKey&user_id=$flickrUserId&min_taken_date=%s&max_taken_date=%s&format=json&nojsoncallback=1&page=%s&per_page=500" 

    def byDate(date: LocalDate): Source[JsValue, Unit] = { 
    Source(new FlickrPhotoSearchPublisher(date)) 
    } 
} 

class FlickrPhotoSearchPublisher(date: LocalDate) extends Publisher[JsValue] { 

    override def subscribe(subscriber: Subscriber[_ >: JsValue]) { 
    try { 
     val from = new LocalDate() 
     val fromSeconds = from.toDateTimeAtStartOfDay.getMillis 
     val toSeconds = from.plusDays(1).toDateTimeAtStartOfDay.getMillis 

     def pageGet(page: Int): Unit = { 
     val url = flickrPhotoSearchUrl format (fromSeconds, toSeconds, page) 
     Logger.debug("Flickr search request: " + url) 
     val photosFound = WS.url(url).get().map { response => 
      val json = response.json 
      val photosThisPage = (json \ "photos" \ "photo").as[JsArray] 
      val numPages = (json \ "photos" \ "pages").as[JsNumber].value.toInt 
      Logger.debug(s"pages: $numPages") 
      Logger.debug(s"photos this page: ${photosThisPage.value.size}") 
      photosThisPage.value.foreach { photo => 
      Logger.debug(s"onNext") 
      subscriber.onNext(photo) 
      } 

      if (numPages > page) { 
      Logger.debug("nextPage") 
      pageGet(page + 1) 
      } else { 
      Logger.debug("onComplete") 
      subscriber.onComplete() 
      } 
     } 
     } 
     pageGet(1) 
    } catch { 
     case ex: Exception => { 
     subscriber.onError(ex) 
     } 
    } 
    } 
} 

Il fera une demande de recherche sur Flickr et la source des résultats comme JsValue s. J'ai essayé de câbler à beaucoup de différents flux et puits, mais ce serait la configuration la plus basique:

val source: Source[JsValue, Unit] = FlickrSource.byDate(date) 
val sink: Sink[JsValue, Future[Unit]] = Sink.foreach(println) 
val stream = source.toMat(sink)(Keep.right) 
stream.run() 

Je vois que le onNext est appelé deux ou trois fois, puis le onComplete. Cependant, l'évier ne reçoit rien. Qu'est-ce qui me manque, n'est-ce pas une façon valide de créer une Source?

Répondre

0

J'ai compris par erreur que Publisher était une interface simple comme Observable, que vous pouvez implémenter vous-même. L'équipe Akka a souligné que ce n'est pas la bonne façon d'implémenter un éditeur. En fait, Publisher est une classe compliquée qui est supposée être implémentée par les bibliothèques plutôt que par les utilisateurs finaux. Cette méthode Source.apply(Publisher) utilisée dans la question est là pour l'interopérabilité avec d'autres implémentations Reactive Streams. Le but de vouloir une implémentation de Source est que je veuille qu'une source de retour récupère les résultats de recherche de Flickr (qui est maximisé à 500 par requête) et je ne veux pas faire plus de demandes (ou plus rapides) que est nécessaire en aval. Cela peut être réalisé en implémentant un ActorPublisher.

Mise à jour

C'est le ActorPublisher qui fait ce que je veux: créer une source qui produit des résultats de recherche, mais seulement rend autant REST appels que nécessaire en aval. Je pense qu'il y a encore place à amélioration, alors n'hésitez pas à le modifier.

import akka.actor.Props 
import akka.stream.actor.ActorPublisher 
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request} 
import org.joda.time.LocalDate 
import play.api.Play.current 
import play.api.libs.json.{JsArray, JsNumber, JsValue} 
import play.api.libs.ws.WS 
import play.api.{Logger, Play} 

import scala.concurrent.ExecutionContext.Implicits.global 

object FlickrSearchActorPublisher { 
    val apiKey = Play.current.configuration.getString("flickr.apikey") 
    val flickrUserId = Play.current.configuration.getString("flickr.userId") 
    val flickrPhotoSearchUrl = s"https://api.flickr.com/services/rest/?method=flickr.photos.search&api_key=$apiKey&user_id=$flickrUserId&min_taken_date=%s&max_taken_date=%s&format=json&nojsoncallback=1&per_page=500&page=" 

    def byDate(from: LocalDate): Props = { 
    val fromSeconds = from.toDateTimeAtStartOfDay.getMillis/1000 
    val toSeconds = from.plusDays(1).toDateTimeAtStartOfDay.getMillis/1000 
    val url = flickrPhotoSearchUrl format (fromSeconds, toSeconds) 

    Props(new FlickrSearchActorPublisher(url)) 
    } 
} 

class FlickrSearchActorPublisher(url: String) extends ActorPublisher[JsValue] { 

    var currentPage = 1 
    var numPages = 1 
    var photos = Seq[JsValue]() 

    def searching: Receive = { 
    case Request(count) => 
     Logger.debug(s"Received Request for $count results from Subscriber, ignoring as we are still searching") 
    case Cancel => 
     Logger.info("Cancel Message Received, stopping") 
     context.stop(self) 
    case _ => 
    } 

    def accepting: Receive = { 
    case Request(count) => 
     Logger.debug(s"Received Request for $count results from Subscriber") 
     sendSearchResults() 
    case Cancel => 
     Logger.info("Cancel Message Received, stopping") 
     context.stop(self) 
    case _ => 
    } 

    def getNextPageOrStop() { 
    if (currentPage > numPages) { 
     Logger.debug("No more pages, stopping") 
     onCompleteThenStop() 
    } else { 
     val pageUrl = url + currentPage 
     Logger.debug("Flickr search request: " + pageUrl) 
     context.become(searching) 
     WS.url(pageUrl).get().map { response => 
     val json = response.json 
     val photosThisPage = (json \ "photos" \ "photo").as[JsArray] 
     numPages = (json \ "photos" \ "pages").as[JsNumber].value.toInt 
     Logger.debug(s"page $currentPage of $numPages") 
     Logger.debug(s"photos this page: ${photosThisPage.value.size}") 
     photos = photosThisPage.value.seq 
     if (photos.isEmpty) { 
      Logger.debug("No photos found, stopping") 
      onCompleteThenStop() 
     } else { 
      currentPage = currentPage + 1 
      sendSearchResults() 
      context.become(accepting) 
     } 
     } 
    } 
    } 

    def sendSearchResults() { 
    if (photos.isEmpty) { 
     getNextPageOrStop() 
    } else { 
     while(isActive && totalDemand > 0) { 
     onNext(photos.head) 
     photos = photos.tail 
     if (photos.isEmpty) { 
      getNextPageOrStop() 
     } 
     } 
    } 
    } 

    getNextPageOrStop() 
    val receive = searching 
}