Existe-t-il une bonne implémentation du traitement asynchrone des éléments de la file d'attente?Implémentation du traitement asynchrone des éléments
Répondre
assez vieux mais c'est le bon que je sais de http://www.codeproject.com/KB/cs/inprocessasynservicesincs.aspx
Si vous utilisez .NET 4, une grande partie est disponible gratuitement. Si vous avez déjà tous les éléments, vous pouvez utiliser Parallel.ForEach
. Si vous avez besoin d'une file d'attente producteur/consommateur, vous pouvez utiliser BlockingCollection<T>
pour envelopper l'une des collections simultanées (telles que ConcurrentQueue<T>
ou ConcurrentStack<T>
). Comment vous utilisez cela est à vous; Il y a un blog post here qui va dans un exemple détaillé, et il y a probablement d'autres publications similaires. (Vous voudrez peut-être regarder le Parallel Team Blog pour beaucoup plus de matériel.)
Utilisez les tâches .NET 4.
var t = Task<int>.Factory.StartNew(() => ProcessItem());
Utilisez les options ConcurrencyOptions pour définir le degré de parallélisation maximal sur ce traitement.
Si vous voulez le faire rouler vous-même, utilisez BlockingCollection<T>
qui fournit des fonctions de blocage et de limitation pour les collections sécurisées aux threads et implémente un thread séparé (ou des threads) pour le consommateur.
Vous pouvez jeter un oeil à un modèle producteur/consommateur si vous êtes assez malheureux de ne pas être en utilisant .net 4.
Voici mon code que j'ai démontées, mes excuses pour le désordre, mais vous devriez être en mesure pour l'utiliser en l'ajoutant à un projet et en le recompilant, puis en créant votre processus en utilisant la DLL résultante.
Enum pour ChannelState:
public enum ChannelState
{
WaitingForSend,
WaitingForReceive,
Open
}
Interfaces:
public interface IChannel<TMessage>
{
// Methods
TMessage Receive();
void Send(TMessage message);
// Properties
bool CanReceive { get; }
bool CanSend { get; }
ChannelState State { get; }
}
using System;
public interface IReceiver<TMessage>
{
// Events
event EventHandler<MessageReceivedEventArgs<TMessage>> MessageReceived;
// Methods
void Activate();
void Deactivate();
// Properties
bool IsActive { get; }
}
classes concrètes:
using System.Collections.Generic;
using System.Threading;
using System;
public class BufferedChannel<TMessage> : IChannel<TMessage>
{
// Fields
private int _blockedReceivers;
private int _blockedSenders;
private Queue<TMessage> _buffer;
private int _capacity;
private EventWaitHandle _capacityAvailableEvent;
private EventWaitHandle _messagesAvailableEvent;
// Methods
public BufferedChannel()
{
this._buffer = new Queue<TMessage>();
this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset);
this._capacity = 50;
}
public BufferedChannel(int bufferSize)
{
this._buffer = new Queue<TMessage>();
this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset);
this._capacity = 50;
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException("bufferSize", bufferSize, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero);
}
this._capacity = bufferSize;
}
public TMessage Receive()
{
Interlocked.Increment(ref this._blockedReceivers);
try
{
this._messagesAvailableEvent.WaitOne();
}
catch
{
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedReceivers);
}
throw;
}
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedReceivers);
this._capacityAvailableEvent.Set();
if ((this._buffer.Count - 1) > this._blockedReceivers)
{
this._messagesAvailableEvent.Set();
}
return this._buffer.Dequeue();
}
}
public void Send(TMessage message)
{
Interlocked.Increment(ref this._blockedSenders);
try
{
this._capacityAvailableEvent.WaitOne();
}
catch
{
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedSenders);
}
throw;
}
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedSenders);
this._buffer.Enqueue(message);
if (this._buffer.Count < this.BufferSize)
{
this._capacityAvailableEvent.Set();
}
this._messagesAvailableEvent.Set();
}
}
// Properties
public int BufferCount
{
get
{
lock (this._buffer)
{
return this._buffer.Count;
}
}
}
public int BufferSize
{
get
{
lock (this._buffer)
{
return this._capacity;
}
}
set
{
lock (this._buffer)
{
if (value <= 0)
{
throw new ArgumentOutOfRangeException("BufferSize", value, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero);
}
this._capacity = value;
if ((this._blockedSenders > 0) && (this._capacity > this._buffer.Count))
{
this._capacityAvailableEvent.Set();
}
}
}
}
public bool CanReceive
{
get
{
return true;
}
}
public bool CanSend
{
get
{
return true;
}
}
public ChannelState State
{
get
{
if (this._blockedSenders > 0)
{
return ChannelState.WaitingForReceive;
}
if (this._blockedReceivers > 0)
{
return ChannelState.WaitingForSend;
}
return ChannelState.Open;
}
}
}
using System;
using System.Collections.Generic;
using System.Threading;
using System.ComponentModel;
using System.Runtime.CompilerServices;
public sealed class Receiver<TMessage> : Component, IReceiver<TMessage>
{
// Fields
private volatile bool _continue;
private object _controlLock;
private volatile bool _disposed;
private Thread _receiverThread;
private bool _receiving;
private object _receivingLock;
private object _threadLock;
[CompilerGenerated]
private IChannel<TMessage> channel;
// Events
public event EventHandler<MessageReceivedEventArgs<TMessage>> MessageReceived;
// Methods
public Receiver(IChannel<TMessage> channel)
{
this._controlLock = new object();
this._threadLock = new object();
this._receivingLock = new object();
if (channel == null)
{
throw new ArgumentNullException("channel");
}
this.Channel = channel;
}
public void Activate()
{
this.CheckDisposed();
lock (this._controlLock)
{
if (this._receiverThread != null)
{
throw new InvalidOperationException();
}
this._continue = true;
this._receiverThread = new Thread(new ThreadStart(this.RunAsync));
this._receiverThread.IsBackground = true;
this._receiverThread.Start();
}
}
private void CheckDisposed()
{
if (this._disposed)
{
throw new ObjectDisposedException(base.GetType().Name);
}
}
public void Deactivate()
{
lock (this._controlLock)
{
if (this._continue)
{
this._continue = false;
lock (this._threadLock)
{
if (this._receiverThread != null)
{
this.SafeInterrupt();
this._receiverThread.Join();
this._receiverThread = null;
}
}
}
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
this.Deactivate();
this._disposed = true;
}
}
private void OnMessageReceived(TMessage message)
{
EventHandler<MessageReceivedEventArgs<TMessage>> messageReceived = this.MessageReceived;
if (messageReceived != null)
{
messageReceived(this, new MessageReceivedEventArgs<TMessage>(message));
}
}
private void RunAsync()
{
while (this._continue)
{
TMessage message = default(TMessage);
bool flag = false;
try
{
lock (this._receivingLock)
{
this._receiving = true;
}
message = this.Channel.Receive();
flag = true;
lock (this._receivingLock)
{
this._receiving = false;
}
Thread.Sleep(0);
}
catch (ThreadInterruptedException)
{
}
if (!this._continue)
{
if (flag)
{
this.Channel.Send(message);
return;
}
break;
}
this.OnMessageReceived(message);
}
}
private void SafeInterrupt()
{
lock (this._receivingLock)
{
lock (this._threadLock)
{
if (this._receiving && (this._receiverThread != null))
{
this._receiverThread.Interrupt();
}
}
}
}
// Properties
protected override bool CanRaiseEvents
{
get
{
return true;
}
}
public IChannel<TMessage> Channel
{
[CompilerGenerated]
get
{
return this.channel;
}
[CompilerGenerated]
private set
{
this.channel = value;
}
}
public bool IsActive
{
get
{
lock (this._controlLock)
{
return (this._receiverThread != null);
}
}
}
}
using System;
using System.Runtime.CompilerServices;
public class MessageReceivedEventArgs<TMessage> : EventArgs
{
// Fields
[CompilerGenerated]
private TMessage message;
// Methods
public MessageReceivedEventArgs(TMessage message)
{
this.Message = message;
}
// Properties
public TMessage Message
{
[CompilerGenerated]
get
{
return this.message;
}
[CompilerGenerated]
private set
{
this.message = value;
}
}
}
using System.Threading;
public class BlockingChannel<TMessage> : IChannel<TMessage>
{
// Fields
private TMessage _message;
private EventWaitHandle _messageReceiveEvent;
private EventWaitHandle _messageReceiveyEvent;
private object _sendLock;
private ChannelState _state;
private object _stateLock;
// Methods
public BlockingChannel()
{
this._state = ChannelState.Open;
this._stateLock = new object();
this._messageReceiveyEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._messageReceiveEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._sendLock = new object();
}
public TMessage Receive()
{
this.State = ChannelState.WaitingForSend;
this._messageReceiveyEvent.WaitOne();
this._messageReceiveEvent.Set();
this.State = ChannelState.Open;
return this._message;
}
public void Send(TMessage message)
{
lock (this._sendLock)
{
this._message = message;
this.State = ChannelState.WaitingForReceive;
this._messageReceiveyEvent.Set();
this._messageReceiveEvent.WaitOne();
}
}
// Properties
public bool CanReceive
{
get
{
return true;
}
}
public bool CanSend
{
get
{
return true;
}
}
public ChannelState State
{
get
{
lock (this._stateLock)
{
return this._state;
}
}
private set
{
lock (this._stateLock)
{
this._state = value;
}
}
}
}
- 1. Implémentation asynchrone
- 2. EJB et traitement asynchrone
- 3. Implémentation d'un service WCF asynchrone
- 4. Implémentation de la méthode asynchrone
- 5. Quelle bibliothèque implémente le traitement asynchrone des messages?
- 6. Traitement de flux asynchrone en Python
- 7. traitement asynchrone dans JBoss 6 ("Comet")
- 8. Traitement du problème de "double marge" entre les éléments GUI
- 9. Traitement asynchrone en Java à partir d'une servlet
- 10. Traitement de démon asynchrone/interaction ORM avec Django
- 11. Comment exécuter un post-traitement asynchrone dans CherryPy?
- 12. Postgres: Post-instruction (ou insertion) traitement asynchrone et non bloquant
- 13. Ruby concurrent/traitement asynchrone (avec un cas d'utilisation simple)
- 14. Comment implémenter le traitement asynchrone avec l'application J2EE
- 15. Comment faire une implémentation de requête asynchrone simple dans JQuery
- 16. Implémentation d'un scénario asynchrone imbriqué "call call" dans .NET
- 17. Implémentation d'une interface utilisant un service WCF asynchrone?
- 18. Rappel asynchrone du service WCF
- 19. Implémentation du filtrage des couleurs en C#
- 20. Traitement du signal en Python
- 21. Amélioration du traitement des messages UDP
- 22. ASP.NET MVC traitement des paramètres du contrôleur
- 23. Bibliothèque d'entreprise - configuration du traitement des exceptions
- 24. Le débogage d'Eclipse a une boîte de dialogue: Traitement des exceptions file d'attente asynchrone java.lang.NullPointerException
- 25. Implémentation du navigateur EventListenerList
- 26. Traitement du contenu HTML
- 27. Implémentation du serveur OpenSocial
- 28. fermeture du socket client asynchrone?
- 29. Personnalisation du traitement des erreurs du processus unmarshall de JAXB
- 30. C# - Traitement des fils d'aide
+1 Pour obtenir en 4.0 :), j'écrivais juste à ce sujet aussi. – TalentTuner
'Parallel.ForEach' bloque le thread appelant jusqu'à ce qu'il les termine tous, ce qui ne va pas vraiment aider à 'traiter les éléments de la file d'attente de façon asynchrone' maintenant c'est bien ça? ou la question a-t-elle vraiment signifié "en parallèle" plutôt que "asynchrone"? –
C'est génial mais malheureusement, j'utilise .Net 3.5 – Sumee