Je suis en train de créer une API consommateur d'étincelles pour recevoir les données de Kafka.Mais dans mon code de consommateur, je ne peux pas ajouter de jar/dépendance pour ces deux classes: import org.apache .spark.streaming.scheduler.ReceiverLauncher; importation org.apache.spark.streaming.Scheduler;Kafka Spark Consumer API problème avec dépendance
J'utilise Kafka 0.11.0.1 et Spark 2.2.0 sur ma machine locale et mon Code de la consommation:
package kafkatest2;
import java.io.Serializable;
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.scheduler.ReceiverLauncher;
import consumer.kafka.MessageAndMetadata;
import kafka.consumer.Consumer;
import org.apache.spark.streaming.Scheduler;
//import kafka.consumer.Consumer;
//import kafka.message.MessageAndMetadata;
public class ConsumerTest implements Serializable {
private static final long serialVersionUID = 4332618245650072140L;
public void start() throws InstantiationException, IllegalAccessException,
ClassNotFoundException {
run();
}
private void run() {
Properties props = new Properties();
props.put("zookeeper.hosts", "localhost");
props.put("zookeeper.port", "2181");
props.put("zookeeper.broker.path", "/brokers");
props.put("kafka.topic", "test-topic");
props.put("kafka.consumer.id", "test-id");
props.put("zookeeper.consumer.connection", "localhost:2182");
props.put("zookeeper.consumer.path", "/spark-kafka");
// Optional Properties
props.put("consumer.forcefromstart", "true");
props.put("consumer.fetchsizebytes", "1048576");
props.put("consumer.fillfreqms", "250");
props.put("consumer.backpressure.enabled", "true");
SparkConf _sparkConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.enable", "false");
JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, new Duration(1000));
// Specify number of Receivers you need.
int numberOfReceivers = 3;
JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY());
unionStreams
.foreachRDD(new Function2<JavaRDD<MessageAndMetadata>, Time, Void>() {
@Override
public Void call(JavaRDD<MessageAndMetadata> rdd, Time time) throws Exception {
rdd.collect();
System.out.println(" Number of records in this batch "
+ rdd.count());
return null;
}
});
jsc.start();
jsc.awaitTermination();
}
public static void main(String[] args) throws Exception {
ConsumerTest consumer = new ConsumerTest();
consumer.start();
}
}