J'ai un ensemble de données de diffusion en continu, la lecture de kafka et essayer d'écrire au format CSVComment définir dynamiquement un schéma de jeu de données en continu pour écrire en csv?
case class Event(map: Map[String,String])
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation
val eventDataset: Dataset[Event] = spark
.readStream
.format("kafka")
.load()
.select("value")
.as[Array[Byte]]
.map(decodeEvent)
Event
détient Map[String,String]
à l'intérieur et à écrire au format CSV je besoin d'un certain schéma.
Disons que tous les champs sont de type String
et donc j'ai essayé l'exemple de spark repo
val columns = List("year","month","date","topic","field1","field2")
val schema = new StructType() //Prepare schema programmatically
columns.foreach { field => schema.add(field, "string") }
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
columns.map(c => event.getOrElse(c, "")
)}
val df = spark.sqlContext.createDataFrame(rowRdd, schema)
Cela donne une erreur lors de l'exécution en ligne « eventDataset.rdd »:
Causée par: org.apache.spark.sql.AnalysisException: Les requêtes avec sources de diffusion doivent être exécutées avec writeStream.start() ;;
ci-dessous ne fonctionne pas parce que « .map » a une liste [chaîne] ne tuple
eventDataset.map(event => columns.map(c => event.getOrElse(c,""))
.toDF(columns:_*)
est-il un moyen d'y parvenir avec le schéma programmatique et des jeux de données de transmission en continu structurés?