2017-09-26 6 views
0

Je cherche à implémenter un système de file d'attente de travail distribué simpliste avec StackExchange.Redis.Redis - approche correcte pour simple lecteur/graveur de file d'attente - StackExchange.Redis

Je comprends la raison de ne pas avoir BLPOP etc., mais comme il se trouve l'interface que je travaille est basée sur TryRead appels répétés avec un délai d'attente.

Je suis hésitant avec le ci-dessous, puisque je suis désinscription dans le gestionnaire, et en mettant un drapeau pour annuler le délai d'attente. Y a-t-il une chance que quelque chose soit manqué? Existe-t-il une approche différente pour y parvenir?

public string TryRead(string queueName, TimeSpan timeout) 
    { 
     string result = null; 

     var chanName = $"qnot_{queueName}"; 
     var done = new ManualResetEvent(false); 

     void Handler(RedisChannel chan, RedisValue val) 
     { 
      _sub.Unsubscribe(chanName, Handler); 
      result = _database.ListRightPop($"qdata_{queueName}"); 
      done.Set(); 
     } 

     _sub.Subscribe(chanName, Handler); 
     done.WaitOne(timeout); 

     return result; 
    } 

    public void Write(string queueName, string text) 
    { 
     _database.ListLeftPush($"qdata_{queueName}", text); 
     _sub.Publish($"qnot_{queueName}", ""); 
    } 

La version ci-dessus sera toujours délai d'attente et retour null dans le cas où il y a un élément existant sur la file d'attente (et rien de nouveau est ajouté). La version ci-dessous vérifie d'abord les données existantes, ce qui fonctionne. Mais il y a un bug, une condition de course: si le premier contrôle de lecture revient négatif, alors quelque chose est poussé et une notification est envoyée, ALORS on s'abonne et on attend le timeout.

public string TryRead(string queueName, TimeSpan timeout) 
    { 
     var dataName = $"qdata_{queueName}"; 

     var result = (string)_database.ListRightPop(dataName); 
     if (result != null) 
     { 
      return result; 
     } 

     var chanName = $"qnot_{queueName}"; 
     var done = new ManualResetEvent(false); 

     void Handler(RedisChannel chan, RedisValue val) 
     { 
      _sub.Unsubscribe(chanName, Handler); 
      result = _database.ListRightPop(dataName); 
      done.Set(); 
     } 

     _sub.Subscribe(chanName, Handler); 
     done.WaitOne(timeout); 

     return result; 
    } 

que je peux faire RPOP s dans une boucle, mais qui semble tout à fait téter. Quelqu'un d'autre a fait quelque chose de similaire?

+0

Je suis confus pourquoi vous vous désabonnez lors de la manipulation - sûrement le lecteur devrait être essentiellement: tout lire? pas seulement un? –

+0

Je suis en train de rééquiper ceci en un système qui utilise actuellement les files d'attente Azure et AWS, et pour garder les choses cohérentes, je cherche à coller la même signature pour 'TryRead'. C'est autonome et bloque juste jusqu'au timeout @MarcGravell –

Répondre

0

J'ai fini avec ce qui fonctionne, mais je voudrais encore accueillir d'autres réponses avec une approche viable:

public string TryRead(string queueName, TimeSpan timeout) 
    { 
     var timer = Stopwatch.StartNew(); 
     var dataName = $"{_keyPrefix}qdata_{queueName}"; 
     var chanName = $"{_keyPrefix}qnot_{queueName}"; 
     var done = new AutoResetEvent(false); 
     string result; 

     // subscribe - sets the 'done' flag when a new item is pushed 
     void Handler(RedisChannel chan, RedisValue val) 
     { 
      done.Set(); 
     } 

     _sub.Subscribe(chanName, Handler); 

     do 
     { 
      // try to read right away (before waiting), in case there was data already there 
      result = _database.ListRightPop(dataName); 
      if (result != null) 
      { 
       continue; 
      } 

      // there wasn't an item right away, so wait for the timeout to expire 
      // or the subscription to be fired. if it fired, try the read again 
      var remainingTime = timeout - timer.Elapsed; 
      if (remainingTime.TotalMilliseconds <= 1.0) 
      { 
       break; 
      } 
      if (done.WaitOne(remainingTime)) 
      { 
       result = _database.ListRightPop(dataName); 
      } 
     } while (result == null && timer.Elapsed < timeout); 

     _sub.Unsubscribe(chanName, Handler); 

     return result; 
    } 

Edit: mise à jour w/AutoResetEvent et retiré Unsubscribe du gestionnaire. Note à ceux qui trouvent ceci, cela semble fonctionner pour moi comme remplacement d'une seule lecture bloquante, mais ce ne sera pas l'approche recommandée. J'utilise seulement ceci parce que je cherche à garder la cohérence avec d'autres implémentations de file d'attente et je travaille à cette signature spécifique TryRead.