2011-09-06 9 views
3

J'utilise un ConcurrentLinkedQueue pour stocker des étapes de calcul et un ExecutorService créé par Executors.newFixedThreadPool pour les exécuter. Mon problème est que l'application ne se termine jamais. Voici un code:Executors.newFixedThreadPool ne se termine pas

public class Run { 

    public static void main (String[] args) throws Exception { 
     ParallelExecutor executor = new ParallelExecutor(); 
     executor.execute(); 

     // manual shutdown 
     Thread.sleep(30 * 1000); 
     executor.stop(); 

     // the main thread dies, the application keeps running 
} 

 

public class ParallelExecutor implements Runnable { 

    // stores executions steps to 
    private ConcurrentLinkedQueue<ExecutionStep> queue = new ConcurrentLinkedQueue<ExecutionStep>(); 

    private ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10); 

    private Thread feedingThread = new Thread(this); 

    public void execute() { 
     ExecutionStep step = new ConcreteExecutionStep(); 
     this.queue.add(step); 
     this.feedingThread.start(); 
    } 

    public void stop() { 
     if (log.isInfoEnabled()) { 
      log.info("Shutting down"); 
     } 
     this.queue = new ConcurrentLinkedQueue<ExecutionStep>(); 

     try { 
      this.feedingThread.join(0); 
      this.threadPoolExecutor.shutdownNow(); 
     } catch (InterruptedException e) { 
      log.warn("Something happened!", e); 
     } 
    } 

    public void run() { 
     while (true) { 

      if (this.threadPoolExecutor.isShutdown()) { 
       return; 
      } 

      try { 
       Thread.sleep(this.waitMillis); 
      } catch (InterruptedException e) { 
       // NOP 
      } 

      if (!this.queue.isEmpty()) { 
       this.threadPoolExecutor.execute(new Runnable() { 

        @Override 
        public void run() { 
         ExecutionStep step = this.queue.poll(); 
         List<ExecutionStep> nextSteps = Collections.emptyList(); 
         try { 
          nextSteps = step.execute(); 
         } catch (Exception e) { 
          // NOP 
         } 

         // here we feed the queue 
         this.queue.addAll(nextSteps); 
        } 
       }); 
      } 
     } 
    } 

EDIT: Voici la décharge de fil après le fil conducteur est mort:

2011-09-06 17:20:56 
Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.1-b02-384 mixed mode): 

"DestroyJavaVM" prio=5 tid=10f874800 nid=0x100501000 waiting on condition [00000000] 
    java.lang.Thread.State: RUNNABLE 

"Poller SunPKCS11-Darwin" daemon prio=1 tid=101a4d800 nid=0x11362e000 waiting on condition [11362d000] 
    java.lang.Thread.State: TIMED_WAITING (sleeping) 
    at java.lang.Thread.sleep(Native Method) 
    at sun.security.pkcs11.SunPKCS11$TokenPoller.run(SunPKCS11.java:692) 
    at java.lang.Thread.run(Thread.java:680) 

"pool-2-thread-1" prio=5 tid=103db3000 nid=0x11341a000 waiting on condition [113419000] 
    java.lang.Thread.State: WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <7f42e8f38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) 
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) 
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) 
    at java.lang.Thread.run(Thread.java:680) 

"Timer-0" daemon prio=5 tid=101a63000 nid=0x113317000 in Object.wait() [113316000] 
    java.lang.Thread.State: TIMED_WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <7f49cc200> (a java.util.TaskQueue) 
    at java.util.TimerThread.mainLoop(Timer.java:509) 
    - locked <7f49cc200> (a java.util.TaskQueue) 
    at java.util.TimerThread.run(Timer.java:462) 

"Low Memory Detector" daemon prio=5 tid=10180a000 nid=0x10f607000 runnable [00000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread1" daemon prio=9 tid=101809800 nid=0x10f504000 waiting on condition [00000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread0" daemon prio=9 tid=103951000 nid=0x10f401000 waiting on condition [00000000] 
    java.lang.Thread.State: RUNNABLE 

"JDWP Command Reader" daemon prio=5 tid=103950000 nid=0x10df01000 runnable [00000000] 
    java.lang.Thread.State: RUNNABLE 

"JDWP Event Helper Thread" daemon prio=5 tid=10394f800 nid=0x10dc0a000 runnable [00000000] 
    java.lang.Thread.State: RUNNABLE 

"JDWP Transport Listener: dt_socket" daemon prio=5 tid=10394e800 nid=0x10db07000 runnable [00000000] 
    java.lang.Thread.State: RUNNABLE 

"Signal Dispatcher" daemon prio=9 tid=10e000000 nid=0x10da04000 waiting on condition [00000000] 
    java.lang.Thread.State: RUNNABLE 

"Surrogate Locker Thread (Concurrent GC)" daemon prio=5 tid=101808800 nid=0x10d901000 waiting on condition [00000000] 
    java.lang.Thread.State: RUNNABLE 

"Finalizer" daemon prio=8 tid=10393f000 nid=0x10c204000 in Object.wait() [10c203000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <7f44e3d70> (a java.lang.ref.ReferenceQueue$Lock) 
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) 
    - locked <7f44e3d70> (a java.lang.ref.ReferenceQueue$Lock) 
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) 
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) 

"Reference Handler" daemon prio=10 tid=10393e000 nid=0x10c101000 in Object.wait() [10c100000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <7f44e5220> (a java.lang.ref.Reference$Lock) 
    at java.lang.Object.wait(Object.java:485) 
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) 
    - locked <7f44e5220> (a java.lang.ref.Reference$Lock) 

