2010-08-25 5 views
2

De Don Syme blog (http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus-twitter-sample.aspx) J'ai essayé d'implémenter un écouteur de flux Twitter. Mon but est de suivre les directives de la documentation de l'API de Twitter qui dit "que les tweets doivent souvent être sauvegardés ou mis en file d'attente avant d'être traités lors de la construction d'un système à haute fiabilité".Twitter flux api avec des agents en F #

Donc mon code doit avoir deux composantes:

  • Une file d'attente qui empile et traite chaque JSON
  • statut/Tweet Quelque chose à lire le flux Twitter qui déverse à la file d'attente le tweet dans les chaînes de JSON

Je choisis les éléments suivants:

  • Un agent auquel je posterai chaque tweet , Qui décode le JSON, et décharges à la base de données
  • Un simple http WebRequest

Je voudrais également jeter dans un fichier texte d'erreur, d'insérer dans la base de données. (Je vais probablement passer à un agent superviseur pour toutes les erreurs).

Deux problèmes:

  • est ma stratégie ici tout bon? Si je comprends bien, l'agent se comporte comme une file d'attente intelligente et traite ses messages de manière asynchrone (s'il a 10 gars dans sa file d'attente, il en traitera un certain nombre au lieu d'attendre que le 1 er termine, etc. ...), correct ?
  • Selon la publication de Don Syme, tout ce qui est avant est isolé, StreamWriter et le vidage de la base de données sont alors isolés. Mais parce que j'en ai besoin, je ne ferme jamais ma connexion à la base de données ...?

Le code ressemble à quelque chose comme:

let dumpToDatabase databaseName = 
    //opens databse connection 
    fun tweet -> inserts tweet in database 

type Agent<'T> = MailboxProcessor<'T> 



let agentDump = 
      Agent.Start(fun (inbox: MailboxProcessor<string>) -> 
       async{ 
        use w2 = new StreamWriter(@"\Errors.txt") 
        let dumpError =fun (error:string) -> w2.WriteLine(error) 
        let dumpTweet = dumpToDatabase "stream" 
        while true do 
         let! msg = inbox.Receive() 
         try 
          let tw = decode msg 
          dumpTweet tw 
         with 
         | :? MySql.Data.MySqlClient.MySqlException as ex -> 
    dumpError (msg+ex.ToString()) 
         | _ as ex ->() 



          } 
          ) 

    let filter_url = "http://stream.twitter.com/1/statuses/filter.json" 
    let parameters = "track=RT&" 
    let stream_url = filter_url 

    let stream = twitterStream MyCredentials stream_url parameters 


    while true do 
     agentDump.Post(stream.ReadLine()) 

Merci beaucoup!

Modifier code avec l'agent de traitement:

let dumpToDatabase (tweets:tweet list)= 
    bulk insert of tweets in database  

let agentProcessor = 
     Agent.Start(fun (inbox: MailboxProcessor<string list>) -> 
      async{ 
       while true do 
         let! msg = inbox.Receive() 
         try 
          msg 
          |> List.map(decode) 
          |> dumpToDatabase 
         with 
         | _ as ex -> Console.WriteLine("Processor "+ex.ToString())) 
       } 
       ) 



let agentDump = 
     Agent.Start(fun (inbox: MailboxProcessor<string>) -> 
        let rec loop messageList count = async{ 
         try 
          let! newMsg = inbox.Receive() 
          let newMsgList = newMsg::messageList 
          if count = 10 then 
           agentProcessor.Post(newMsgList) 
           return! loop [] 0 
          else      
           return! loop newMsgList (count+1) 
         with 
         | _ as ex -> Console.WriteLine("Dump "+ex.ToString()) 

        } 
        loop [] 0) 

let filter_url = "http://stream.twitter.com/1/statuses/filter.json" 
let parameters = "track=RT&" 
let stream_url = filter_url 

let stream = twitterStream MyCredentials stream_url parameters 


while true do 
    agentDump.Post(stream.ReadLine()) 

Répondre

5

Je pense que la meilleure façon de décrire l'agent est qu'il est est un processus en cours d'exécution qui maintient un état et peut communiquer avec d'autres agents (ou web pages ou base de données). Lors de l'écriture d'une application basée sur un agent, vous pouvez souvent utiliser plusieurs agents qui s'envoient des messages les uns aux autres. Je pense que l'idée de créer un agent qui lit les tweets du web et les stocke dans une base de données est un bon choix (bien que vous puissiez également garder les tweets en mémoire comme état de l'agent).

  • Je ne garderais pas la connexion ouverte tout le temps - MSSQL (et MySQL probablement trop) met en œuvre la mise en commun de connexion, il ne fermera pas la connexion automatiquement lorsque vous le relâchez. Cela signifie qu'il est plus sûr et tout aussi efficace de rouvrir la connexion chaque fois que vous devez écrire des données dans la base de données.À moins que vous ne vous attendiez à recevoir un grand nombre de messages d'erreur, je ferais probablement la même chose pour le flux de fichier (en écrivant, vous pouvez l'ouvrir, de sorte que le nouveau contenu soit ajouté à la fin).

La file d'attente moyen de F agents # travail est qu'il traite les messages un par un (dans votre exemple, vous attendez un message en utilisant inbox.Receive(). Lorsque la file d'attente contient plusieurs messages, vous les obtenez un par un (dans une boucle).

  • Si vous voulez traiter plusieurs messages à la fois, vous pouvez écrire un agent qui attend, par exemple, 10 messages et les envoie comme une liste à un autre agent (qui procéderait alors au traitement en vrac.)

  • Vous pouvez également spécifier le paramètre timeout à la méthode Receive, de sorte que vous pouvez attendre au plus 10 messages tant qu'ils arrivent tous en une seconde - vous pouvez ainsi mettre en œuvre de manière élégante un traitement en bloc ne contenant pas de messages pendant longtemps. temps.

+0

merci pour votre réponse tomas. Quand le travail est terminé, j'essaierai d'écrire le traitement en bloc, cela me semblera amusant (cela nécessitera probablement beaucoup plus d'études ...)! – jlezard

+0

a finalement eu le temps d'essayer d'ajouter un autre agent, semble bien fonctionner. Je suis un peu troublé par votre dernier point: à moins que je me trompe Recevoir tue l'agent après le temps spécifié, donc je ne vois pas comment l'utiliser? Merci! – jlezard

+0

@jlezard: Je pense que l'exception vous avertit qu'aucun message n'a été reçu, mais qu'il maintient l'agent dans un état utilisable. Cependant, c'est une meilleure idée d'utiliser 'TryReceive' qui renvoie' None' quand il expire (je ne me suis pas rendu compte qu'il existe quand j'ai écrit la réponse!) –