0

J'ai créé un serveur de socket Java qui crée un serveur de socket sur un port spécifié, puis génère un objet RecordWriter pour effectuer une opération sur le flux de données obtenu à partir de chaque connexion.Restriction du nombre de connexions dans le serveur socket Java

J'ai démarré le programme avec le port comme 61000 et numthreads comme 2. J'ai également commencé 3 clients pour se relier à lui. Du côté du client, je pouvais voir que tous les 3 d'entre eux connectés au récepteur, mais les journaux du récepteur indiqué que seulement deux d'entre eux connectés.

netstat -an|grep 61000|grep -i ESTABLISHED

total 6 liaisons indiquées que le client et le serveur sont exécutés sur la même machine. Mes doutes sont:

  1. Pourquoi le journal client pour la troisième émission de temps qu'il pourrait se connecter au programme sur 61000 alors que je me sers de l'arriéré de 2. Aussi Executors.newFixedThreadPool(numThreads); est ce qui permet seulement 2 clients à raccorder.
  2. Bien que le server.accept se passe dans le MyWriter.java et il n'y a aucune indication dans les journaux que le 3e client peut se connecter, pourquoi ne montre netstat cela comme une connexion établie

Voici mes codes:

MyReceiver.java

package com.vikas; 

import java.net.ServerSocket; 
import java.io.IOException; 
import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 

public class MyReceiver{ 

    protected int serverPort = -1; 
    protected int numThreads = -1; 

    protected boolean isStopped = false; 
    protected Thread runningThread = null; 
    protected ExecutorService threadPool = null; 

    protected static Logger logger = LogManager.getLogger(MyReceiver.class); 
    protected static ServerSocket serverSocket = null; 
    protected static Map<String, String> mapConnections = new ConcurrentHashMap<String, String>(); 

    public MyReceiver(int port){ 
     this.serverPort = port; 
    } 

    public void run(int numThreads){ 
     this.threadPool = Executors.newFixedThreadPool(numThreads); 

     try { 
      logger.info("Starting server on port " + this.serverPort); 
      MyReceiver.serverSocket = new ServerSocket(this.serverPort, numThreads); 
     } catch (IOException e) { 
      //throw new RuntimeException("Cannot open port " + this.serverPort, e); 
      logger.error("Cannot open port " + this.serverPort, e); 
     } 

     while(!isStopped()){ 
      this.threadPool.execute(new MyWriter()); 
     } 


     if(MyReceiver.mapConnections.isEmpty()){ 
      this.threadPool.shutdown(); 
      //System.out.println("Server Stopped after shutdown.") ; 
      logger.info("Server Stopped after shutdown."); 
     } 
    } 


    public synchronized boolean isStopped() { 
     return this.isStopped; 
    } 

    public synchronized void stop(){ 
     this.isStopped = true; 
     try { 
      MyReceiver.serverSocket.close(); 
     } catch (IOException e) { 
      //throw new RuntimeException("Error closing server", e); 
      logger.error("Error closing server", e); 
     } 
    } 

    public static void main(String[] args) { 
     if(args.length != 2){ 
      System.out.println("Number of input arguements is not equal to 4."); 
      System.out.println("Usage: java -cp YOUR_CLASSPATH -Dlog4j.configurationFile=/path/to/log4j2.xml com.vikas.MyReceiver <port> <number of threads>"); 
      System.out.println("java -cp \"$CLASSPATH:./MyReceiver.jar:./log4j-api-2.6.2.jar:./log4j-core-2.6.2.jar\" -Dlog4j.configurationFile=log4j2.xml com.vikas.MyReceiver 61000 2"); 
     } 
     int port = Integer.parseInt(args[0].trim()); 
     int numThreads = Integer.parseInt(args[1].trim()); 

     final MyReceiver myConnection = new MyReceiver(port, topic, brokers); 
     myConnection.run(numThreads); 

     /*Thread t = new Thread(myConnection); 
     t.start();*/ 


     try { 
      Thread.sleep(20000); 
     } catch (InterruptedException e) { 
      //e.printStackTrace(); 
      logger.error("Something went wrong", e); 
     } 
     //System.out.println("Stopping Server"); 
     Runtime.getRuntime().addShutdownHook(new Thread() 
     { 
      @Override 
      public void run() 
      { 
       logger.info("SocketServer - Receive SIGINT!!!"); 
       logger.info("Stopping Server"); 

       if(!myConnection.isStopped()){ 
        myConnection.stop(); 
       } 
       logger.info("Server Stopped successfully"); 

       try 
       { 
        Thread.sleep(1000); 
       } 
       catch (Exception e) {} 
      } 
     }); 
     //myConnection.stop(); 
    } 
} 

