2017-03-29 2 views
1

J'ai un problème avec NetMQ 4.0.0.1 sur Mono 4.8 sur Debian Wheezy.Pourquoi NetMQ DealerSocket sur Mono n'envoie aucun message au serveur sur Debian Wheezy, mais sur Windows?

Où la prise du concessionnaire n'envoie aucun message jusqu'à ce que je n'arrête pas de l'appeler pour envoyer un nouveau message. Quand je vais mettre Thread.Sleep(1000) entre la création d'une tâche avec que tout est ok. Je voudrais admettre que tout fonctionne sur Windows dans .Net Framework 4.5 et sur .Net Core 1.1 sans aucun Thread.Sleep().

J'ai motif comme ceci: enter image description here

J'ai ajouté des messages de débogage et je peux voir que je suis en train de créer 100 REQ prises dans les tâches dans une boucle, et le routeur reçoit des demandes dans une file d'attente, que Je les envoie par le concessionnaire, et rien ne se passe de l'autre côté de TCP jusqu'à ce que je vais arrêter l'appel d'envoyer sur les prises REQ. Un simple Thread.Sleep() toutes les 5 tâches fonctionne. Cela ressemble à un bug de Poller, ou à un bug de Dealer, ou je fais quelque chose de mal.

Voici un code de boîte du milieu:

public class CollectorDevice : IDisposable 
{ 
    private NetMQPoller _poller; 
    private RouterSocket _frontendSocket; 
    private DealerSocket _backendSocket; 
    private readonly string _backEndAddress; 
    private readonly string _frontEndAddress; 
    private readonly int _expectedFrameCount; 
    private readonly ManualResetEvent _startSemaphore = new ManualResetEvent(false); 
    private readonly Thread _localThread; 
    private static Logger _logger = LogManager.GetCurrentClassLogger(); 

    /// <summary> 
    /// Constructor 
    /// </summary> 
    /// <param name="backEndAddress"></param> 
    /// <param name="frontEndAddress"></param> 
    /// <param name="expectedFrameCount"></param> 
    public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount) 
    { 
     _expectedFrameCount = expectedFrameCount; 

     _backEndAddress = backEndAddress; 
     _frontEndAddress = frontEndAddress; 

     _localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" }; 
    } 

    public void Start() 
    { 
     _localThread.Start(); 
     _startSemaphore.WaitOne(); 


    } 

    public void Stop() 
    { 
     _poller.Stop(); 
    } 

    #region Implementation of IDisposable 

    public void Dispose() 
    { 
     Stop(); 
    } 

    #endregion 


    #region Private Methods 
    private void DoWork() 
    { 
     try 
     { 
      using (_poller = new NetMQPoller()) 
      using (_frontendSocket = new RouterSocket(_frontEndAddress)) 
      using (_backendSocket = new DealerSocket(_backEndAddress)) 
      { 
       _backendSocket.ReceiveReady += OnBackEndReady; 
       _frontendSocket.ReceiveReady += OnFrontEndReady; 


       _poller.Add(_frontendSocket); 
       _poller.Add(_backendSocket); 

       _startSemaphore.Set(); 

       _poller.Run(); 
      } 
     } 
     catch (Exception e) 
     { 
      _logger.Error(e); 
     } 
    } 

    private void OnBackEndReady(object sender, NetMQSocketEventArgs e) 
    { 
     NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount); 
     _frontendSocket.SendMultipartMessage(message); 
    } 

    private void OnFrontEndReady(object sender, NetMQSocketEventArgs e) 
    { 
     NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount); 
     _backendSocket.SendMultipartMessage(message); 
    } 

    #endregion 
} 

Voici un côté client:

class Program 
{ 
    private static Logger _logger = LogManager.GetCurrentClassLogger(); 


    private static void Main(string[] args) 
    { 
     Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server"); 
     Console.ReadKey(); 


     using (var collectorDevice = new CollectorDevice(">tcp://localhost:5556", "inproc://broker", 3)) 
     { 
      collectorDevice.Start(); 

      List<Task> tasks = new List<Task>(); 
      for (int i = 0; i < 100; i++) 
      { 
       Console.WriteLine(i); 
       int j = i;  
       Task t = Task.Factory.StartNew(() => 
       { 
        try 
        { 
         using (var req = new RequestSocket("inproc://broker")) 
         { 
          req.SendFrame(String.Format("Request client: {0} id: {1}", j, Task.CurrentId)); 
          _logger.Debug(String.Format("Request client: {0} id: {1}", j, Task.CurrentId)); 
          Console.WriteLine(String.Format("Request client: {0} id: {1}", j, Task.CurrentId)); 

          string responseMessage = req.ReceiveFrameString(); 
          _logger.Debug(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage)); 
          Console.WriteLine(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage)); 
         } 
        } 
        catch (Exception e) 
        { 
         Console.WriteLine(e); 
         _logger.Error(e); 
        } 
       }); 
       tasks.Add(t); 
       //Thread.Sleep (100);//<- This thread sleep is fixing problem? 
      } 

