Je cherche à implémenter un système de file d'attente de travail distribué simpliste avec StackExchange.Redis.Redis - approche correcte pour simple lecteur/graveur de file d'attente - StackExchange.Redis
Je comprends la raison de ne pas avoir BLPOP
etc., mais comme il se trouve l'interface que je travaille est basée sur TryRead
appels répétés avec un délai d'attente.
Je suis hésitant avec le ci-dessous, puisque je suis désinscription dans le gestionnaire, et en mettant un drapeau pour annuler le délai d'attente. Y a-t-il une chance que quelque chose soit manqué? Existe-t-il une approche différente pour y parvenir?
public string TryRead(string queueName, TimeSpan timeout)
{
string result = null;
var chanName = $"qnot_{queueName}";
var done = new ManualResetEvent(false);
void Handler(RedisChannel chan, RedisValue val)
{
_sub.Unsubscribe(chanName, Handler);
result = _database.ListRightPop($"qdata_{queueName}");
done.Set();
}
_sub.Subscribe(chanName, Handler);
done.WaitOne(timeout);
return result;
}
public void Write(string queueName, string text)
{
_database.ListLeftPush($"qdata_{queueName}", text);
_sub.Publish($"qnot_{queueName}", "");
}
La version ci-dessus sera toujours délai d'attente et retour null
dans le cas où il y a un élément existant sur la file d'attente (et rien de nouveau est ajouté). La version ci-dessous vérifie d'abord les données existantes, ce qui fonctionne. Mais il y a un bug, une condition de course: si le premier contrôle de lecture revient négatif, alors quelque chose est poussé et une notification est envoyée, ALORS on s'abonne et on attend le timeout.
public string TryRead(string queueName, TimeSpan timeout)
{
var dataName = $"qdata_{queueName}";
var result = (string)_database.ListRightPop(dataName);
if (result != null)
{
return result;
}
var chanName = $"qnot_{queueName}";
var done = new ManualResetEvent(false);
void Handler(RedisChannel chan, RedisValue val)
{
_sub.Unsubscribe(chanName, Handler);
result = _database.ListRightPop(dataName);
done.Set();
}
_sub.Subscribe(chanName, Handler);
done.WaitOne(timeout);
return result;
}
que je peux faire RPOP
s dans une boucle, mais qui semble tout à fait téter. Quelqu'un d'autre a fait quelque chose de similaire?
Je suis confus pourquoi vous vous désabonnez lors de la manipulation - sûrement le lecteur devrait être essentiellement: tout lire? pas seulement un? –
Je suis en train de rééquiper ceci en un système qui utilise actuellement les files d'attente Azure et AWS, et pour garder les choses cohérentes, je cherche à coller la même signature pour 'TryRead'. C'est autonome et bloque juste jusqu'au timeout @MarcGravell –