2017-09-13 1 views
0

J'utilise RxJs pour écouter un flux de données provenant d'une socket Web. Pour utiliser cette socket pubsub, je dois d'abord m'authentifier sur une autre socket qui retournera l'URL du socket pubsub.Comment faire attendre les observateurs pour la création de WebSocket après l'authentification

Je travaille maintenant avec un seul abonné. Je dois en ajouter un autre, mais il y a des problèmes avec mon approche actuelle car les deux abonnés vont déclencher l'authentification et créer une seconde socket pubsub. Pour être clair, il ne devrait y avoir qu'un seul socket d'authentification et un seul socket pubsub pour l'ensemble de l'application. Mais j'ai besoin que les abonnés "attendent" que l'authentification se produise avant d'essayer d'utiliser le socket pubsub. Sinon, le socket pubsub sera indéfini (puisque nous savons seulement qu'il s'agit d'une URL au moment de l'exécution).

Ceci est ma tentative actuelle:

Websocket.service.ts

private connect(): Observable<IConnectionInfo> { 

    if (!this.authObservable) { 
     var credentials = { 
     "username": "bob", 
     "password": "slob" 
     } 

     this.authObservable = Observable.create((observer) => { 
     const socket = new WebSocket(this.authurl); 
     socket.onopen =() => { 
      console.log("Auth socket opened"); 
      socket.send(JSON.stringify(credentials)); 
     } 
     socket.onmessage = (event) => { 
      var connection_info = <IConnectionInfo>JSON.parse(event.data); 
      observer.next(connection_info); 
     } 

     return() => { 
      socket.close(); //invoked on unsubscribe 
      console.log("Auth socket closed"); 
     } 
     }) 
    } 
    return this.authObservable; 
    } 

public open_pubsub(events: string[]): Observable<IPubSubMessage> { 

    return this.connect() 
     .flatMap((connection_info: IConnectionInfo) => { 
     var url = "ws://" + this.hostName + ":" + connection_info.port + "/" + connection_info.ps; 

     var subscription = { 
      "subscribe": events 
     } 

     var authenticate_request = { 
      "authenticate": connection_info['token'] 
     } 

     if (!this.psObservable) { 
      this.psObservable = Observable.create((observer) => { 
      const socket = new WebSocket(url); 

      socket.onopen =() => { 
       console.log("PS socket opened"); 
       socket.send(JSON.stringify(authenticate_request)); 
       socket.send(JSON.stringify(subscription)); 
      } 
      socket.onmessage = (event) => { 
       var psmsg = <IPubSubMessage>JSON.parse(event.data); 
       observer.next(psmsg); 
      } 

      return() => { 
       socket.close(); //invoked on unsubscribe 
       console.log("PS socked closed"); 
      } 
      }) 
     } 
     return this.psObservable; 
     } 
    ); 
    } 

Et l'observateur:

getSystemState(): Observable<string> { 

    return this._wsService.open_pubsub([MSG_SYSTEM_STATE_CHANGED]) 
     .map((response: IPubSubMessage): string => { 

       console.log(response.payload); 
       return "I wish this worked"; 
     }) 
     .catch(this.handleError); 
} 

Toute aide appréciée!

(EDIT Sur la base d'une réponse qui semble avoir été supprimé, mais il était vraiment très utile, j'ai modifié le code. Cette partie fixe du problème, mais se traduit par plusieurs prises)

Répondre

0

trouvé ce que Il me manquait - la magie de l'opérateur share().

Donc, pour les deux Observables, je fais ceci:

this.authObservable = Observable.create((observer) => { 
     ...(etc)... 
     }).share(); 

Et étonnamment, seulement 1 de chaque prise est créé.