2010-02-15 4 views
20

Très bien, possible une question naïve ici. J'ai un service qui doit se connecter à plusieurs périphériques réseau, exécuter une commande sur chacun et collecter les résultats. Pour la rapidité, plutôt que de collecter les informations sur chaque périphérique en séquence, j'ai besoin d'y accéder simultanément et de consommer les résultats une fois qu'ils sont terminés. En utilisant Spring framework et Jsch, je suis assez facilement capable d'interroger chaque périphérique correctement. Où je suis dans une certaine confusion est en essayant de recâbler les beans pour utiliser TaskExecutor pour accomplir cela. Ce que je n'arrive pas à comprendre c'est comment savoir quand le fil est fini.En utilisant Spring threading et TaskExecutor, comment savoir quand un thread est terminé?

Ce que j'ai à ce jour est le suivant:

public class RemoteCommand { 

    private String user; 
    private String host; 
    private String password; 
    private String command; 
    private List<String> commandResults; 
    private TaskExecutor taskExecutor; 

    public RemoteCommand(String user, String host, String password, TaskExecutor taskExecutor) { 

     setUser(user); 
     setHost(host); 
     setPassword(password); 
     setTaskExecutor(taskExecutor); 
    } 

    /** 
    * @param user the user to set 
    */ 
    public void setUser(String user) { 
     this.user = user; 
    } 

    /** 
    * @return the user 
    */ 
    public String getUser() { 
     return user; 
    } 

    /** 
    * @param host the host to set 
    */ 
    public void setHost(String host) { 
     this.host = host; 
    } 

    /** 
    * @return the host 
    */ 
    public String getHost() { 
     return host; 
    } 

    /** 
    * @param password the password to set 
    */ 
    public void setPassword(String password) { 
     this.password = password; 
    } 

    /** 
    * @return the password 
    */ 
    public String getPassword() { 
     return password; 
    } 

    /** 
    * @param command the command to set 
    */ 
    private void setCommand(String command) { 
     this.command = command; 
    } 

    /** 
    * @return the command 
    */ 
    private String getCommand() { 
     return command; 
    } 

    /** 
    * @param commandResults the commandResults to set 
    */ 
    private void setCommandResults(List<String> commandResults) { 
     this.commandResults = commandResults; 
    } 

    /** 
    * @return the commandResults 
    */ 
    public List<String> getCommandResults(String command) { 
     taskExecutor.execute(new CommandTask(command)); 

     return commandResults; 
    } 

    /** 
    * @param taskExecutor the taskExecutor to set 
    */ 
    public void setTaskExecutor(TaskExecutor taskExecutor) { 
     this.taskExecutor = taskExecutor; 
    } 

    /** 
    * @return the taskExecutor 
    */ 
    public TaskExecutor getTaskExecutor() { 
     return taskExecutor; 
    } 

    private class CommandTask implements Runnable { 

     public CommandTask(String command) { 
      setCommand(command); 
      System.out.println("test: " + getCommand()); 
     } 

     /** 
     * 
     * @param command 
     */ 
     public void run() { 

      List<String> results = new LinkedList<String>(); 
      String command = getCommand(); 

      try { 
       System.out.println("running"); 
       JSch jsch = new JSch(); 

       String user = getUser(); 
       String host = getHost(); 

       java.util.Properties config = new java.util.Properties(); 
       config.put("StrictHostKeyChecking", "no"); 

       host = host.substring(host.indexOf('@') + 1); 
       Session session = jsch.getSession(user, host, 22); 

       session.setPassword(getPassword()); 
       session.setConfig(config); 
       session.connect(); 

       Channel channel = session.openChannel("exec"); 
       ((ChannelExec) channel).setCommand(command); 

       channel.setInputStream(null); 

       ((ChannelExec) channel).setErrStream(System.err); 

       InputStream in = channel.getInputStream(); 

       channel.connect(); 
       byte[] tmp = new byte[1024]; 
       while (true) { 
        while (in.available() > 0) { 
         int i = in.read(tmp, 0, 1024); 
         if (i < 0) 
          break; 
         results.add(new String(tmp, 0, i)); 
         System.out.print(new String(tmp, 0, i)); 
        } 
        if (channel.isClosed()) { 
         //System.out.println("exit-status: " 
         //  + channel.getExitStatus()); 
         break; 
        } 
        try { 
         Thread.sleep(1000); 
        } catch (Exception ee) { 
         ee.printStackTrace(); 
        } 
       } 
       channel.disconnect(); 
       session.disconnect(); 
      } catch (Exception e) { 
       System.out.println(e); 
      } 
      setCommandResults(results); 
      System.out.println("finished running"); 
     } 
    } 
} 

