2010-11-15 3 views
1

Je crée une bibliothèque composée d'un applicateur Log4J qui envoie des événements de manière asynchrone à un serveur distant. Lorsqu'une instruction de journal est créée, l'appender enregistre l'événement de manière asynchrone dans une file d'attente locale qu'un groupe de consommateurs récupère ensuite et envoie à la télécommande.JMS est-il la réponse à la nécessité d'une file d'attente de blocage persistante?

La solution complètement en mémoire serait de créer un BlockingQueue qui gérerait le problème de la concurrence. Cependant, j'aimerais que la file d'attente soit conservée de façon à ce que, si le serveur distant n'est pas disponible, je ne développe pas la file d'attente sans limite ou commence à rejeter les messages dans le cas d'une file d'attente limitée. Je pensais à utiliser une base de données H2 intégrée pour stocker les événements localement, puis utiliser un mécanisme d'interrogation pour récupérer les événements et les envoyer à la télécommande. Je préfèrerais utiliser un BlockingQueue plutôt que d'interroger une table de base de données.

Est-ce que JMS est la réponse?

EDIT:

Si JMS est la réponse, et il semble aller de cette façon, quelqu'un at-il des recommandations sur une solution JMS léger, intégrable qui peut être configuré pour accepter uniquement les messages en cours de traitement? En d'autres termes, je ne veux pas, et je ne le ferai peut-être pas, ouvrir un socket TCP sur lequel écouter.

EDIT:

J'ai ActiveMQ intégrés maintenant et il semble fonctionner. Merci a tous.

+0

juste être conscient de la mise en œuvre de JMS que vous utilisent et comment l'infrastructure est mise en place. Certaines implémentations de JMS permettent à plusieurs threads de lire et de traiter des messages simultanément. Cela préserve FIFO du point de vue de se faire ramasser par la file d'attente, mais ne conserve pas nécessairement le FIFO du point de vue du moment où les messages sont traités. Encore une fois, vérifiez pour voir votre implémentation. Si vous exécutez JMS qui prend en charge les lectures simultanées, assurez-vous simplement qu'il est réglé pour les interdire. Ou si vous ne vous souciez pas, alors vous pouvez ignorer ce que je viens de dire. :) –

+0

Merci. C'est OK si certaines données sont légèrement non ordonnées car elles peuvent être triées sur des données d'événement. Ma préoccupation avec JMS est la surcharge de l'intégration d'une autre bibliothèque.Il y a beaucoup d'implémentations JMS différentes - est-ce qu'il y en a une qui est plus facile à implémenter que d'autres et qui peut fonctionner de manière intégrée? – Collin

+1

Je chercherais une implémentation JMS qui ne supporte que le besoin d'y accéder via la spécification JMS 1.1 et/ou 1.2. De cette façon, vous pouvez simplement lire et écrire par programmation à partir des files d'attente en utilisant uniquement l'API. La seule raison pour laquelle j'ai mentionné l'implémentation est que parfois vous devez configurer certaines choses avec l'implémentation du fournisseur pour activer/désactiver la concurrence, etc. Dans mon cas, nous nous en tenons aux produits IBM, parce que mon entreprise aime Big Blue. Mais vous pourriez probablement vous en sortir avec n'importe quelle implémentation. Dans notre cas, je dis toujours à nos développeurs de s'en tenir à l'API et de ne rien faire de spécifique à l'implémentation. –

Répondre

1

Vous pouvez utiliser JMS pour envoyer de manière asynchrone des messages à un ordinateur distant (en supposant qu'il puisse les recevoir bien sûr), Log4j dispose d'un Appender JMS que vous pouvez utiliser pour cela.

+0

La mise en œuvre sera spécifique au fournisseur, mais les documents log4j devraient vous aider à démarrer, je crois qu'ils ont un exemple de configuration/application dans leurs documents –

1

Vous pouvez certainement utiliser JMS à cet effet. Autant que je comprends, vous utilisez l'appender Log4J JMS. Ce composant envoie des messages à destination JMS préconfigurée (généralement en file d'attente). Vous pouvez configurer cette file d'attente pour qu'elle soit persistante. Dans ce cas, tous les messages insérés dans la file d'attente seront automatiquement stockés dans un magasin persistant (généralement une base de données.). Malheureusement, cette configuration est spécifique au fournisseur (dépend du fournisseur JMS), mais est généralement très simple. Veuillez vous reporter à la documentation de votre fournisseur JMS.

0

Voir si cela fonctionne

Ce code devrait fonctionner pour vous - son une mémoire file d'attente de blocage persistant - a besoin des réglages de fichiers, mais devrait fonctionner

 package test; 

    import java.io.BufferedReader; 
    import java.io.BufferedWriter; 
    import java.io.File; 
    import java.io.FileReader; 
    import java.io.FileWriter; 
    import java.io.IOException; 
    import java.util.ArrayList; 
    import java.util.Collections; 
    import java.util.LinkedList; 
    import java.util.List; 

    public class BlockingQueue { 

    //private static Long maxInMenorySize = 1L; 
    private static Long minFlushSize = 3L; 

    private static String baseDirectory = "/test/code/cache/"; 
    private static String fileNameFormat = "Table-"; 

    private static String currentWriteFile = ""; 

    private static List<Object> currentQueue = new LinkedList<Object>(); 
    private static List<Object> lastQueue = new LinkedList<Object>(); 

    static{ 
     try { 
      load(); 
     } catch (IOException e) { 
      System.out.println("Unable To Load"); 
      e.printStackTrace(); 
     } 
    } 

    private static void load() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()==0){ 
      //currentQueue = lastQueue = new ArrayList<Object>(); 
      currentWriteFile = baseDirectory + "Table-1"; 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString()+ "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 
     }else{ 
      if(fileList.size()>0){ 
        BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
        String line=null; 
        while ((line=reader.readLine())!=null){ 
         currentQueue.add(line); 
        } 
        reader.close(); 
        File toDelete = new File(fileList.get(0)); 
        toDelete.delete(); 
      } 

      if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1))); 
       currentWriteFile = fileList.get(fileList.size()-1); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        lastQueue.add(line); 
       } 
       reader.close(); 
       //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9)); 
      } 
     } 

    } 

    private void loadFirst() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        currentQueue.add(line); 
       } 
       reader.close(); 
       File toDelete = new File(fileList.get(0)); 
       toDelete.delete(); 
     } 
    } 

    public Object pop(){ 
     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 

     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 
     else 
      return null; 
    } 

    public synchronized Object waitTillPop() throws InterruptedException{ 
     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      if(currentQueue.size()==0) 
       wait(); 
     } 
     return currentQueue.remove(0); 
    } 

    public synchronized void push(Object data) throws IOException{ 
     lastQueue.add(data); 
     this.notifyAll(); 
     if(lastQueue.size()>=minFlushSize){ 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString() + "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 

      currentWriteFile = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
        (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1); 
     } 
    } 

    public static void main(String[] args) { 
     try { 
      BlockingQueue bq = new BlockingQueue(); 

      for(int i =0 ; i<=8 ; i++){ 
       bq.push(""+i); 
      } 

      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 

      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 



     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
Questions connexes