2017-08-24 3 views
0

J'utilise actuellement le code suivant pour convertir une source Akka (comme reçu de la lecture d'un fichier en utilisant FileIO de Akka) à un RxJava2 Flowable:Conversion de sources Akka en RxJava2 Fluide?

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data, 
     Flow<ByteString, ByteString, NotUsed> compType) { 
    final Publisher<ByteString> uncompressedData = 
     data.via(compType) 
      .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer); 
    return Flowable.fromPublisher(uncompressedData) 
     .map(bytes -> Buffer.buffer(bytes.toArray())); 
} 

Mon problème avec cette solution (de travail) est que , au moins autant que je le comprends actuellement, l'appel de la méthode .runWith() exécute déjà le code, c'est-à-dire rassemble toutes les données de la source donnée, les met en mémoire tampon et les place ensuite dans un éditeur. Y a-t-il un moyen de l'exécuter à ce stade? Je voudrais juste définir la conversion à ce stade sans le matérialisateur et seulement exécuter tout ce qui est souscrit quelque chose plus tard.

Merci!

Répondre

0

Utilisez defer (sidenote: je devais faire cela plusieurs fois parce que Akka sources sont un seul coup):

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data, 
     Flow<ByteString, ByteString, NotUsed> compType) { 

    return Flowable.defer(() -> data.via(compType) 
     .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer) 
    ).map(bytes -> Buffer.buffer(bytes.toArray())); 
}