0

J'ai une LinkedBlockingQueue avec une capacité arbitrairement choisi de 10, et un fichier d'entrée avec 1000 lignes. J'ai une variable de type ExecutorService dans la méthode main de la classe de service qui, à ma connaissance, gère d'abord - en utilisant Executors.newSingleThreadExecutor() - un thread unique pour appeler buffer.readline() jusqu'au fichier line == null, puis gère - dans un boucle en utilisant Executors.newSingleThreadExecutor() --en threads pour traiter les lignes et les écrire dans les fichiers de sortie, jusqu'à !queue.take().equals("Stop"). Cependant, après avoir écrit quelques lignes dans des fichiers, quand je suis en mode débogage, je vois que la capacité de la file atteint finalement max (10), et les threads de traitement n'exécutent pas queue.take(). Tous les threads sont dans l'état running, mais le processus s'arrête après queue.put(). Qu'est-ce qui causerait ce problème, et est-il soluble en utilisant une combinaison de thread-pooling ou plusieurs variables de gestionnaire ExecutorService, au lieu d'une seule variable?Un producteur Dix consommateurs de traitement de fichiers avec Executors.newSingleThreadExecutor()

Aperçu pour l'état actuel de la méthode main en service:

//app settings to get values for keys within a properties file 
AppSettings appSettings = new AppSettings(); 
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); 

maxProdThreads = 1; 
maxConsThreads = 10; 

ExecutorService execSvc = null; 

for (int i = 0; i < maxProdThreads; i++) { 

     execSvc = Executors.newSingleThreadExecutor(); 

     execSvc.submit(new ReadJSONMessage(appSettings,queue)); 

    } 

    for (int i = 0; i < maxConsThreads; i++) { 

     execSvc = Executors.newSingleThreadExecutor(); 

     execSvc.submit(new ProcessJSONMessage(appSettings,queue)); 

    } 

lecture Code de la méthode:

buffer = new BufferedReader(new FileReader(inputFilePath)); 

    while((line = buffer.readLine()) != null){ 

      line = line.trim(); 

      queue.put(line); 

     } 

Traitement et écriture de code:

while(!(line=queue.take()).equals("Stop")){ 

     if(line.length() > 10) 
     { 

      try { 
       if(processMessage(line, outputFilePath) == true) 
       { 
        ++count; 
       } 
      } catch (Exception e) { 
       e.printStackTrace(); 
      }    
     } 
    } 


public boolean processMessage(String line, String outputFilePath){ 
    CustomObject cO = new CustomObject(); 
    cO.setText(line); 
    writeToFile1(cO,...); 
    writeToFile2(cO,...); 
} 

public void writeOutputAToFile(CustomObject cO,...){ 
    synchronized(cO){ 
     ... 
     org.apache.commons.io.FileUtils.writeStringToFile(...) 
    } 
} 


public void writeOutputBToFile(CustomObject cO,...){ 
     synchronized(cO){ 
      ... 
      org.apache.commons.io.FileUtils.writeStringToFile(...) 
     } 
    } 
+1

S'il vous plaît postez votre lire et écrire le code. J'ai l'impression que vous bloquez d'une façon ou d'une autre. –

+0

Pas votre réponse, mais je suggère de mettre un seul 'Executors.newCachedThreadPool()' en dehors de vos boucles et en utilisant cela. Cela vous permettra de fermer tous les threads avec un seul appel ('exeSvc.shutdown()') lorsque vous avez terminé. – teppic

+0

Effectuez un vidage de thread pour voir où vos threads sont bloqués. – teppic

Répondre

0

Dans le traitement et l'écriture de code Assurez-vous que toutes les ressources sont bien fermées. Probablement les ressources pourraient ne pas être fermé correctement en raison de laquelle le thread continue à fonctionner et le ExecutorService ne peut pas trouver un thread inactif ...