2009-09-18 5 views
3

comment utiliser plusieurs threads dans java pour traiter un grand nombre de fichiers stockés dans le répertoire disque local (en utilisant le verrouillage de fichier)comment utiliser plusieurs threads pour traiter un grand nombre de fichiers stockés sur le disque local (en utilisant le verrouillage de fichier)

+4

Je vous déconseille de le faire. Lors du traitement d'un grand nombre de fichiers, c'est probablement l'E/S du disque qui vous tue, pas le processeur. Plusieurs threads ne feront qu'aggraver ce goulot d'étranglement. – Joey

+0

@Johannes, bien que généralement vrai, cela dépend du traitement, de la mise en mémoire tampon du disque et même de la distribution des fichiers sur différents supports physiques. Il se peut que le traitement soit incroyablement complexe et dépasse de loin le temps d'E/S du disque. – paxdiablo

+0

Pax: C'est pourquoi le "probablement" est là. Mais "grand nombre de fichiers" commence à environ 10k pour moi et quand ils prennent chacun 3 minutes de traitement, vous avez probablement d'autres soucis (comme la recherche d'un autre ordinateur pour les prochains mois). – Joey

Répondre

4

La meilleure façon de le faire (dans n'importe quelle langue, pas seulement Java) est d'utiliser un paradigme producteur/multi-consommateur.

Avoir un thread créer une file d'attente puis démarrer N autres threads. Ce thread principal va ensuite énumérer tous les fichiers et placer leurs noms dans cette file d'attente. Ensuite, il placera N marqueurs de fin de file d'attente dans la file d'attente. Les autres threads lisent simplement le nom suivant de cette file d'attente et traitent le fichier. Lorsqu'ils lisent un marqueur de fin de file d'attente, ils quittent (et le thread principal peut récolter leur statut de sortie si nécessaire).

Cela simplifie la communication entre les threads à la file d'attente (qui devrait, bien sûr, être protégé par un mutex afin de ne pas provoquer des conditions de course avec tous les threads). Il permet également aux threads de contrôler leur propre condition de sortie (sous la direction du thread principal), un autre bon moyen d'éviter certains problèmes de multi-threading.

0

Une grande partie du travail a été fait pour vous dans les classes Java simultanées. Vous voulez probablement quelque chose comme ConcurrentLinkedQueue.

Une file d'attente thread-safe illimitée basée sur des nœuds liés. Cette file d'attente commande les éléments FIFO (premier entré, premier sorti). La tête de file d'attente est l'élément qui a été le plus longtemps dans la file d'attente. La queue de la file d'attente est l'élément qui a été le plus rapidement dans la file d'attente. De nouveaux éléments sont insérés à la fin de la file d'attente et les opérations de récupération de la file d'attente obtiennent des éléments en tête de la file d'attente. Un ConcurrentLinkedQueue est un choix approprié lorsque de nombreux threads partageront l'accès à une collection commune.

Vous utilisez la méthode offer() pour placer des entrées dans la file d'attente, soit dans le thread principal, soit dans un thread distinct. Ensuite, vous avez un tas d'abeilles ouvrières (idéalement créées dans quelque chose comme ExecutorService) qui utilisent la méthode poll() pour retirer l'entrée suivante de la file d'attente et la traiter. L'utilisation de cette conception vous donne une flexibilité incroyable pour déterminer combien de producteurs et combien de consommateurs fonctionnent simultanément, sans avoir à faire vous-même le code d'attente/d'interrogation. Vous pouvez créer votre pool de serviteurs en utilisant Executors.newFixedThreadPool().

0

Ce que vous voulez vraiment faire est d'avoir votre programme principal traverser le répertoire en obtenant File références. Utilisez ces références pour créer un objet qui implémente Runnable. La méthode run() de Runnable est l'ensemble de votre logique de traitement. Créez un ExecutorService et appelez execute (Runnable) pour soumettre les tâches au service d'exécution. L'Executor exécutera les tâches que les threads deviennent disponibles en fonction du type d'Executor que vous créez (Executors.newFixedThreadPool() est un bon choix.Si votre thread principal a trouvé tous les fichiers et les a soumis en tant que tâches, vous voulez appeler shutdown() sur l'Executor, puis appelez [awaitTermination()] [6] L'appel de shutdown() indique à l'exécuteur de terminer l'exécution des tâches qui lui ont été assignées et de les fermer, l'appel de waitTermination() provoque le blocage de votre thread principal jusqu'à la fermeture de l'Executor. cela suppose bien sûr que vous voulez attendre que toutes les tâches à terminer et faire plus de traitement

[6]. http://java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination(long, java.util.concurrent.TimeUnit)

3

Voici comment je le fais habituellement.

Vous pouvez créer une file d'attente de blocage comme ceci:

LinkedBlockingQueue<String> files; 
files = new LinkedBlockingQueue<String>(1000); 
AtomicBoolean done = new AtomicBoolean(false); 

La file d'attente ne peut contenir 1000 éléments, donc si vous avez certains comment un milliard de fichiers ou que ce soit, vous n'avez pas à vous soucier de manquer de la mémoire. Vous pouvez changer la taille de ce que vous voulez en fonction de la quantité de mémoire que vous voulez utiliser.

Dans votre thread principal vous faites quelque chose comme:

File directory = new File("path\to\folder"); 
for(File file : directory.listFiles()){ 
    files.put(file.getAbsolutePath()); 
} 
files.put(null);//this last entry tells the worker threads to stop 

Les blocs fonctionnels de vente jusqu'à ce que l'espace devient disponible dans la file d'attente, donc si vous remplissez les fichiers arrêtera la lecture. Bien sûr, parce que File.listFiles() retourne un tableau, plutôt qu'une Collection qui n'a pas besoin d'être entièrement chargée en mémoire, vous finissez par charger une liste complète de fichiers en mémoire si vous utilisez cette fonction. Si cela finit par poser problème, je suppose que vous devrez faire autre chose.

Mais ce modèle fonctionne également si vous avez une autre méthode de listage des fichiers (par exemple s'ils sont tous dans une base de données, ou autre) Remplacez simplement l'appel à directory.listFiles() par ce que vous utilisez pour obtenir votre liste de fichiers. De plus, si vous devez traiter des fichiers dans des sous-répertoires, vous devrez les parcourir récursivement, ce qui peut être ennuyeux (mais contourner le problème de mémoire pour les répertoires extrêmement grands)

puis dans vos threads de travail:

public void run(){ 
    while(!done.get()){ 
     String filename = files.take(); 
     if(filename != null){ 
     //do stuff with your file. 
     } 
     else{ 
     done.set(true);//signal to the other threads that we found the final element. 
     } 
    } 
} 

Si tous les fichiers de la file d'attente ont été traités, la phase d'attente attend jusqu'à ce que de nouveaux éléments apparaissent.

C'est l'idée de base de toute façon, ce code est hors de ma tête et n'a pas été testé exactement comme est.

+0

'files.put (null);' vous donnera une exception de pointeur nul selon les spécifications – VHS

5

Vous ne voulez pas lire les fichiers parallèlement (les E/S disque ne se parallélisent pas bien). Mieux vaut alors laisser un seul thread lire les fichiers, envoyer le contenu aux threads de travail pour le traitement en parallèle, puis collecter les résultats auprès des travailleurs. L'utilisation de l'excellent ExecutorService & c: o de java.util.concurrent vous épargne les détails sales du filetage et rend votre solution beaucoup plus flexible.

Voici un exemple simple. En supposant Foo est le résultat du traitement d'un fichier:

public List<Foo> processFiles(Iterable<File> files){ 
    List<Future<Foo>> futures = new ArrayList<Future<Foo>>(); 
    ExecutorService exec = Executors.newFixedThreadPool(
     Runtime.getRuntime().availableProcessors()); 
    for (File f : files){ 
     final byte[] bytes = readAllBytes(f); // defined elsewhere 
     futures.add(exec.submit(new Callable<Foo>(){ 
      public Foo call(){ 
       InputStream in = new ByteArrayInputStream(bytes); 
       // Read a Foo object from "in" and return it 
      } 
     })); 
    } 
    List<Foo> foos = new List<Foo>(futures.size()); 
    for (Future<Foo> f : futures) foos.add(f.get()); 
    exec.shutdown(); 
    return foos; 
} 

TODO: Ajouter la gestion des exceptions, etc. Vous pouvez également instancier le ExecutorService en dehors de processFiles afin que vous puissiez le réutiliser entre les appels.

0

Je travaille sur un problème similaire où je dois traiter quelques milliers de fichiers texte. J'ai un poller de fichier qui interroge le répertoire et prépare la liste des fichiers trouvés dans le répertoire (y compris les sous-répertoires), et appelle une méthode, disons, fileFound avec la liste comme argument.

Dans la méthode fileFound, je suis en train d'itérer sur la liste et de créer un nouveau thread pour chaque fichier. J'utilise ExecutorService pour contrôler le nombre de threads actifs. Code va comme ceci:

public void fileFound(List<File> fileList) { 
    for (File file : fileList) { 
     FileProcessor fprocessor = new FileProcessor(file); // run() method takes care of implementing business rules for the file. 
     EXECUTOR.submit(fprocessor); //ExecutorService EXECUTOR = Executors.newFixedThreadPool(10); 
    } 
} 

Mon observation:

  1. Lors du traitement des fichiers un par un, sans multi-threading, le traitement 3.5K fichiers (~ 32Go au total), il a fallu ~ 9 heures.
  2. Utilisation multi-threading:

    Lorsque nombre de fils fixés à 5 - 118 minutes.

    Lorsque le nombre de threads est fixé à 10 - 75 minutes.

    Lorsque le nombre de threads est fixé à 15 - 72 minutes.

+0

Pouvez-vous s'il vous plaît partager votre nombre de CORES de CPU pour lequel je vois 10 Threads est optimal? –

+0

@sunny_dev la machine avait 4 cœurs – Amarjeet

+0

est-ce des noyaux logiques ou des cœurs réels. –

1

Avec Java 8, vous pouvez facilement y parvenir en utilisant parallel streams. Voir l'extrait de code suivant:

try { 
     Files.walk(Paths.get("some-path")).parallel().forEach(file -> {/*do your processing*/}); 
    } catch (IOException e1) { 
     e1.printStackTrace(); 
    } 

Avec flux parallèle, le temps d'exécution engendrera le nombre requis de fils, ne dépassant pas le nombre de cœurs logiques CPU, pour traiter les éléments de collecte, les fichiers dans notre cas, en parallèle . Vous pouvez également contrôler le nombre de threads en les transmettant en tant qu'argument JVM. L'avantage de cette approche est que vous n'avez pas vraiment besoin de faire un travail de bas niveau pour créer et maintenir des threads. Vous vous concentrez simplement sur votre problème de haut niveau.

Questions connexes