2017-08-23 1 views
0

J'ai commencé à jouer autour de scala et je suis arrivé à cette page d'accueil en particulier de la chatroom de socket web en scala.Scala & Play Websockets: Stockage des messages échangés

Ils utilisent MessageHub.source() et BroadcastHub.sink() comme Source et Sink pour envoyer les messages à tous les clients connectés.

L'exemple fonctionne correctement pour échanger des messages tels quels. Je souhaite stocker les messages échangés dans le salon de discussion dans un DB.

J'ai essayé d'ajouter des fonctions map et fold à la source et à la mémoire pour récupérer les messages envoyés mais je n'ai pas pu.

J'ai essayé d'ajouter une étape d'écoulement entre MergeHub et BroadcastHub comme ci-dessous

val flow = Flow[WSMessage].map(element => println(s"Message: $element")) 
source.via(flow).toMat(sink)(Keep.both).run() 

Mais il génère une erreur de compilation qui ne peut pas faire référence à toMat avec une telle signature. Est-ce que quelqu'un peut m'aider ou me signaler comment puis-je obtenir des messages qui sont envoyés et les stocker dans la base de données.?

Lien pour le modèle complet:

https://github.com/playframework/play-scala-chatroom-example

+0

S'il vous plaît ajouter un code (prototype) pour montrer comment vous avez essayé d'ajouter une étape de flux. Montre également la définition de l'étape de flux en prenant soin d'enregistrer les données dans la base de données. –

+0

@FredericA. Ajouté le code où j'ai essayé d'ajouter l'étape de flux. Je peux me tromper comme je suis nouveau à ceci. – practice2perfect

+0

Avez-vous vraiment utilisé ce code pour le flux? Il est clairement faux et expliquerait l'erreur de compilation –

Répondre

1

Regardons votre flux:

val flow = Flow[WSMessage].map(element => println(s"Message: $element")) 

Il prend des éléments de type WSMessage, et ne retourne rien (Unit). Ici, il est à nouveau avec le bon type:

val flow: Flow[Unit] = Flow[WSMessage].map(element => println(s"Message: $element")) 

Cela ne fonctionnera pas clairement que l'évier attend WSMessage et non Unit.

Voici comment vous pouvez résoudre le problème ci-dessus:

val flow = Flow[WSMessage].map { element => 
    println(s"Message: $element") 
    element 
} 

Pas que pour les messages persistants dans la base de données, vous souhaiterez probablement utiliser une étape async, à peu près:

val flow = Flow[WSMessage].mapAsync(parallelism) { element => 
    println(s"Message: $element") 
    // assuming DB.write() returns a Future[Unit] 
    DB.write(element).map(_ => element) 
} 
+0

Merci @Fredric A. Cela m'aide à comprendre comment cela fonctionne. – practice2perfect

+0

J'ai encore un doute, dans l'exemple que j'ai donné, cela fonctionne comme envoyer les messages d'un client à d'autres clients. Si je veux diffuser un message qui est généré dynamiquement de temps en temps dans le serveur à tous les clients, comment le ferais-je? – practice2perfect

+0

Je pense que vous pouvez y parvenir en créant une nouvelle 'Source' (cette source est le serveur comme vous l'avez décrit). Maintenant, si vous comprenez que votre source 'source.via (flow)' est une source, vous avez maintenant 2 sources à fusionner. Vous pouvez par exemple faire 'source.via (flow) .combine (serverEventsSource) (Fusionner).toMat (sink) (Keep.both) .run() ' –