J'ai un flux de charge important de données d'utilisateurs. Je veux déterminer si c'est un nouvel utilisateur par son identifiant. Afin de réduire les appels à la base de données je maintiens plutôt un état dans la mémoire des utilisateurs précédents.Maintien de l'état dans un flux
val users = mutable.set[String]()
//init the state from db
user = db.getAllUsersIds()
val source: Source[User, NotUsed]
val dbSink: Sink[User, NotUsed] //goes to db
//if the user is added to the set it will return true
val usersFilter = Flow[User].filter(user => users.add(user.id))
maintenant je peux créer un graphique
source ~> usersFilter ~> dbSink
mon problème est que l'état mutable est partagé et dangereux. Y at-il une option pour maintenir l'état dans le flux?
Il y a un défaut majeur dans votre suggestion. Il faudra beaucoup de demandes à la db, ce qui signifie beaucoup de io. Je le maintiens plutôt dans la mémoire (je mettrai à jour mon article) – igx
Au moins dans mon cas je fais seulement 1 appel à la DB et charge tous les identifiants existants dans une seule requête dans une liste. puis recherchez simplement la liste (ou la carte) dans le flux lors de l'exécution. –
pensez-vous que quelque chose comme ça va fonctionner? par exemple 'def = {alreadyExists val alreadyExistingIds = mutable.set [chaîne] (init de db) Flow [utilisateur] .filterNot (user => alreadyExistingIds.add (user.id)) }' – igx