2010-02-11 3 views
22

J'utilise ExecutorService pour faciliter le programme multithread concurrent. Prenez le code suivant:ExecutorService, moyen standard pour éviter que la file d'attente des tâches ne soit trop pleine

 

while(xxx) 
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
... 
Future<..> ... = exService.submit(..); 
... 
} 
 

Dans mon cas, le problème est que submit() ne bloque pas si tous les NUMBER_THREADS sont occupés. La conséquence est que la file d'attente des tâches est inondée par de nombreuses tâches. La conséquence de ceci est que l'arrêt du service d'exécution avec ExecutorService.shutdown() prend des âges (ExecutorService.isTerminated() sera faux pendant longtemps). Raison est que la file d'attente de tâches est encore assez complète.

Pour l'instant ma solution de contournement est de travailler avec des sémaphores pour désavouer avoir à plusieurs entrées dans la file d'attente des tâches de ExecutorService:

 

... 
Semaphore semaphore=new Semaphore(NUMBER_THREADS); 

while(xxx) 
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
... 
semaphore.aquire(); 
// internally the task calls a finish callback, which invokes semaphore.release() 
// -> now another task is added to queue 
Future<..> ... = exService.submit(..); 
... 
} 
 

Je suis sûr qu'il ya une meilleure solution plus encapsulées?

Répondre

4

Il est préférable de créer le ThreadPoolExecutor vous-même (ce que fait Executors.newXXX() de toute façon).

Dans le constructeur, vous pouvez passer une BlockingQueue à utiliser par l'Executor comme file d'attente de tâches. Si vous passez une taille bloquée BlockingQueue (comme LinkedBlockingQueue), il devrait obtenir l'effet que vous voulez.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize)); 
+0

je vois. J'ai négligé un détail à http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29. là, il est mentionné, qu'en standard une file d'attente illimitée est utilisée. –

+6

je l'ai essayé. Malheureusement, votre solution ne bloque pas (comme je le veux) mais lance une exception RejectedExecutionException. également trouvé: http://www.velocityreviews.com/forums/t389526-threadpoolexecutor-with-blocking-execute.html. les solutions de contournement présentées semblent être plus compliquées que mon exemple de sémaphore, putain! –

+3

cela ne fonctionne pas à cause de RejectedExecutionException si la file d'attente est pleine – user249654

5

Vous pouvez ThreadPoolExecutor.getQueue() taille.() Pour connaître la taille de la file d'attente. Vous pouvez effectuer une action si la file d'attente est trop longue. Je suggère d'exécuter la tâche dans le thread actuel si la file d'attente est trop longue pour ralentir le producteur (si cela est approprié)

4

Un vrai blocage ThreadPoolExecutor a été sur la liste de souhaits de beaucoup, il y a même un bogue JDC ouvert dessus . Je suis face au même problème, et suis tombé sur ceci: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

Il est une mise en œuvre d'un BlockingThreadPoolExecutor, mis en œuvre au moyen d'un RejectionPolicy qui utilise l'offre pour ajouter la tâche à la file d'attente, en attendant que la file d'attente d'avoir de la place. Ça à l'air bon.

+0

[Cette réponse] (http://stackoverflow.com/a/4522411/394431) à une autre question suggère d'utiliser une sous-classe personnalisée 'BlockingQueue' qui bloque sur 'offer()' en déléguant à 'put()'. Je pense que cela finit par fonctionner plus ou moins comme ce 'RejectedExecutionHandler'. –

1

Vous pouvez ajouter une autre file d'attente bloquante qui a une taille limitée pour contrôler la taille de la file d'attente interne dans executorService, certains pensent comme un sémaphore mais très facile. avant l'exécuteur que vous mettez() et quand la tâche achive take(). prendre() doit être à l'intérieur du code de tâche

21

L'astuce consiste à utiliser une taille de file d'attente fixe et:

new ThreadPoolExecutor.CallerRunsPolicy() 

Je recommande également d'utiliser ListeningExecutorService de goyave. Voici un exemple de files d'attente consommateur/producteur.

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); 
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); 

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 
            5000L, TimeUnit.MILLISECONDS, 
            new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); 
} 

quelque chose de mieux et vous voudrez peut-être envisager un MQ comme RabbitMQ ou ActiveMQ car ils ont la technologie QoS.

+0

Grande réponse, avec un exemple complet. –

Questions connexes