2009-06-28 10 views
1

J'ai créé un programme de parcours de répertoire et de traitement de fichiers récursif et concurrent, qui se bloque parfois après tous les calculs parallèles, mais le thread principal ne se poursuit jamais avec d'autres tâches. Le code est fondamentalement un agrégateur concurrent de style join-fork, et une fois l'agrégation parallèle terminée, il doit afficher les résultats dans une fenêtre Swing. Le problème avec l'agrégation est qu'il doit générer un arbre et agréger les statistiques des nœuds feuilles dans la hiérarchie. Je suis sûr que j'ai fait une erreur de concurrence mais je ne peux pas le trouver. J'ai inclus la partie pertinente de mon code à la fin du message (les commentaires du code ont été supprimés par souci de concision, désolé pour les 150 lignes, si nécessaire, je pourrais le déplacer vers un emplacement externe).Problème de blocage de l'algorithme de traversée de répertoire simultané

Contexte: UC Java 6u13, Windows XP SP3, Core 2 Duo.

Mes questions sont les suivantes:

Quelle pourrait être la cause de ce blocage au hasard?

Existe-t-il une meilleure façon de faire une traversée de répertoire simultanée, peut-être sous la forme d'une bibliothèque déjà existante? Est-ce que le framework fork-join de Doug Lea (ou Java 7) serait un meilleur framework pour la traversée agrégation/répertoire, et si oui, comment repenser mon implémentation - au niveau conceptuel?

Nous vous remercions de votre temps.

Et l'extrait de code:

