2017-07-11 1 views
8

Supposons que j'ai ce code:flatMap toujours parallèle séquentielle

Collections.singletonList(10) 
      .parallelStream() // .stream() - nothing changes 
      .flatMap(x -> Stream.iterate(0, i -> i + 1) 
        .limit(x) 
        .parallel() 
        .peek(m -> { 
         System.out.println(Thread.currentThread().getName()); 
        })) 
      .collect(Collectors.toSet()); 

sortie est le même nom de fil, donc il n'y a aucun avantage de parallel ici - ce que je veux dire par là qu'il ya un seul thread qui fait tout le travail.

intérieur flatMap il y a ce code:

result.sequential().forEach(downstream); 

Je comprends forçais la sequential propriété si le flux « extérieur » serait parallèle (ils pourraient probablement bloc), « extérieur » devraient attendre « flatMap "pour finir et l'inverse (puisque le même pool commun est utilisé) Mais pourquoi toujours force cela?

Est-ce une de ces choses que pourrait changer dans une version ultérieure?

Répondre

8

Il existe deux aspects différents. Tout d'abord, il n'y a qu'un seul pipeline séquentiel ou parallèle. Le choix de séquentiel ou parallèle au flux interne est sans importance. Notez que le consommateur downstream que vous voyez dans l'extrait de code cité représente l'ensemble du pipeline de flux suivant, donc dans votre code, se terminant par .collect(Collectors.toSet());, ce consommateur finira par ajouter les éléments résultants à une seule instance Set qui n'est pas thread-safe. Donc, le traitement du flux interne en parallèle avec ce seul consommateur casserait toute l'opération.

Si un flux externe se scinde, ce code cité peut être appelé simultanément avec différents consommateurs ajoutant à différents ensembles. Chacun de ces appels traiterait un élément différent du mappage de flux externe vers une instance de flux interne différente. Puisque votre flux externe est constitué d'un seul élément, il ne peut pas être divisé.

La façon dont cela a été implémenté est également la raison du problème Why filter() after flatMap() is “not completely” lazy in Java streams?, car forEach est appelée sur le flux interne qui transmet tous les éléments au consommateur en aval. Comme démontré par this answer, une implémentation alternative, supportant la paresse et la division du sous-flux, est possible. Mais c'est une manière fondamentalement différente de l'implémenter. La conception actuelle de l'implémentation Stream fonctionne principalement par la composition du consommateur, donc à la fin, le splitterator source (et ceux qui en sont séparés) reçoit un Consumer représentant l'ensemble du pipeline de flux dans tryAdvance ou forEachRemaining. En revanche, la solution de la réponse liée fait la composition du spliterator, en produisant une nouvelle Spliterator déléguant aux spliterators de source. J'ai supposé, les deux approches ont des avantages et je ne suis pas sûr, combien l'implémentation d'OpenJDK perdrait en travaillant l'inverse.

+0

Salut, monsieur. est-ce que ça devrait être un bug de stream? –

+1

@ holi-java Je ne dirais pas que c'est un bug, juste une mauvaise conception de l'implémentation qui sera probablement réparée à l'avenir. –

+5

@ holi-java: la paresse manquante peut être considérée comme un bug et il existe déjà un rapport de bug. La parallélisation limitée, cependant, est juste un domaine pour l'amélioration de la performance potentielle. En pratique, cela n'affecte que les cours d'eau ayant un petit nombre d'éléments dans le cours d'eau extérieur et des cours d'eau intérieurs beaucoup plus grands. – Holger