2017-10-12 4 views
3

Chaque intervalle Je récupère les tweets avec une certaine requête. Ces tweets doivent être transmis à des services qui calculent et manipulent ces tweets. Donc, ces services sont souscrits auprès de mon éditeur. Donc publisher.hasSubscribers() renvoie true. Mais la fonction submit ou offer n'invoque pas le onNext de mon abonné. Donc, en tant que "correctif", je parcours mes abonnés et je les invoque moi-même. Mais cela ne devrait pas être le cas.SubmissionPublisher sur submit ne pas invoquer onNext de l'abonné

Ceci est le constructeur de mon éditeur.

public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){ 
    super(executor, maxBufferCapacity); 
    this.searchQuery = searchQuery; 
    scheduler = new ScheduledThreadPoolExecutor(1); 
    this.tweetGetter = scheduler.scheduleAtFixedRate(
      () -> { 
       List<String> tweets = getTweets(searchQuery); 
       /* this.lastCall = LocalDateTime.now(); 
       for(Flow.Subscriber sub : this.getSubscribers()){ 
        sub.onNext(tweets); 
       }*/ 
       this.submit(tweets); 
       if(tweets.size() >= 20) this.close(); 
      }, 0, period, unit); 
} 

Ceci est mon abonné

package myFlowAPI; 

import Interfaces.IProcess; 
import Services.LogToFileService; 

import java.util.List; 
import java.util.concurrent.Flow; 
import java.util.concurrent.atomic.AtomicInteger; 

public class MySubscriber implements Flow.Subscriber<List<String>> { 
private Flow.Subscription subscription; 
private AtomicInteger count; 

private IProcess processor; 

private String name; 
private int DEMAND = 0; 

public MySubscriber(String name, IProcess processor){ 
    this.name = name; 
    this.processor = processor; 
} 

@Override 
public void onSubscribe(Flow.Subscription subscription) { 
    this.subscription = subscription; 
} 


@Override 
public void onNext(List<String> item) { 
    Object result = this.processor.process(item); 
    this.readResult(result); 

    switch (this.processor.getClass().getSimpleName()){ 
     case "CalculateTweetStatsService": 
      if((Integer) result >= 20){ 
       this.subscription.cancel(); 
      } 
      break; 
    } 
} 

@Override 
public void onError(Throwable throwable) { 
    System.out.println("Error is thrown " + throwable.getMessage()); 
} 

@Override 
public void onComplete() { 
    if(this.processor instanceof LogToFileService){ 
     ((LogToFileService) processor).closeResource(); 
    } 
    System.out.println("complete"); 
} 

private void readResult(Object result){ 
    System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString()); 
} 
} 

C'est la principale où je souscris à l'éditeur

public static void main(String[] args) { 
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); 

    String searchQuery; 
    try{ 
     searchQuery = args[0] != null ? args[0] : "#capgemini50"; 
    }catch (ArrayIndexOutOfBoundsException ex){ 
     searchQuery = "#capgemini50"; 
    } 

    TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery); 

    MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt")); 
    MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService()); 
    streamer.subscribe(subscriber1); 
    streamer.subscribe(subscriber2); 

} 

Répondre

4

Vous devez l'abonné de demander explicitement des données par exemple sur abonnement (voir http://download.java.net/java/jdk9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long-):

@Override 
public void onSubscribe(Flow.Subscription subscription) { 
    this.subscription = subscription; 
    this.subscription.request(1); 
} 

Même lors d'un traitement dans OnNext() pour demander l'élément suivant.

+2

Mon héros. Merci beaucoup!! '@Override public void onNext (Liste article) { Résultat de l'objet = this.processor.process (item); this.readResult (résultat); commutateur (this.processor.getClass() getSimpleName (.)) { cas "CalculateTweetStatsService": if ((résultat entier)> = 20) { this.subscription.cancel(); } pause; } cette.souscription.inscription (1); } ' – user1008531