2017-08-18 1 views
0

Dites, SomeOtherService dans un Veritcle utiliser UserService dans différents Verticle, la communication se produit sur le bus d'événements. pour le représenter:Vert.x avec mise en cache Rx-Java pour un appel de service

class SomeOtherService { 

    final UserService userService = new UserService(); 

    // Mutable state 
    final Map<String, Single<String>> cache = new HashMap(); // Not Synchronized ? 

    public Single<String> getUserSessionInfo(String id) { 
     // Seems it is not save ! : 
     return cache.computeIfAbsent(id, _id -> { 
       log("could not find " + id + " in cache. Connecting to userService..."); 
       return userService.getUserSessionInfo(id); // uses generated proxy to send msg to the event bus to call it 
      } 
     ); 
    } 
} 

// Quelque part dans un autre verticle/micro service sur une autre machine.

class UserService { 

    public Single<String> getUserSessionInfo(String id) { 
     return Single.fromCallable(() -> { 

      waitForOneSecond(); 

      log("getUserSessionInfo for " + id); 

      if (id.equals("1")) 
       return "one"; 
      if (id.equals("2")) 
       return "two"; 

      else throw new Exception("could not"); // is it legal? 

      } 
     ); 
    } 

Et le code client, où nous adhérons et décider de l'ordonnanceur:

final Observable<String> obs2 = Observable.from(new String[] {"1", "1"}); 


     // Emulating sequential call of 'getUserSessionInfo' to fork in separate scheduler A 
     obs.flatMap(id -> { 
        log("flatMap"); // on main thread 
        return someOtherService.getUserSessionInfo(id) 
              .subscribeOn(schedulerA) // Forking. will thread starvation happen? (since we have only 10 threads in the pool) 
              .toObservable(); 
       } 
     ).subscribe(
       x -> log("next: " + x) 
     ); 

La question est, à quel point est la solution à utiliser HashMap pour le cache (puisqu'il est l'état partagé ici) en utilisant la méthode computeIfAbsent?

Même si nous utilisons l'événement en boucle & Event Bus il ne nous sauvera pas d'un état partagé et problème de concurrence possible, en supposant que log-opération (comme getUserSessionInfo(id) arrive dans le planificateur/threads séparés?

Dois-je utiliser ReplySubject au lieu de mettre en œuvre la mise en cache? Qu'est-ce que les meilleures pratiques pour vert.x + rx-java?

Paraît que huard cache.computeIfAbsent est exécuté sur EventLoop il est sûr car il est séquentiel?

Désolé .. beaucoup de questi ons, je pense que je peux me limiter à: Quelles sont les meilleures pratiques pour mettre en œuvre les appels Cash for the Service dans Vert.x et Rx-Java?

L'exemple est tout here:

+0

Je pense avoir trouvé ma réponse ici: http: //blog.danl ew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ - Observable source = Observable .concat (mémoire, diskWithCache, networkWithSave) .first(); et quand je l'enregistre en utilisant map.put (..) explicitement au lieu d'utiliser computeIfAbsent – ses

Répondre

0

Je pense avoir trouvé ma réponse ici: http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ -

Observable<Data> source = Observable .concat(memory, diskWithCache, networkWithSave) .first(); 

et quand je l'enregistrer en utilisant map.put (..) explicitement au lieu d'utiliser computeIfAbsent

et comme journal que je suis sur la boucle d'événement, je suis sûr à utiliser un-carte en espèces synchronisées à travers