2017-09-26 3 views
4

J'utilise Java.Comment accéder aux chemins de fichiers dans les enregistrements de Kafka et créer des jeux de données?

Je reçois un chemin de fichier sur kafka-messages. Et j'ai besoin de charger ce fichier dans un RDD spark, le traiter et le vider dans HDFS.

Je suis capable de récupérer le chemin de fichier du message kafka. Et je souhaite créer un jeu de données/RDD sur ce fichier.

Je ne peux pas exécuter une fonction de carte sur un jeu de données de message kafka. Il commet une erreur avec un NPE car sparkContext n'est pas disponible sur le worker.

Je ne peux pas exécuter un foreach sur le jeu de données de messages kafka. Il émet une erreur avec le message "Les requêtes avec les sources de streaming doivent être exécutées avec writeStream.start();"

Je ne peux pas collecter les données reçues de l'ensemble de données de messages kafka, car il génère une erreur avec le message "Les requêtes avec des sources de diffusion doivent être exécutées avec writeStream.start() ;;"

Je suppose que cela doit être un cas d'utilisation très général et doit être exécuté dans de nombreuses configurations.

Comment puis-je charger le fichier en tant que RDD à partir des chemins que je reçois dans le message Kafka?

CODE CI-DESSOUS:

SparkSession spark = SparkSession.builder() 
.appName("MyKafkaStreamReader") 
    .master("local[4]") 
.config("spark.executor.memory", "2g") 
.getOrCreate(); 

// Create DataSet representing the stream of input lines from kafka 
Dataset<String> kafkaValues = spark.readStream() 
.format("kafka") 
    .option("spark.streaming.receiver.writeAheadLog.enable", true) 
    .option("kafka.bootstrap.servers", Configuration.KAFKA_BROKER) 
    .option("subscribe", Configuration.KAFKA_TOPIC) 
    .option("fetchOffset.retryIntervalMs", 100) 
    .option("checkpointLocation", "file:///tmp/checkpoint") 
.load() 
    .selectExpr("CAST(value AS STRING)").as(Encoders.STRING()); 

Dataset<String> messages = kafkaValues.map(x -> { 
    ObjectMapper mapper = new ObjectMapper(); 
    String m = mapper.readValue(x.getBytes(), String.class); 
    return m; 
}, Encoders.STRING()); 

// ==================== 
// TEST 1 : FAILS 
// ====================  
// CODE TRYING TO execute MAP on the received RDD 
// This fails with a Null pointer exception because "spark" is not available on worker node 

/* 
Dataset<String> statusRDD = messages.map(message -> { 

    // BELOW STATEMENT FAILS 
    Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message); 
    Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates(); 
    dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation()); 
    return getHdfsLocation(); 

}, Encoders.STRING()); 

    StreamingQuery query2 = statusRDD.writeStream().outputMode("append").format("console").start(); 
    */ 

// ====================  
// TEST 2 : FAILS 
// ====================  
// CODE BELOW FAILS WITH EXCEPTION 
// "Queries with streaming sources must be executed with writeStream.start();;" 
// Hence, processing the deduplication on the worker side using 
/* 
JavaRDD<String> messageRDD = messages.toJavaRDD(); 

messageRDD.foreach(message -> { 

    Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message); 
    Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates(); 
    dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation()); 

}); 
*/ 

// ====================  
// TEST 3 : FAILS 
// ==================== 
// CODE TRYING TO COLLECT ALSO FAILS WITH EXCEPTION 
// "Queries with streaming sources must be executed with writeStream.start();;" 
// List<String> mess = messages.collectAsList(); 

Toute idée sur comment puis-je lire créer les chemins de fichiers et de créer des RDD sur les fichiers?

+0

Pouvez-vous envoyer votre code aussi ..comment tu le fais. – nat

+0

Ajout du code pour mes essais. –

+0

Je ne pense pas que vous pouvez réaliser cette utilisation en utilisant le streaming structuré. Utilisez plutôt Spark Streaming avec le consommateur kafka 'Direct'. Vous pouvez implémenter votre logique de chargement de fichier personnalisée dans l'opération générale 'foreachRDD'. – maasg

Répondre

0

Dans Streaming structuré, je ne pense pas qu'il existe un moyen de réifier les données dans un flux à utiliser en tant que paramètre pour une opération Dataset.

Dans l'écosystème Spark, ceci est possible en combinant Spark Streaming et Spark SQL (Datasets). Nous pouvons utiliser Spark Streaming pour consommer le sujet Kafka et ensuite, en utilisant Spark SQL, nous pouvons charger les données correspondantes et appliquer le processus désiré.

Un tel travail ressemblerait à peu près comme ceci: (Ceci est en Scala, le code Java suit la même structure seulement que le code réel est un peu plus bavard.)

// configure and create spark Session 

val spark = SparkSession 
    .builder 
    .config(...) 
    .getOrCreate() 

// create streaming context with a 30-second interval - adjust as required 
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(30)) 

// this uses Kafka080 client. Kafka010 has some subscription differences 

val kafkaParams = Map[String, String](
    "metadata.broker.list" -> kafkaBootstrapServer, 
    "group.id" -> "job-group-id", 
    "auto.offset.reset" -> "largest", 
    "enable.auto.commit" -> (false: java.lang.Boolean).toString 
) 

// create a kafka direct stream 
val topics = Set("topic") 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    streamingContext, kafkaParams, topics) 

// extract the values from the kafka message 
val dataStream = stream.map{case (id, data) => data}  

// process the data 
dataStream.foreachRDD { dataRDD => 
    // get all data received in the current interval 
    // We are assuming that this data fits in memory. 
    // We're not processing a million files per second, are we? 
    val files = dataRDD.collect() 
    files.foreach{ file => 
    // this is the process proposed in the question -- 
    // notice how we have access to the spark session in the context of the foreachRDD 
    val fileDataset = spark.read().option("header", "true").csv(file) 
    val dedupedFileDataset = fileDataset.dropDuplicates() 
    // this can probably be written in terms of the dataset api 
    //dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation()); 
    dedupedFileDataset.write.format("text").mode("overwrite").save(getHdfsLocation()) 
    } 
} 

// start the streaming process 
streamingContext.start() 
streamingContext.awaitTermination()