0

Je suis en train de créer une API consommateur d'étincelles pour recevoir les données de Kafka.Mais dans mon code de consommateur, je ne peux pas ajouter de jar/dépendance pour ces deux classes: import org.apache .spark.streaming.scheduler.ReceiverLauncher; importation org.apache.spark.streaming.Scheduler;Kafka Spark Consumer API problème avec dépendance

J'utilise Kafka 0.11.0.1 et Spark 2.2.0 sur ma machine locale et mon Code de la consommation:

package kafkatest2; 

import java.io.Serializable; 
import java.util.Properties; 
import java.util.Arrays; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.TopicPartition; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.storage.StorageLevel; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.Time; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.scheduler.ReceiverLauncher; 

import consumer.kafka.MessageAndMetadata; 
import kafka.consumer.Consumer; 

import org.apache.spark.streaming.Scheduler; 

//import kafka.consumer.Consumer; 
//import kafka.message.MessageAndMetadata; 

public class ConsumerTest implements Serializable { 

private static final long serialVersionUID = 4332618245650072140L; 

public void start() throws InstantiationException, IllegalAccessException, 
    ClassNotFoundException { 

    run(); 
} 

private void run() { 

    Properties props = new Properties(); 
    props.put("zookeeper.hosts", "localhost"); 
    props.put("zookeeper.port", "2181"); 
    props.put("zookeeper.broker.path", "/brokers"); 
    props.put("kafka.topic", "test-topic"); 
    props.put("kafka.consumer.id", "test-id"); 
    props.put("zookeeper.consumer.connection", "localhost:2182"); 
    props.put("zookeeper.consumer.path", "/spark-kafka"); 
    // Optional Properties 
    props.put("consumer.forcefromstart", "true"); 
    props.put("consumer.fetchsizebytes", "1048576"); 
    props.put("consumer.fillfreqms", "250"); 
    props.put("consumer.backpressure.enabled", "true"); 

    SparkConf _sparkConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.enable", "false"); 
    JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, new Duration(1000)); 
    // Specify number of Receivers you need. 
    int numberOfReceivers = 3; 
    JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY()); 
    unionStreams 
    .foreachRDD(new Function2<JavaRDD<MessageAndMetadata>, Time, Void>() { 
    @Override 
    public Void call(JavaRDD<MessageAndMetadata> rdd, Time time) throws Exception { 
     rdd.collect(); 
     System.out.println(" Number of records in this batch " 
     + rdd.count()); 

     return null; 
    } 
    }); 

    jsc.start(); 
    jsc.awaitTermination(); 
} 

public static void main(String[] args) throws Exception { 

    ConsumerTest consumer = new ConsumerTest(); 
    consumer.start(); 
} 
} 

Répondre

0

Maven. Voici un pom.xml qui fonctionne. (Réglez les versions kafka/Spark si nécessaire). J'espère que cela aide.

<dependencies> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.4</version> 
     <scope>provided</scope> 
    </dependency> 

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.1.1</version> 
     <scope>provided</scope> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.1</version> 
     <scope>provided</scope> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.3</version> 
     <scope>provided</scope> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.11</artifactId> 
     <version>2.1.1</version> 
     <scope>provided</scope> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql-kafka-0-10_2.11</artifactId> 
     <version>2.1.1</version> 
     <!--  
     <scope>provided</scope> 
     --> 
    </dependency> 


    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.2.1</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.11.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
     <version>2.6.3</version> 
    </dependency> 

</dependencies>