2017-08-07 1 views
0

Comment analyser uv, pv, ip un jour toutes les 5 minutes, et stocké Mysql. Les données sont de Kafka dans le format suivant:comment utiliser spark pour analyser pv, uv, ip toutes les 5 min

Message sent: {"cookie":"a95f22eabc4fd4b580c011a3161a9d9d","ip":"125.119.144.252","event_time":"2017-08-07 10:50:16"} 
Message sent: {"cookie":"6b67c8c700427dee7552f81f3228c927","ip":"202.109.201.181","event_time":"2017-08-07 10:50:26"} 

Il est comme 00: 00-00: 05 00: 05 à -00: 10 et ainsi de suite, je :

val write=new JDBCSink() 
     val query=counts.writeStream.foreach(write).outputMode("complete") 
      .trigger(ProcessingTime("5 minutes"))  
      .start() 

mais quand je le commets sur 00:01 ou sa panne, comment puis-je être sûr qu'il ne sera pas analyser comme 00: 01-00: 06 et ainsi de suite.

Répondre

0

Utilisation de la fonction window:

query = counts.groupBy(window('event_time', '5 second')).agg() 
query.writeStream.start() 
+0

pv, calculate uv est le dernier jour, et la fenêtre n'est pas stateful, si j'utilise la fenêtre comme cette fenêtre ($ "unix_timestamp", "1 jour", « 5 minutes ") il devrait également exécuter le programme à 00:00 et non 00:01 – Aaron