0

Comment sauver Kafka-Spark messages de données en continu cadre dans seul fichierComment économiser Kafka-Spark messages de données en continu cadre dans seul fichier

J'ai développé une application qui consomme les messages à l'aide de processus Streaming Spark Kafka-.

Une fois les données reçues, elles sont transformées en trame de données. Ensuite, la trame de données en continu est sauvegardée en tant que fichier texte, ici la trame de données est sauvegardée dans chaque fichier pour chaque message kafka, ci-dessous est le code que j'ai utilisé pour enregistrer en fichier texte, c'est enregistrer les données pour multiplier le fichier texte pour chaque message.

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append") 
           .save("path") 

Voici l'exigence que je voudrais obtenir le streaming cadre de ce jour doivent être enregistré en tant que fichier unique pour chaque message kafka, si possible s'il vous plaît me aider à la solution.

Merci d'avance

Répondre

0

Le code ci-dessous pourrait vous aider. Il suffit de générer la liste de RDD, puis l'union.

var dStreamRDDList = new ListBuffer[RDD[String]] 
dStream.foreachRDD(rdd => 
    { 
     dStreamRDDList += rdd 
    }) 
val joinRDD = ssc.sparkContext.union(dStreamRDDList) 
//then convert joinRDD to DataFrame (DF) 
DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append") 
          .save("path")