2017-10-16 4 views
0

J'ai un flux de charge important de données d'utilisateurs. Je veux déterminer si c'est un nouvel utilisateur par son identifiant. Afin de réduire les appels à la base de données je maintiens plutôt un état dans la mémoire des utilisateurs précédents.Maintien de l'état dans un flux

val users = mutable.set[String]() 
//init the state from db 
user = db.getAllUsersIds() 
val source: Source[User, NotUsed] 
val dbSink: Sink[User, NotUsed] //goes to db 
//if the user is added to the set it will return true 
val usersFilter = Flow[User].filter(user => users.add(user.id)) 

maintenant je peux créer un graphique

source ~> usersFilter ~> dbSink 

mon problème est que l'état mutable est partagé et dangereux. Y at-il une option pour maintenir l'état dans le flux?

Répondre

0

Il y a deux façons de procéder.

Si vous obtenez un flux d'enregistrements et que vous souhaitez dédoublonner le flux (car certains identifiants sont déjà traités). Vous pouvez le faire

http://janschulte.com/2016/03/08/deduplicate-akka-stream/

L'autre façon de le faire est via la base de données où lookups vous vérifiez si l'ID existe déjà.

val alreadyExists : Flow[User, NotUsed] = { 
    // build a cache of known ids 
    val knownIdList = ... // query database and get list of IDs 
    Flow[User].filterNot(user => knownIdList.contains(user.id)) 
} 
+0

Il y a un défaut majeur dans votre suggestion. Il faudra beaucoup de demandes à la db, ce qui signifie beaucoup de io. Je le maintiens plutôt dans la mémoire (je mettrai à jour mon article) – igx

+0

Au moins dans mon cas je fais seulement 1 appel à la DB et charge tous les identifiants existants dans une seule requête dans une liste. puis recherchez simplement la liste (ou la carte) dans le flux lors de l'exécution. –

+0

pensez-vous que quelque chose comme ça va fonctionner? par exemple 'def = {alreadyExists val alreadyExistingIds = mutable.set [chaîne] (init de db) Flow [utilisateur] .filterNot (user => alreadyExistingIds.add (user.id)) }' – igx