2016-09-22 2 views
4

Comme nous le savons, ThreadPoolExecutor utilise certains BlockingQueue comme une file d'attente de tâches entrantes. Ce que je veux, c'est avoir ThreadPoolExecutor qui a une seconde file d'attente pour les résultats de la tâche qui sont prêts. Je veux utiliser cette file d'attente comme source pour les services d'entrée/sortie qui envoient ou stockent ces résultats. Pourquoi je veux créer une file d'attente séparée?Concurrence: comment implémenter un exécuteur avec des files d'attente entrantes et sortantes?

Parce que je veux découpler l'action d'envoyer des résultats de l'action d'obtenir des résultats. En outre, je suppose que les exceptions et les délais qui accompagnent les opérations d'entrée/sortie ne devraient pas affecter mon ThreadPoolExecutor qui calcule le résultat.

J'ai créé une implémentation naïve de cela. J'aimerais avoir des critiques à ce sujet. Peut-être, il peut être mis en œuvre avec des classes Java out-of-the-box mieux? J'utilise Java 7.

public class ThreadPoolWithResultQueue { 
    interface Callback<T> { 
     void complete(T t); 
    } 
    public abstract static class CallbackTask<T> implements Runnable { 
     private final Callback callback; 
     CallbackTask(Callback callback) { 
      this.callback = callback; 
     }  
     public abstract T execute(); 
     final public void run() { 
      T t = execute(); 
      callback.complete(t); 
     } 
    } 
    public static class CallBackTaskString extends CallbackTask<String> { 
     public CallBackTaskString(Callback callback) { 
      super(callback); 
     } 
     @Override 
     public String execute() { 
      try { 
       Thread.sleep(3000); 
      } catch (InterruptedException e) { 
      } 
      return hashCode() + "-" + System.currentTimeMillis(); 
     } 
    }  
    public static void main(String[] args) throws InterruptedException { 
     BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); 
     final BlockingQueue<String> resultQueue = new LinkedBlockingQueue<String>(); 
     Callback<String> addToQueueCallback = new Callback<String>() { 
      @Override 
      public void complete(String s) { 
       System.out.println("Adding Result To Queue " + s); 
       resultQueue.add(s); //adding to outgoing queue. some other executor (or same one?) will process it 
      } 
     }; 
     ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1000l, TimeUnit.DAYS, workQueue); 
     for (int i = 0; i <= 5; i++) { 
      executor.submit(new CallBackTaskString(addToQueueCallback)); 
     }; 
     System.out.println("All submitted."); 
     executor.shutdown(); 
     executor.awaitTermination(10l, TimeUnit.SECONDS); 
     System.out.println("Result queue size " + resultQueue.size()); 
    } 
} 
+0

Avez-vous mis tout ce 'Callback' supplémentaire là-dedans donc le code semble plus intelligent, ou avez-vous besoin/vouloir le faire de cette façon? Une implémentation simple implique simplement une file d'attente supplémentaire, et si vous savez comment mettre les choses dans des files d'attente, cela devrait être évident. – Kayaman

+4

Un mot: 'ExecutorCompletionService'. –

+0

Kayaman, oui c'est plus simple d'ajouter des résultats à la file d'attente sortante juste dans le Runnable à exécuter. Mais j'essaie de découpler autant que possible. Quoi qu'il en soit, Marko a suggéré une bonne chose. – MiamiBeach

Répondre

0

Par souci de makinf un composant de bibliothèque, vous devez envelopper les choses ...

Vous pouvez prolonger l'exécuteur pool de threads qui a un certain nombre de méthodes pour intercepter les tâches soumises, de sorte que vous mettriez la chose en file d'attente dans une file d'attente passée dans le constructeur. C'est en gros ExecutorCompletionService, mais vous permettriez à l'utilisateur de brancher une file d'attente au lieu d'apparaître comme une seule.

Dans le cas contraire, il s'agit d'un proxy typique de la tâche. Bon travail.