2014-09-17 2 views
1

Je teste ZeroMQ et je ne reçois que 1227-1276 messages par seconde. J'ai lu cependant que ceux-ci sont censés être plus de 100x ce montant.ZeroMQ produisant des résultats maigres

Qu'est-ce que je fais mal? Y at-il une configuration que je peux spécifier pour résoudre ce problème?

J'utilise les fonctionnalités suivantes:

public static final String SERVER_LOCATION = "127.0.0.1"; 
public static final int SERVER_BIND_PORT = 5570; 

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{ 
    ZContext ctx = new ZContext(); 

    Socket frontend = ctx.createSocket(ZMQ.PULL); 
    frontend.bind("tcp://*:"+SERVER_BIND_PORT); 

    int i = 1; 
    do{ 
     ZMsg msg = ZMsg.recvMsg(frontend); 
     ZFrame content = msg.pop(); 
     if(content!= null){ 
      msg.destroy(); 
      System.out.println("Received: "+i); 
      i++; 
      content.destroy(); 
     } 
    }while(true); 
} 

public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{ 
    ZContext ctx = new ZContext(); 
    Socket client = ctx.createSocket(ZMQ.PUSH); 

    client.setIdentity("i".getBytes()); 
    client.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT); 

    PollItem[] items = new PollItem[] { new PollItem(client, Poller.POLLIN) }; 
    int i = 1; 
    Timer t = new Timer(timeToSpendSending); 
    t.start(); 
    do{ 
     client.send(/* object to send*/ , 0); 
     i++; 
    }while(!t.isDone()); 

    System.out.println("Done with "+i); 
} 

classe Timer utilisée pour limiter le temps du programme court pour:

class Timer extends Thread{ 
    int time; 
    boolean done; 
    public Timer(int time){ 
     this.time = time; 
     done = false; 
    } 
    public void run(){ 
     try { 
      this.sleep(time); 
      done = true; 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
    public boolean isDone(){ 
     return done; 
    } 
} 

Edit: J'utilise jeroMQ

<dependency> 
    <groupId>org.zeromq</groupId> 
    <artifactId>jeromq</artifactId> 
    <version>0.3.4</version> 
</dependency> 

Répondre

0

I a dû remplacer la méthode connect et supprimé High Water Mark (mis à 0 pour les messages illimités dans la mémoire)

Le code serait la suivante:

public static final String SERVER_LOCATION = "127.0.0.1"; 
public static final int SERVER_BIND_PORT = 5570; 
public static final String TOPIC = "topic1"; 

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{ 
    // Prepare our context and subscribe 
     Context context = ZMQ.context(1); 
     Socket subscriber = context.socket(ZMQ.SUB); 

     subscriber.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT); 
     subscriber.setRcvHWM(0); 
     subscriber.subscribe(TOPIC.getBytes()); 
     System.out.println("subscribed to "+TOPIC); 
     int i = 1; 
     boolean started = false; 
     Timer t = new Timer(timeToSpendSending); 
     do{ 
      String msg = subscriber.recvStr(); 
      if(!TOPIC.equals(msg)){ 
       if(!started){ 
        t.start(); 
        started = true; 
       } 
       i++; 
      } 
     }while(!t.isDone()); 
     System.out.println("Done with: "+i); 
     subscriber.close(); 
     context.term(); 
    } 
    public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{ 
     Context context = ZMQ.context(1); 
     Socket publisher = context.socket(ZMQ.PUSH); 
     publisher.bind("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT); 
     publisher.setHWM(0); 
     publisher.setSndHWM(0); 

     int i = 1; 
     Timer t = new Timer(timeToSpendSending); 
     t.start(); 
     do{ 
      publisher.sendMore(TOPIC); 
      publisher.send("Test Data number "+i); 
      i++; 
     }while(!t.isDone()); 
     System.out.println("Done with: "+i); 
     publisher.close(); 
     context.term(); 
    } 

Comme cela, je suis le nombre de messages allant dans les 250 000 par seconde lors de l'envoi et 145000 par seconde lors de la réception.

Questions connexes