J'ai appliqué l'agrégation sur la base de données en streaming en utilisant le mode complet. Pour enregistrer les données en local, j'ai implémenté foreach sink. Je suis capable de sauvegarder des données sous forme de texte. Mais j'ai besoin de le sauvegarder en format parquet.Comment enregistrer l'agrégation en flux continu dans le mode de sortie Complet vers le parquet?
val writerForText = new ForeachWriter[Row] {
var fileWriter: FileWriter = _
override def process(value: Row): Unit = {
fileWriter.append(value.toSeq.mkString(","))
}
override def close(errorOrNull: Throwable): Unit = {
fileWriter.close()
}
override def open(partitionId: Long, version: Long): Boolean = {
FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
true
}
}
val columnName = "col1"
frame.select(count(columnName),count(columnName),min(columnName),mean(columnName),max(columnName),first(columnName), last(columnName), sum(columnName))
.writeStream.outputMode(OutputMode.Complete()).foreach(writerForText).start()
Comment puis-je y parvenir? Merci d'avance!
avez-vous trouvé un moyen de sauver au parquet en mode complet/mise à jour? –
Oui, j'ai écrit écrivain personnalisé en utilisant https://github.com/chtefi/parquet-custom-reader-writer –