2010-01-28 5 views
25

J'ai un processus qui délègue des tâches asynchrones à un pool de threads. Je dois m'assurer que certaines tâches sont exécutées dans l'ordre. Ainsi, par exempleContrôle de l'ordre d'exécution des tâches avec ExecutorService

tâches arrivent dans l'ordre

Tâches a1, b1, c1, d1, e1, a2, a3, b2, f1

Les tâches peuvent être exécutées dans un ordre quelconque, sauf lorsqu'il y a un la dépendance naturelle, donc a1, a2, a3 doivent être traitées dans cet ordre soit en allouant au même thread ou en les bloquant jusqu'à ce que je sache que la tâche précédente # a été complétée.

Actuellement, il n'utilise pas le package Java Concurrency, mais j'envisage de changer pour profiter de la gestion des threads.

Quelqu'un at-il une solution similaire ou des suggestions sur la façon d'y parvenir

Répondre

2

Lorsque vous soumettez une Runnable ou Callable à un ExecutorService vous recevez un Future en retour. Avez-les threads qui dépendent de a1 être passé Future a1 et appelez Future.get(). Cela bloquera jusqu'à ce que le thread se termine.

Alors:

ExecutorService exec = Executor.newFixedThreadPool(5); 
Runnable a1 = ... 
final Future f1 = exec.submit(a1); 
Runnable a2 = new Runnable() { 
    @Override 
    public void run() { 
    f1.get(); 
    ... // do stuff 
    } 
} 
exec.submit(a2); 

et ainsi de suite.

+4

Je ne pense pas que cela fonctionnera avec un pool de threads fixe, comme les fils pourraient tous sur le bloc 'f1.get()' à la fois et dans l'impasse. – finnw

+0

Réglez la taille de la piscine selon le cas. – cletus

+0

Ou utilisez un pool de threads mis en cache. – finnw

2

Une autre option consiste à créer votre propre exécuteur, appelez-le OrderedExecutor et créez un tableau d'objets ThreadPoolExecutor encapsulés, avec 1 thread par exécuteur interne. Vous fournissez ensuite un mécanisme pour choisir un des objets internes, par exemple, vous pouvez le faire en fournissant une interface que l'utilisateur de votre classe peut implémenter:

 
executor = new OrderedExecutor(10 /* pool size */, new OrderedExecutor.Chooser() { 
    public int choose(Runnable runnable) { 
    MyRunnable myRunnable = (MyRunnable)runnable; 
    return myRunnable.someId(); 
    }); 

executor.execute(new MyRunnable()); 

La mise en œuvre de OrderedExecutor.execute() utilisera alors le Sélecteur pour obtenir un int, vous modifiez ceci avec la taille du pool, et c'est votre index dans le tableau interne. L'idée étant que "someId()" retournera la même valeur pour tous les "a", etc.

12

Quand je l'ai fait dans le passé, j'ai généralement eu la commande traitée par un composant qui soumet ensuite callables/runnables à un Executor.

Quelque chose comme.

  • Vous avez une liste des tâches à exécuter, certaines avec dépendances
  • Créer un Exécuteur et enveloppez avec un ExecutorCompletionService
  • Rechercher toutes les tâches, tout sans dépendances, les programmer via le service d'achèvement
  • Sondage le service d'achèvement
  • Comme chaque tâche complète
    • Ajouter à un « complété » liste
    • Réévaluer toutes les tâches en attente par rapport à la "liste terminée" pour voir si elles sont "dépendance terminée".Si oui les programmer
    • Rinse répétition jusqu'à ce que toutes les tâches sont présentées/complétées

Le service d'achèvement est une belle façon d'être en mesure d'obtenir les tâches qu'ils accomplissent plutôt que d'essayer d'interroger un groupe de Futures. Cependant, vous voudrez probablement conserver un Map<Future, TaskIdentifier> qui est rempli lorsqu'une tâche est planifiée via le service d'achèvement, de sorte que lorsque le service d'achèvement vous donne un Avenir terminé, vous pouvez déterminer quel est le TaskIdentifier.

Si vous vous trouvez dans un état où les tâches sont toujours en attente d'exécution, mais rien ne fonctionne et rien ne peut être planifié, alors vous avez un problème de dépendance circulaire.

9

J'écris mon propre Executor qui garantit la commande des tâches pour les tâches avec la même clé. Il utilise la carte des files d'attente pour les tâches de commande avec la même clé. Chaque tâche à clé exécute la tâche suivante avec la même clé.

Cette solution ne gère pas RejectedExecutionException ou d'autres exceptions de l'Executor délégué! Ainsi, l'exécuteur délégué devrait être "illimité".

import java.util.HashMap; 
import java.util.LinkedList; 
import java.util.Map; 
import java.util.Queue; 
import java.util.concurrent.Executor; 

/** 
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly). 
*/ 
public class OrderingExecutor implements Executor{ 

    private final Executor delegate; 
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>(); 

    public OrderingExecutor(Executor delegate){ 
     this.delegate = delegate; 
    } 

    @Override 
    public void execute(Runnable task) { 
     // task without key can be executed immediately 
     delegate.execute(task); 
    } 

    public void execute(Runnable task, Object key) { 
     if (key == null){ // if key is null, execute without ordering 
      execute(task); 
      return; 
     } 

     boolean first; 
     Runnable wrappedTask; 
     synchronized (keyedTasks){ 
      Queue<Runnable> dependencyQueue = keyedTasks.get(key); 
      first = (dependencyQueue == null); 
      if (dependencyQueue == null){ 
       dependencyQueue = new LinkedList<Runnable>(); 
       keyedTasks.put(key, dependencyQueue); 
      } 

      wrappedTask = wrap(task, dependencyQueue, key); 
      if (!first) 
       dependencyQueue.add(wrappedTask); 
     } 

     // execute method can block, call it outside synchronize block 
     if (first) 
      delegate.execute(wrappedTask); 

    } 

    private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
     return new OrderedTask(task, dependencyQueue, key); 
    } 

    class OrderedTask implements Runnable{ 

     private final Queue<Runnable> dependencyQueue; 
     private final Runnable task; 
     private final Object key; 

     public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
      this.task = task; 
      this.dependencyQueue = dependencyQueue; 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      try{ 
       task.run(); 
      } finally { 
       Runnable nextTask = null; 
       synchronized (keyedTasks){ 
        if (dependencyQueue.isEmpty()){ 
         keyedTasks.remove(key); 
        }else{ 
         nextTask = dependencyQueue.poll(); 
        } 
       } 
       if (nextTask!=null) 
        delegate.execute(nextTask); 
      } 
     } 
    } 
} 
+0

