2017-10-08 2 views
2

J'ai un cluster Storm se connectant à Kinesis Stream. Le message ressemble à ceci.Comment envoyer un tuple à un autre boulon en fonction d'une valeur dans le message

{ 
    _c: "a" 
} 

ou il devrait être

{ 
    _c: "b" 
} 

Je voudrais envoyer un tuple avec _c = "a" à un boulon et _c = "b" à un boulon différent. Comment puis-je y parvenir?

C'est le boulon qui analyse le message de Kinesis à JSON objet en utilisant GSon

@Override 
public void execute(Tuple tuple) { 
    String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY); 
    String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER); 
    byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA); 

    ByteBuffer buffer = ByteBuffer.wrap(payload); 
    String data = null; 
    try { 
    data = decoder.decode(buffer).toString(); 

    HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >>() {}.getType()); 

    this.outputCollector.emit(tuple, new Values(map)); 
    this.outputCollector.ack(tuple); 

    } catch (CharacterCodingException e) { 
    this.outputCollector.fail(tuple); 
    } 

} 

Merci

Répondre

0

Vous pouvez définir deux cours d'eau dans votre boulon puis déclarer deux OutputStreams:

@Override 
public void execute(Tuple tuple) { 
    // ... 
    // Some Code 
    // ... 
    if (_c =="a") { 
    collector.emit("stream1", tuple, new Values(_c)); 
    } else { 
    collector.emit("stream2", tuple, new Values(_c)); 
    } 

} 

@Override 
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
    outputFieldsDeclarer.declareStream("stream1", new Fields("_c")); 
    outputFieldsDeclarer.declareStream("stream2", new Fields("_c")); 
} } 

Dans votre topologie, vous pouvez utiliser l'option ShuffleGrouping pour transmettre un ID de flux.

topology.setBolt("FirstBolt",new FirstBolt(),1);  
topology.setBolt("newBolt1", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream1"); 
topology.setBolt("newBolt2", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream2"); 

Une autre possibilité est d'envoyer simplement aux deux boulons puis vérifier la valeur dans les deux boulons et exécuter le code requis.