2017-08-12 5 views
1

Le code suivant divise un flux d'objets en blocs de 1 000, les traite lors de la matérialisation et renvoie le nombre total d'objets à la fin.Le regroupement StreamEx dans des listes renvoie un nombre incorrect d'enregistrements

Dans tous les cas, le nombre affiché est correct à moins que la taille du cours d'eau se trouve être 1. Dans le cas, la taille du flux est 1, le nombre retourné est 0.

Toute aide serait grandement appréciée. J'ai également dû pirater l'appel de retour dans le cas où il n'y a pas d'enregistrements dans le flux à 0. Je voudrais résoudre ce problème aussi.

AtomicInteger recordCounter = new AtomicInteger(0); 
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) { 
     stream.groupRuns((prev, next) -> recordCounter.incrementAndGet() % 1000 != 0) 
       .forEach((chunk) -> 
         { 
          //... process each chunk 
         } 
      ); 
    } catch(Exception e) { 
     throw new MyRuntimeException("Failure streaming...", e); 
    } finally { 
     myObjects.close(); 
    } 

return recordCounter.get() == 0 ? 0 : recordCounter.incrementAndGet(); 
+0

pourquoi avez-vous revenir 'contre + 1' et non' counter'? – wargre

+0

Parce que sinon, il renvoie toujours 1 moins que ce qu'il est censé faire. –

Répondre

0

En fin de compte, je suis allé avec Iterators.partition() de goyave pour partager mon flux d'objets en morceaux:

MutableInt recordCounter = new MutableInt(); 
try { 
    Iterators.partition(myObjects.iterator(), 1000) 
      .forEachRemaining((chunk) -> { 
         //process each chunk 
         ... 
         recordCounter.add(chunk.size()); 
      }); 
} catch (Exception e) { 
    throw new MyRuntimeException("Failure streaming...", e); 
} finally { 
    myObjects.close(); 
} 

return recordCounter.getValue(); 
0

Originally compteur a été utilisé pour savoir quand diviser les morceaux et il n'est pas fiable pour compter le nombre total d'objets. Lorsque le flux a la taille 0 ou 1, la fonction groupRuns n'est pas exécutée.

Vous avez donc besoin d'un autre moyen de compter les objets. Au lieu de simplement consommer des articles dans forEach vous pouvez retourner le nombre d'objets traités chunk.size() et les sum à la fin

AtomicInteger counter = new AtomicInteger(0); 
    try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) { 
     return stream 
       .groupRuns((prev, next) -> counter.incrementAndGet() % 1000 != 0) 
       .mapToLong((chunk) -> { 
        //... process each chunk 
        return chunk.size(); 
       }) 
       .sum(); 
    } catch(Exception e) { 
     throw new MyRuntimeException("Failure streaming...", e); 
    } finally { 
     myObjects.close(); 
    } 
+0

Ok, pour suivre ceci - La modification que vous mentionnez ci-dessus ajoute 100% de temps d'exécution à la méthode originale. Il aborde le problème mais prend beaucoup trop de temps! –

0

@Nazarii Bardiuk expliqué, pourquoi il ne fonctionne pas. Je rencontre les exigences similaires pour diviser le flux avant. Alors je l'ai fourchu et fait quelques changements à: StreamEx-0.8.7. Voici un exemple simple:

int count = IntStreamEx.range(0, 10).boxed().splitToList(3).mapToInt(chunk -> { 
    System.out.println(chunk); 
    return chunk.size(); 
}).sum(); 

System.out.println(count); 

Si vous êtes au début de votre projet, vous pouvez essayer, et le code sera:

try (StreamEx<MyObject> stream = StreamEx.of(myObjects).onClose(() -> myObjects.close())) { 
    return stream.splitToList(1000) 
       .mapToInt((chunk) -> { 
           //... process each chunk 
        return chunk.size(); 
        }).sum(); 
} 
1

Comme JavaDoc dit:

sameGroup - un prédicat sans interférence et sans état à appliquer à la paire d'éléments adjacents qui renvoie true pour les éléments appartenant au même groupe.

Le prédicat doit être sans état, ce qui n'est pas votre cas. Vous utilisez abusivement la méthode, c'est pourquoi vous ne pouvez pas obtenir un résultat attendu. Cela fonctionne de près à ce que vous voulez par hasard, vous ne pouvez pas compter sur ce comportement, cela pourrait changer dans les versions futures de StreamEx.

+0

Nous utilisons plutôt Guava –