+1. Merci pour ça.Je vais utiliser cette implantation, mais je ne sais vraiment pas comment cela n'est pas marqué comme la réponse finale à la question. –

0

Dans Habanero-Java library, il existe un concept de tâches basées sur les données qui peuvent être utilisées pour exprimer des dépendances entre les tâches et d'éviter les opérations de filetage de blocage. Sous les couvertures, la bibliothèque Habanero-Java utilise les JDKs ForkJoinPool (c'est-à-dire un ExecutorService).

Par exemple, votre cas d'utilisation pour les tâches A1, A2, A3, ... pourrait se traduire comme suit:

HjFuture a1 = future(() -> { doA1(); return true; }); 
HjFuture a2 = futureAwait(a1,() -> { doA2(); return true; }); 
HjFuture a3 = futureAwait(a2,() -> { doA3(); return true; }); 

Notez que a1, a2, et a3 ne sont que des références à des objets de type HjFuture et peut être conservé dans vos structures de données personnalisées pour spécifier les dépendances au fur et à mesure que les tâches A2 et A3 arrivent à l'exécution.

Il existe quelques tutorial slides available. Vous trouverez d'autres documentations sous les références javadoc, API summary et primers.

0

Vous pouvez utiliser Executors.newSingleThreadExecutor(), mais il n'utilisera qu'un seul thread pour exécuter vos tâches. Une autre option consiste à utiliser CountDownLatch. Voici un exemple simple:

