J'utilise un ConcurrentLinkedQueue
pour stocker des étapes de calcul et un ExecutorService
créé par Executors.newFixedThreadPool
pour les exécuter. Mon problème est que l'application ne se termine jamais. Voici un code:Executors.newFixedThreadPool ne se termine pas
public class Run {
public static void main (String[] args) throws Exception {
ParallelExecutor executor = new ParallelExecutor();
executor.execute();
// manual shutdown
Thread.sleep(30 * 1000);
executor.stop();
// the main thread dies, the application keeps running
}
public class ParallelExecutor implements Runnable {
// stores executions steps to
private ConcurrentLinkedQueue<ExecutionStep> queue = new ConcurrentLinkedQueue<ExecutionStep>();
private ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10);
private Thread feedingThread = new Thread(this);
public void execute() {
ExecutionStep step = new ConcreteExecutionStep();
this.queue.add(step);
this.feedingThread.start();
}
public void stop() {
if (log.isInfoEnabled()) {
log.info("Shutting down");
}
this.queue = new ConcurrentLinkedQueue<ExecutionStep>();
try {
this.feedingThread.join(0);
this.threadPoolExecutor.shutdownNow();
} catch (InterruptedException e) {
log.warn("Something happened!", e);
}
}
public void run() {
while (true) {
if (this.threadPoolExecutor.isShutdown()) {
return;
}
try {
Thread.sleep(this.waitMillis);
} catch (InterruptedException e) {
// NOP
}
if (!this.queue.isEmpty()) {
this.threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
ExecutionStep step = this.queue.poll();
List<ExecutionStep> nextSteps = Collections.emptyList();
try {
nextSteps = step.execute();
} catch (Exception e) {
// NOP
}
// here we feed the queue
this.queue.addAll(nextSteps);
}
});
}
}
}
EDIT: Voici la décharge de fil après le fil conducteur est mort:
2011-09-06 17:20:56
Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.1-b02-384 mixed mode):
"DestroyJavaVM" prio=5 tid=10f874800 nid=0x100501000 waiting on condition [00000000]
java.lang.Thread.State: RUNNABLE
"Poller SunPKCS11-Darwin" daemon prio=1 tid=101a4d800 nid=0x11362e000 waiting on condition [11362d000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at sun.security.pkcs11.SunPKCS11$TokenPoller.run(SunPKCS11.java:692)
at java.lang.Thread.run(Thread.java:680)
"pool-2-thread-1" prio=5 tid=103db3000 nid=0x11341a000 waiting on condition [113419000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <7f42e8f38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:680)
"Timer-0" daemon prio=5 tid=101a63000 nid=0x113317000 in Object.wait() [113316000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <7f49cc200> (a java.util.TaskQueue)
at java.util.TimerThread.mainLoop(Timer.java:509)
- locked <7f49cc200> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:462)
"Low Memory Detector" daemon prio=5 tid=10180a000 nid=0x10f607000 runnable [00000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" daemon prio=9 tid=101809800 nid=0x10f504000 waiting on condition [00000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" daemon prio=9 tid=103951000 nid=0x10f401000 waiting on condition [00000000]
java.lang.Thread.State: RUNNABLE
"JDWP Command Reader" daemon prio=5 tid=103950000 nid=0x10df01000 runnable [00000000]
java.lang.Thread.State: RUNNABLE
"JDWP Event Helper Thread" daemon prio=5 tid=10394f800 nid=0x10dc0a000 runnable [00000000]
java.lang.Thread.State: RUNNABLE
"JDWP Transport Listener: dt_socket" daemon prio=5 tid=10394e800 nid=0x10db07000 runnable [00000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" daemon prio=9 tid=10e000000 nid=0x10da04000 waiting on condition [00000000]
java.lang.Thread.State: RUNNABLE
"Surrogate Locker Thread (Concurrent GC)" daemon prio=5 tid=101808800 nid=0x10d901000 waiting on condition [00000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" daemon prio=8 tid=10393f000 nid=0x10c204000 in Object.wait() [10c203000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <7f44e3d70> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
- locked <7f44e3d70> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
"Reference Handler" daemon prio=10 tid=10393e000 nid=0x10c101000 in Object.wait() [10c100000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <7f44e5220> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:485)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
- locked <7f44e5220> (a java.lang.ref.Reference$Lock)
"VM Thread" prio=9 tid=103939800 nid=0x10b6fc000 runnable
"Gang worker#0 (Parallel GC Threads)" prio=9 tid=103801800 nid=0x102601000 runnable
"Gang worker#1 (Parallel GC Threads)" prio=9 tid=103802000 nid=0x102704000 runnable
"Gang worker#2 (Parallel GC Threads)" prio=9 tid=103803000 nid=0x107102000 runnable
"Gang worker#3 (Parallel GC Threads)" prio=9 tid=103803800 nid=0x107205000 runnable
"Gang worker#4 (Parallel GC Threads)" prio=9 tid=103804000 nid=0x107308000 runnable
"Gang worker#5 (Parallel GC Threads)" prio=9 tid=103804800 nid=0x10740b000 runnable
"Gang worker#6 (Parallel GC Threads)" prio=9 tid=103805800 nid=0x10750e000 runnable
"Gang worker#7 (Parallel GC Threads)" prio=9 tid=103806000 nid=0x107611000 runnable
"Concurrent Mark-Sweep GC Thread" prio=9 tid=1038e3800 nid=0x10b408000 runnable
"Gang worker#0 (Parallel CMS Threads)" prio=9 tid=1038e2000 nid=0x10aa02000 runnable
"Gang worker#1 (Parallel CMS Threads)" prio=9 tid=1038e2800 nid=0x10ab05000 runnable
"VM Periodic Task Thread" prio=10 tid=10181c000 nid=0x10f70a000 waiting on condition
"Exception Catcher Thread" prio=10 tid=103801000 nid=0x1017f9000 runnable
JNI global references: 13614
Heap
par new generation total 19136K, used 16017K [7f3000000, 7f44c0000, 7f44c0000)
eden space 17024K, 81% used [7f3000000, 7f3d944a8, 7f40a0000)
from space 2112K, 100% used [7f42b0000, 7f44c0000, 7f44c0000)
to space 2112K, 0% used [7f40a0000, 7f40a0000, 7f42b0000)
concurrent mark-sweep generation total 63872K, used 6053K [7f44c0000, 7f8320000, 7fae00000)
concurrent-mark-sweep perm gen total 30656K, used 30476K [7fae00000, 7fcbf0000, 800000000)
EDIT: Le problème était dû à un autre exécuteur démarré (en silence) dans le code que j'ai découpé avant de poster la question, car je pensais que ce n'était pas pertinent. Quoi qu'il en soit, Peter Lawrey réponse est encore une entrée précieuse, d'où il a été accepté.
Si vous avez un ExecutorService qui est un pool de threads et une file d'attente, pourquoi avez-vous un autre thread et une autre file d'attente pour y ajouter des tâches? Je voudrais que le pool d'exécuteurs fasse tout ce qui simplifie la fermeture. Sans le fil supplémentaire et la file d'attente, le code serait beaucoup plus simple. –
'nextSteps = étape.execute();' : cette classe (non montrée) est-elle libre de tout soupçon? –