2010-12-03 3 views
6

J'ai, disons, 1000 observables. Maintenant, je veux agréger tous les événements dans une nouvelle observable qui déclenche OnNext une fois que tous les autres ont envoyé un événement. Quelle est la meilleure façon de faire cela en utilisant Rx?Agréger un grand nombre d'observables dans un nouveau observable

Mise à jour: Quelques bons commentaires sur le forum Rx, en particulier par Dave Sexton. Il a montré comment créer une méthode d'extension Zip qui prend plusieurs observables: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/daaa84db-b560-4eda-871e-e523098db20c/

+0

Tous les types des 1000 observables sont-ils identiques? Qu'est-ce que vous tapez de l'agrégation observable à être? –

+0

Tous les 1000 observables sont du même type, le nouvel agrégat peut être un nouveau type. Par exemple. L'événement devient AggregateEvent. – lukebuehler

+0

Voulez-vous combiner leurs dernières valeurs uniquement? C'EST À DIRE. si Observable a déclenche deux événements et que b ne peut en déclencher qu'un, voulez-vous agréger le premier événement d'un événement, ou le dernier événement d'un, avec l'événement b? –

Répondre

2

Il existe un MailboxProcessor en F # ... J'utiliserais un SynchronizationContext en C# dans le même but. Donnez-moi quelques minutes et je vais écrire un exemple. A côté: Voici mon code en F # qui fait quelque chose de similaire ... Ce sera beaucoup plus d'effort, mais encore faisable en C# avec Rx.

open System.Diagnostics 

let numWorkers = 20 
let asyncDelay = 100 

type MessageForMailbox = 
    | DataMessage of AsyncReplyChannel<unit> 
    | GetSummary of AsyncReplyChannel<unit> 

let main = 
    let actor = 
     MailboxProcessor.Start(fun inbox -> 
     let rec loop acc = 
      async { 
       let! message = inbox.Receive() 
       match message with 
       | DataMessage replyChannel -> replyChannel.Reply(); return! loop acc 
       | GetSummary replyChannel -> replyChannel.Reply(); return! loop acc 
      } 

     loop 0 // seed for acc 
    ) 

    let codeBlocks = [for i in 1..numWorkers -> 
         async { 
          do! Async.Sleep asyncDelay 
          return! actor.PostAndAsyncReply DataMessage 
         } ] 

    while true do 
     printfn "Concurrent started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore 
     actor.PostAndReply GetSummary 
     sw.Stop() 
     printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * 100)/sw.ElapsedMilliseconds) 

     printfn "Synchronous started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore 
     sw.Stop() 
     printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100)/sw.ElapsedMilliseconds) 

main 
+0

hmm, donc vous voulez dire quelque chose dans le sens de l'utilisation de SynchronizationContext.Send() pour synchroniser toutes les observables en créant des événements? Je vois un peu ce que fait votre code F # mais je ne suis pas assez averti pour le comprendre complètement. – lukebuehler

+0

Je pense que vous l'avez. RunSynchronously implémente ForkJoin avec des flux de travail asynchrones. – GregC

+0

+1: Je n'ai jamais vu un bon exemple de MailboxProcessor auparavant. :) –

Questions connexes