2010-07-19 8 views
7

System.Collections.Concurrent possède de nouvelles collections qui fonctionnent très bien dans les environnements multithread. Cependant, ils sont un peu limités. Soit ils bloquent jusqu'à ce qu'un élément devienne disponible, soit ils renvoient default(T) (méthodes TryXXX).Collecte simultanée non bloquante?

J'ai besoin d'une collection thread-safe, mais au lieu de bloquer le thread appelant, elle utilise un rappel pour m'informer qu'au moins un élément est disponible.

Ma solution actuelle consiste à utiliser un BlockingCollection, mais d'utiliser l'APM avec un délégué pour obtenir l'élément suivant. En d'autres termes, je crée un délégué à une méthode Take s de la collection et exécute ce délégué en utilisant BeginInvoke.

Malheureusement, je dois garder beaucoup d'état dans ma classe pour y arriver. Pire, la classe n'est pas sûre pour les threads; il ne peut être utilisé que par un seul thread. Je suis en train de contourner la limite de la maintenabilité, ce que je préférerais ne pas faire. Je sais qu'il y a quelques bibliothèques qui rendent ce que je fais ici assez simple (je crois que le Reactive Framework en fait partie), mais je voudrais accomplir mes objectifs sans ajouter de références en dehors de la version 4 du cadre.

Y a-t-il de meilleurs modèles que je peux utiliser et qui ne nécessitent pas de références externes pour atteindre mon objectif?


tl; dr:

Y at-il des modèles qui répondent à l'exigence:

« Je dois signaler une collection que je suis prêt pour le prochain élément, et avoir la collection exécuter un rappel lorsque l'élément suivant est arrivé, sans qu'aucun thread ne soit bloqué. "

+0

Est-ce que ce sera thread-safe? Qu'est-ce qui empêche l'élément disponible de devenir indisponible avant l'appel du délégué? Et quel est votre objectif global (c.-à-d. Un système de file d'attente)? –

+0

@Adam Bon point sur la consommation de l'article. Le délégué prend l'article retiré de la collection. Ainsi, l'exécution du délégué est bloquée jusqu'à ce qu'un élément soit 'Take'-en de la collection, et que cet objet soit le' object' passé à EndInvoke. L'objectif global est un peu compliqué; Essentiellement, je dois désactiver un flux de travail jusqu'à ce que l'élément devienne disponible. Vous ne pouvez pas bloquer l'exécution du flux de travail, donc simplement «Prendre» un élément ne fonctionnera pas comme l'appel bloque. Je dois créer un signet, puis passer à une extension. L'extension appelle le délégué, en reprenant le signet dans le rappel. – Will

+0

malheureusement j'ai peu d'expérience avec les flux de travail - essayez d'ajouter que des détails à votre question et il pourrait susciter l'intérêt de quelqu'un :-) –

Répondre

4

Je pense avoir deux solutions possibles. Je ne suis pas particulièrement satisfait non plus, mais ils fournissent au moins une alternative raisonnable à l'approche APM. Le premier ne répond pas à votre exigence de thread de blocage, mais je pense qu'il est plutôt élégant parce que vous pouvez enregistrer des rappels et ils seront appelés en mode round-robin, mais vous avez toujours la possibilité d'appeler Take ou TryTake comme vous le feriez normalement pour un BlockingCollection. Ce code force les rappels à être enregistrés chaque fois qu'un élément est demandé. C'est le mécanisme de signalisation pour la collection. La bonne chose à propos de cette approche est que les appels à Take ne deviennent pas affamés comme ils le font dans ma deuxième solution.

public class NotifyingBlockingCollection<T> : BlockingCollection<T> 
{ 
    private Thread m_Notifier; 
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
     m_Notifier = new Thread(Notify); 
     m_Notifier.IsBackground = true; 
     m_Notifier.Start(); 
    } 

    private void Notify() 
    { 
     while (true) 
     { 
      Action<T> callback = m_Callbacks.Take(); 
      T item = Take(); 
      callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
     } 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     m_Callbacks.Add(callback); 
    } 
} 

La seconde ne répond pas à votre exigence de thread de blocage. Notez comment il transfère l'appel du rappel au pool de threads. Je l'ai fait parce que je pense que s'il était exécuté de manière synchrone, les verrous seraient maintenus plus longtemps, ce qui entraînerait un goulot d'étranglement de Add et de RegisterForTake. Je l'ai examiné de près et je ne pense pas qu'il puisse être verrouillé en direct (à la fois un objet et un callback sont disponibles, mais le callback ne sera jamais exécuté) mais vous voudrez peut-être le vérifier par vous-même. Le seul problème ici est qu'un appel à Take serait affamé que les rappels ont toujours la priorité.

