2017-09-20 4 views
4

J'utilise Spark streaming pour lire les données Kinesis en utilisant le cadre structuré en continu, ma connexion est la suivanteSpark en streaming Garantie spécifique Heure fenêtre de démarrage

val kinesis = spark 
    .readStream 
    .format("kinesis") 
    .option("streams", streamName) 
    .option("endpointUrl", endpointUrl) 
    .option("initialPositionInStream", "earliest") 
    .option("format", "json") 
    .schema(<my-schema>) 
    .load 

Les données proviennent de plusieurs appareils IdO qui ont une expérience unique id, je dois agréger les données de cet identifiant et par une fenêtre culbute sur le champ d'horodatage, comme suit:

val aggregateData = kinesis 
    .groupBy($"uid", window($"timestamp", "15 minute", "15 minute")) 
    .agg(...) 

le problème que je rencontre est que je dois garantir que toutes les fenêtres commence parfois rondes (comme 00:00:00, 00:15:00 et ainsi de suite), aussi je dois garantie da que seules les lignes contenant plein longues fenêtres de 15 minutes vont être sortie à mon évier, ce que je fais actuellement est

val query = aggregateData 
    .writeStream 
     .foreach(postgreSQLWriter) 
     .outputMode("update") 
     .start() 
     .awaitTermination() 

Où ths postgreSQLWriter est un StreamWriter j'ai créé pour insérer chaque ligne dans un PostgreSQL SGBD. Comment puis-je forcer mes fenêtres à durer exactement 15 minutes et l'heure de début à arrondir les valeurs d'horodatage de 15 minutes pour chaque identifiant unique de périphérique?

Répondre

1

question1: pour commencer à des moments précis pour commencer, il y a une autre fonction de groupement d'étincelles paramètres qui est "offset". En spécifiant qu'il commence après le temps spécifié à partir d'une heure Exemple:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")) 

si la syntaxe ci-dessus groupe par column1 et créer des fenêtres de durée de 22 minutes avec une taille de fenêtre glissante de 1 minute et de décalage en tant que 15 minute

par exemple, il commence à partir de:

window1: 8:15(8:00 add 15 minute offset) to 8:37 (8:15 add 22 minutes) 
window2: 8:16(previous window start + 1 minute) to 8:38 (22 minute size again) 

question2: pour pousser uniquement les fenêtres ayant pleine taille de 15 minutes, créez une colonne de comptage qui compte le nombre d'événements ayant dans cette fenêtre. une fois qu'il atteint 15, le pousser à l'endroit où vous voulez en utilisant la commande de filtre

calcul count:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")).agg(count*$"Column2").as("count")) 

filtre WriteStream contenant 15 seulement le nombre:

aggregateddata.filter($"count"===15).writeStream.format(....).outputMode("complete").start()