2010-05-07 3 views
14

Je travaille avec un java.util.concurrent.ThreadPoolExecutor pour traiter un certain nombre d'éléments en parallèle. Bien que le thread lui-même fonctionne bien, nous avons parfois rencontré d'autres contraintes de ressources dues aux actions qui se produisaient dans les threads, ce qui nous a donné envie de réduire le nombre de Threads dans le pool.Redimensionnement dynamique de java.util.concurrent.ThreadPoolExecutor pendant qu'il a des tâches en attente

Je voudrais savoir s'il existe un moyen de réduire le nombre de threads pendant que les threads fonctionnent réellement. Je sais que vous pouvez appeler setMaximumPoolSize() et/ou setCorePoolSize(), mais cela ne fait que redimensionner le pool une fois que les threads sont inactifs, mais ils ne deviennent inactifs que lorsqu'aucune tâche n'est en attente dans la file d'attente.

Répondre

4

Pour autant que je peux dire, ce n'est pas possible dans une belle manière propre.

Vous pouvez implémenter la méthode beforeExecute pour vérifier certaines valeurs booléennes et forcer l'arrêt temporaire des threads. Gardez à l'esprit, ils contiendront une tâche qui ne sera pas exécutée jusqu'à ce qu'ils soient réactivés.

Vous pouvez également implémenter afterExecute pour lancer une exception RuntimeException lorsque vous êtes saturé. Cela entraînera effectivement la mort du Fil et puisque l'Exécuteur sera au-dessus du maximum, aucun nouveau ne sera créé.

Je ne vous recommande pas non plus. Au lieu de cela, essayez de trouver un autre moyen de contrôler l'exécution simultanée des tâches qui vous causent un problème. Peut-être en les exécutant dans un pool de threads séparé avec un nombre plus limité de travailleurs.

+0

Pour les électeurs vers le bas. Si vous lisez la question de l'OP, la deuxième réponse hautement votée n'a pas répondu à l'origine. L'OP en dit autant. Les modifications suivantes implémentent les suggestions dans cette réponse. –

0

Je lis la documentation de setMaximumPoolSize() et setCorePoolSize(), et il semble qu'ils peuvent produire le comportement dont vous avez besoin.

- EDIT -

Ma conclusion était erronée: S'il vous plaît voir la discussion ci-dessous pour plus de détails ...

+0

Les travailleurs ne quittera pas immédiatement. "Si la nouvelle valeur est inférieure à la valeur actuelle, les threads existants en excès seront terminés lorsqu'ils seront inactifs." Tant qu'il y a des tâches dans l'exécuteur, ou entrer dans l'exécuteur en temps opportun, les travailleurs ne seront pas résilier. –

+0

@Tim - oui, c'est ce que dit Eyal. Est-ce que l'OP veut que ses threads existants soient terminés sans égard? Ce ne serait pas quelque chose qu'un exécuteur pourrait faire même s'il le voulait.L'OP pourrait définir un booléen sur certaines tâches pour leur dire de se terminer tôt, et ce serait probablement une bonne idée d'envoyer une interruption lorsque ce booléen est défini. Mais c'est une logique au niveau de l'application - pas quelque chose qu'un exécuteur pourrait faire d'emblée. –

+0

Une autre idée aléatoire serait si l'Executor pouvait étrangler un certain nombre de threads en réduisant leur priorité. Cela pourrait donner le même effet global. C'est juste une idée aléatoire - il pourrait avoir d'énormes défauts auxquels je ne pense pas. –

32

Vous pouvez absolument. Appeler setCorePoolSize(int) va changer la taille de base de la piscine. Les appels à cette méthode sont thread-safe et remplacent les paramètres fournis au constructeur de ThreadPoolExecutor. Si vous réduisez la taille du pool, les threads restants s'arrêteront une fois la file d'attente de travail en cours (s'ils sont inactifs, ils s'arrêteront immédiatement). Si vous augmentez la taille du pool, les nouveaux threads seront alloués dès que possible. Le délai pour l'allocation de nouveaux threads n'est pas documenté - mais dans l'implémentation, l'allocation de nouveaux threads est effectuée à chaque appel de la méthode execute. Pour l'associer à une ferme d'emploi ajustable à l'exécution, vous pouvez exposer cette propriété (par wrapper ou à l'aide d'un exportateur dynamique MBean) en tant qu'attribut JMX en lecture-écriture pour créer une image plutôt agréable à la volée. processeur par lots réglable.

Pour réduire la taille du pool de manière forcée en cours d'exécution (ce qui est votre requête), vous devez sous-classer le ThreadPoolExecutor et ajouter une interruption à la méthode beforeExecute(Thread,Runnable). Interrompre le thread n'est pas une interruption suffisante, car cela n'interagit qu'avec les états d'attente et pendant le traitement, les threads de tâche ThreadPoolExecutor ne passent pas dans un état interrompu.

