2010-05-13 6 views
1

Je travaille sur un programme Java qui est essentiellement un forum de discussion. Ceci est une tâche pour la classe donc pas de code s'il vous plaît, je suis juste avoir quelques problèmes qui déterminent la façon la plus réaliste de gérer ce que je dois faire. J'ai un programme serveur déjà configuré pour un seul client en utilisant des threads pour obtenir le flux d'entrée de données et un thread pour gérer l'envoi sur le flux de sortie de données. Ce que je dois faire maintenant est de créer un nouveau thread pour chaque requête entrante.Utilisation de threads pour gérer des sockets

Mon idée est de créer une liste chaînée pour contenir les sockets client, ou éventuellement le thread. Où je trébuche est de savoir comment gérer l'envoi des messages à tous les clients. Si j'ai un thread pour chaque message entrant, comment puis-je faire demi-tour et l'envoyer à chaque socket client.

Je pense que si j'avais une liste des clientsockets, je pourrais alors traverser la liste et l'envoyer à chacun, mais ensuite je devrais créer un flux de sortie de données à chaque fois. Est-ce que je peux créer une liste liée de dataoutputstreams? Je suis désolé si j'ai l'impression que je suis en train de divaguer, mais je ne veux pas commencer à le coder, ça pourrait être compliqué sans un bon plan. Merci!

EDIT J'ai décidé de publier le code que j'ai jusqu'à présent. Je n'ai pas encore eu l'occasion de le tester, donc tout commentaire serait génial. Merci!

import java.io.BufferedReader; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.net.Socket; 
import java.net.ServerSocket; 
import java.util.LinkedList; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

public class prog4_server { 

    // A Queue of Strings used to hold out bound Messages 
    // It blocks till on is available 
    static BlockingQueue<String> outboundMessages = new LinkedBlockingQueue<String>(); 

    // A linked list of data output streams 
    // to all the clients 
    static LinkedList<DataOutputStream> outputstreams; 

    // public variables to track the number of clients 
    // and the state of the server 
    static Boolean serverstate = true; 
    static int clients = 0; 

    public static void main(String[] args) throws IOException{ 

     //create a server socket and a clientSocket 
     ServerSocket serverSocket = null; 
     try { 
      serverSocket = new ServerSocket(6789); 
     } catch (IOException e) { 
      System.out.println("Could not listen on port: 6789"); 
      System.exit(-1); 
     }// try{...}catch(IOException e){...} 

     Socket clientSocket; 

     // start the output thread which waits for elements 
     // in the message queue 
     OutputThread out = new OutputThread(); 
     out.start(); 

     while(serverstate){ 

      try { 

       // wait and accept a new client 
       // pass the socket to a new Input Thread 
       clientSocket = serverSocket.accept(); 
       DataOutputStream ServerOut = new DataOutputStream(clientSocket.getOutputStream()); 
       InputThread in = new InputThread(clientSocket, clients); 
       in.start(); 
       outputstreams.add(ServerOut); 

      } catch (IOException e) { 

       System.out.println("Accept failed: 6789"); 
       System.exit(-1); 
      }// try{...}catch{..} 

      // increment the number of clients and report 
      clients = clients++; 

      System.out.println("Client #" + clients + "Accepted"); 

     }//while(serverstate){... 

    }//public static void main 

    public static class OutputThread extends Thread { 

     //OutputThread Class Constructor 
     OutputThread() { 
     }//OutputThread(...){... 

     public void run() { 

      //string variable to contain the message 
      String msg = null; 

      while(!this.interrupted()) { 

       try { 

        msg = outboundMessages.take(); 

        for(int i=0;i<outputstreams.size();i++){ 

         outputstreams.get(i).writeBytes(msg + '\n'); 

        }// for(...){... 

       } catch (IOException e) { 

        System.out.println(e); 

       } catch (InterruptedException e){ 

        System.out.println(e); 

       }//try{...}catch{...} 

      }//while(...){ 

     }//public void run(){... 

    }// public OutputThread(){... 

    public static class InputThread extends Thread { 

     Boolean threadstate = true; 
     BufferedReader ServerIn; 
     String user; 
     int threadID; 
     //SocketThread Class Constructor 
     InputThread(Socket clientSocket, int ID) { 

      threadID = ID; 

      try{ 
       ServerIn = new BufferedReader(
        new InputStreamReader(clientSocket.getInputStream())); 
        user = ServerIn.readLine(); 
      } 
      catch(IOException e){ 
       System.out.println(e); 
      } 

     }// InputThread(...){... 

     public void run() { 

      String msg = null; 

     while (threadstate) { 

       try { 

        msg = ServerIn.readLine(); 

        if(msg.equals("EXITEXIT")){ 

         // if the client is exiting close the thread 
         // close the output stream with the same ID 
         // and decrement the number of clients 
      threadstate = false; 
         outputstreams.get(threadID).close(); 
         outputstreams.remove(threadID); 
         clients = clients--; 
         if(clients == 0){ 
          // if the number of clients has dropped to zero 
          // close the server 
          serverstate = false; 
          ServerIn.close(); 
         }// if(clients == 0){... 
        }else{ 

         // add a message to the message queue 
         outboundMessages.add(user + ": " + msg); 

        }//if..else... 

       } catch (IOException e) { 

        System.out.println(e); 

       }// try { ... } catch { ...} 

     }// while 

     }// public void run() { ... 
    } 

    public static class ServerThread extends Thread { 

