2017-05-03 2 views
0

Je travaille sur une application lourdement chargée et j'aimerais avoir la possibilité de mettre les données dans un cache de manière asynchrone. Je veux dire quelque chose comme appeler l'opération sur un cache et recevoir IgniteFuture<V> représentant le résultat. Je veux dire que j'ai besoin de la capacité de paralléliser l'extraction de données d'un stockage de persistance et de les mettre dans un cache. Si quelque chose arrive, je peux essayer d'extraire les données une fois de plus (très bien, pas trop de données, assez fin).Ignite mettre les données dans un cache asynchronosuly

+0

Quelle est votre question? On dirait que vous avez fait une introduction mais avez oublié la dernière partie de la question? –

Répondre

1

Si vous n'avez pas besoin dur sur IgniteFuture (que je crois ne devrait pas être le cas) et tout ce que vous voulez est un mécanisme pour mettre les données dans un cache et l'obtenir traité de manière asynchrone, et traiter ensuite les résultats retourné par cette opération, vous pouvez utiliser Java's Executor Service.

Si vous n'êtes pas très au courant du service d'exécuteur de Java alors vous voudrez peut-être lire la documentation ou this answer qui met en évidence les points rapides, dans l'exemple ci-dessous j'ai également ajouté des commentaires.

Voici quelques autres points rapides sur le nombre de fils:

  • Cet exemple crée un ExecutorService avec « ThreadPoolExecutor » la mise en œuvre, il y a d'autres options comme « Executors.newSingleThreadExecutor() » et « Executors.newFixedThreadPool(10) » qui vous permet de définir combien threads que vous voulez dans la JVM.
  • Vous pouvez également choisir de créer directement l'objet de ThreadPoolExecutor comme ceci return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); et vous pouvez avoir plus de contrôle sur le nombre de threads et le type de file d'attente. Vous devrez en savoir plus sur cette option si vous voulez créer ThreadPoolExecutor objet vous-même et non par ExecutorService

Quelques autres points liés à vos réalisations:

  • Lorsque vous traitez les Future objets il est processus de blocage, car Future.get() est un appel bloquant, donc votre thread principal sera bloqué jusqu'à ce que tous les objets Future soient retournés et traités.
  • Si vous ne voulez pas de blocage, il existe plusieurs options: vous pouvez créer un nouveau thread pour effectuer tout ce traitement et ainsi libérer votre thread principal. Une autre option pourrait être de ne pas traiter les objets Future, donc dès que vous faites executorService.submit(callableTask1), votre thread sera libre, puis dans votre objet CallableTask vous pouvez pousser le résultat dans une file d'attente (vous pouvez choisir Java's queue implementation selon votre besoin) puis traiter cette file d'attente à partir d'un autre thread. Une autre option pourrait être de dédier un autre thread pour le traitement de vos objets Future.

Exemple de code:

import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class ExecutorServiceFutureCallableExample { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     List<Future<String>> futuresList = new ArrayList<>(); 

     ExecutorService executorService = Executors.newCachedThreadPool(); 
     ExecutorServiceFutureCallableExample.CallableTask callableTask1 = new ExecutorServiceFutureCallableExample.CallableTask(2000); 
     ExecutorServiceFutureCallableExample.CallableTask callableTask2 = new ExecutorServiceFutureCallableExample.CallableTask(1000); 
     ExecutorServiceFutureCallableExample.CallableTask callableTask3 = new ExecutorServiceFutureCallableExample.CallableTask(3000); 

     System.out.println("### Starting submitting tasks"); 

     // submit the callable and register the returned future object so that it can be processed later. 
     futuresList.add(executorService.submit(callableTask1)); 
     futuresList.add(executorService.submit(callableTask2)); 
     futuresList.add(executorService.submit(callableTask3)); 

     System.out.println("### Finished submitting tasks"); 

     for (int i = 0; i < futuresList.size(); i++) { 
      // here "get()" waits for the future tasks to be returned. 
      System.out.println(futuresList.get(i).get()); 
     } 

     System.out.println("### Finished."); 
    } 

    static class CallableTask implements Callable<String>{ 

     private long timeToSleep; 

     CallableTask(long _timeToSleep){ 
      this.timeToSleep = _timeToSleep; 
     } 

     @Override 
     public String call() throws Exception { 
      String str = new Date() + ": Processing - " + this.hashCode() + " | " + Thread.currentThread() + ", slept for seconds - " + timeToSleep; 
      System.out.println(str); 
      Thread.sleep(timeToSleep); 
      return str + " ||||| completed at: " + new Date(); 
     } 

     public long getTimeToSleep() { 
      return timeToSleep; 
     } 

     public void setTimeToSleep(long timeToSleep) { 
      this.timeToSleep = timeToSleep; 
     } 

    } 
}