2017-09-20 1 views
0
val enableJdbcStreaming: (java.sql.Statement) => Unit = { statement ⇒ 
    if (statement.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl])) { 
     statement.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStreamingResults() 
    } 
} 

implicit val actorSystem = ActorSystem() 
implicit val actorMaterializer = ActorMaterializer() 
val config = ConfigFactory.load() 
val db = Database.forConfig("mysql") 
val query = Tables.Foo.map(r => (r.id, r.pid)).result 
val source = Source.fromPublisher[(Long, Option[Long])](db.stream(query.withStatementParameters(statementInit = enableJdbcStreaming))) 
val future = source.runForEach(x => println(x)) 
import actorSystem.distpatcher 
future.onComplete{ _ => 
    db.close() 
    actorSystem.terminate() 
} 
Await.result(future, Duration.Inf) 
Await.result(actorsystem.whenTerminated, Duration.Inf) 

dépendances de la bibliothèqueSlick 3 application MySQL en streaming se bloque juste ... (ne pas diffuser des données)

"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.11", 
"com.typesafe.slick" %% "slick" % "3.2.1", 
"com.typesafe.slick" %% "slick-hikaricp" % "3.2.1", 
"mysql" % "mysql-connector-java" % "5.1.44", 
"ch.qos.logback" % "logback-classic" % "1.2.3" 

config mysql

mysql { 
    profile = "slick.jdbc.MySQLProfile$" 
    dataSourceClass = "slick.jdbc.DatabaseUrlDataSource" 
    properties { 
    driver = "com.mysql.jdbc.Driver" 
    url = "jdbc:mysql://server:3306/db" 
    user = "user" 
    password = "password" 
    } 
    connectionTimeout = 300 
} 

Quand je lance ce code. il continue juste à imprimer ces lignes sur la console

[info] 19:20:31.647 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:21:01.649 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:21:31.656 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:22:01.661 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:22:31.668 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:23:01.674 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:23:31.680 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:24:01.687 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:24:31.693 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:25:01.699 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:25:31.706 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 
[info] 19:26:01.708 [mysql housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - mysql - Pool stats (total=21, active=1, idle=20, waiting=0) 

J'ai attendu longtemps mais il a continué à imprimer ces lignes.

Je m'attendais à ce que je commence immédiatement à lire la table rangée par rangée pour moi.

Répondre

0

J'ai résolu le problème moi-même. Je pense que je ne construisais pas le graphique correctement. J'ai réécrit mon graphique en utilisant RunnableGraph. et maintenant cela fonctionne parfaitement

val query = sql"select id from foo".as[Long] 
val publisher = db.stream(query.withStatementParameters(statementInit = enableJdbcStreaming)) 
val source = Source.fromPublisher[Long](publisher) 
val sink = Sink.foreach[Long]{x => println(x)} 
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit b => s => 
    import GraphDSL.Implicits._ 
    source ~> s.in 
    ClosedShape 
}) 
val future = graph.run() 
Await.result(future, Duration.Inf) 

Bien qu'il soit intéressant de savoir pourquoi l'approche précédente n'a pas fonctionné.