2017-09-28 9 views
0

Lorsque j'utilise Reactive Streams (https://github.com/reactor/reactor-core) avec un Publisher personnalisé associé à la fonction publishOn, j'obtiens toujours un NPE. Quel est le problème avec mon code? Est-ce que j'utilise le Publisher dans le mauvais sens?ReactiveStreams NPE lors de l'utilisation de publishOn avec Publisher personnalisé

Flux.from(MyPublisher()) 
      .publishOn(Schedulers.single()) 
      .subscribe { println("<-- $it received") } 

class MyPublisher : Publisher<Int> { 
    override fun subscribe(sub: Subscriber<in Int>) { 
     while (true) { 
      Thread.sleep(300) 
      sub.onNext(1) 
     } 
    } 
} 

Exception est:

Exception in thread "main" java.lang.NullPointerException 
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212) 
    at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18) 
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52) 
    at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6447) 
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6440) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6404) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6347) 
    at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11) 
+0

S'il vous plaît ne pas se contenter de downvote, mais laissez un commentaire utile pourquoi vous pensez que cette question est mauvaise. – guenhter

Répondre

2

Publisher est défini par la norme et a "réactif" flux d'un certain nombre d'exigences. Une de ces exigences est que Subscriber.onSubscribe doit être appelé avant l'une des autres méthodes afin de suivre le protocole. Comme vous ne l'avez pas fait, cela signifie que quelque chose n'a probablement pas été initialisé correctement, ce qui a provoqué la présence de NPE dans la classe du réacteur.

Toutefois, même si vous résolvez ce problème, la norme est conçue pour être réactive, ce qui signifie qu'elle n'émet que des données lorsque l'abonné le demande. Comme vous enverrez des données, cela entraînera probablement une exception plus tard dans la ligne. Utilisez Flux.create pour créer un émetteur capable de gérer correctement les requêtes au lieu de créer votre propre implémentation Publisher.