2017-07-28 4 views
0

Mon attente est d'ajouter des observables à la volée (par exemple, le téléchargement d'images), de les laisser démarrer et, quand j'ai fini de tout mettre en file d'attente, d'attendre tout observable à être fini.Comment attendre que tout ce qui est observable dans un tableau mutable se termine dans RxSwift

Voici ma classe:

open class InstantObservables<T> { 
    lazy var disposeBag = DisposeBag() 

    public init() { } 

    lazy var observables: [Observable<T>] = [] 
    lazy var disposables: [Disposable] = [] 

    open func enqueue(observable: Observable<T>) { 
     observables.append(observable) 

     let disposable = observable 
      .subscribe() 

     disposables.append(disposable) 

     disposable 
      .addDisposableTo(disposeBag) 
    } 

    open func removeAndStop(atIndex index: Int) { 
     guard observables.indices.contains(index) 
      && disposables.indices.contains(index) else { 
      return 
     } 
     let disposable = disposables.remove(at: index) 
     disposable.dispose() 

     _ = observables.remove(at: index) 
    } 

    open func waitForAllObservablesToBeFinished() -> Observable<[T]> { 
     let multipleObservable = Observable.zip(observables) 
     observables.removeAll() 
     disposables.removeAll() 
     return multipleObservable 
    } 

    open func cancelObservables() { 
     disposeBag = DisposeBag() 
    } 
} 

Mais quand je souscris à l'observable envoyé par waitForAllObservablesToBeFinished(), tous sont réexécutée (ce qui est logique, sur la façon dont fonctionne Rx).

Comment pourrais-je garantir que chacun est exécuté une fois, quel que soit le numéro de l'abonnement?

Répondre

0

En écrivant la question, j'ai eu la réponse! En modifiant l'observable à travers shareReplay(1), et en mettant en file d'attente et en vous abonnant à cette observable altérée .. Cela fonctionne!

Voici le code mis à jour:

open class InstantObservables<T> { 
    lazy var disposeBag = DisposeBag() 

    public init() { } 

    lazy var observables: [Observable<T>] = [] 
    lazy var disposables: [Disposable] = [] 

    open func enqueue(observable: Observable<T>) { 
     let shared = observable.shareReplay(1) 
     observables.append(shared) 

     let disposable = shared 
      .subscribe() 

     disposables.append(disposable) 

     disposable 
      .addDisposableTo(disposeBag) 
    } 

    open func removeAndStop(atIndex index: Int) { 
     guard observables.indices.contains(index) 
      && disposables.indices.contains(index) else { 
      return 
     } 
     let disposable = disposables.remove(at: index) 
     disposable.dispose() 

     _ = observables.remove(at: index) 
    } 

    open func waitForAllObservablesToBeFinished() -> Observable<[T]> { 
     let multipleObservable = Observable.zip(observables) 
     observables.removeAll() 
     disposables.removeAll() 
     return multipleObservable 
    } 

    open func cancelObservables() { 
     disposeBag = DisposeBag() 
    } 
}