2017-07-17 1 views
1

enter image description hereMa méthode d'échec dans la classe de bec ne fonctionne que pour le premier boulon, à partir du deuxième boulon et ne fonctionne pas.

Note:
Bolt1 contient une liste des trois premiers nombres premiers (2,3,5).
Bolt2 contient une liste des trois premiers nombres de nombres premiers (7,11,13).
Dans Bolt3 il suffit de vérifier le nombre est premier ou non.
À partir du premier boulon, je suis capable d'appeler Fail() à partir de la classe de bec mais à partir du deuxième boulon je ne peux pas appeler Fail() de la classe de bec.

classe Topologie:

 ...... 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("spout", new SpoutClass(), 1); 
     builder.setBolt("bolt1", new Bolt1(), 1).shuffleGrouping("spout"); 
     builder.setBolt("bolt2", new Bolt2(), 1).shuffleGrouping("bolt1"); 
     builder.setBolt("bolt3", new Bolt3(), 1).shuffleGrouping("bolt2"); 

classe Spout:

SpoutClass implements IRichSpout{ 
    private SpoutOutputCollector collector; 
    private TopologyContext context; 

    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     this.context = context; 
     this.collector = collector; 
     } 

    public void nextTuple() { 
     try { 
      //messageQueue is blocking queue which contains data 
      String msg = messageQueue.take(); 
      String ackId = msg; 
      this.collector.emit(new Values(msg), ackId); 

     }catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
    public void ack(Object msgId) { 

     System.out.println("Acknowledges that this tuple has been processed ........... " + msgId); 

    } 

    public void fail(Object msgId) { 

     System.out.println("FAILED To Process Message :-" + msgId); 

    } 
} 

classe Bolt1:

public class Bolt1 extends BaseRichBolt { 
private OutputCollector collector; 
ArrayList<Integer> firstthreePrime = new ArrayList<Integer>(); 
     firstthreePrime.add(2); 
     firstthreePrime.add(3); 
     firstthreePrime.add(5); 
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 
    } 
    public void execute(Tuple tuple) { 

     String message = (String) tuple.getValueByField("msg"); 

     System.out.println("Received " + message + " in Bolt1."); 
     Integer number = Integer.valueOf(message); 
     if (check this number contains bolt1 or not) { 
      //if number is contains 
      System.out.println(" Number is prime ............." + number + " and Throw from Bolt1"); 
      this.collector.fail(tuple); 
     } else { 
      collector.emit(new Values(message)); 
      collector.ack(tuple); 
     } 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("msg")); 
    } 
} 

classe Bolt2:

public class Bolt2 extends BaseRichBolt { 
private OutputCollector collector; 
ArrayList<Integer> secondthreePrime = new ArrayList<Integer>(); 
     secondthreePrime.add(7); 
     secondthreePrime.add(11); 
     secondthreePrime.add(13); 
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 

    } 
    public void execute(Tuple tuple) { 

     String message = (String) tuple.getValueByField("msg"); 

     System.out.println("Received " + message + " in Bolt2."); 
     Integer number = Integer.valueOf(message); 
     if (check this number contains bolt2 or not) { 
      //if number is contains 
      System.out.println(" Number is prime ............." + number + " and Throw from Bolt2"); 
      this.collector.fail(tuple); 
     } else { 
      collector.emit(new Values(message)); 
      collector.ack(tuple); 
     } 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("msg")); 
    } 
} 

classe Bolt3:

public class Bolt3 extends BaseRichBolt { 
private OutputCollector collector; 

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 

    } 
    public void execute(Tuple tuple) { 

     String message = (String) tuple.getValueByField("msg"); 

     System.out.println("Received " + message + " in Bolt3."); 
     Integer number = Integer.valueOf(message); 
     if (check this number is prime or not) { 
      //if number is prime 
      System.out.println(" Number is prime ............." + number + " and Throw from Bolt3"); 
      this.collector.fail(tuple); 
     } else { 
      collector.emit(new Values(message)); 
      collector.ack(tuple); 
     } 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    } 
} 

Répondre

1

Puisque vous utilisez BaseRichBolt, ne voulez-vous pas ancrer vos tuples sortants?

_collector.emit(tuple, new Values(message)); 

Si vous ne les ancrez pas, ils n'ont aucun lien avec le tuple provenant du bec. Découvrez les docs:

+0

Dans la classe de bec, nous ne pouvons pas émettre avec tuple. Cela fonctionne après avoir ajouté dans la classe de boulon: collector.emit (tuple, nouvelles valeurs (message)); – Ashish

+0

Oui désolé que c'était une faute de frappe signifiait BaseRichBolt. Si cela a réglé votre problème, acceptez ma réponse. –