2017-09-29 11 views
1

Utilisation de Java 8Terminate autres tâches futures en cours d'exécution lorsque la condition est remplie dans l'un pour l'avenir des tâches

Je suis en train d'écrire un prog pour télécharger les fichiers journaux de serveurs diff et la recherche d'un texte donné dans ces fichiers journaux . Je fais est synchrone en ce moment. Je veux le faire en parallèle et a découvert que cela peut être fait en utilisant Future dans Java. J'utilise apache.commons.io pour télécharger le fichier depuis l'URL. Voici l'extrait de code:

ExecutorService executorService = Executors.newCachedThreadPool(); 
    List<Future<XCluster>> clusterFutures = new ArrayList<>(); 
    for(XCluster cluster: clusters) { 
     clusterFutures.add(executorService.submit(() -> { 
      return downloadAndSearch(textToSearch, cluster); 
     })); 
    } 
    //For now I'm not doing anything with returned value from Future 

Mais maintenant, je veux mettre fin à une autre opération de téléchargement de recherche a commencé dans l'avenir que la recherche donnée devrait être trouvée que dans l'un des serveurs. Il n'est donc pas nécessaire de poursuivre les autres tâches futures que j'ai commencées. quelqu'un peut-il suggérer un moyen de le faire? J'utilise java 8, d'autres options sont également les bienvenues. Merci d'avance!

Répondre

1

Le service ExecutorService possède une méthode shutdownNow qui arrête tous les threads et arrête le service.

Edit:

j'ai fait quelques expériences avec shutdownNow et que je vois qu'il ne peut pas arrêter les fils que je pensais. AFAIK Il utilise interrupts() mais tous les threads ne réagissent pas à l'interruption.

Donc, la meilleure alternative que je peux venir avec:

Tout d'abord, créez une classe d'indicateur:

public static class Indicator{ 

    private boolean isReady = false; 

    public void ready(){ 
     isReady = true; 
    } 

    public boolean isReady(){ 
     return isReady; 
    } 
} 

Les fils doivent commencer à partager une instance Indicateur de communiquer. vous pouvez donc créer un appelable comme ceci:

public static class Processor implements Callable<Integer> { 

    private volatile Indicator indicator; 

    private Integer number; 

    public Processor(Integer integer, Indicator isReady){ 
     this.number = integer; 
     this.indicator = isReady; 
    } 

    @Override 
    public Integer call() throws Exception { 
     System.out.println("Thread started:" + Thread.currentThread().getId()); 
     int counter = 0; 
     while (!indicator.isReady &&counter < number) { 
      // Make complicated things 
      Math.sin(counter); 
      counter++; 
     } 
     if(indicator.isReady){ 
      //another thread finished 
      //delete resources 
      System.out.println("Thread interrupted: " + Thread.currentThread().getId() + " " + counter); 
      return -1; 
     } else { 
      System.out.println("Thread finished: " + Thread.currentThread().getId() + " " + counter); 
      indicator.ready(); 
      return counter; 
     } 
    } 
} 

De cette façon, lorsque le premier fil est prêt, il peut arrêter les autres et ils nettoyer après himselves.

J'ai essayé comme suit:

public static void main(String[] args) throws ExecutionException, InterruptedException { 
    ExecutorService executorService = Executors.newCachedThreadPool(); 
    List<Future<Integer>> clusterFutures = new ArrayList<>(); 
    Indicator indicator = new Indicator(); 
    clusterFutures.add(executorService.submit(new Processor(100, indicator))); 
    clusterFutures.add(executorService.submit(new Processor(10000, indicator))); 
    clusterFutures.add(executorService.submit(new Processor(10000000,indicator))); 
} 

Un exemple de sortie:

Thread started:11 
Thread started:12 
Thread finished: 11 100 
Thread interrupted: 12 1001 
Thread started:13 
Thread interrupted: 13 0 

Sidenote: les classes référencées ne doivent pas être les classes internes statiques juste, il était plus facile de faire des expériences en un fichier.

+0

merci, encore un point cependant. Y at-il un moyen d'avoir un bloc de code comme finalement qui sera exécuté quand j'arrêterai d'autres threads. J'ai besoin de nettoyer les fichiers téléchargés par d'autres futurs. – user3539951

0

En termes de code, la solution la plus simple est d'avoir un fil d'arrêt qui annule tous les contrats à terme:

final ExecutorService executorService = Executors.newCachedThreadPool(); 
    final ExecutorService shutdownService = Executors.newSingleThreadExecutor(); 
    List<Future<XCluster>> clusterFutures = new ArrayList<>(); 
    for(XCluster cluster: clusters) { 
     clusterFutures.add(executorService.submit(() -> { 
      boolean cancelOthers = false; 
      try { 
       XCluster result = downloadAndSearch(textToSearch, cluster); 
       cancelOthers = yourPredicateOfSuccess(); 
       return result; 
      } finally { 
       if (cancelOthers) { 
       shutdownService.execute(() -> { 
        executorService.shutdownNow(); 
       }); 
       } 
      } 
     })); 
    } 

L'autre fil et l'try-finally est important car cela assure que vous ne serez pas annuler l'exécution de la méthode presque réussie.

+0

merci, j'ai essayé cette approche où j'arrête d'autres futurs, mais ça ne fonctionne pas. Les threads continuent à s'exécuter même après l'appel de shutdownNow() – user3539951