2010-11-24 2 views
13

J'évalue Rx pour un projet de plate-forme de négociation qui devra traiter des milliers de messages par seconde. La plate-forme existante possède un système de routage d'événements complexe (délégués de multidiffusion) qui répond à ces messages et effectue de nombreux traitements ultérieurs.Les extensions réactives semblent très lentes - est-ce que je fais quelque chose de mal?

J'ai regardé les extensions réactives pour les avantages évidents, mais j'ai remarqué qu'il est un peu plus lent, habituellement 100 fois plus lent.

J'ai créé un test unitaire pour illustrer ceci qui exécute un incrément simple d'un million de fois, en utilisant divers arômes Rx et un test de "contrôle" de délégué direct.

Voici les résultats:

Delegate         - (1000000) - 00:00:00.0410000 
Observable.Range()      - (1000000) - 00:00:04.8760000 
Subject.Subscribe() - NewThread   - (1000000) - 00:00:02.7630000 
Subject.Subscribe() - CurrentThread  - (1000000) - 00:00:03.0280000 
Subject.Subscribe() - Immediate   - (1000000) - 00:00:03.0030000 
Subject.Subscribe() - ThreadPool   - (1000000) - 00:00:02.9800000 
Subject.Subscribe() - Dispatcher   - (1000000) - 00:00:03.0360000 

Comme vous pouvez le voir, toutes les méthodes Rx sont environ 100 fois plus lent qu'un équivalent délégué. Évidemment Rx fait beaucoup sous les couvertures qui seront utiles dans un exemple plus complexe, mais cela semble incroyablement lent.

Est-ce normal ou mes hypothèses de test sont-elles invalides? Code nunit pour ce qui précède ci-dessous -

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Linq; 
using System.Text; 
using NUnit.Framework; 
using System.Concurrency; 

namespace RxTests 
{ 
    [TestFixture] 
    class ReactiveExtensionsBenchmark_Tests 
    { 
     private int counter = 0; 

     [Test] 
     public void ReactiveExtensionsPerformanceComparisons() 
     { 
      int iterations = 1000000; 

      Action<int> a = (i) => { counter++; }; 

      DelegateSmokeTest(iterations, a); 
      ObservableRangeTest(iterations, a); 
      SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread"); 
      SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread"); 
      SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate"); 
      SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool"); 
      SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher"); 
     } 

     public void ObservableRangeTest(int iterations, Action<int> action) 
     { 
      counter = 0; 

      long start = DateTime.Now.Ticks; 

      Observable.Range(0, iterations).Subscribe(action); 

      OutputTestDuration("Observable.Range()", start); 
     } 


     public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode) 
     { 
      counter = 0; 

      var eventSubject = new Subject<int>(); 
      var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb 
      events.Subscribe(action); 

      long start = DateTime.Now.Ticks; 

      Enumerable.Range(0, iterations).ToList().ForEach 
       (
        a => eventSubject.OnNext(1) 
       ); 

      OutputTestDuration("Subject.Subscribe() - " + mode, start); 
     } 

     public void DelegateSmokeTest(int iterations, Action<int> action) 
     { 
      counter = 0; 
      long start = DateTime.Now.Ticks; 

      Enumerable.Range(0, iterations).ToList().ForEach 
       (
        a => action(1) 
       ); 

      OutputTestDuration("Delegate", start); 
     } 


     /// <summary> 
     /// Output helper 
     /// </summary> 
     /// <param name="test"></param> 
     /// <param name="duration"></param> 
     public void OutputTestDuration(string test, long duration) 
     { 
      Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration))); 
     } 

     /// <summary> 
     /// Test timing helper 
     /// </summary> 
     /// <param name="elapsedTicks"></param> 
     /// <returns></returns> 
     public string ElapsedDuration(long elapsedTicks) 
     { 
      return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString(); 
     } 

    } 
} 
+0

SubjectSubscribeTest n'utilise pas l'argument 'scheduler', donc je suis surpris que vous obteniez des résultats différents. – dtb

+0

Et pourquoi utilisez-vous un sujet au lieu de souscrire l'action directement à l'observable? Les sujets d'Afaik font beaucoup de choses sous couverture, vous devriez donc vérifier si le fait de les enlever fait une différence. – dtb

+0

Je suis surpris que le ObservableRangeTest soit * vraiment * mauvais, même comparé aux tests avec sujet. Wtf? – dtb

Répondre

16

Je pense que l'équipe Rx se concentre sur la construction de la fonctionnalité première et ne se soucie pas encore sur l'optimisation des performances.

Utilisez un profileur pour déterminer les goulots d'étranglement et remplacer les classes Rx lentes par vos propres versions optimisées.

Voici deux exemples.

Résultats:

 
Delegate         - (1000000) - 00:00:00.0368748 

Simple - NewThread      - (1000000) - 00:00:00.0207676 
Simple - CurrentThread     - (1000000) - 00:00:00.0214599 
Simple - Immediate      - (1000000) - 00:00:00.0162026 
Simple - ThreadPool      - (1000000) - 00:00:00.0169848 

FastSubject.Subscribe() - NewThread  - (1000000) - 00:00:00.0588149 
FastSubject.Subscribe() - CurrentThread - (1000000) - 00:00:00.0508842 
FastSubject.Subscribe() - Immediate  - (1000000) - 00:00:00.0513911 
FastSubject.Subscribe() - ThreadPool  - (1000000) - 00:00:00.0529137 

Tout d'abord, il semble à la matière beaucoup la façon dont la mise en œuvre est observable. Voici un observable qui ne peut pas être désinscrit, mais il est rapide:

private IObservable<int> CreateFastObservable(int iterations) 
{ 
    return Observable.Create<int>(observer => 
    { 
     new Thread(_ => 
     { 
      for (int i = 0; i < iterations; i++) 
      { 
       observer.OnNext(i); 
      } 
      observer.OnCompleted(); 
     }).Start(); 
     return() => { }; 
    }); 
} 

Test:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode) 
{ 
    counter = 0; 

    var start = Stopwatch.StartNew(); 

    var observable = CreateFastObservable(iterations); 

    observable.SubscribeOn(scheduler).Run(action); 

    OutputTestDuration("Simple - " + mode, start); 
} 

sujets ajouter beaucoup de frais généraux. Voici un sujet qui est dépouillé de la plupart des fonctionnalités attendues d'un sujet, mais il est rapide:

class FastSubject<T> : ISubject<T> 
{ 
    private event Action onCompleted; 
    private event Action<Exception> onError; 
    private event Action<T> onNext; 

    public FastSubject() 
    { 
     onCompleted +=() => { }; 
     onError += error => { }; 
     onNext += value => { }; 
    } 

    public void OnCompleted() 
    { 
     this.onCompleted(); 
    } 

    public void OnError(Exception error) 
    { 
     this.onError(error); 
    } 

    public void OnNext(T value) 
    { 
     this.onNext(value); 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     this.onCompleted += observer.OnCompleted; 
     this.onError += observer.OnError; 
     this.onNext += observer.OnNext; 

     return Disposable.Create(() => 
     { 
      this.onCompleted -= observer.OnCompleted; 
      this.onError -= observer.OnError; 
      this.onNext -= observer.OnNext; 
     }); 
    } 
} 

Test:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode) 
{ 
    counter = 0; 

    var start = Stopwatch.StartNew(); 

    var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount(); 

    observable.SubscribeOn(scheduler).Run(action); 

    OutputTestDuration("FastSubject.Subscribe() - " + mode, start); 
} 
+0

Wow. Merci beaucoup pour votre temps à ce sujet. –

+2

Observable.Range a une surcharge avec un Scheduler en tant que paramètre, donc, cela retourne (en utilisant Scheduler.Immediate) dans 00: 00: 00.6698080 pour moi: public void SimpleObserveTest (itérations, Action action, Planificateur IScheduler, mode chaîne) {compteur = 0; var start = Stopwatch.StartNew(); Observable.Range (0, itérations, planificateur) .Run (action); OutputTestDuration (mode "Simple -" +, démarrer); } –

+0

@Richard Hein: Belle trouvaille. Donc Observable.Range dépend beaucoup du planificateur utilisé. C'est quand même plusieurs fois plus lent que mon "FastObservable". Mais ça paye de savoir quels boutons tournent quoi. – dtb

10

Rappelez-vous que votre délégué ne garantit pas la sécurité de fil - littéralement appelle le délégué à partir de n'importe quel thread à partir duquel il est appelé, tandis que lorsque vous appelez Observable.ObserveOn pour marshall notifications à d'autres threads, Rx.NET doit faire le verrouillage pour s'assurer qu'il fait ce que vous pensez qu'il fait. Ainsi, les délégués peuvent se déplacer très rapidement, mais si vous voulez construire quelque chose de pratique en l'utilisant, vous allez finir par construire une synchronisation à la main, ce qui vous ralentira. Cela étant dit, Rx, tout comme LINQ, est une abstraction - si vous avez besoin d'être ridiculement rapide, vous devez commencer à écrire du code moche.

+0

Oui, commencer à réaliser les problèmes avec les hypothèses dans le code maintenant ;-) –

12

Mise à jour pour Rx 2.0: Je pris le code du message original avec (presque) la dernière version bêta de LINQPad 4.42.04 (bien il y a un 06, mais de toute façon): Rx Main assemblies

... et ajusté légèrement à utiliser la nouvelle Rx v2 syntaxe planificateur:

 public void ReactiveExtensionsPerformanceComparisons() 
    { 
     int iterations = 1000000; 

     Action<int> a = (i) => { counter++; }; 

     DelegateSmokeTest(iterations, a); 
     ObservableRangeTest(iterations, a); 
     SubjectSubscribeTest(iterations, a, NewThreadScheduler.Default, "NewThread"); 
     SubjectSubscribeTest(iterations, a, CurrentThreadScheduler.Instance, "CurrentThread"); 
     SubjectSubscribeTest(iterations, a, ImmediateScheduler.Instance, "Immediate"); 
     SubjectSubscribeTest(iterations, a, ThreadPoolScheduler.Instance, "ThreadPool"); 
     // I *think* this is the same as the ThreadPool scheduler in my case 
     SubjectSubscribeTest(iterations, a, DefaultScheduler.Instance, "Default");     
     // doesn't work, as LinqPad has no Dispatcher attched to the Gui thread, maybe there's a workaround; the Instance property on it is obsolete 
     //SubjectSubscribeTest(iterations, a, DispatcherScheduler.Current, "ThreadPool"); 
    } 

note: les résultats varient énormément, dans des cas rares bat Threadpool newThread, mais dans la plupart des cas newThread a un léger avantage au-dessus des ordonnanceurs ci-dessous dans la liste:

Delegate         - (1000000) - 00:00:00.0440025 
Observable.Range()      - (1000000) - 00:00:01.9251101 
Subject.Subscribe() - NewThread   - (1000000) - 00:00:00.0400023 
Subject.Subscribe() - CurrentThread  - (1000000) - 00:00:00.0530030 
Subject.Subscribe() - Immediate   - (1000000) - 00:00:00.0490028 
Subject.Subscribe() - ThreadPool   - (1000000) - 00:00:00.0490028 
Subject.Subscribe() - Default   - (1000000) - 00:00:00.0480028 

Il semble donc qu'ils aient travaillé assez dur sur les performances.

+0

Merci pour cette mise à jour! – film42

Questions connexes