      Task.WaitAll(tasks.ToArray()); 
     } 

    } 
} 

Et côté serveur:

class Program 
{ 
    private static Logger _logger = LogManager.GetCurrentClassLogger(); 

    static void Main(string[] args) 
    { 
     try{ 
     using (var routerSocket = new RouterSocket("@tcp://*:5556")) 
     { 
      var poller = new NetMQPoller(); 
      routerSocket.ReceiveReady += RouterSocketOnReceiveReady; 
      poller.Add(routerSocket); 
      poller.Run(); 
     } 
     } 
     catch(Exception e) 
     {  
      Console.WriteLine (e); 
     } 

     Console.ReadKey(); 
    } 

    private static void RouterSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs) 
    { 
     NetMQMessage clientMessage = new NetMQMessage(); 
     bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5), 
      ref clientMessage, 5); 

     if (result == false) 
     { 
      Console.WriteLine ("Something went wrong?!"); 
     } 

     var address = clientMessage[0]; 
     var address2 = clientMessage[1]; 
     var clientMessageString = clientMessage[3].ConvertToString(); 

     //_logger.Debug("Message from client received: '{0}'", clientMessageString); 
     Console.WriteLine (String.Format ("Message from client received: '{0}'", clientMessageString)); 

     netMqSocketEventArgs 
      .Socket.SendMoreFrame(address.Buffer) 
      .SendMoreFrame(address2.Buffer) 
      .SendMoreFrameEmpty() 
      .SendFrame("I have received your message"); 
    } 
} 

Tout le monde a une idée? Je pensais que je suis peut-être en utilisant socket de différents threads, donc je l'ai emballé dans la structure ThreadLocal, mais il n'a pas été aidé.Comme j'ai lu des problèmes dans l'unité avec NetMQ, j'ai ajouté 'AsyncIO.ForceDotNet .Obliger();' avant chaque appel de constructeur de socket, et cela n'a pas été aidé aussi. Que j'ai mis à jour mon mono à 4.8 à partir de la version 4.4 et qu'il a toujours la même apparence.

J'ai vérifié que Thread.Sleep (100) entre les tâches corrige le problème. C'est étrange

+0

Y a-t-il des chances que vous utilisiez une socket à partir de plusieurs threads? – somdoron

+0

En outre, pouvez-vous partager le code de la socket de demande? – somdoron

+0

J'ai mis socket dans la structure ThreadLocal pour être sûr que je ne fais pas cette erreur. J'ai ajouté plus de code comme Vous voulez. En passant, merci pour la réponse rapide – bzyku

Répondre

0

J'ai testé le code, ça prend beaucoup de temps mais finalement le serveur reçoit tous les messages (ça prend environ une minute).

Le problème est la quantité de threads, toutes les opérations asynchrones qui doivent être effectuées sur le thread de complétion io complique beaucoup de temps quand il y a 100 threads. J'ai pu le reproduire sans NetMQ avec le code suivant

public static void Main(string[] args) 
    { 
     ManualResetEvent resetEvent = new ManualResetEvent(false); 

     List<Task> tasks = new List<Task>(); 

     for (int i = 0; i < 100; i++) 
     { 
      tasks.Add(Task.Run(() => 
      { 
       resetEvent.WaitOne(); 
      })); 
     } 

     Thread.Sleep(100); 

     Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 
     listener.Bind(new IPEndPoint(IPAddress.Any, 5556)); 
     listener.Listen(1); 

     SocketAsyncEventArgs args1 = new SocketAsyncEventArgs(); 
     args1.Completed += (sender, eventArgs) => 
     { 
      Console.WriteLine($"Accepted {args1.SocketError}"); 
      resetEvent.Set(); 
     }; 
     listener.AcceptAsync(args1); 

     SocketAsyncEventArgs args2 = new SocketAsyncEventArgs(); 
     args2.RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, 5556); 
     args2.Completed += (sender, eventArgs) => Console.WriteLine("Connected"); 
     Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 
     client.ConnectAsync(args2); 

     Task.WaitAll(tasks.ToArray()); 

     Console.WriteLine("all tasks completed"); 
    } 

Vous pouvez voir qui est aussi prend environ une minute. Avec seulement 5 threads, il s'est terminé immédiatement.

De toute façon, vous pourriez vouloir démarrer moins de threads et/ou reoort un bug dans un projet mono.

+0

Merci, je vais probablement réduire le nombre maximum de threads dans le pool de threads dans le serveur web à un nombre raisonnable – bzyku