2010-06-17 3 views
9

Je travaille sur une application et en utilisant ThreadPoolExecutor pour gérer diverses tâches. ThreadPoolExecutor est bloqué après une certaine durée. Pour simuler cela dans un environnement plus simple, j'ai écrit un code simple dans lequel je peux simuler le problème.Java ThreadPoolExecutor se bloque en utilisant ArrayBlockingQueue

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.RejectedExecutionHandler; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class MyThreadPoolExecutor { 
    private int poolSize = 10; 
    private int maxPoolSize = 50; 
    private long keepAliveTime = 10; 
    private ThreadPoolExecutor threadPool = null; 
    private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
      100000); 

    public MyThreadPoolExecutor() { 
     threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, 
       keepAliveTime, TimeUnit.SECONDS, queue); 
     threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { 

      @Override 
      public void rejectedExecution(Runnable runnable, 
        ThreadPoolExecutor threadPoolExecutor) { 
       System.out 
         .println("Execution rejected. Please try restarting the application."); 
      } 

     }); 
    } 

    public void runTask(Runnable task) { 
     threadPool.execute(task); 
    } 

    public void shutDown() { 
     threadPool.shutdownNow(); 
    } 
    public ThreadPoolExecutor getThreadPool() { 
     return threadPool; 
    } 

    public void setThreadPool(ThreadPoolExecutor threadPool) { 
     this.threadPool = threadPool; 
    } 

    public static void main(String[] args) { 
     MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor(); 
     for (int i = 0; i < 1000; i++) { 
      final int j = i; 
      mtpe.runTask(new Runnable() { 

       @Override 
       public void run() { 
        System.out.println(j); 
       } 

      }); 
     } 
    } 
} 

Essayez d'exécuter ce code plusieurs fois. Il imprime normalement le numéro sur la console et quand tous les threads finissent, il existe. Mais parfois, il a terminé toutes les tâches et ne se termine pas. Le vidage de fil est comme suit:

MyThreadPoolExecutor [Java Application] 
    MyThreadPoolExecutor at localhost:2619 (Suspended) 
     Daemon System Thread [Attach Listener] (Suspended) 
     Daemon System Thread [Signal Dispatcher] (Suspended)  
     Daemon System Thread [Finalizer] (Suspended)  
      Object.wait(long) line: not available [native method] 
      ReferenceQueue<T>.remove(long) line: not available  
      ReferenceQueue<T>.remove() line: not available  
      Finalizer$FinalizerThread.run() line: not available 
     Daemon System Thread [Reference Handler] (Suspended)  
      Object.wait(long) line: not available [native method] 
      Reference$Lock(Object).wait() line: 485 
      Reference$ReferenceHandler.run() line: not available  
     Thread [pool-1-thread-1] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-2] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-3] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-4] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-6] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-8] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-5] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-10] (Suspended) 
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-9] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-7] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [DestroyJavaVM] (Suspended) 
    C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM) 

Dans mon application réelle, les discussions ThreadPoolExecutor aller dans cet état, puis il cesse de répondre.

Cordialement, Ravi Rao

+0

Une idée est que vous essayez ExecutorService, un de mes favoris. –

+0

@Lars Andren, un ThreadPoolExecutor est un ExecutorService. ExecutorService est simplement une interface. Dans la bibliothèque Java 1.5, ThreadPoolExecutor est la seule implémentation directe de l'interface ExecutorService. Il existe un AbstractExecutorService et un DelegatedExecutorService qui ne sont pas des classes fonctionnelles autonomes. En outre, il existe une interface SheceduledExecutorService qui étend ExecutorService et possède une seule implémentation concrète. –

Répondre

9

Dans votre méthode main, vous n'appeler mtpe.shutdown(). Un ThreadPoolExecutor tentera de garder son corePoolSize en vie indéfiniment. Parfois, vous avez de la chance et vous avez plus de corePoolSize threads en vie, de sorte que chaque thread de travail ira dans une branche logique conditionnelle qui lui permet de se terminer après le délai d'attente spécifié de 10 secondes. Cependant, comme vous l'avez remarqué, parfois ce n'est pas le cas, donc chaque thread de l'exécuteur bloquera ArrayBlockingQueue.take() et attendra une nouvelle tâche. En outre, veuillez noter qu'il existe une différence significative entre ExecutorService.shutdown() et ExecutorService.shutdownNow(). Si vous appelez ExecutorService.shutdownNow() comme l'indique l'implémentation de l'encapsuleur, vous supprimerez à l'occasion certaines tâches qui n'ont pas été affectées à l'exécution.

Mise à jour: Depuis ma réponse originale, l'implémentation ThreadPoolExecutor a été modifiée de sorte que le programme dans le message d'origine ne devrait jamais quitter.

Questions connexes