2017-09-26 10 views
2

J'ai appliqué l'agrégation sur la base de données en streaming en utilisant le mode complet. Pour enregistrer les données en local, j'ai implémenté foreach sink. Je suis capable de sauvegarder des données sous forme de texte. Mais j'ai besoin de le sauvegarder en format parquet.Comment enregistrer l'agrégation en flux continu dans le mode de sortie Complet vers le parquet?

val writerForText = new ForeachWriter[Row] { 
    var fileWriter: FileWriter = _ 

    override def process(value: Row): Unit = { 
     fileWriter.append(value.toSeq.mkString(",")) 
    } 

    override def close(errorOrNull: Throwable): Unit = { 
     fileWriter.close() 
    } 

    override def open(partitionId: Long, version: Long): Boolean = { 
     FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}")) 
     fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp")) 
     true 

    } 
    } 

val columnName = "col1" 
frame.select(count(columnName),count(columnName),min(columnName),mean(columnName),max(columnName),first(columnName), last(columnName), sum(columnName)) 
       .writeStream.outputMode(OutputMode.Complete()).foreach(writerForText).start() 

Comment puis-je y parvenir? Merci d'avance!

+0

avez-vous trouvé un moyen de sauver au parquet en mode complet/mise à jour? –

+0

Oui, j'ai écrit écrivain personnalisé en utilisant https://github.com/chtefi/parquet-custom-reader-writer –

Répondre

-1

Pour enregistrer les données en local, j'ai implémenté foreach sink. Je suis capable de sauvegarder des données sous forme de texte. Mais j'ai besoin de le sauvegarder en format parquet.

Le format par défaut lors de l'enregistrement d'un dataset streaming est ... parquet. Cela dit, vous n'avez pas besoin d'utiliser un évier foreach assez avancé, mais simplement parquet.

La requête pourrait être comme suit:

scala> :type in 
org.apache.spark.sql.DataFrame 

scala> in.isStreaming 
res0: Boolean = true 

in.writeStream. 
    option("checkpointLocation", "/tmp/checkpoint-so"). 
    start("/tmp/parquets") 
+0

Streaming structuré ne nous permet pas d'écrire des données sur n'importe quel récepteur en mode complet sauf la mémoire. Pour l'enregistrer, nous devons implémenter foreach sink. Nous ne pouvons pas le faire en mode complet que vous avez suggéré. –

+0

Ouch ... vous avez peut-être raison ... été trop pressé d'y répondre. Laissez-moi y penser ... –

+0

@MaheshChandKandpal Comme votre récepteur est un fichier, il est logique d'utiliser le ** File Sink ** avec le mode ajout comme dans la réponse de Jacek. Votre solution avec le lavabo Foreach semble être surreprésentée –