2017-10-18 28 views
0

Je suis en train de me battre pour exprimer ce qui suit avec RxJava. Imaginons que j'avoir un flux de la liste des paquets que je veux transmettre sur un réseau:Séquence synchrone utilisant RxJava

PublishSubject<List<Packet>> packetsSubject = ...; 

Et je la fonction de transmission suivante:

public Observable<Status> transmit(Packet p) {...} 

Je veux transmettre chaque paquet d'une liste comme tant que le statut renvoyé du paquet est Status.OK. En d'autres termes, si la transmission du n-ième paquet est NOK, alors le n-ième paquet + 1 ne devrait pas être envoyé.

En outre, si une erreur est détectée:

  • une erreur doit être affiché avec l'index du paquet dans la liste
  • le transfert de la liste suivante du paquet devrait commencer

Merci de m'avoir lu

+0

Avez-vous essayé d'utiliser l'opérateur 'concat'? Pour la gestion des erreurs, la nouvelle liste a-t-elle une relation avec l'ancienne? – masp

+0

Les deux listes n'ont rien en commun, je reçois une liste de paquets en découpant une charge utile de requête. Comment l'opérateur concat entrerait-il dans l'image? – n1r3

+0

Je pense qu'avec 'concat' les demandes arrivent dans le même ordre que dans la liste. Si l'un échoue, vous n'essayez pas l'élément suivant. – masp

Répondre

1

Vous pouvez utiliser l'opérateur flatMap() pour convertir des listes en entrées observables. Utilisez l'opérateur zipWith() pour jumeler chaque entrée de liste avec l'index du paquet dans la liste individuelle. flatMap() à nouveau pour transmettre et obtenir le statut résultant. Transformez un NOK en un lanceable, et terminez la chaîne d'observateurs intérieurs. Lors de l'utilisation de l'opérateur flatMap(), l'ajout du dernier paramètre de 1 fait apparaître le mappage une fois, il n'y a donc pas de chevauchement de demandes dans cette chaîne d'observateurs.

packetsSubject 
     .flatMap(listOfPackets -> Observable.from(listOfPackets) 
      .zipWith(Observable.range(1, LARGE_NUMBER), (p, i) -> new Pair<>(p, i)) 
      .flatMap(pair -> transmits(pair.getFirst()) 
        .flatMap(status -> status.equals("OK") 
          ? Observable.empty() 
          : Observable.error(new IllegalStateException())) 
        .doOnError(error -> System.out.println("Error at index " + pair.getSecond())) 
        .onErrorResumeNext(Observable.empty()), 1)) 
    .subscribe(); 

Alors que je suis mal à l'aise avec les étapes intérieures finales de la cartographie NOK à une erreur, il vous connecter et remappage puis de nouveau le résultat, il y a une raison pour cela. Le traitement de listOfPackets doit se terminer soit lorsque tous les paquets ont été traités ou quand un Status.NOK est vu; transformer Status.NOK en un indicateur d'erreur accomplit ceci. Cependant, nous voulons seulement que le traitement d'une liste individuelle se termine, et non l'ensemble de l'observable.

+0

flatMap() cette merde! Merci, très utile. – n1r3