2011-10-23 2 views
23

Je me demande s'il existe une implémentation/wrapper pour ConcurrentQueue, similaire à BlockingCollection où prélèvement de la collection ne bloque pas, mais est plutôt asynchrone et provoquera une attente asynchrone jusqu'à ce qu'un élément est placé dans le queue.attente Tâche basée sur la file d'attente

J'ai trouvé ma propre implémentation, mais elle ne semble pas fonctionner comme prévu. Je me demande si je réinvente quelque chose qui existe déjà.

Voici mon implémentation:

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 

    object queueSyncLock = new object(); 

    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> Dequeue() 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs=null; 
     T firstItem=default(T); 
     while (true) 
     { 
      bool ok; 
      lock (queueSyncLock) 
      { 
       ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); 
       if (ok) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        queue.TryDequeue(out firstItem); 
       } 
      } 
      if (!ok) break; 
      tcs.SetResult(firstItem); 
     } 
    } 
} 
+0

oh beurk .... ...... –

+21

@AdamSack: en effet, mais votre commentaire ne m'aide pas. – spender

Répondre

36

Je ne sais pas d'une solution sans verrou, mais vous pouvez jeter un oeil à la nouvelle Dataflow library, une partie de la Async CTP. Un simple BufferBlock<T> devrait suffire, .: par exemple

BufferBlock<int> buffer = new BufferBlock<int>(); 

Production et consommation sont les plus faciles à faire par des méthodes d'extension sur les types de blocs de flux de données.

La production est aussi simple que:

buffer.Post(13); 

et la consommation est async prêt:

int item = await buffer.ReceiveAsync(); 

Je ne vous recommande d'utiliser Dataflow si possible; rendre un tel tampon à la fois efficace et correct est plus difficile qu'il n'y paraît d'abord.

+0

Cela semble très prometteur ... va le vérifier demain. Merci. Cela ressemble beaucoup à un port CCR. – spender

+2

A pris un coup d'oeil avant le coucher à la place! Il semble que Dataflow s'adapte très bien à mes besoins. Il semble combler le fossé entre ce qui est offert par TPL et ce qui est offert dans CCR (que j'ai utilisé avec beaucoup de succès). Cela me laisse un sentiment positif que l'excellent travail dans CCR n'a pas été gaspillé. C'est la bonne réponse (et quelque chose de brillant et de nouveau à enfoncer mes dents!) Merci @StephenCleary. – spender

1

Il peut être surpuissant pour votre cas d'utilisation (compte tenu de la courbe d'apprentissage), mais Reactive Extentions fournit toute la colle que vous pourriez jamais vouloir pour la composition asynchrone.

Vous êtes essentiellement abonné aux modifications et elles vous sont transmises dès qu'elles sont disponibles, et vous pouvez demander au système d'insérer les modifications dans un fil distinct.

+0

Je suis au moins partiellement versé dans Reactive, mais c'est un peu ésotérique à utiliser dans la production car d'autres peuvent avoir à maintenir le code.Je suis en train de creuser la simplicité qu'Async/Wait apporte à un produit de serveur auparavant très compliqué, et j'essaie de garder toute la technologie async sous une seule technologie. – spender

-1

Vous pouvez simplement utiliser un BlockingCollection (en utilisant la valeur par défaut ConcurrentQueue) et envelopper l'appel à Take dans un Task afin que vous puissiez await il:

var bc = new BlockingCollection<T>(); 

T element = await Task.Run(() => bc.Take()); 
+4

Bonne idée, mais je ne suis pas content de bloquer. Je vais avoir quelques milliers de clients chacun avec leur propre file d'attente de messages. Tout blocage va couler le navire parce qu'il va attacher des threads ne rien faire. La raison pour laquelle je veux une tâche non-bloquante, c'est que je peux garder toutes les opérations dans le pool de threads sans provoquer la famine du pool de threads. – spender

0

est ici la mise en œuvre que je utilise actuellement.

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 
    object queueSyncLock = new object(); 
    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> DequeueAsync(CancellationToken ct) 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     ct.Register(() => 
     { 
      lock (queueSyncLock) 
      { 
       tcs.TrySetCanceled(); 
      } 
     }); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs = null; 
     T firstItem = default(T); 
     lock (queueSyncLock) 
     { 
      while (true) 
      { 
       if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        if (tcs.Task.IsCanceled) 
        { 
         continue; 
        } 
        queue.TryDequeue(out firstItem); 
       } 
       else 
       { 
        break; 
       } 
       tcs.SetResult(firstItem); 
      } 
     } 
    } 
} 

Il fonctionne assez bien, mais il y a beaucoup de discorde sur queueSyncLock, comme je fais pas mal d'utilisation du CancellationToken d'annuler certaines des tâches d'attente. Bien sûr, cela conduit à beaucoup moins que je verrais le blocage avec un BlockingCollection mais ...

Je me demande s'il y a une plus lisse, verrouiller des moyens libres d'obtenir le même résultat

2

Mon atempt (il un événement déclenché lorsqu'une « promesse » est créé, et il peut être utilisé par un producteur externe pour savoir quand pour produire plus d'articles):

public class AsyncQueue<T> 
{ 
    private ConcurrentQueue<T> _bufferQueue; 
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue; 
    private object _syncRoot = new object(); 

    public AsyncQueue() 
    { 
     _bufferQueue = new ConcurrentQueue<T>(); 
     _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); 
    } 

    /// <summary> 
    /// Enqueues the specified item. 
    /// </summary> 
    /// <param name="item">The item.</param> 
    public void Enqueue(T item) 
    { 
     TaskCompletionSource<T> promise; 
     do 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return;          
      } 
     } 
     while (promise != null); 

     lock (_syncRoot) 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return; 
      } 

      _bufferQueue.Enqueue(item); 
     }    
    } 

    /// <summary> 
    /// Dequeues the asynchronous. 
    /// </summary> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    /// <returns></returns> 
    public Task<T> DequeueAsync(CancellationToken cancellationToken) 
    { 
     T item; 

     if (!_bufferQueue.TryDequeue(out item)) 
     { 
      lock (_syncRoot) 
      { 
       if (!_bufferQueue.TryDequeue(out item)) 
       { 
        var promise = new TaskCompletionSource<T>(); 
        cancellationToken.Register(() => promise.TrySetCanceled()); 

        _promisesQueue.Enqueue(promise); 
        this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); 

        return promise.Task; 
       } 
      } 
     } 

     return Task.FromResult(item); 
    } 

    /// <summary> 
    /// Gets a value indicating whether this instance has promises. 
    /// </summary> 
    /// <value> 
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>. 
    /// </value> 
    public bool HasPromises 
    { 
     get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } 
    } 

    /// <summary> 
    /// Occurs when a new promise 
    /// is generated by the queue 
    /// </summary> 
    public event EventHandler PromiseAdded; 
} 
+0

Je pense que c'est la meilleure solution. J'ai implémenté ceci et l'ai testé intensivement. Quelques notes: l'appel à! Promise.Task.IsCanceled est inutile. J'ai ajouté un ManualResetEventSlim pour suivre quand bufferQueue est vide afin qu'un appelant puisse bloquer pour attendre que la file d'attente soit vide. –

+0

Vous devriez [disposer] (http://stackoverflow.com/a/21653382/298609) 'CancellationTokenRegistration' que vous avez reçu de l'appel' cancellationToken.Register'. – Paya

Questions connexes