MyWriter.java

package com.vikas; 

import java.io.InputStreamReader; 
import java.io.BufferedReader; 
import java.io.IOException; 
import java.net.Socket; 
import java.util.Properties; 

import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 



public class MyWriter implements Runnable{ 

    protected String topic = null; 
    protected String brokers = null; 

    protected static Logger logger = LogManager.getLogger(MyWriter.class); 

    public MyWriter() { 

    } 


    public void run() { 
     while(!MyReceiver.serverSocket.isClosed()){ 
      Socket server = null; 
      try { 
       server = MyReceiver.serverSocket.accept(); 
       //System.out.println("Just connected to " + server.getRemoteSocketAddress()); 
       logger.info("Just connected to " + server.getRemoteSocketAddress()); 
       MyReceiver.mapConnections.put(server.getRemoteSocketAddress().toString().trim(), ""); 

       //change for prod deployment //change implemented 
       String key = null; 
       String message = null; 

       char ch; 
       StringBuilder msg = new StringBuilder(); 
       int value = 0; 


       try { 
        BufferedReader in = new BufferedReader(new InputStreamReader(server.getInputStream())); 
        while((value = in.read()) != -1){ 
         ch = (char)value; 
         if(ch == 0x0a){ 
          //msg.append(ch); 
          //System.out.println(msg); 

          message = msg.toString().trim(); 

          //code change as part of testing in prod 
          if(message.length() != 0){ 
           //do something 
           msg.setLength(0); 
          } 
          else{ 
           logger.error("Blank String received"); 
           msg.setLength(0); 
          } 
         } 
         else{ 
          msg.append(ch); 
         } 
        } 
        logger.info("Closing connection for client :" + server.getRemoteSocketAddress()); 
        //System.out.println("Closing connection for client :" + this.getClientSocket().getRemoteSocketAddress()); 
        server.close(); 
        MyReceiver.mapConnections.remove(server.getRemoteSocketAddress()); 
       } catch (IOException e) { 
        //report exception somewhere. 
        //e.printStackTrace(); 
        logger.error("Something went wrong!!", e); 
       } 
       finally{ 
        producer.close(); 
       } 

      } catch (IOException e) { 
       if(MyReceiver.serverSocket.isClosed()) { 
        //System.out.println("Server was found to be Stopped."); 
        logger.error("Server was found to be Stopped."); 
        logger.error("Error accepting client connection", e); 
        break; 
       } 
      }   
     } 
    } 
} 
+0

'myConnection.run (numThreads);' - Ce ia * jamais * un bon signe - vous devez utiliser 'myConnection.start (numThreads),' 'noter le start'. – OldCurmudgeon

+0

@OldCurmudgeon Dans ce cas, c'est juste un nom de méthode. myConnection est de type 'MyReceiver' qui n'implémente pas Runnable. Je ne le nommerais pas "courir" écrou ce n'est pas faux. – Fildor

+0

@Fildor - Compris - pas le problème en question. Nécessité de souligner cependant. – OldCurmudgeon

Répondre

3

Le paramètre backlog du constructeur ServerSocket limite la taille de la file d'attente de connexion entrante pas le nombre total de fois que vous êtes autorisé à appeler avec succès accept(). Si vous souhaitez limiter le nombre de connexions actives, vous devez garder une trace du nombre de connexions que vous avez acceptées, puis lorsque vous atteignez votre seuil, n'appelez pas de nouveau accept() jusqu'à ce qu'au moins l'une des connexions actives ait été fermée.

while(!MyReceiver.serverSocket.isClosed()){ 
     Socket server = null; 
     try { 
      server = MyReceiver.serverSocket.accept(); 
      //System.out.println("Just connected to " + server.getRemoteSocketAddress()); 
      logger.info("Just connected to " + server.getRemoteSocketAddress()); 
      MyReceiver.mapConnections.put(server.getRemoteSocketAddress().toString().trim(), ""); 

      if (activeConnections == maxConnections) break; // exit accept loop 
+0

Merci @SpaceghostAli !! Je ai juste sur le doute, quand vous avez dit "Le paramètre backlog du constructeur ServerSocket limite la taille de la file d'attente de connexion entrante pas le nombre total de fois que vous êtes autorisé à appeler avec succès()" ..... quelle file sommes-nous parler et comment cette file d'attente affecte mon serveur socket? –

+1

@VikasSaxena Il parle de la file d'attente de connexion entrante, ou file d'attente 'backlog', et il affecte votre serveur en limitant le nombre de connexions entrantes qui peuvent exister sans avoir été retourné par 'accept()' encore. – EJP