2017-10-19 3 views
0

Tout d'abord, toutes mes excuses pour le titre, je ne savais pas comment décrire cela de façon éloquente.Spark plusieurs chemins de sortie se traduisent par plusieurs lectures d'entrée

J'ai un travail d'étincelle qui analyse les journaux en JSON, puis en utilisant spark-sql convertit des colonnes spécifiques en ORC et écrit sur différents chemins. Par exemple:

val logs = sc.textFile("s3://raw/logs") 
val jsonRows = logs.mapPartitions(partition => { 
    partition.map(log => { 
    logToJson.parse(log) 
    } 
} 

jsonRows.foreach(r => { 
    val contentPath = "s3://content/events/" 
    val userPath = "s3://users/events/" 
    val contentDf = sqlSession.read.schema(contentSchema).json(r) 
    val userDf = sqlSession.read.schema(userSchema).json(r) 
    val userDfFiltered = userDf.select("*").where(userDf("type").isin("users") 
    // Save Data 
    val contentWriter = contentDf.write.mode("append").format("orc") 
    eventWriter.save(contentPath) 
    val userWriter = userDf.write.mode("append").format("orc") 
    userWriter.save(userPath) 

Quand j'ai écrit cela, j'attendre à ce que l'analyse syntaxique se produirait une fois, et il écrirait aux emplacements respectifs après. Cependant, il semble qu'il exécute tout le code dans le fichier deux fois - une fois pour content et une fois pour users. Est-ce prévu? Je préférerais ne pas transférer les données de S3 et analyser deux fois, car c'est le plus gros goulot d'étranglement. J'attache une image de l'interface utilisateur Spark pour montrer la duplication des tâches pour une seule fenêtre de streaming. Merci pour toute l'aide que vous pourrez fournir! Spark Application UI

Répondre

0

D'accord, ce type de DFs imbriqué est un non. DataFrames sont censés être une structure de données pour big jeux de données qui ne rentrent pas dans des structures de données normales (comme Seq ou List) et qui doivent être traitées de manière distribuée. C'est pas juste un autre type de tableau. Ce que vous essayez de faire ici est de créer un DataFrame par ligne de journal, ce qui n'a pas de sens. Dans la mesure où je peux dire à partir du code (incomplet) que vous avez posté ici, vous voulez créer deux nouveaux DataFrames de votre entrée d'origine (les journaux) que vous souhaitez ensuite stocker dans deux endroits différents. Quelque chose comme ceci:

val logs = sc.textFile("s3://raw/logs") 
val contentPath = "s3://content/events/" 
val userPath = "s3://users/events/" 

val jsonRows = logs 
    .mapPartitions(partition => { 
    partition.map(log => logToJson.parse(log)) 
    } 
    .toDF() 
    .cache() // Or use persist() if dataset is larger than will fit in memory 

jsonRows 
    .write 
    .format("orc") 
    .save(contentPath) 

jsonRows 
    .filter(col("type").isin("users")) 
    .write 
    .format("orc") 
    .save(userPath) 

Espérons que cela aide.