Dans ma demande, je suis en train de traiter les données dans IMap, le scénario est le suivant:EntryProcessor sans entrées de verrouillage
-
demande
- de recieves d'application (REST par exemple) avec jeu de clés à traiter l'application
- traite les entrées avec clé donnée et renvoie le résultat - carte où la clé est la clé originale de l'entrée et le résultat est calculé
pour ce scénario IMap.executeOnKeys
est presque parfait, avec un problème - l'entrée est verrouillée en cours de traitement - et vraiment ça fait mal thruput. L'IMap est rempli au démarrage et n'a jamais été modifié.
Est-il possible de traiter des entrées sans les verrouiller? Si possible sans envoyer des entrées à un autre nœud et sans provoquer une surcharge du réseau (envoyer 1000 tâches à un seul nœud en boucle for)
est ici mise en œuvre de référence pour démontrer ce que je suis en train de réaliser:
public class Main {
public static void main(String[] args) throws Exception {
HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, String> map = instance.getMap("the-map");
// populated once on startup, never modified
for (int i = 1; i <= 10; i++) {
map.put("key-" + i, "value-" + i);
}
Set<String> keys = new HashSet<>();
keys.add("key-1"); // every requst may have different key set, they may overlap
System.out.println(" ---- processing ----");
ForkJoinPool pool = new ForkJoinPool();
// to simulate parallel requests on the same entry
pool.execute(() -> map.executeOnKeys(keys, new MyEntryProcessor("first")));
pool.execute(() -> map.executeOnKeys(keys, new MyEntryProcessor("second")));
System.out.println(" ---- pool is waiting ----");
pool.shutdown();
pool.awaitTermination(5, TimeUnit.MINUTES);
System.out.println(" ------ DONE -------");
}
static class MyEntryProcessor implements EntryProcessor<String, String> {
private String name;
MyEntryProcessor(String name) {
this.name = name;
}
@Override
public Object process(Map.Entry<String, String> entry) {
System.out.println(name + " is processing " + entry);
return calculate(entry); // may take some time, doesn't modify entry
}
@Override
public EntryBackupProcessor<String, String> getBackupProcessor() {
return null;
}
}
}
Merci d'avance
de 'EntryProcessor' javadoc: « Au moment où vous le recevez l'entrée est verrouillée et non libéré jusqu'à ce que la EntryProcessor complète » et de' 'ReadOnly' et Offloadable' javadoc: Actuellement pris en charge dans: executeOnKey (..), submitToKey (..) Implémentation de ces interfaces et en utilisant executeOnKey() l'application fonctionne, mais j'ai besoin d'utiliser ensemble de clés, pas à la fois – AllanS
Fondamentalement, toutes les partitions sont divisées de sorte qu'un thread de partition possède plusieurs partitions. Donc, si une opération de partition arrive, elle est exécutée par son thread propriétaire et pendant que le thread exécute cette opération, elle n'acceptera aucune autre opération. Ceci fournit d'une certaine manière à votre code un accès sécurisé à la clé pour que l'entrée ne soit pas modifiée pendant l'exécution de votre code. – wildnez
Et si vous êtes après un débit plus élevé, l'utilisation de Offloadable devrait certainement aider. Convenu que ce n'est pas actuellement disponible pour executeOnKeys, mais le lancement de plusieurs EPs Offloadable pour toutes vos clés peut vous donner un meilleur débit que ce que vous obtenez pour le moment. – wildnez