2017-09-14 2 views
5

Scénario: J'ai un service qui enregistre des événements comme dans cet exemple CSV:Faire bouillir vers le bas des événements à des intervalles de temps

#TimeStamp, Name, ColorOfPullover 
TimeStamp01, Peter, Green 
TimeStamp02, Bob, Blue 
TimeStamp03, Peter, Green 
TimeStamp04, Peter, Red 
TimeStamp05, Peter, Green 

événements qui par exemple Peter porte Vert se produira très souvent dans une rangée.

J'ai deux objectifs:

  1. Conserver les données aussi faible que possible
  2. Conserver tous les pertinentes données

moyens pertinents: Je dois savoir, dans lequel temps traves une personne était portant quelle couleur. Par exemple:

#StartTime, EndTime, Name, ColorOfPullover 
TimeStamp01, TimeStamp03, Peter, Green 
TimeStamp02, TimeStamp02, Bob, Blue 
TimeStamp03, TimeStamp03, Peter, Green 
TimeStamp04, TimeStamp04, Peter, Red 
TimeStamp05, TimeStamp05, Peter, Green 

Dans ce format, je peux répondre à des questions comme: Quelle couleur a été Peter portait au moment TimeStamp02? (Je peux supposer que chaque personne porte la même couleur entre les deux événements pour la connecté même couleur.)

question principale: Puis-je utiliser une technologie déjà existante pour y parvenir? C'est à dire. Je peux le fournir avec un flux continu d'événements et il extrait et stocke les données pertinentes?


Pour être précis, je dois mettre en œuvre un algorithme comme celui-ci (pseudo-code). La méthode OnNewEvent est appelée pour chaque ligne de l'exemple CSV. Le paramètre event contient déjà les données de la ligne en tant que variables membres.

def OnNewEvent(even) 
    entry = Database.getLatestEntryFor(event.personName) 
    if (entry.pulloverColor == event.pulloverColor) 
     entry.setIntervalEndDate(event.date) 
     Database.store(entry) 
    else 
     newEntry = new Entry 
     newEntry.setIntervalStartDate(event.date) 
     newEntry.setIntervalEndDate(event.date) 
     newEntry.setPulloverColor(event.pulloverColor)) 
     newEntry.setName(event.personName) 
     Database.createNewEntry(newEntry) 
    end 
end 
+0

Il devrait être possible de le faire avec logstash, mais le problème est que vous devez faire une demande de ElasticSearch pour chaque ligne pour récupérer la dernière entrée, qui fera la processus très lent. C'est pourquoi je ne pense pas que logstash soit le bon outil pour cela. – baudsp

+0

Quels sont vos volumes de données et à quelle vitesse devez-vous réagir lorsqu'un nouvel événement se produit? Est-ce correct si certains événements sont perdus? – ffeast

+0

La réaction aux événements peut être lente. Par exemple. Un jour de retard est acceptable. Donc, un emploi cron un par jour pourrait être une option. Les événements ne peuvent pas être perdus, c'est critique. – fex

Répondre

0
This is typical scenario of any streaming architecture. 

There are multiple existing technologies which work in tandem to get what you want. 


1. NoSql Database (Hbase, Aerospike, Cassandra) 
2. streaming jobs Like Spark streaming(micro batch), Storm 
3. Run mapreduce in micro batch to insert into NoSql Database. 
4. Kafka Distriuted queue 

The end to end flow. 

Data -> streaming framework -> NoSql Database. 
OR 
Data -> Kafka -> streaming framework -> NoSql Database. 


IN NoSql database there are two ways to model your data. 
1. Key by "Name" and for every event for that given key, insert into Database. 
    While fetching u get back all events corresponding to that key. 

2. Key by "name", every time a event for key is there, do a UPSERT into a existing blob(Object saved as binary), Inside the blob you maintain the time range and color seen. 

Code sample to read and write to Hbase and Aerospike 

Hbase: http://bytepadding.com/hbase/

aérospike: http://bytepadding.com/aerospike/

+0

les deux liens sont cassés – ffeast

+0

Désolé compagnon, les hackers s'amusaient, juste eu le site corrigé. N'hésitez pas à parcourir les exemples. et le moi savoir si vous avez besoin de plus de clarification – KrazyGautam

0

Une façon de le faire est d'utiliser HiveMQ. HiveMQ est une technologie de file d'attente de messages basée sur MQTT. La bonne chose à ce sujet est que vous pouvez écrire des plugins personnalisés pour traiter le message entrant. Pour obtenir la dernière entrée d'un événement pour une personne, une table de hachage dans le plugin HiveMQ fonctionnerait correctement. Si le nombre de personnes différentes est très grand, je considérerais utiliser un cache comme Redis pour mettre en cache le dernier événement pour chaque personne.

Votre service publie des événements dans HiveMQ. Le plugin HiveMQ traite les événements entrants et met à jour votre base de données.

HiveMQ Plugin

Redis