Vous pouvez absolument. Appeler setCorePoolSize(int)
va changer la taille de base de la piscine. Les appels à cette méthode sont thread-safe et remplacent les paramètres fournis au constructeur de ThreadPoolExecutor
. Si vous réduisez la taille du pool, les threads restants s'arrêteront une fois la file d'attente de travail en cours (s'ils sont inactifs, ils s'arrêteront immédiatement). Si vous augmentez la taille du pool, les nouveaux threads seront alloués dès que possible. Le délai pour l'allocation de nouveaux threads n'est pas documenté - mais dans l'implémentation, l'allocation de nouveaux threads est effectuée à chaque appel de la méthode execute
. Pour l'associer à une ferme d'emploi ajustable à l'exécution, vous pouvez exposer cette propriété (par wrapper ou à l'aide d'un exportateur dynamique MBean) en tant qu'attribut JMX en lecture-écriture pour créer une image plutôt agréable à la volée. processeur par lots réglable.
Pour réduire la taille du pool de manière forcée en cours d'exécution (ce qui est votre requête), vous devez sous-classer le ThreadPoolExecutor
et ajouter une interruption à la méthode beforeExecute(Thread,Runnable)
. Interrompre le thread n'est pas une interruption suffisante, car cela n'interagit qu'avec les états d'attente et pendant le traitement, les threads de tâche ThreadPoolExecutor
ne passent pas dans un état interrompu.
J'ai récemment rencontré le même problème en essayant de terminer un pool de threads avant que toutes les tâches soumises ne soient exécutées. Pour que cela se produise, j'ai interrompu le thread en lançant une exception d'exécution seulement après avoir remplacé le UncaughtExceptionHandler
du thread avec un qui attend mon exception spécifique et l'abandonne.
/**
* A runtime exception used to prematurely terminate threads in this pool.
*/
static class ShutdownException
extends RuntimeException {
ShutdownException (String message) {
super(message);
}
}
/**
* This uncaught exception handler is used only as threads are entered into
* their shutdown state.
*/
static class ShutdownHandler
implements UncaughtExceptionHandler {
private UncaughtExceptionHandler handler;
/**
* Create a new shutdown handler.
*
* @param handler The original handler to deligate non-shutdown
* exceptions to.
*/
ShutdownHandler (UncaughtExceptionHandler handler) {
this.handler = handler;
}
/**
* Quietly ignore {@link ShutdownException}.
* <p>
* Do nothing if this is a ShutdownException, this is just to prevent
* logging an uncaught exception which is expected. Otherwise forward
* it to the thread group handler (which may hand it off to the default
* uncaught exception handler).
* </p>
*/
public void uncaughtException (Thread thread, Throwable throwable) {
if (!(throwable instanceof ShutdownException)) {
/* Use the original exception handler if one is available,
* otherwise use the group exception handler.
*/
if (handler != null) {
handler.uncaughtException(thread, throwable);
}
}
}
}
/**
* Configure the given job as a spring bean.
*
* <p>Given a runnable task, configure it as a prototype spring bean,
* injecting any necessary dependencices.</p>
*
* @param thread The thread the task will be executed in.
* @param job The job to configure.
*
* @throws IllegalStateException if any error occurs.
*/
protected void beforeExecute (final Thread thread, final Runnable job) {
/* If we're in shutdown, it's because spring is in singleton shutdown
* mode. This means we must not attempt to configure the bean, but
* rather we must exit immediately (prematurely, even).
*/
if (!this.isShutdown()) {
if (factory == null) {
throw new IllegalStateException(
"This class must be instantiated by spring"
);
}
factory.configureBean(job, job.getClass().getName());
}
else {
/* If we are in shutdown mode, replace the job on the queue so the
* next process will see it and it won't get dropped. Further,
* interrupt this thread so it will no longer process jobs. This
* deviates from the existing behavior of shutdown().
*/
workQueue.add(job);
thread.setUncaughtExceptionHandler(
new ShutdownHandler(thread.getUncaughtExceptionHandler())
);
/* Throwing a runtime exception is the only way to prematurely
* cause a worker thread from the TheadPoolExecutor to exit.
*/
throw new ShutdownException("Terminating thread");
}
}
Dans votre cas, vous pouvez créer un sémaphores (juste pour être utilisé comme compteur de threadsafe) qui n'a pas permis, et lors de l'arrêt des fils vers le bas libérer à un certain nombre de permis qui correspond au delta du la taille du pool principal précédent et la nouvelle taille du pool (vous devez remplacer la méthode setCorePoolSize(int)
). Cela vous permettra de terminer vos discussions après la fin de leur tâche en cours.
private Semaphore terminations = new Semaphore(0);
protected void beforeExecute (final Thread thread, final Runnable job) {
if (terminations.tryAcquire()) {
/* Replace this item in the queue so it may be executed by another
* thread
*/
queue.add(job);
thread.setUncaughtExceptionHandler(
new ShutdownHandler(thread.getUncaughtExceptionHandler())
);
/* Throwing a runtime exception is the only way to prematurely
* cause a worker thread from the TheadPoolExecutor to exit.
*/
throw new ShutdownException("Terminating thread");
}
}
public void setCorePoolSize (final int size) {
int delta = getActiveCount() - size;
super.setCorePoolSize(size);
if (delta > 0) {
terminations.release(delta);
}
}
Cela devrait interrompre n fils pour f (n) = actif - demandé. En cas de problème, la stratégie d'allocation de ThreadPoolExecutor
est assez durable. Il livre sur la résiliation prématurée en utilisant un bloc finally
qui garantit l'exécution. Pour cette raison, même si vous terminez trop de threads, ils se repeupleront.
Pour les électeurs vers le bas. Si vous lisez la question de l'OP, la deuxième réponse hautement votée n'a pas répondu à l'origine. L'OP en dit autant. Les modifications suivantes implémentent les suggestions dans cette réponse. –