2016-12-20 4 views
2

Je voudrais adapter un IEnumerable, IDisposable (source) dans un Observable et aimerait connaître la meilleure façon de faire cela et avoir la méthode source.Dispose s'appelle lors de la désinscription. Il y a un example sur introtorx.com d'adapter un IEnumerable, mais il indique explicitement qu'il a beaucoup de défauts tels que le modèle inadmissible de disposition, modèle pauvre de concurrence, aucune gestion des erreurs, etc ... et que la version intégrée gère ces. Mais la version intégrée ne semble pas appeler Dispose sur la source IEnumerable lors de la désinscription.C# Rx Comment disposer correctement de la source Enumerable dans créé Observable

Idéalement, je voudrais utiliser le modèle .Publish().RefCount() pour avoir plusieurs abonnés sur la même source et avoir seulement la source Dispose() appelée quand ils sont tous désabonnés.

Voici le code de ma tentative, bien que cela ne fonctionne pas.

static void FromEnumerableTest() { 
    var observable = Observable.Create<int>(
     observer => { 
      var source = new JunkEnumerable(); 
      foreach (int i in source) { 
       observer.OnNext(i); 
      } 
      return() => { 
       source.Dispose(); 
      }; 
     }) 
     .SubscribeOn(Scheduler.Default) 
     .Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running 
     .Publish() 
     .RefCount(); 

    //var observable = Observable.ToObservable(new JunkEnumerable()) 
    // .SubscribeOn(Scheduler.Default) 
    // .Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running 
    // .Publish() 
    // .RefCount(); 

    Console.WriteLine("Press any key to subscribe"); 
    Console.ReadKey(); 

    var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i)); 
    Console.WriteLine("Press any key to unsubscribe"); 
    Console.ReadKey(); 
    subscription.Dispose(); 

    Console.WriteLine("Press any key to exit"); 
    Console.ReadKey(); 
} 


class JunkEnumerable : IEnumerable<int>, IDisposable { 
    public void Dispose() { Console.WriteLine("JunkEnumerable.Dispose invoked"); } 

    public IEnumerator<int> GetEnumerator() { return new Enumerator(); } 

    IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } 

    class Enumerator : IEnumerator<int> { 
     private int counter = 0; 
     public int Current { 
      get { 
       Thread.Sleep(1000); 
       return counter++; 
      } 
     } 

     object IEnumerator.Current { get { return Current; } } 

     public void Dispose() { Console.WriteLine("JunkEnumerable.Enumerator.Dispose invoked"); } 

     public bool MoveNext() { return true; } 

     public void Reset() { counter = 0; } 
    } 
} 
+0

double possible de http://stackoverflow.com/questions/7322395/creating-a-weak-subscription-to-an -iobservable –

Répondre

1

Il y a trois étapes dans un Rx abonnement à vie:

  1. souscription
  2. Observation
  3. désabonnement

Si l'abonnement ne se termine jamais, le code de désabonnement n » T arrive. Après tout, si vous ne vous êtes jamais complètement inscrit, pourquoi devriez-vous vous désabonner? Votre exemple de code a une boucle infinie dans le code d'abonnement, donc il ne se termine jamais, donc le code de désabonnement ne se produira jamais. La manière normale de gérer un IDisposable est avec Observable.Using. La façon normale de gérer un IEnumerable est avec .ToObservable. Si vous essayez d'introduire asynchronisme à code synchrone, dénombrable (comme votre exemple), vous pouvez le faire comme suit:

var observable = Observable.Using(() => new JunkEnumerable(), junk => 
    Observable.Generate(junk.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(20)) 
); 

Tant que le TimeSpan est supérieure à 15 Millis, Rx va le transformer async, compléter l'abonnement. Les valeurs suivantes font partie de la phase d'observation, et la désinscription aura entièrement lieu.

+0

Ah! c'est bien. J'ai passé la méthode 'Observable.Generate' dans la documentation. [ici] (http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html#ObservableTimer) est un lien pour les curieux. –

1

Voici un opérateur pour exécuter l'énumération sur un planificateur spécifié. Nous programmons chaque énumération de l'énumérable afin que les jetables puissent retourner correctement.

public static IObservable<T> ToObservableOn<T>(this IEnumerable<T> source, IScheduler scheduler = default(IScheduler)) 
    { 
     scheduler = scheduler ?? Scheduler.Default; 
     return Observable.Create<T>(
      (observer) => 
      { 
       var disposed = new BooleanDisposable(); 
       var enumerator = source.GetEnumerator(); 

       Action scheduleNext = default(Action); 
       scheduleNext =() => 
       { 
        if (disposed.IsDisposed) 
         return; 

        if (!enumerator.MoveNext()) 
        { 
         observer.OnCompleted(); 
         return; 
        } 

        observer.OnNext(enumerator.Current); 

        scheduler.Schedule(scheduleNext); 
       }; 

       scheduler.Schedule(scheduleNext); 
       return StableCompositeDisposable.Create(disposed, enumerator); 
      }); 
    } 

De votre exemple, nous changeons simplement le SubscribeOn à:

 var observable = 
      new JunkEnumerable() 
      .ToObservableOn(Scheduler.Default)     
      .Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running 
      .Publish() 
      .RefCount();