     //public variable declaration 
     BufferedReader UserIn = 
       new BufferedReader(new InputStreamReader(System.in)); 

     //OutputThread Class Constructor 
     ServerThread() { 

     }//OutputThread(...){... 

     public void run() { 

      //string variable to contain the message 
      String msg = null; 

      try { 

       //while loop will continue until 
       //exit command is received 
       //then send the exit command to all clients 

       msg = UserIn.readLine(); 

       while (!msg.equals("EXITEXIT")) { 

        System.out.println("Enter Message: "); 
        msg = UserIn.readLine(); 

       }//while(...){ 

       outboundMessages.add(msg); 
       serverstate = false; 
       UserIn.close(); 

      } catch (IOException e) { 
       System.out.println(e); 

      }//try{...}catch{...} 


     }//public void run(){... 
    }// public serverThread(){... 

}// public class prog4_server 
+1

"thread par requête" ou "thread par socket" ne s'adapte pas - pensez à ce qui se passerait lorsque des clients 5K sont connectés à votre serveur. –

+0

Je ne suis pas sûr de ce que vous voulez dire, dites-vous que je dois limiter le nombre de threads? – Levi

+0

Nikolai, quel est le but de votre 'plainte' si vous ne proposez pas une meilleure idée? Je serais également intéressé de savoir comment «mettre à l'échelle» correctement :) –

Répondre

3

J'ai résolu ce problème dans le passé en définissant une classe « MessageHandler » par connexion client, responsable du trafic des messages entrants/sortants. En interne, le gestionnaire utilise une implémentation BlockingQueue sur laquelle les messages sortants sont placés (par les threads de travail internes). Le thread de l'expéditeur d'E/S tente continuellement de lire dans la file d'attente (en bloquant si nécessaire) et envoie chaque message récupéré au client.

Voici quelques exemples de code squelette (non testé):

/** 
* Our Message definition. A message is capable of writing itself to 
* a DataOutputStream. 
*/ 
public interface Message { 
    void writeTo(DataOutputStream daos) throws IOException; 
} 

/** 
* Handler definition. The handler contains two threads: One for sending 
* and one for receiving messages. It is initialised with an open socket. 
*/  
public class MessageHandler { 
    private final DataOutputStream daos; 
    private final DataInputStream dais; 
    private final Thread sender; 
    private final Thread receiver; 
    private final BlockingQueue<Message> outboundMessages = new LinkedBlockingQueue<Message>(); 

    public MessageHandler(Socket skt) throws IOException { 
    this.daos = new DataOutputStream(skt.getOutputStream()); 
    this.dais = new DataInputStream(skt.getInputStream()); 

    // Create sender and receiver threads responsible for performing the I/O. 
    this.sender = new Thread(new Runnable() { 
     public void run() { 
     while (!Thread.interrupted()) { 
      Message msg = outboundMessages.take(); // Will block until a message is available. 

      try { 
      msg.writeTo(daos); 
      } catch(IOException ex) { 
      // TODO: Handle exception 
      } 
     } 
     } 
    }, String.format("SenderThread-%s", skt.getRemoteSocketAddress())); 

    this.receiver = new Thread(new Runnable() { 
     public void run() { 
     // TODO: Read from DataInputStream and create inbound message. 
     } 
    }, String.format("ReceiverThread-%s", skt.getRemoteSocketAddress())); 

    sender.start(); 
    receiver.start(); 
    } 

    /** 
    * Submits a message to the outbound queue, ready for sending. 
    */ 
    public void sendOutboundMessage(Message msg) { 
    outboundMessages.add(msg); 
    } 

    public void destroy() { 
    // TODO: Interrupt and join with threads. Close streams and socket. 
    } 
} 

Notez que Nikolai est correct que le blocage d'E/S en utilisant 1 (ou 2) fils par connexion n'est pas une solution évolutive et typiquement des applications peut-être écrit en utilisant Java NIO pour contourner cela. Cependant, en réalité, sauf si vous écrivez un serveur d'entreprise auquel des milliers de clients se connectent simultanément, ce n'est pas vraiment un problème. Rédaction d'applications évolutives sans bug en utilisant Java NIO est difficile et certainement pas quelque chose que je recommanderais.

+0

Merci. Je n'ai jamais utilisé une file d'attente bloquante auparavant, mais je peux voir comment cela pourrait s'intégrer dans mon design original. Je sais que ce n'est pas évolutif mais ce n'est pas vraiment dans le cadre de cette mission. Ce que je vois est qu'avec les deux threads que j'avais à l'origine, je peux utiliser une variable de message global et une file d'attente de blocage dans mes threads de sortie pour envoyer le message sur tous les ports clients actifs. Je vais voir si ça va marcher. Je dois également trouver un moyen de fermer les deux threads associés à un socket. – Levi

+0

Oui exactement - Vous pouvez créer un message, puis le transmettre à chaque gestionnaire. Pour fermer vos threads, vous devrez appeler thread.interrupt() suivi de thread.join(). Cependant, pour que cela fonctionne, il est important que les deux threads vérifient régulièrement leur statut interrompu via Thread.interrupted(). – Adamski

+0

Ma version précédente s'est terminée en fonction d'une entrée utilisateur. C'est assez facile avec le thread d'entrée, mais je suis bloqué en voyant comment garder une association entre ce thread d'entrée et son thread de sortie correspondant. Merci encore pour votre aide à ce sujet, j'aime apprendre de nouvelles et meilleures façons de faire les choses. – Levi

Questions connexes