J'ai un cluster Storm se connectant à Kinesis Stream. Le message ressemble à ceci.Comment envoyer un tuple à un autre boulon en fonction d'une valeur dans le message
{
_c: "a"
}
ou il devrait être
{
_c: "b"
}
Je voudrais envoyer un tuple avec _c = "a" à un boulon et _c = "b" à un boulon différent. Comment puis-je y parvenir?
C'est le boulon qui analyse le message de Kinesis à JSON objet en utilisant GSon
@Override
public void execute(Tuple tuple) {
String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
ByteBuffer buffer = ByteBuffer.wrap(payload);
String data = null;
try {
data = decoder.decode(buffer).toString();
HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >>() {}.getType());
this.outputCollector.emit(tuple, new Values(map));
this.outputCollector.ack(tuple);
} catch (CharacterCodingException e) {
this.outputCollector.fail(tuple);
}
}
Merci