public class Main2 { 

public static void main(String[] args) throws InterruptedException { 

    final CountDownLatch cdl1 = new CountDownLatch(1); 
    final CountDownLatch cdl2 = new CountDownLatch(1); 
    final CountDownLatch cdl3 = new CountDownLatch(1); 

    List<Runnable> list = new ArrayList<Runnable>(); 
    list.add(new Runnable() { 
     public void run() { 
      System.out.println("Task 1"); 

      // inform that task 1 is finished 
      cdl1.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 1 is finished 
      try { 
       cdl1.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 2"); 

      // inform that task 2 is finished 
      cdl2.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 2 is finished 
      try { 
       cdl2.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 3"); 

      // inform that task 3 is finished 
      cdl3.countDown(); 
     } 
    }); 

    ExecutorService es = Executors.newFixedThreadPool(200); 
    for (int i = 0; i < 3; i++) { 
     es.submit(list.get(i)); 
    } 

    es.shutdown(); 
    es.awaitTermination(1, TimeUnit.MINUTES); 
} 
} 
0

J'ai créé un OrderingExecutor pour ce problème. Si vous passez la même clé à la méthode execute() avec des runnables différents, l'exécution des runnables avec la même clé sera dans l'ordre où execute() est appelée et ne se chevauchera jamais.

import java.util.Arrays; 
import java.util.Collection; 
import java.util.Iterator; 
import java.util.Queue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.ConcurrentMap; 
import java.util.concurrent.Executor; 

/** 
* Special executor which can order the tasks if a common key is given. 
* Runnables submitted with non-null key will guaranteed to run in order for the same key. 
* 
*/ 
public class OrderedExecutor { 

    private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
      new ConcurrentLinkedQueue<Runnable>()); 

    private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>(); 
    private Executor delegate; 
    private volatile boolean stopped; 

    public OrderedExecutor(Executor delegate) { 
     this.delegate = delegate; 
    } 

    public void execute(Runnable runnable, Object key) { 
     if (stopped) { 
      return; 
     } 

     if (key == null) { 
      delegate.execute(runnable); 
      return; 
     } 

     Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> { 
      v.add(runnable); 
      return v; 
     }); 
     if (queueForKey == null) { 
      // There was no running task with this key 
      Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>()); 
      newQ.add(runnable); 
      // Use putIfAbsent because this execute() method can be called concurrently as well 
      queueForKey = taskMap.putIfAbsent(key, newQ); 
      if (queueForKey != null) 
       queueForKey.add(runnable); 
      delegate.execute(new InternalRunnable(key)); 
     } 
    } 

    public void shutdown() { 
     stopped = true; 
     taskMap.clear(); 
    } 

    /** 
    * Own Runnable used by OrderedExecutor. 
    * The runnable is associated with a specific key - the Queue&lt;Runnable> for this 
    * key is polled. 
    * If the queue is empty, it tries to remove the queue from taskMap. 
    * 
    */ 
    private class InternalRunnable implements Runnable { 

     private Object key; 

     public InternalRunnable(Object key) { 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      while (true) { 
       // There must be at least one task now 
       Runnable r = taskMap.get(key).poll(); 
       while (r != null) { 
        r.run(); 
        r = taskMap.get(key).poll(); 
       } 
       // The queue emptied 
       // Remove from the map if and only if the queue is really empty 
       boolean removed = taskMap.remove(key, EMPTY_QUEUE); 
       if (removed) { 
        // The queue has been removed from the map, 
        // if a new task arrives with the same key, a new InternalRunnable 
        // will be created 
        break; 
       } // If the queue has not been removed from the map it means that someone put a task into it 
        // so we can safely continue the loop 
      } 
     } 
    } 

    /** 
    * Special Queue implementation, with equals() and hashCode() methods. 
    * By default, Java SE queues use identity equals() and default hashCode() methods. 
    * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()). 
    * 
    * @param <E> The type of elements in the queue. 
    */ 
    private static class QueueWithHashCodeAndEquals<E> implements Queue<E> { 

     private Queue<E> delegate; 

     public QueueWithHashCodeAndEquals(Queue<E> delegate) { 
      this.delegate = delegate; 
     } 

     public boolean add(E e) { 
      return delegate.add(e); 
     } 

     public boolean offer(E e) { 
      return delegate.offer(e); 
     } 

     public int size() { 
      return delegate.size(); 
     } 

     public boolean isEmpty() { 
      return delegate.isEmpty(); 
     } 

     public boolean contains(Object o) { 
      return delegate.contains(o); 
     } 

     public E remove() { 
      return delegate.remove(); 
     } 

     public E poll() { 
      return delegate.poll(); 
     } 

     public E element() { 
      return delegate.element(); 
     } 

     public Iterator<E> iterator() { 
      return delegate.iterator(); 
     } 

     public E peek() { 
      return delegate.peek(); 
     } 

     public Object[] toArray() { 
      return delegate.toArray(); 
     } 

     public <T> T[] toArray(T[] a) { 
      return delegate.toArray(a); 
     } 

     public boolean remove(Object o) { 
      return delegate.remove(o); 
     } 

     public boolean containsAll(Collection<?> c) { 
      return delegate.containsAll(c); 
     } 

     public boolean addAll(Collection<? extends E> c) { 
      return delegate.addAll(c); 
     } 

     public boolean removeAll(Collection<?> c) { 
      return delegate.removeAll(c); 
     } 

     public boolean retainAll(Collection<?> c) { 
      return delegate.retainAll(c); 
     } 

     public void clear() { 
      delegate.clear(); 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if (!(obj instanceof QueueWithHashCodeAndEquals)) { 
       return false; 
      } 
      QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj; 
      return Arrays.equals(toArray(), other.toArray()); 
     } 

     @Override 
     public int hashCode() { 
      return Arrays.hashCode(toArray()); 
     } 

    } 

} 
0

J'ai écrit mon service d'exécuteur win qui est sensible à la séquence. Il enchaîne les tâches qui contiennent certaines références liées et actuellement en cours.

Vous pouvez passer par la mise en œuvre à https://github.com/nenapu/SequenceAwareExecutorService

Questions connexes