2

J'ai un ensemble de données de diffusion en continu, la lecture de kafka et essayer d'écrire au format CSVComment définir dynamiquement un schéma de jeu de données en continu pour écrire en csv?

case class Event(map: Map[String,String]) 
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation 
val eventDataset: Dataset[Event] = spark 
    .readStream 
    .format("kafka") 
    .load() 
    .select("value") 
    .as[Array[Byte]] 
    .map(decodeEvent) 

Event détient Map[String,String] à l'intérieur et à écrire au format CSV je besoin d'un certain schéma.

Disons que tous les champs sont de type String et donc j'ai essayé l'exemple de spark repo

val columns = List("year","month","date","topic","field1","field2") 
val schema = new StructType() //Prepare schema programmatically 
columns.foreach { field => schema.add(field, "string") } 
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
    columns.map(c => event.getOrElse(c, "") 
)} 
val df = spark.sqlContext.createDataFrame(rowRdd, schema) 

Cela donne une erreur lors de l'exécution en ligne « eventDataset.rdd »:

Causée par: org.apache.spark.sql.AnalysisException: Les requêtes avec sources de diffusion doivent être exécutées avec writeStream.start() ;;

ci-dessous ne fonctionne pas parce que « .map » a une liste [chaîne] ne tuple

eventDataset.map(event => columns.map(c => event.getOrElse(c,"")) 
.toDF(columns:_*) 

est-il un moyen d'y parvenir avec le schéma programmatique et des jeux de données de transmission en continu structurés?

Répondre

1

J'utilise l'approche beaucoup plus simple:

import org.apache.spark.sql.functions._ 

eventDataset.select(columns.map(
    c => coalesce($"map".getItem(c), lit("")).alias(c) 
): _*).writeStream.format("csv").start(path) 

mais si vous voulez quelque chose de plus proche de la solution actuelle sauter la conversion RDD

import org.apache.spark.sql.catalyst.encoders.RowEncoder 

eventDataset.rdd.map(event => 
    Row.fromSeq(columns.map(c => event.getOrElse(c,""))) 
)(RowEncoder(schema)).writeStream.format("csv").start(path)