2017-08-24 2 views
2

J'ai une chaîne observable, l'observable initial est du réseau, et sera déclenché chaque fois que le message est prêt à être lu. Le gestionnaire suivant lit alors le message et le désérialise. Maintenant, j'ai une fourchette de l'observable, l'un est gestionnaire de messages et l'autre enregistre le message.f # effet de fourche observable et effet secondaire

Le problème est que parce que je suis à l'aide observable je vais essayer effectivement de lire le message deux fois.

Je comprends que l'utilisation de l'événement au lieu de Observable résoudra la question, mais je devrai alors un problème de collecte des ordures qui pourraient provoquer des prises de ne pas être collectées.

Une solution à laquelle j'ai pensé est d'insérer une sorte de séparateur qui termine une chaîne d'observables et en crée une nouvelle, une telle fonction existe-t-elle déjà dans fsharp ou une autre bibliothèque de bibliothèques.

Existe-t-il d'autres solutions au problème?

Edit:

Exemple de code

qui ne fonctionne pas correctement

let messagesStream = 
    socket.observable |> 
    Observable.map (fun() -> socket.read()) |> 
    Observable.map (fun m -> deserialize m) 

messagesStream |> Observable.add (fun m -> printf m) 
messagesStream |> Observable.add (fun m -> handle m) 
+3

Du côté: Vous devez mettre les tuyaux avant * avant * la fonction que vous tuyau dans, non * derrière * la valeur tuyau. Les aligner de cette façon rend beaucoup plus clair ce qui se passe. – TeaDrivenDev

+2

Chaque abonnement à ab observable pipeline provoque un abonnement à la source observable. Vous créez deux abonnements à 'socket.observable'. Vous devriez essayer de mettre un opérateur de publication à la fin de l'observable 'messagesStream'. – Enigmativity

+0

Merci, j'ai trouvé sur une publication, connaissez-vous une bibliothèque pour f # avec fonction de publication? Si vous pouvez ajouter un exemple de réponse et de code, je le marquerai comme la réponse. Merci – somdoron

Répondre

0

On dirait que vous pouvez créer une observable qui gère la lecture du message du réseau et désérialise il. En supposant que cela soit fait avec des opérateurs Rx standard, cela devrait renvoyer une observation qui pousse de nouveaux messages réseau désérialisés.

Vous pouvez avoir 2 abonnés à cette observable, qui réagit aux nouveaux messages avec ce logique métier que vous souhaitez et la journalisation du second abonné les messages.

Cela devrait éliminer l'effet secondaire de la lecture à plusieurs reprises à partir du réseau. Pousser 2 copies du message désérialisé ne devrait pas avoir d'effet secondaire.

+0

J'ai modifié le code avec un exemple qui ne fonctionne pas. – somdoron

+0

Je ne suis pas sûr de comprendre votre réponse. Ce que vous suggérez est de ne pas utiliser Observable.map mais de vous abonner au réseau et ensuite de produire de nouveaux observables? pouvez-vous donner un exemple de code? – somdoron

+0

En voyant le code que vous avez posté, il semble que vous ayez essayé ce que je suggérais dans ma réponse. Qu'est-ce qui ne fonctionne pas exactement? –

0

La meilleure façon d'ajouter un peu de l'exploitation forestière est d'utiliser Observable.iter comme suit:

let messagesStream = 
    socket.observable 
    |> Observable.map (fun() -> socket.read()) 
    |> Observable.map (fun m -> deserialize m) 
    |> Observable.iter (printfn "%A") 

messagesStream |> Observable.add (fun m -> handle m)