Tout d'abord, toutes mes excuses pour le titre, je ne savais pas comment décrire cela de façon éloquente.Spark plusieurs chemins de sortie se traduisent par plusieurs lectures d'entrée
J'ai un travail d'étincelle qui analyse les journaux en JSON, puis en utilisant spark-sql convertit des colonnes spécifiques en ORC et écrit sur différents chemins. Par exemple:
val logs = sc.textFile("s3://raw/logs")
val jsonRows = logs.mapPartitions(partition => {
partition.map(log => {
logToJson.parse(log)
}
}
jsonRows.foreach(r => {
val contentPath = "s3://content/events/"
val userPath = "s3://users/events/"
val contentDf = sqlSession.read.schema(contentSchema).json(r)
val userDf = sqlSession.read.schema(userSchema).json(r)
val userDfFiltered = userDf.select("*").where(userDf("type").isin("users")
// Save Data
val contentWriter = contentDf.write.mode("append").format("orc")
eventWriter.save(contentPath)
val userWriter = userDf.write.mode("append").format("orc")
userWriter.save(userPath)
Quand j'ai écrit cela, j'attendre à ce que l'analyse syntaxique se produirait une fois, et il écrirait aux emplacements respectifs après. Cependant, il semble qu'il exécute tout le code dans le fichier deux fois - une fois pour content
et une fois pour users
. Est-ce prévu? Je préférerais ne pas transférer les données de S3 et analyser deux fois, car c'est le plus gros goulot d'étranglement. J'attache une image de l'interface utilisateur Spark pour montrer la duplication des tâches pour une seule fenêtre de streaming. Merci pour toute l'aide que vous pourrez fournir!