"VM Thread" prio=9 tid=103939800 nid=0x10b6fc000 runnable 

"Gang worker#0 (Parallel GC Threads)" prio=9 tid=103801800 nid=0x102601000 runnable 

"Gang worker#1 (Parallel GC Threads)" prio=9 tid=103802000 nid=0x102704000 runnable 

"Gang worker#2 (Parallel GC Threads)" prio=9 tid=103803000 nid=0x107102000 runnable 

"Gang worker#3 (Parallel GC Threads)" prio=9 tid=103803800 nid=0x107205000 runnable 

"Gang worker#4 (Parallel GC Threads)" prio=9 tid=103804000 nid=0x107308000 runnable 

"Gang worker#5 (Parallel GC Threads)" prio=9 tid=103804800 nid=0x10740b000 runnable 

"Gang worker#6 (Parallel GC Threads)" prio=9 tid=103805800 nid=0x10750e000 runnable 

"Gang worker#7 (Parallel GC Threads)" prio=9 tid=103806000 nid=0x107611000 runnable 

"Concurrent Mark-Sweep GC Thread" prio=9 tid=1038e3800 nid=0x10b408000 runnable 
"Gang worker#0 (Parallel CMS Threads)" prio=9 tid=1038e2000 nid=0x10aa02000 runnable 

"Gang worker#1 (Parallel CMS Threads)" prio=9 tid=1038e2800 nid=0x10ab05000 runnable 

"VM Periodic Task Thread" prio=10 tid=10181c000 nid=0x10f70a000 waiting on condition 

"Exception Catcher Thread" prio=10 tid=103801000 nid=0x1017f9000 runnable 
JNI global references: 13614 

Heap 
par new generation total 19136K, used 16017K [7f3000000, 7f44c0000, 7f44c0000) 
    eden space 17024K, 81% used [7f3000000, 7f3d944a8, 7f40a0000) 
    from space 2112K, 100% used [7f42b0000, 7f44c0000, 7f44c0000) 
    to space 2112K, 0% used [7f40a0000, 7f40a0000, 7f42b0000) 
concurrent mark-sweep generation total 63872K, used 6053K [7f44c0000, 7f8320000, 7fae00000) 
concurrent-mark-sweep perm gen total 30656K, used 30476K [7fae00000, 7fcbf0000, 800000000) 

EDIT: Le problème était dû à un autre exécuteur démarré (en silence) dans le code que j'ai découpé avant de poster la question, car je pensais que ce n'était pas pertinent. Quoi qu'il en soit, Peter Lawrey réponse est encore une entrée précieuse, d'où il a été accepté.

+0

Si vous avez un ExecutorService qui est un pool de threads et une file d'attente, pourquoi avez-vous un autre thread et une autre file d'attente pour y ajouter des tâches? Je voudrais que le pool d'exécuteurs fasse tout ce qui simplifie la fermeture. Sans le fil supplémentaire et la file d'attente, le code serait beaucoup plus simple. –

+0

'nextSteps = étape.execute();' : cette classe (non montrée) est-elle libre de tout soupçon? –

Répondre

2

Ceci s'arrêtera à condition qu'il n'y ait pas de travail en cours et qu'il ne s'arrêtera pas de temps en temps ce qui ralentit votre application et pourrait donner l'impression que rien ne se passe.

public class ParallelExecutor { 
    interface ExecutionStep { 
    List<ExecutionStep> execute(); 
    } 

    public static final int N_THREADS = Runtime.getRuntime().availableProcessors(); 
    private final ExecutorService executorService = Executors.newFixedThreadPool(N_THREADS); 

    public void execute(final ExecutionStep step) { 
    executorService.submit(new Runnable() { 
     @Override 
     public void run() { 
     for (ExecutionStep es : step.execute()) 
      execute(es); 
     } 
    }); 
    } 

    public void stop() { 
    this.executorService.shutdownNow(); 
    } 
} 

course avec

ParallelExecutor executor = new ParallelExecutor(); 
executor.execute(new ConcreteExecutionStep()); 

// manual shutdown 
Thread.sleep(30 * 1000); 
executor.stop(); 
+1

Il est certainement logique de supprimer la file d'attente inutile. Je vais refactoriser mon code, tester votre proposition et vous le faire savoir. Merci. – skuro

+2

Merci pour votre contribution, le problème s'est avéré être causé par quelque chose d'invisible ici, car j'ai enlevé le code en pensant qu'il était .. non pertinent :-) En tout cas, votre réponse est toujours utile pour tous ceux qui recherchent . – skuro

3

Vous attendez que FeedThread se termine (feedingThread.join(0)) avant d'arrêter l'exécuteur et le thread d'alimentation continue dans son while while (true) -loop jusqu'à ce que l'exécuteur soit arrêté.

+0

mais ce comportement est en contradiction avec son commentaire à la fin de la main: "fil principal meurt". Donc il était incorrect en disant ça? – akappa

+0

Je confirme les filières principales. J'ai édité ma question originale pour montrer la décharge de fil pendant l'état d'attente sans fin à la fin de chaque exécution. – skuro

+0

Oui, le thread principal bloquera dans 'executor.stop();'. – jarnbjo

0

J'ai trouvé une réponse à cette question très simple. Si vous ne revenez pas simplement de votre fonction principale, mais que vous la terminez avec System.exit(0), l'exécuteur mourra comme prévu.

+3

Vous pouvez également débrancher votre ordinateur. –