J'ai un problème avec NetMQ 4.0.0.1 sur Mono 4.8 sur Debian Wheezy.Pourquoi NetMQ DealerSocket sur Mono n'envoie aucun message au serveur sur Debian Wheezy, mais sur Windows?
Où la prise du concessionnaire n'envoie aucun message jusqu'à ce que je n'arrête pas de l'appeler pour envoyer un nouveau message. Quand je vais mettre Thread.Sleep(1000)
entre la création d'une tâche avec que tout est ok. Je voudrais admettre que tout fonctionne sur Windows dans .Net Framework 4.5 et sur .Net Core 1.1 sans aucun Thread.Sleep()
.
J'ai ajouté des messages de débogage et je peux voir que je suis en train de créer 100 REQ
prises dans les tâches dans une boucle, et le routeur reçoit des demandes dans une file d'attente, que Je les envoie par le concessionnaire, et rien ne se passe de l'autre côté de TCP jusqu'à ce que je vais arrêter l'appel d'envoyer sur les prises REQ. Un simple Thread.Sleep()
toutes les 5 tâches fonctionne. Cela ressemble à un bug de Poller, ou à un bug de Dealer, ou je fais quelque chose de mal.
Voici un code de boîte du milieu:
public class CollectorDevice : IDisposable
{
private NetMQPoller _poller;
private RouterSocket _frontendSocket;
private DealerSocket _backendSocket;
private readonly string _backEndAddress;
private readonly string _frontEndAddress;
private readonly int _expectedFrameCount;
private readonly ManualResetEvent _startSemaphore = new ManualResetEvent(false);
private readonly Thread _localThread;
private static Logger _logger = LogManager.GetCurrentClassLogger();
/// <summary>
/// Constructor
/// </summary>
/// <param name="backEndAddress"></param>
/// <param name="frontEndAddress"></param>
/// <param name="expectedFrameCount"></param>
public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount)
{
_expectedFrameCount = expectedFrameCount;
_backEndAddress = backEndAddress;
_frontEndAddress = frontEndAddress;
_localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" };
}
public void Start()
{
_localThread.Start();
_startSemaphore.WaitOne();
}
public void Stop()
{
_poller.Stop();
}
#region Implementation of IDisposable
public void Dispose()
{
Stop();
}
#endregion
#region Private Methods
private void DoWork()
{
try
{
using (_poller = new NetMQPoller())
using (_frontendSocket = new RouterSocket(_frontEndAddress))
using (_backendSocket = new DealerSocket(_backEndAddress))
{
_backendSocket.ReceiveReady += OnBackEndReady;
_frontendSocket.ReceiveReady += OnFrontEndReady;
_poller.Add(_frontendSocket);
_poller.Add(_backendSocket);
_startSemaphore.Set();
_poller.Run();
}
}
catch (Exception e)
{
_logger.Error(e);
}
}
private void OnBackEndReady(object sender, NetMQSocketEventArgs e)
{
NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount);
_frontendSocket.SendMultipartMessage(message);
}
private void OnFrontEndReady(object sender, NetMQSocketEventArgs e)
{
NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount);
_backendSocket.SendMultipartMessage(message);
}
#endregion
}
Voici un côté client:
class Program
{
private static Logger _logger = LogManager.GetCurrentClassLogger();
private static void Main(string[] args)
{
Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server");
Console.ReadKey();
using (var collectorDevice = new CollectorDevice(">tcp://localhost:5556", "inproc://broker", 3))
{
collectorDevice.Start();
List<Task> tasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
Console.WriteLine(i);
int j = i;
Task t = Task.Factory.StartNew(() =>
{
try
{
using (var req = new RequestSocket("inproc://broker"))
{
req.SendFrame(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
_logger.Debug(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
Console.WriteLine(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
string responseMessage = req.ReceiveFrameString();
_logger.Debug(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
Console.WriteLine(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
}
}
catch (Exception e)
{
Console.WriteLine(e);
_logger.Error(e);
}
});
tasks.Add(t);
//Thread.Sleep (100);//<- This thread sleep is fixing problem?
}
Task.WaitAll(tasks.ToArray());
}
}
}
Et côté serveur:
class Program
{
private static Logger _logger = LogManager.GetCurrentClassLogger();
static void Main(string[] args)
{
try{
using (var routerSocket = new RouterSocket("@tcp://*:5556"))
{
var poller = new NetMQPoller();
routerSocket.ReceiveReady += RouterSocketOnReceiveReady;
poller.Add(routerSocket);
poller.Run();
}
}
catch(Exception e)
{
Console.WriteLine (e);
}
Console.ReadKey();
}
private static void RouterSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
{
NetMQMessage clientMessage = new NetMQMessage();
bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5),
ref clientMessage, 5);
if (result == false)
{
Console.WriteLine ("Something went wrong?!");
}
var address = clientMessage[0];
var address2 = clientMessage[1];
var clientMessageString = clientMessage[3].ConvertToString();
//_logger.Debug("Message from client received: '{0}'", clientMessageString);
Console.WriteLine (String.Format ("Message from client received: '{0}'", clientMessageString));
netMqSocketEventArgs
.Socket.SendMoreFrame(address.Buffer)
.SendMoreFrame(address2.Buffer)
.SendMoreFrameEmpty()
.SendFrame("I have received your message");
}
}
Tout le monde a une idée? Je pensais que je suis peut-être en utilisant socket de différents threads, donc je l'ai emballé dans la structure ThreadLocal, mais il n'a pas été aidé.Comme j'ai lu des problèmes dans l'unité avec NetMQ, j'ai ajouté 'AsyncIO.ForceDotNet .Obliger();' avant chaque appel de constructeur de socket, et cela n'a pas été aidé aussi. Que j'ai mis à jour mon mono à 4.8 à partir de la version 4.4 et qu'il a toujours la même apparence.
J'ai vérifié que Thread.Sleep (100) entre les tâches corrige le problème. C'est étrange
Y a-t-il des chances que vous utilisiez une socket à partir de plusieurs threads? – somdoron
En outre, pouvez-vous partager le code de la socket de demande? – somdoron
J'ai mis socket dans la structure ThreadLocal pour être sûr que je ne fais pas cette erreur. J'ai ajouté plus de code comme Vous voulez. En passant, merci pour la réponse rapide – bzyku