0

J'ai un flux de (limité) Flux que je veux transformer en un flux de Long, où le Long est la taille du flux intérieur:Comment s'abonner à Flux interne/Mono automatiquement?

Flux.just(Flux.just(1, 2, 3), Flux.just(1, 2) ) 
     .map(Flux::count) 
     .log() 
     .subscribe(); 

Le journal d'exécution est le suivant:

onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 
request(unbounded) 
onNext({ "operator" : "Count" }) 
onNext({ "operator" : "Count" }) 
onComplete() 

Flux: count renvoie un mono, pas un long. Y a-t-il des opérateurs pour déballer automatiquement ce mono interne lors de l'abonnement au flux principal?

Répondre

1

flatMap() est là pour vous:

Transform les éléments émis par cette Flux dans Publisher s de manière asynchrone, aplatirez ces éditeurs internes en un seul Flux par la fusion, qui leur permettent de intercalent.

https://projectreactor.io/docs/core/snapshot/api/reactor/core/publisher/Flux.html#flatMap

+0

Merci pour la réponse! le problème que j'essaie de résoudre est en fait légèrement différent: quel opérateur dois-je utiliser si le mono interne n'est pas directement dans le flux parent: 'Flux >>' 'Flux > 'Peut-on encore utiliser flatmap dans ce cas? –

+1

Hm. Non, je pense que celui-ci ne va pas fonctionner. C'est la valeur imbriquée du 'Tuple' et je ne suis pas sûr qu'il y ait quelque chose pour traiter sa part à la demande. –

+1

Enfin trouvé une solution sans abonnement au flux interne/mono. Après groupBy je chaîner l'opérateur suivant et je crée le tuple après avoir compté les éléments du groupe: '.flatMap (group -> group.count(). Map (count -> Tuples.of (group.key(), count))); ' –