J'utilise KafkaSpout. Veuillez trouver le programme de test ci-dessous. J'utilise Storm 0.8.1. La classe Multischeme est là dans Storm 0.8.2. Je vais l'utiliser. Je veux juste savoir comment les versions précédentes fonctionnaient juste en instanciant la classe StringScheme()? Où puis-je télécharger les versions antérieures de Kafka Spout? Mais je doute que ce serait une alternative correcte que de travailler sur Storm 0.8.2. ??? (Confus)Kafka Storm Intégration utilisant Kafka Spout
Lorsque j'exécute le code (donné ci-dessous) sur storm cluster (ie quand j'appuie sur ma topologie) j'obtiens l'erreur suivante (Cela arrive quand la partie Scheme est commentée bien sûr j'obtiendrai l'erreur du compilateur la classe n'est pas là en 0.8.1):
java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
dans le code ci-dessous vous pouvez trouver la spoutConfig.scheme = new StringScheme(); partie commentée. Je recevais une erreur de compilateur si je ne commente pas cette ligne qui est naturelle car il n'y a pas de constructeur là-bas. Aussi quand j'instancie MultiScheme j'obtiens l'erreur car je n'ai pas cette classe dans 0.8.1.
public class TestTopology {
public static class PrinterBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.toString());
}
}
public static void main(String [] args) throws Exception {
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("127.0.0.1",9092));
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
spoutConfig.zkServers=ImmutableList.of("localhost");
spoutConfig.zkPort=2181;
//spoutConfig.scheme=new StringScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout",new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
cluster.submitTopology("kafka-test", config, builder.createTopology());
Thread.sleep(600000);
}
Je suppose Je ne comprends pas le problème: ça marche juste si vous allez à 0.8.2? Si c'est le cas, pourquoi même essayer de courir dans 0.8.1: 0.8.2 le remplace par quelques corrections de bugs et d'autres améliorations. –