2017-09-30 1 views
0

Dans ma demande, je suis en train de traiter les données dans IMap, le scénario est le suivant:EntryProcessor sans entrées de verrouillage

    demande
  1. de recieves d'application (REST par exemple) avec jeu de clés à traiter
  2. l'application
  3. traite les entrées avec clé donnée et renvoie le résultat - carte où la clé est la clé originale de l'entrée et le résultat est calculé

pour ce scénario IMap.executeOnKeys est presque parfait, avec un problème - l'entrée est verrouillée en cours de traitement - et vraiment ça fait mal thruput. L'IMap est rempli au démarrage et n'a jamais été modifié.

Est-il possible de traiter des entrées sans les verrouiller? Si possible sans envoyer des entrées à un autre nœud et sans provoquer une surcharge du réseau (envoyer 1000 tâches à un seul nœud en boucle for)

est ici mise en œuvre de référence pour démontrer ce que je suis en train de réaliser:

public class Main { 
    public static void main(String[] args) throws Exception { 
     HazelcastInstance instance = Hazelcast.newHazelcastInstance(); 

     IMap<String, String> map = instance.getMap("the-map"); 

     // populated once on startup, never modified 
     for (int i = 1; i <= 10; i++) { 
      map.put("key-" + i, "value-" + i); 
     } 

     Set<String> keys = new HashSet<>(); 
     keys.add("key-1"); // every requst may have different key set, they may overlap 

     System.out.println(" ---- processing ----"); 
     ForkJoinPool pool = new ForkJoinPool(); 
     // to simulate parallel requests on the same entry 
     pool.execute(() -> map.executeOnKeys(keys, new MyEntryProcessor("first"))); 
     pool.execute(() -> map.executeOnKeys(keys, new MyEntryProcessor("second"))); 

     System.out.println(" ---- pool is waiting ----"); 
     pool.shutdown(); 
     pool.awaitTermination(5, TimeUnit.MINUTES); 

     System.out.println(" ------ DONE -------"); 
    } 

    static class MyEntryProcessor implements EntryProcessor<String, String> { 
     private String name; 

     MyEntryProcessor(String name) { 
      this.name = name; 
     } 

     @Override 
     public Object process(Map.Entry<String, String> entry) { 
      System.out.println(name + " is processing " + entry); 
      return calculate(entry); // may take some time, doesn't modify entry 
     } 

     @Override 
     public EntryBackupProcessor<String, String> getBackupProcessor() { 
      return null; 
     } 
    } 
} 

Merci d'avance

Répondre

0

Dans executeOnKeys, les entrées ne sont pas verrouillées. Peut-être que vous voulez dire que le traitement se passe sur partitionThreads, de sorte qu'il ne peut y avoir aucun autre traitement pour la clé particulière? Quoi qu'il en soit, voici la solution:

Votre EntryProcessor devrait mettre en œuvre:

  • Offloadable Interface -> cela signifie que la partition-fil ne sera utilisée que pour la lecture de la valeur. Le calcul sera effectué dans le pool de threads de déchargement. Interface -> dans ce cas, l'EP ne sautera plus sur la partition-thread pour sauvegarder la modification que vous auriez pu faire dans l'entrée. Puisque votre EP ne modifie pas les entrées, cela augmentera les performances.
+0

de 'EntryProcessor' javadoc: « Au moment où vous le recevez l'entrée est verrouillée et non libéré jusqu'à ce que la EntryProcessor complète » et de' 'ReadOnly' et Offloadable' javadoc: Actuellement pris en charge dans: executeOnKey (..), submitToKey (..) Implémentation de ces interfaces et en utilisant executeOnKey() l'application fonctionne, mais j'ai besoin d'utiliser ensemble de clés, pas à la fois – AllanS

+0

Fondamentalement, toutes les partitions sont divisées de sorte qu'un thread de partition possède plusieurs partitions. Donc, si une opération de partition arrive, elle est exécutée par son thread propriétaire et pendant que le thread exécute cette opération, elle n'acceptera aucune autre opération. Ceci fournit d'une certaine manière à votre code un accès sécurisé à la clé pour que l'entrée ne soit pas modifiée pendant l'exécution de votre code. – wildnez

+0

Et si vous êtes après un débit plus élevé, l'utilisation de Offloadable devrait certainement aider. Convenu que ce n'est pas actuellement disponible pour executeOnKeys, mais le lancement de plusieurs EPs Offloadable pour toutes vos clés peut vous donner un meilleur débit que ce que vous obtenez pour le moment. – wildnez