2017-07-09 1 views
2

J'ai un système utilisant Akka qui gère actuellement les données de flux entrant sur les files d'attente de messages. Lorsqu'un enregistrement arrive, il est traité, mq est acquitté et l'enregistrement est transmis pour une manipulation ultérieure dans le système.Enregistrement de flux à partir de DataBase à l'aide d'Akka Stream

Maintenant, je voudrais ajouter le support pour l'utilisation des DB en entrée.
Quel serait le chemin à parcourir pour que la source d'entrée puisse gérer la DB (devrait-elle diffuser dans des enregistrements> 100M à la vitesse que le récepteur peut gérer - donc je suppose que les flux réactifs/akka?)?

Répondre

2

Slick

Slick streaming comment cela se fait habituellement.

Extension de la documentation lisse un peu pour inclure akka flux:

//SELECT Name from Coffees 
val q = for (c <- coffees) yield c.name 

val action = q.result 

type Name = String 

val databasePublisher : DatabasePublisher[Name] = db stream action 

import akka.stream.scaladsl.Source 

val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher 

Maintenant akkaSourceFromSlick est comme tout autre flux akka Source.

ResultSet

Il est également possible d'utiliser une ResultSet simple, sans marée noire, comme le "moteur" pour un flux de Akka.

d'abord créer le ResultSet en utilisant des techniques standard jdbc:

import java.sql._ 

val statement : Statement = ??? 

val resultSet : ResultSet = statement executeQuery "SELECT Name from Coffees" 

Maintenant, nous pouvons construire un Iterator de la ResultSet qui itérer les lignes et récupérer la colonne "Name":

val nameIterator : (ResultSet) =>() => Iterator[Name] = 
    (resultSet) =>() => 
    Try { 
     resultSet.beforeFirst() 
    } 
    match { 
     case Success(_) => 
     new Iterator[Try[Name]] { 
      override def hasNext : Boolean = resultSet.next 
      override def next() : Try[Name] = Try{resultSet getString "Name"} 
     } flatMap (_.toOption) 
     case Failure(_) => Iterator.empty 
    } 

Cette Iterator peut maintenant alimenter un Source:

+0

Merci - cela semble bon! –

+0

@EvanM. Vous êtes les bienvenus, joyeux piratage. –