public class NotifyingBlockingCollection<T> 
{ 
    private BlockingCollection<T> m_Items = new BlockingCollection<T>(); 
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
    } 

    public void Add(T item) 
    { 
     lock (m_Callbacks) 
     { 
      if (m_Callbacks.Count > 0) 
      { 
       Action<T> callback = m_Callbacks.Dequeue(); 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Items.Add(item); 
      } 
     } 
    } 

    public T Take() 
    { 
     return m_Items.Take(); 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     lock (m_Callbacks) 
     { 
      T item; 
      if (m_Items.TryTake(out item)) 
      { 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Callbacks.Enqueue(callback); 
      } 
     } 
    } 
} 
+0

Merci pour la réponse, mais ce n'est pas ce que je cherche. C'est ce que je fais actuellement, mais avec l'APM poussé dans la collection (le code que vous avez fourni). Je suppose que le point crucial de mon problème est que l'APM ne correspond pas à mes exigences, c'est juste la mise en œuvre que j'ai utilisée. Mes exigences exigent un modèle qui apporte une solution à la question: «Comment puis-je signaler à une collection que je suis prêt pour l'élément suivant et que la collection exécute un rappel lorsque l'élément suivant est arrivé, sans qu'aucun thread ne soit bloqué? – Will

+0

J'ai un peu pensé que ce n'était pas ce que vous recherchiez. C'est un problème intéressant cependant. Dommage que "Add" ne soit pas "virtuel" sinon vous auriez pu injecter la notification d'une manière ou d'une autre. Peut-être que vous pourriez utiliser l'une des implémentations de la file d'attente de blocage comme point de départ. Le problème est que vous devez faire attention à la façon dont vous envoyez cette notification, sinon un autre consommateur aura saisi l'élément en premier. Je pourrais jouer avec ça aujourd'hui si j'ai le temps. Poster une réponse vous-même si vous le comprendre. Je ne sais pas ... il se peut que vous trouviez plus facile de lancer et de faire référence à une autre bibliothèque. –

+0

La notification doit contenir l'élément suivant et doit être contrôlée par le notifiant. Peut-être que l'idée que ce soit une collection est erronée; ce n'est que par ce mécanisme que l'on peut fournir l'article suivant, évitant ainsi la délivrance de deux observateurs sur un seul article. En d'autres termes, un observateur ne peut pas utiliser le mécanisme A pour obtenir l'élément suivant (à savoir 'T Pop()') tandis que l'autre a enregistré pour un rappel. – Will

3

Que diriez-vous quelque chose comme ça? (Le nom pourrait probablement utiliser un peu de travail et notez que ceci n'est pas testé.)

public class CallbackCollection<T> 
{ 
    // Sychronization object to prevent race conditions. 
    private object _SyncObject = new object(); 

    // A queue for callbacks that are waiting for items. 
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>(); 

    // A queue for items that are waiting for callbacks. 
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>(); 

    public void Add(T item) 
    { 
     Action<T> callback; 
     lock (_SyncObject) 
     { 
      // Try to get a callback. If no callback is available, 
      // then enqueue the item to wait for the next callback 
      // and return. 
      if (!_Callbacks.TryDequeue(out callback)) 
      { 
       _Items.Enqueue(item); 
       return; 
      } 
     } 

     ExecuteCallback(callback, item); 
    } 

    public void TakeAndCallback(Action<T> callback) 
    { 
     T item; 
     lock(_SyncObject) 
     { 
      // Try to get an item. If no item is available, then 
      // enqueue the callback to wait for the next item 
      // and return. 
      if (!_Items.TryDequeue(out item)) 
      { 
       _Callbacks.Enqueue(callback); 
       return; 
      } 
     } 
     ExecuteCallback(callback, item); 
    } 

    private void ExecuteCallback(Action<T> callback, T item) 
    { 
     // Use a new Task to execute the callback so that we don't 
     // execute it on the current thread. 
     Task.Factory.StartNew(() => callback.Invoke(item)); 
    } 
} 
+0

Juste rafraîchi et vu @ NotifyingBlockingCollection de Brian. On dirait que lui et moi avons eu à peu près la même solution en même temps. –

+0

Oui, nous pensions vraiment dans le même sens ici, en particulier la partie à obtenir l'invocation du rappel hors du thread courant. –

Questions connexes