Je travaille avec Apache Storm cette topologie:Apache Storm (Java): Bolt ne recevant pas d'autres tuple Bolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("socketspout", new SocketSpout(IP_HOST,PORT));
builder.setBolt("filterone", new FilterOne()).shuffleGrouping("socketspout");
builder.setBolt("filtertwo", new FilterTwo()).shuffleGrouping("filterone");
Les méthodes du premier boulon sont (FilteOne), cette classe s'étend BaseRichBolt:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ID1","signal1"));
}
public void execute(Tuple input) {
int sig;
try {
sig=input.getInteger(1)*2;
System.out.println("Filter one.."+Integer.toString(sig));
collector.emit("ack1", new Values(input.getString(0), sig));
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
}
}
les procédés de la deuxième culasse sont (FilteTwo), cette classe se prolonge trop BaseRichBolt:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple input) {
int sig;
try {
sig=input.getInteger(1)+1;
System.out.println("Filter two.."+Integer.toString(sig));
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
}
}
Lorsque le mode execute programme localcluster je peux voir le premier boulon émettre tuple mais le second ne reçoivent jamais le tuple ......
Le problème a été résolu de modifier le filtre d'un code de 'collector.emit (» ack1 ", nouvelles valeurs (input.getString (0), sig));' à 'collector.emit (nouvelles valeurs (input.getString (0), sig));' – amoto