J'utilise Spark streaming pour lire les données Kinesis en utilisant le cadre structuré en continu, ma connexion est la suivanteSpark en streaming Garantie spécifique Heure fenêtre de démarrage
val kinesis = spark
.readStream
.format("kinesis")
.option("streams", streamName)
.option("endpointUrl", endpointUrl)
.option("initialPositionInStream", "earliest")
.option("format", "json")
.schema(<my-schema>)
.load
Les données proviennent de plusieurs appareils IdO qui ont une expérience unique id, je dois agréger les données de cet identifiant et par une fenêtre culbute sur le champ d'horodatage, comme suit:
val aggregateData = kinesis
.groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
.agg(...)
le problème que je rencontre est que je dois garantir que toutes les fenêtres commence parfois rondes (comme 00:00:00, 00:15:00 et ainsi de suite), aussi je dois garantie da que seules les lignes contenant plein longues fenêtres de 15 minutes vont être sortie à mon évier, ce que je fais actuellement est
val query = aggregateData
.writeStream
.foreach(postgreSQLWriter)
.outputMode("update")
.start()
.awaitTermination()
Où ths postgreSQLWriter est un StreamWriter j'ai créé pour insérer chaque ligne dans un PostgreSQL SGBD. Comment puis-je forcer mes fenêtres à durer exactement 15 minutes et l'heure de début à arrondir les valeurs d'horodatage de 15 minutes pour chaque identifiant unique de périphérique?