2015-10-19 2 views
1

J'écris un petit programme pour mettre des tweets du flux public Twitter dans une base de données HBase. Le programme utilise deux threads, un pour collecter les tweets et un pour les traiter. Le premier thread utilise twitter4j StatusListener pour obtenir les tweets et les place dans une ArrayBlockingQueue d'une capacité de 100. Le deuxième thread prend un statut de la file d'attente, filtre les données nécessaires et les déplace vers la base de données. Le traitement prend plus de temps que la collecte de l'état.java blockingqueue Bloc consommateur en file d'attente complète

Le producteur se présente comme suit:

public void onStatus(Status status) { 
    try { 
     this.queue.put(status); 
    } catch(Exception ex) { 
     ex.printStackTrace(); 
    } 
} 

Le consommateur utilise prendre et appelle une fonction pour traiter le nouveau statut:

public void run() { 
    try { 
     while(true) { 
      // Get new status to process 
      this.status = this.queue.take(); 
      this.analyse(); 
     } 
    } catch(Exception ex) { 
     ex.printStackTrace(); 
    } 
} 

Dans la fonction principale des deux fils ont été créés et ont commencé:

ArrayBlockingQueue<Status> queue_public = new ArrayBlockingQueue<Status>(100); 

Thread ta_public = new Thread(new TweetAnalyser(cl.getOptionValue("config"), queue_public)); 
Thread st_public = new Thread(new RunPublicStream(cl.getOptionValue("config"), queue_public)); 

ta_public.start(); 
st_public.start(); 

Le programme se déroule pendant un certain temps sans aucun problème, mais arrête s brusquement. À ce moment-là, la file d'attente est pleine et il semble que le consommateur ne soit pas en mesure de prendre un nouveau statut. J'ai essayé plusieurs variantes du modèle producteur/consommateur sans succès. Aucune exception n'est levée.

Je ne sais pas qui devait chercher l'échec. J'espère que quelqu'un pourrait me donner un indice ou une solution.

+0

Combien de temps dure un certain temps? L'échec se produit-il immédiatement lorsque la file d'attente se remplit ou est-elle heureuse depuis un certain temps? Avez-vous des appels de type 'System.exit' dans' analyser' (ou des méthodes appelées à partir de là)? –

+0

Non, il filtre simplement les hashtags, le nom d'utilisateur et le texte du tweet et les place dans la base de données. –

+0

Echoue-t-il toujours si vous n'appelez pas 'analyse'? Et qu'entendez-vous exactement par "stop" - les sorties de la JVM, ça se bloque, ou autre chose? –

Répondre

0

Si vous travaillez avec des files d'attente bloquantes, vérifiez s'il y a des commandes de blocage (mettre et prendre pour ArrayBlockingQueue) dans le code et des fautes de frappe si vous travaillez avec plusieurs listes.