2017-04-10 1 views
2

J'utilise 0.4.0 version de jeromq et j'essayais d'utiliser l'exemple ci-dessous à partir de ce link mais il donne une erreur de compilation sur cette ligne ZMQ.poll(items, 10);. Il semble que quelque chose a changé dans la version récente de jeromq mais la documentation et le code ne sont pas encore mis à jour. Quelqu'un peut-il m'aider à comprendre comment puis-je adapter mon code ci-dessous à utiliser avec la dernière version de jeromq.Client/serveur asynchrone utilisant Java jeromq

<dependency> 
     <groupId>org.zeromq</groupId> 
     <artifactId>jeromq</artifactId> 
     <version>0.4.0</version> 
    </dependency> 

Voici le code:

public class asyncsrv { 
    // --------------------------------------------------------------------- 
    // This is our client task 
    // It connects to the server, and then sends a request once per second 
    // It collects responses as they arrive, and it prints them out. We will 
    // run several client tasks in parallel, each with a different random ID. 
    private static Random rand = new Random(System.nanoTime()); 

    private static class client_task implements Runnable { 

    public void run() { 
     ZContext ctx = new ZContext(); 
     Socket client = ctx.createSocket(ZMQ.DEALER); 

     // Set random identity to make tracing easier 
     String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt()); 
     client.setIdentity(identity.getBytes()); 
     client.connect("tcp://localhost:5570"); 

     PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)}; 

     int requestNbr = 0; 
     while (!Thread.currentThread().isInterrupted()) { 
     // Tick once per second, pulling in arriving messages 
     for (int centitick = 0; centitick < 100; centitick++) { 
      // this line is giving compilation error as it says undefined 
      ZMQ.poll(items, 10); 
      if (items[0].isReadable()) { 
      ZMsg msg = ZMsg.recvMsg(client); 
      msg.getLast().print(identity); 
      msg.destroy(); 
      } 
     } 
     client.send(String.format("request #%d", ++requestNbr), 0); 
     } 
     ctx.destroy(); 
    } 
    } 

    // This is our server task. 
    // It uses the multithreaded server model to deal requests out to a pool 
    // of workers and route replies back to clients. One worker can handle 
    // one request at a time but one client can talk to multiple workers at 
    // once. 
    private static class server_task implements Runnable { 
    public void run() { 
     ZContext ctx = new ZContext(); 

     // Frontend socket talks to clients over TCP 
     Socket frontend = ctx.createSocket(ZMQ.ROUTER); 
     frontend.bind("tcp://*:5570"); 

     // Backend socket talks to workers over inproc 
     Socket backend = ctx.createSocket(ZMQ.DEALER); 
     backend.bind("inproc://backend"); 

     // Launch pool of worker threads, precise number is not critical 
     for (int threadNbr = 0; threadNbr < 5; threadNbr++) 
     new Thread(new server_worker(ctx)).start(); 

     // Connect backend to frontend via a proxy 
     ZMQ.proxy(frontend, backend, null); 

     ctx.destroy(); 
    } 
    } 

    // Each worker task works on one request at a time and sends a random number 
    // of replies back, with random delays between replies: 

    private static class server_worker implements Runnable { 
    private ZContext ctx; 

    public server_worker(ZContext ctx) { 
     this.ctx = ctx; 
    } 

    public void run() { 
     Socket worker = ctx.createSocket(ZMQ.DEALER); 
     worker.connect("inproc://backend"); 

     while (!Thread.currentThread().isInterrupted()) { 
     // The DEALER socket gives us the address envelope and message 
     ZMsg msg = ZMsg.recvMsg(worker); 
     ZFrame address = msg.pop(); 
     ZFrame content = msg.pop(); 
     assert (content != null); 
     msg.destroy(); 

     // Send 0..4 replies back 
     int replies = rand.nextInt(5); 
     for (int reply = 0; reply < replies; reply++) { 
      // Sleep for some fraction of a second 
      try { 
      Thread.sleep(rand.nextInt(1000) + 1); 
      } catch (InterruptedException e) { 
      } 
      address.send(worker, ZFrame.REUSE + ZFrame.MORE); 
      content.send(worker, ZFrame.REUSE); 
     } 
     address.destroy(); 
     content.destroy(); 
     } 
     ctx.destroy(); 
    } 
    } 

    // The main thread simply starts several clients, and a server, and then 
    // waits for the server to finish. 

    public static void main(String[] args) throws Exception { 
    ZContext ctx = new ZContext(); 
    new Thread(new client_task()).start(); 
    new Thread(new client_task()).start(); 
    new Thread(new client_task()).start(); 
    new Thread(new server_task()).start(); 

    // Run for 5 seconds then quit 
    Thread.sleep(5 * 1000); 
    ctx.destroy(); 
    } 
} 

Répondre

3

En 0.4.0 il n'y a pas de méthode de sondage. Mais vous pouvez utiliser ZPoller à la place.

Exemple:

- Vous devez nouvelle instance de Poller:

ZPoller zPoller = new ZPoller(ctx); 
zPoller.register(client, ZMQ.Poller.POLLIN); 

- sondage:

zPoller.poll(10); 

- et lecture si prise est lisible:

