1

Il existe un Broadcaster qui accepte les chaînes et les ajoute à un StringBuilder.Project Reactor: attente pendant la fin du diffuseur

Je veux le tester.

Je dois utiliser Thread#sleep pour attendre, tandis que le diffuseur finit le traitement des chaînes. Je veux supprimer sleep. J'ai tenté d'utiliser Control#debug() sans succès.

public class BroadcasterUnitTest { 

@Test 
public void test() { 
    //prepare 
    Environment.initialize(); 
    Broadcaster<String> sink = Broadcaster.create(Environment.newDispatcher()); //run broadcaster in separate thread (dispatcher) 
    StringBuilder sb = new StringBuilder(); 
    sink 
      .observe(s -> sleep(100)) //long-time operation 
      .consume(sb::append); 

    //do 
    sink.onNext("a"); 
    sink.onNext("b"); 

    //assert 
    sleep(500);//wait while broadcaster finished (if comment this line then the test will fail) 
    assertEquals("ab", sb.toString()); 
} 

private void sleep(int millis) { 
    try { 
     Thread.sleep(millis); 
    } catch (InterruptedException e) { 
     throw new RuntimeException(e); 
    } 
} 
} 

Répondre

0

Je ne suis pas familier avec Broadcaster (et il est probablement obsolète puisque la question est ancienne), mais ces 3 moyens pourraient être utiles en général:

  1. Lors du test Flux de Project-Reactor es et d'autres choses, vous êtes probablement mieux d'utiliser leur bibliothèque de test faite spécialement pour cela. Their reference et la Javadoc sur cette partie sont assez bon, et je vais juste copier un exemple qui parle pour lui-même ici:

    @Test 
    public void testAppendBoomError() { 
        Flux<String> source = Flux.just("foo", "bar"); 
        StepVerifier.create( 
        appendBoomError(source)) 
        .expectNext("foo") 
        .expectNext("bar") 
        .expectErrorMessage("boom") 
        .verify(); 
    } 
    
  2. Vous pourriez block() par vous-même sur les Flux es et Mono s puis exécuter des contrôles . Et notez que si une erreur est émise, cela entraînera une exception. Mais vous avez le sentiment que vous aurez besoin d'écrire plus de code pour certains cas (par exemple, vérifier Flux a émis 2 éléments X & Y puis s'est terminé avec une erreur) et vous réinstallez StepVerifier.

    @Test 
    public void testFluxOrMono() { 
        Flux<String> source = Flux.just(2, 3); 
        List<Integer> result = source 
         .flatMap(i -> multiplyBy2Async(i)) 
         .collectList() 
         .block(); 
        // run your asserts on the list. Reminder: the order may not be what you expect because of the `flatMap` 
        // Or with a Mono: 
        Integer resultOfMono = Mono.just(5) 
         .flatMap(i -> multiplyBy2Async(i)) 
         .map(i -> i * 4) 
         .block(); 
        // run your asserts on the integer 
    } 
    
  3. Vous pouvez utiliser les solutions générales à Async tests comme CountDownLatch, mais, encore une fois, je ne recommande pas et vous donnerait du mal dans certains cas. Par exemple, si vous ne connaissez pas le nombre de récepteurs à l'avance, vous devrez utiliser autre chose.