J'ai récemment rencontré le même problème en essayant de terminer un pool de threads avant que toutes les tâches soumises ne soient exécutées. Pour que cela se produise, j'ai interrompu le thread en lançant une exception d'exécution seulement après avoir remplacé le UncaughtExceptionHandler du thread avec un qui attend mon exception spécifique et l'abandonne.

/** 
* A runtime exception used to prematurely terminate threads in this pool. 
*/ 
static class ShutdownException 
extends RuntimeException { 
    ShutdownException (String message) { 
     super(message); 
    } 
} 

/** 
* This uncaught exception handler is used only as threads are entered into 
* their shutdown state. 
*/ 
static class ShutdownHandler 
implements UncaughtExceptionHandler { 
    private UncaughtExceptionHandler handler; 

    /** 
    * Create a new shutdown handler. 
    * 
    * @param handler The original handler to deligate non-shutdown 
    * exceptions to. 
    */ 
    ShutdownHandler (UncaughtExceptionHandler handler) { 
     this.handler = handler; 
    } 
    /** 
    * Quietly ignore {@link ShutdownException}. 
    * <p> 
    * Do nothing if this is a ShutdownException, this is just to prevent 
    * logging an uncaught exception which is expected. Otherwise forward 
    * it to the thread group handler (which may hand it off to the default 
    * uncaught exception handler). 
    * </p> 
    */ 
    public void uncaughtException (Thread thread, Throwable throwable) { 
     if (!(throwable instanceof ShutdownException)) { 
      /* Use the original exception handler if one is available, 
      * otherwise use the group exception handler. 
      */ 
      if (handler != null) { 
       handler.uncaughtException(thread, throwable); 
      } 
     } 
    } 
} 
/** 
* Configure the given job as a spring bean. 
* 
* <p>Given a runnable task, configure it as a prototype spring bean, 
* injecting any necessary dependencices.</p> 
* 
* @param thread The thread the task will be executed in. 
* @param job The job to configure. 
* 
* @throws IllegalStateException if any error occurs. 
*/ 
protected void beforeExecute (final Thread thread, final Runnable job) { 
    /* If we're in shutdown, it's because spring is in singleton shutdown 
    * mode. This means we must not attempt to configure the bean, but 
    * rather we must exit immediately (prematurely, even). 
    */ 
    if (!this.isShutdown()) { 
     if (factory == null) { 
      throw new IllegalStateException(
       "This class must be instantiated by spring" 
       ); 
     } 

     factory.configureBean(job, job.getClass().getName()); 
    } 
    else { 
     /* If we are in shutdown mode, replace the job on the queue so the 
     * next process will see it and it won't get dropped. Further, 
     * interrupt this thread so it will no longer process jobs. This 
     * deviates from the existing behavior of shutdown(). 
     */ 
     workQueue.add(job); 

     thread.setUncaughtExceptionHandler(
      new ShutdownHandler(thread.getUncaughtExceptionHandler()) 
      ); 

     /* Throwing a runtime exception is the only way to prematurely 
     * cause a worker thread from the TheadPoolExecutor to exit. 
     */ 
     throw new ShutdownException("Terminating thread"); 
    } 
} 

Dans votre cas, vous pouvez créer un sémaphores (juste pour être utilisé comme compteur de threadsafe) qui n'a pas permis, et lors de l'arrêt des fils vers le bas libérer à un certain nombre de permis qui correspond au delta du la taille du pool principal précédent et la nouvelle taille du pool (vous devez remplacer la méthode setCorePoolSize(int)). Cela vous permettra de terminer vos discussions après la fin de leur tâche en cours.

private Semaphore terminations = new Semaphore(0); 

protected void beforeExecute (final Thread thread, final Runnable job) { 
    if (terminations.tryAcquire()) { 
     /* Replace this item in the queue so it may be executed by another 
     * thread 
     */ 
     queue.add(job); 

     thread.setUncaughtExceptionHandler(
      new ShutdownHandler(thread.getUncaughtExceptionHandler()) 
      ); 

     /* Throwing a runtime exception is the only way to prematurely 
     * cause a worker thread from the TheadPoolExecutor to exit. 
     */ 
     throw new ShutdownException("Terminating thread"); 
    } 
} 

public void setCorePoolSize (final int size) { 
    int delta = getActiveCount() - size; 

    super.setCorePoolSize(size); 

    if (delta > 0) { 
     terminations.release(delta); 
    } 
} 

Cela devrait interrompre n fils pour f (n) = actif - demandé. En cas de problème, la stratégie d'allocation de ThreadPoolExecutor est assez durable. Il livre sur la résiliation prématurée en utilisant un bloc finally qui garantit l'exécution. Pour cette raison, même si vous terminez trop de threads, ils se repeupleront.

+1

Oui, je sais que vous pouvez faire le changement et le faire prendre effet une fois que les threads sont inactifs. Cependant ce que j'ai trouvé était qu'être "inactif" ne se produit pas quand un thread termine sa tâche, à moins qu'il n'y ait aucune autre tâche en attente. Je demandais s'il y avait un moyen pour que la modification prenne effet immédiatement, de sorte que même s'il y a des tâches en attente, elles seront traitées par le nouveau nombre désiré de threads (comme dans mon exemple, où nous voulions immédiatement quantité de ressources concurrentes utilisées). –

