2016-06-21 3 views
1

J'ai eu un bogue dans mon code de production que j'ai retrouvé et j'ai réussi à créer un cas de test à reproduire. Je créeComment arrêter les fuites IDisposables avec un IObservable <IDisposable>?

IObservable<IDisposable> 

instances et en utilisant un usage unique de série dans l'abonnement à garder au maximum un élément vivant à la fois. C'est un bon moyen d'ajouter des objets graphiques à une scène et de les supprimer lorsque des mises à jour sont disponibles. Cependant, le cas de test suivant montre le bogue subtil.

using System; 
using System.Reactive.Concurrency; 
using System.Reactive.Disposables; 
using System.Reactive.Linq; 
using FluentAssertions; 
using Microsoft.Reactive.Testing; 
using Xunit; 

namespace WeinCadSW.Spec 
{ 
    /// <summary> 
    /// This test case demonstrates problems with streams of IDisposables. 
    /// http://stackoverflow.com/questions/37936537/how-to-stop-leaking-idisposables-with-an-iobservableidisposable 
    /// </summary> 
    public class ObservableDisposableSpec : ReactiveTest 
    { 
     TestScheduler _Scheduler = new TestScheduler(); 
     [Fact] 
     public void ShouldWork() 
     { 
      var o = _Scheduler.CreateHotObservable 
       (OnNext(100, "A") 
       , OnNext(200, "B") 
       , OnNext(250, "C") 
       , OnNext(255, "D") 
       , OnNext(258, "E") 
       , OnNext(600, "F") 
       ); 

      var disposablesCreated = 0; 
      var disposabledDisposed = 0; 
      var oo = o.Select 
       (s => 
       { 
        disposablesCreated++; 
        return Disposable.Create(() => disposabledDisposed++); 
       }) 
       .Delay(TimeSpan.FromTicks(10), _Scheduler); 


      IDisposable sub = Disposable.Empty; 
      _Scheduler.ScheduleAbsolute(null, 0, (Func<IScheduler, object, IDisposable>)((scheduler, state) => 
      { 
       sub = oo.SubscribeDisposable(); 
       return Disposable.Empty; 
      })); 
      _Scheduler.ScheduleAbsolute(null, 605, (Func<IScheduler, object, IDisposable>)((scheduler, state) => 
      { 
       sub.Dispose(); 
       return Disposable.Empty; 
      })); 

      _Scheduler.Start(); 

      // This test will fail here because 6 disposables are created. 
      disposablesCreated.Should().Be(6); 
      disposabledDisposed.Should().Be(6); // but is actually 5 

     } 

    } 

et la méthode SubscribeDisposable qui est au coeur du problème. Lorsque je dispose de l'abonnement, un IDisposable de plus est généré et n'est jamais envoyé à l'abonnement.

6 produits jetables sont générés mais il y a une fuite. C'est à cause du retard que j'ai mis dans le système pour modéliser les retards de planification dans le système réel.

Ma question est.

Est-il possible d'écrire un SubscribeDisposable similaire à ce qui précède qui ne fuit pas les IDisposables.

+0

Je ne comprends pas réactif, mais où 'SubscribeDisposable' est-il utilisé? – Euphoric

+0

J'ai mis à jour le scénario de test pour démontrer complètement SubscribeDisposable. – bradgonesurfing

+0

N'est-ce pas parce que le dernier événement est à l'heure 600, mais que le sous-marin est disposé à 605, ce qui signifie qu'il ne sera jamais éliminé? – Euphoric

Répondre

0

Je suis fermement convaincu qu'il n'y a pas de solution à la question ci-dessus en raison des contrats que le RX impose. J'ai donc mis à jour mon code pour éviter le motif ci-dessus. Au lieu de cela j'utilise.

/// <summary> 
    /// Subscribes to the observable sequence and manages the disposables 
    /// with a serial disposable. That 
    /// is before the function is called again the previous disposable is disposed. 
    /// </summary> 

    public static IDisposable SubscribeDisposable<T> 
    (this IObservable<T> o, Func<T, IDisposable> fn, Action<Exception> errHandler) 
    { 
     var d = new SerialDisposable(); 

     var s = o.Subscribe(v => 
     { 
       d.Disposable = Disposable.Empty; 
       d.Disposable = fn(v) ?? Disposable.Empty; 
     }, onError:errHandler); 

     return new CompositeDisposable(s,d); 

    } 

Ainsi

IObservable<IDisposable> 

est retiré de mon code comme il semble dangereux.