Dans mon test JUnit je:

@Test 
    public void testRemoteExecution() { 

     remoteCommand = (RemoteCommand) applicationContext.getBean("remoteCommand"); 
     remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx"); 

      //List<String> results = remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx"); 
     //for (String line : results) { 
     // System.out.println(line.trim()); 
     //} 
    } 

Mon fichier applicationContext.xml:

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
     <property name="corePoolSize" value="5" /> 
     <property name="maxPoolSize" value="10" /> 
     <property name="queueCapacity" value="25" /> 
    </bean>   

<!-- ******************** --> 
<!--  Utilities  --> 
<!-- ******************** --> 

    <bean name="remoteCommand" class="com.xxx.ncc.sonet.utilities.RemoteCommand" scope="prototype"> 
     <description>Remote Command</description> 
     <constructor-arg><value>${remote.user}</value></constructor-arg> 
     <constructor-arg><value>${remote.host}</value></constructor-arg> 
     <constructor-arg><value>${remote.password}</value></constructor-arg> 
     <constructor-arg ref="taskExecutor" /> 
    </bean> 

j'aller aussi loin comme premier println dans la méthode run(). Ensuite, le test se termine proprement sans erreurs. Je n'arrive jamais à la deuxième impression au bas de cette routine. J'ai regardé ce fil here, qui était très utile, mais pas mis en œuvre dans un mode spécifique de printemps. Je suis sûr qu'il me manque quelque chose de simple, ou que j'ai complètement fui les rails ici. Toute aide est appréciée.

Répondre

11
public List<String> getCommandResults(String command) { 
    FutureTask task = new FutureTask(new CommandTask(command)) 
    taskExecutor.execute(task); 

    return task.get(); //or task.get(); return commandResults; - but it not a good practice 
} 
+1

'FutureTask' n'a pas un tel constructeur ... il prend soit' Callable', soit 'Runnable' avec un résultat pré-calculé. – skaffman

+1

Oh, oui, c'est vrai. J'ai juste oublié de dire qu'il est préférable de rendre CommandTask comme Callable. Pardon –

45

L'interface TaskExecutor est une interface fire-and-forget, à utiliser lorsque vous ne vous souciez pas de la fin de la tâche. C'est l'abstraction async la plus simple que le printemps offre.

Il y a, cependant, une interface améliorée, AsyncTaskExecutor, qui fournit des méthodes supplémentaires, y compris submit() méthodes qui renvoient un Future qui vous permettent d'attendre le résultat.

Le ressort fournit la classe ThreadPoolTaskExecutor, qui implémente à la fois TaskExecutor et AsyncTaskExecutor.

Dans votre cas, je serais réimplémenter la Runnable comme Callable, et retourner le commandResults de la méthode Callable.call(). La méthode getCommandResults peut alors être réimplémentée comme:

public List<String> getCommandResults(String command) { 
    Future<List<String>> futureResults = taskExecutor.submit(new CommandTask(command)); 
    return futureResults.get(); 
} 

Cette méthode soumettra la tâche de manière asynchrone, puis attendez qu'elle soit terminée avant de retourner les résultats renvoyés par la méthode Callable.call(). Cela vous permet également de vous débarrasser du champ commandResults.

+2

Une réponse très utile. C'était la réponse finale ci-dessus qui a complété le correctif, mais je ne l'aurais pas complètement compris sans votre réponse, merci. – Bill

Questions connexes