private static JavaFileEvaluator[] processFiles(File[] files) 
throws InterruptedException { 
    CountUpDown count = new CountUpDown(); 
    ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors 
    .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 

    JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length]; 
    for (int i = 0; i < jfes.length; i++) { 
     count.increment(); 
     jfes[i] = new JavaFileEvaluator(files[i], count, ex); 
     ex.execute(jfes[i]); 
    } 
    count.await(); 
    for (int i = 0; i < jfes.length; i++) { 
     count.increment(); 
     final JavaFileEvaluator jfe = jfes[i]; 
     ex.execute(new Runnable() { 
      public void run() { 
       jfe.aggregate(); 
      } 
     }); 

    } 
    // ------------------------------------- 
    // this await sometimes fails to wake up 
    count.await(); // <--------------------- 
    // ------------------------------------- 
    ex.shutdown(); 
    ex.awaitTermination(0, TimeUnit.MILLISECONDS); 
    return jfes; 
} 
public class JavaFileEvaluator implements Runnable { 
    private final File srcFile; 
    private final Counters counters = new Counters(); 
    private final CountUpDown count; 
    private final ExecutorService service; 
    private List<JavaFileEvaluator> children; 
    public JavaFileEvaluator(File srcFile, 
      CountUpDown count, ExecutorService service) { 
     this.srcFile = srcFile; 
     this.count = count; 
     this.service = service; 
    } 
    public void run() { 
     try { 
      if (srcFile.isFile()) { 
       JavaSourceFactory jsf = new JavaSourceFactory(); 
       JavaParser jp = new JavaParser(jsf); 
       try { 
        counters.add(Constants.FILE_SIZE, srcFile.length()); 
        countLines(); 
        jp.parse(srcFile); 
        Iterator<?> it = jsf.getJavaSources(); 
        while (it.hasNext()) { 
         JavaSource js = (JavaSource)it.next(); 
         js.toString(); 
         processSource(js); 
        } 
       // Some catch clauses here 
       } 
      } else 
      if (srcFile.isDirectory()) { 
       processDirectory(srcFile); 
      } 
     } finally { 
      count.decrement(); 
     } 
    } 
    public void processSource(JavaSource js) { 
     // process source, left out for brevity 
    } 
    public void processDirectory(File dir) { 
     File[] files = dir.listFiles(new FileFilter() { 
      @Override 
      public boolean accept(File pathname) { 
       return 
       (pathname.isDirectory() && !pathname.getName().startsWith("CVS") 
       && !pathname.getName().startsWith(".")) 
       || (pathname.isFile() && pathname.getName().endsWith(".java") 
       && pathname.canRead()); 
      } 
     }); 
     if (files != null) { 
      Arrays.sort(files, new Comparator<File>() { 
       @Override 
       public int compare(File o1, File o2) { 
        if (o1.isDirectory() && o2.isFile()) { 
         return -1; 
        } else 
        if (o1.isFile() && o2.isDirectory()) { 
         return 1; 
        } 
        return o1.getName().compareTo(o2.getName()); 
       } 
      }); 
      for (File f : files) { 
       if (f.isFile()) { 
        counters.add(Constants.FILE, 1); 
       } else { 
        counters.add(Constants.DIR, 1); 
       } 
       JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service); 
       if (children == null) { 
        children = new ArrayList<JavaFileEvaluator>(); 
       } 
       children.add(ev); 
       count.increment(); 
       service.execute(ev); 
      } 
     } 
    } 
    public Counters getCounters() { 
     return counters; 
    } 
    public boolean hasChildren() { 
     return children != null && children.size() > 0; 
    } 
    public void aggregate() { 
     // recursively aggregate non-leaf nodes 
     if (!hasChildren()) { 
      count.decrement(); 
      return; 
     } 
     for (final JavaFileEvaluator e : children) { 
      count.increment(); 
      service.execute(new Runnable() { 
       @Override 
       public void run() { 
        e.aggregate(); 
       } 
      }); 
     } 
     count.decrement(); 
    } 
} 
public class CountUpDown { 
    private final Lock lock = new ReentrantLock(); 
    private final Condition cond = lock.newCondition(); 
    private final AtomicInteger count = new AtomicInteger(); 
    public void increment() { 
     count.incrementAndGet(); 
    } 
    public void decrement() { 
     int value = count.decrementAndGet(); 
     if (value == 0) { 
      lock.lock(); 
      try { 
       cond.signalAll(); 
      } finally { 
       lock.unlock(); 
      } 
     } else 
     if (value < 0) { 
      throw new IllegalStateException("Counter < 0 :" + value); 
     } 
    } 
    public void await() throws InterruptedException { 
     lock.lock(); 
     try { 
      if (count.get() > 0) { 
       cond.await(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 

Modifier Ajout de la méthode hasChildren() dans JavaSourceEvaluator.

Répondre

1

Dans la méthode agrégée de JavaFileEvaluator, count.decrement() n'est pas appelé dans un bloc finally. Si des RuntimeExceptions sont lancées à l'intérieur de la fonction d'agrégat (éventuellement dans la méthode hasChildren, dont je ne vois pas le corps?), L'appel à décrément ne se produira jamais et CountUpDown restera indéfiniment en attente. Cela peut être la cause du blocage aléatoire que vous voyez. Pour la deuxième question, je ne connais pas de bibliothèques en java pour cela, mais je n'ai pas vraiment regardé, désolé pour la non-réponse mais ce n'est pas quelque chose que j'ai eu l'occasion de utiliser avant. En ce qui concerne la troisième question, je pense que si vous utilisez un framework fork-join fourni par quelqu'un d'autre, ou si vous continuez à fournir votre propre framework de concurrence, le plus gros gain serait de séparer la logique qui fait le travail de traverser les répertoires de la logique impliquée dans la gestion du parrallélisme. Le code que vous avez fourni utilise la classe CountUpDown pour garder une trace de quand tous les threads sont terminés, et vous vous retrouvez avec des appels à incrémenter/décrémenter saupoudré dans les méthodes traitant le répertoire traversant, ce qui conduira à des bugs cauchemardesques. Le passage au framework java7 fork-join vous forcera à créer une classe qui ne traitera que la logique de traversée et laissera la logique de concurrence au framework, ce qui peut être un bon moyen pour vous d'y aller.L'autre option est de continuer avec ce que vous avez ici, mais faites une délimitation claire entre la logique de gestion et la logique de travail, ce qui vous aiderait à traquer et corriger ces types de bogues.

+0

Wow, vous avez raison! J'ai ajouté les hasChildren manquants. Je vais également envelopper le corps dans un essai et enfin exécuter le code en boucle pendant un certain temps, merci! +1 Pourriez-vous également réfléchir aux deuxième et troisième questions? – akarnokd

+0

Je n'ai pas trouvé de bibliothèque non plus lorsque j'ai commencé ce code il y a un an. Il semble que ce type de traversée parallèle est simplement externalisé dans le cadre fj. Je vous remercie. – akarnokd

Questions connexes