if (zPoller.isReadable(client)) { 
    ZMsg msg = ZMsg.recvMsg(client); 
    msg.getLast().print(identity); 
    msg.destroy(); 
} 

donc votre code ressemblera à ceci:

public class asyncsrv { 
    // --------------------------------------------------------------------- 
    // This is our client task 
    // It connects to the server, and then sends a request once per second 
    // It collects responses as they arrive, and it prints them out. We will 
    // run several client tasks in parallel, each with a different random ID. 
    private static Random rand = new Random(System.nanoTime()); 

    private static class client_task implements Runnable { 

     public void run() { 
      ZContext ctx = new ZContext(); 
      ZMQ.Socket client = ctx.createSocket(ZMQ.DEALER); 

      // Set random identity to make tracing easier 
      String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt()); 
      client.setIdentity(identity.getBytes()); 
      client.connect("tcp://localhost:5570"); 

      //ZMQ.PollItem[] items = new ZMQ.PollItem[] {new ZMQ.PollItem(client, ZMQ.Poller.POLLIN)}; 
      ZPoller zPoller = new ZPoller(ctx); 
      zPoller.register(client, ZMQ.Poller.POLLIN); 

      int requestNbr = 0; 
      while (!Thread.currentThread().isInterrupted()) { 
       // Tick once per second, pulling in arriving messages 
       for (int centitick = 0; centitick < 100; centitick++) { 
        // this line is giving compilation error as it says undefined 
        //ZMQ.poll(items, 10); 
        zPoller.poll(10); 
        /*if (items[0].isReadable()) { 
         ZMsg msg = ZMsg.recvMsg(client); 
         msg.getLast().print(identity); 
         msg.destroy(); 
        }*/ 
        if (zPoller.isReadable(client)) { 
         ZMsg msg = ZMsg.recvMsg(client); 
         msg.getLast().print(identity); 
         msg.destroy(); 
        } 
       } 
       client.send(String.format("request #%d", ++requestNbr), 0); 
      } 
      ctx.destroy(); 
     } 
    } 

    // This is our server task. 
    // It uses the multithreaded server model to deal requests out to a pool 
    // of workers and route replies back to clients. One worker can handle 
    // one request at a time but one client can talk to multiple workers at 
    // once. 
    private static class server_task implements Runnable { 
     public void run() { 
      ZContext ctx = new ZContext(); 

      // Frontend socket talks to clients over TCP 
      ZMQ.Socket frontend = ctx.createSocket(ZMQ.ROUTER); 
      frontend.bind("tcp://*:5570"); 

      // Backend socket talks to workers over inproc 
      ZMQ.Socket backend = ctx.createSocket(ZMQ.DEALER); 
      backend.bind("inproc://backend"); 

      // Launch pool of worker threads, precise number is not critical 
      for (int threadNbr = 0; threadNbr < 5; threadNbr++) 
       new Thread(new server_worker(ctx)).start(); 

      // Connect backend to frontend via a proxy 
      ZMQ.proxy(frontend, backend, null); 

      ctx.destroy(); 
     } 
    } 

    // Each worker task works on one request at a time and sends a random number 
    // of replies back, with random delays between replies: 

    private static class server_worker implements Runnable { 
     private ZContext ctx; 

     public server_worker(ZContext ctx) { 
      this.ctx = ctx; 
     } 

     public void run() { 
      ZMQ.Socket worker = ctx.createSocket(ZMQ.DEALER); 
      worker.connect("inproc://backend"); 

      while (!Thread.currentThread().isInterrupted()) { 
       // The DEALER socket gives us the address envelope and message 
       ZMsg msg = ZMsg.recvMsg(worker); 
       ZFrame address = msg.pop(); 
       ZFrame content = msg.pop(); 
       assert (content != null); 
       msg.destroy(); 

       // Send 0..4 replies back 
       int replies = rand.nextInt(5); 
       for (int reply = 0; reply < replies; reply++) { 
        // Sleep for some fraction of a second 
        try { 
         Thread.sleep(rand.nextInt(1000) + 1); 
        } catch (InterruptedException e) { 
        } 
        address.send(worker, ZFrame.REUSE + ZFrame.MORE); 
        content.send(worker, ZFrame.REUSE); 
       } 
       address.destroy(); 
       content.destroy(); 
      } 
      ctx.destroy(); 
     } 
    } 

    // The main thread simply starts several clients, and a server, and then 
    // waits for the server to finish. 

    public static void main(String[] args) throws Exception { 
     ZContext ctx = new ZContext(); 
     new Thread(new client_task()).start(); 
     new Thread(new client_task()).start(); 
     new Thread(new client_task()).start(); 
     new Thread(new server_task()).start(); 

     // Run for 5 seconds then quit 
     Thread.sleep(5 * 1000); 
     ctx.destroy(); 
    } 
} 
+0

obtenu maintenant .. De plus, quelle est la différence entre '' close' et destroy'. Devrais-je appeler 'close' sur' zPoller' ou 'destroy'? – user1234

+0

@ La méthode close de user1234 ne gère pas les exceptions IOException et destroy (elle appelle effectivement la méthode close). aussi, c'est assez drôle, mais ces deux méthodes sont inutiles, parce que nous n'avons pas de sélecteurs ici)) – DontPanic