2017-07-25 1 views
0

Je travaille avec Apache Storm cette topologie:Apache Storm (Java): Bolt ne recevant pas d'autres tuple Bolt

TopologyBuilder builder = new TopologyBuilder(); 
    builder.setSpout("socketspout", new SocketSpout(IP_HOST,PORT)); 
    builder.setBolt("filterone", new FilterOne()).shuffleGrouping("socketspout"); 
    builder.setBolt("filtertwo", new FilterTwo()).shuffleGrouping("filterone"); 

Les méthodes du premier boulon sont (FilteOne), cette classe s'étend BaseRichBolt:

public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    declarer.declare(new Fields("ID1","signal1")); 
} 

public void execute(Tuple input) { 
    int sig; 
    try { 
     sig=input.getInteger(1)*2; 
     System.out.println("Filter one.."+Integer.toString(sig)); 
     collector.emit("ack1", new Values(input.getString(0), sig)); 
     collector.ack(input); 
    } catch (Exception e) { 
     collector.fail(input); 
    } 
} 

les procédés de la deuxième culasse sont (FilteTwo), cette classe se prolonge trop BaseRichBolt:

public void declareOutputFields(OutputFieldsDeclarer declarer) { 
} 

public void execute(Tuple input) { 
    int sig; 
    try { 
     sig=input.getInteger(1)+1; 
     System.out.println("Filter two.."+Integer.toString(sig)); 
     collector.ack(input); 
    } catch (Exception e) { 
     collector.fail(input); 
    } 
} 

Lorsque le mode execute programme localcluster je peux voir le premier boulon émettre tuple mais le second ne reçoivent jamais le tuple ......

enter image description here

+0

Le problème a été résolu de modifier le filtre d'un code de 'collector.emit (» ack1 ", nouvelles valeurs (input.getString (0), sig));' à 'collector.emit (nouvelles valeurs (input.getString (0), sig));' – amoto

Répondre

0

Le problème a été résolu de modifier le filtre d'un code de collector.emit("ack1", new Values(input.getString(0), sig)); à collector.emit(new Values(input.getString(0), sig));

0

la méthode du collecteur de émette pourrait mettre en place comme comme:

collector.emit(input, new Values(input.getString(0), sig)); 

ne pas oublier de définir le nom de champ pour cette valeur, dans la méthode declareOutpu TFields:

public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("myValue")); 
    } 

Ensuite, dans le deuxième boulon, essayez d'obtenir la valeur en utilisant le champ "maValeur":

sig = input.getValueByField("myValue").getInteger(1)+1;