+0

J'ai mis à jour mon article pour décrire une solution dérivée d'un problème que j'avais récemment et qui était de nature similaire. Regardez ceci, s'il vous plaît. –

+0

Je ne comprends pas, pourquoi je reçois zéro karma pour cette réponse? :-( –

6

La solution consiste à vider la file d'attente ThreadPoolExecutor, à définir la taille ThreadPoolExecutor selon les besoins et à rajouter les threads, un par un, dès que les autres se terminent. La méthode pour vider la file d'attente dans la classe ThreadPoolExecutor est privée. Vous devez donc la créer vous-même. Voici le code:

/** 
* Drains the task queue into a new list. Used by shutdownNow. 
* Call only while holding main lock. 
*/ 
public static List<Runnable> drainQueue() { 
    List<Runnable> taskList = new ArrayList<Runnable>(); 
    BlockingQueue<Runnable> workQueue = executor.getQueue(); 
    workQueue.drainTo(taskList); 
    /* 
    * If the queue is a DelayQueue or any other kind of queue 
    * for which poll or drainTo may fail to remove some elements, 
    * we need to manually traverse and remove remaining tasks. 
    * To guarantee atomicity wrt other threads using this queue, 
    * we need to create a new iterator for each element removed. 
    */ 
    while (!workQueue.isEmpty()) { 
     Iterator<Runnable> it = workQueue.iterator(); 
     try { 
      if (it.hasNext()) { 
       Runnable r = it.next(); 
       if (workQueue.remove(r)) 
        taskList.add(r); 
      } 
     } catch (ConcurrentModificationException ignore) { 
     } 
    } 
    return taskList; 
} 

Avant d'appeler cette méthode, vous devez récupérer le verrou principal. Pour ce faire, vous devez utiliser la réflexion java car le champ "mainLock" est privé. Encore une fois, voici le code:

private Field getMainLock() throws NoSuchFieldException { 
    Field mainLock = executor.getClass().getDeclaredField("mainLock"); 
    mainLock.setAccessible(true); 
    return mainLock; 
} 

Où "exécuteur testamentaire" est votre ThreadPoolExecutor.

Maintenant, vous devez verrouiller/déverrouiller les méthodes:

public void lock() { 
    try { 
     Field mainLock = getMainLock(); 
     Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null); 
     lock.invoke(mainLock.get(executor), (Object[])null); 
    } catch { 
     ... 
    } 
} 

public void unlock() { 
    try { 
     Field mainLock = getMainLock(); 
     mainLock.setAccessible(true); 
     Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null); 
     lock.invoke(mainLock.get(executor), (Object[])null); 
    } catch { 
     ... 
    } 
} 

Enfin, vous pouvez écrire votre méthode "setThreadsNumber", et cela fonctionnera à la fois augmenter et diminuer la taille de ThreadPoolExecutor:

public void setThreadsNumber(int intValue) { 
    boolean increasing = intValue > executor.getPoolSize(); 
    executor.setCorePoolSize(intValue); 
    executor.setMaximumPoolSize(intValue); 
    if(increasing){ 
     if(drainedQueue != null && (drainedQueue.size() > 0)){ 
      executor.submit(drainedQueue.remove(0)); 
     } 
    } else { 
     if(drainedQueue == null){ 
      lock(); 
      drainedQueue = drainQueue(); 
      unlock(); 
     } 
    } 
} 

Note: évidemment, si vous exécutez N threads parallèles et que vous modifiez ce nombre en N-1, tous les N threads continueront à s'exécuter. Lorsque le premier thread se termine, aucun nouveau thread ne sera exécuté. A partir de maintenant, le nombre de threads parallèles sera celui que vous avez choisi.

0

J'avais aussi besoin de la même solution, et il semble que dans le JDK8, setCorePoolSize() et setMaximumPoolSize() produisent effectivement le résultat souhaité. J'ai fait un cas de test où je soumets 4 tâches au pool et elles s'exécutent de manière concurente, je réduis la taille du pool pendant qu'elles sont en cours d'exécution et je soumets encore un autre exécutable que je veux être seul. Ensuite, je restaure la piscine à sa taille d'origine. Voici la source de test https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381

Il produit la sortie suivante (fil « 50 » est celui qui doit être exécuté de manière isolée)

run: 
test thread 2 enter 
test thread 1 enter 
test thread 3 enter 
test thread 4 enter 
test thread 1 exit 
test thread 2 exit 
test thread 3 exit 
test thread 4 exit 
test thread 50 enter 
test thread 50 exit 
test thread 1 enter 
test thread 2 enter 
test thread 3 enter 
test thread 4 enter 
test thread 1 exit 
test thread 2 exit 
test thread 3 exit 
test thread 4 exit 
Questions connexes