Comme nous le savons, ThreadPoolExecutor utilise certains BlockingQueue comme une file d'attente de tâches entrantes. Ce que je veux, c'est avoir ThreadPoolExecutor qui a une seconde file d'attente pour les résultats de la tâche qui sont prêts. Je veux utiliser cette file d'attente comme source pour les services d'entrée/sortie qui envoient ou stockent ces résultats. Pourquoi je veux créer une file d'attente séparée?Concurrence: comment implémenter un exécuteur avec des files d'attente entrantes et sortantes?
Parce que je veux découpler l'action d'envoyer des résultats de l'action d'obtenir des résultats. En outre, je suppose que les exceptions et les délais qui accompagnent les opérations d'entrée/sortie ne devraient pas affecter mon ThreadPoolExecutor qui calcule le résultat.
J'ai créé une implémentation naïve de cela. J'aimerais avoir des critiques à ce sujet. Peut-être, il peut être mis en œuvre avec des classes Java out-of-the-box mieux? J'utilise Java 7.
public class ThreadPoolWithResultQueue {
interface Callback<T> {
void complete(T t);
}
public abstract static class CallbackTask<T> implements Runnable {
private final Callback callback;
CallbackTask(Callback callback) {
this.callback = callback;
}
public abstract T execute();
final public void run() {
T t = execute();
callback.complete(t);
}
}
public static class CallBackTaskString extends CallbackTask<String> {
public CallBackTaskString(Callback callback) {
super(callback);
}
@Override
public String execute() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
return hashCode() + "-" + System.currentTimeMillis();
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
final BlockingQueue<String> resultQueue = new LinkedBlockingQueue<String>();
Callback<String> addToQueueCallback = new Callback<String>() {
@Override
public void complete(String s) {
System.out.println("Adding Result To Queue " + s);
resultQueue.add(s); //adding to outgoing queue. some other executor (or same one?) will process it
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1000l, TimeUnit.DAYS, workQueue);
for (int i = 0; i <= 5; i++) {
executor.submit(new CallBackTaskString(addToQueueCallback));
};
System.out.println("All submitted.");
executor.shutdown();
executor.awaitTermination(10l, TimeUnit.SECONDS);
System.out.println("Result queue size " + resultQueue.size());
}
}
Avez-vous mis tout ce 'Callback' supplémentaire là-dedans donc le code semble plus intelligent, ou avez-vous besoin/vouloir le faire de cette façon? Une implémentation simple implique simplement une file d'attente supplémentaire, et si vous savez comment mettre les choses dans des files d'attente, cela devrait être évident. – Kayaman
Un mot: 'ExecutorCompletionService'. –
Kayaman, oui c'est plus simple d'ajouter des résultats à la file d'attente sortante juste dans le Runnable à exécuter. Mais j'essaie de découpler autant que possible. Quoi qu'il en soit, Marko a suggéré une bonne chose. – MiamiBeach