2017-05-02 9 views
0

J'ai un flux akka d'un socket web comme akka stream consume web socket et je voudrais construire une scène graphique réutilisable (inlet: le flux, FlowShape: ajouter un champ supplémentaire à l'origine en spécifiant JSON-à-direakka stade graphique personnalisé flux

{ 
..., 
"origin":"blockchain.info" 
} 

et un outlet à kafka

je faire face aux problèmes suivants: 3.

  • incapable de envelopper la tête autour de la création une coutume Inlet du flux WebSocket
  • incapable d'intégrer kafka directement dans le flux (voir le code ci-dessous)
  • pas sûr serait nécessaire transformateur pour ajouter le champ supplémentaire désérialiser le JSON d'ajouter l'origine

le projet d'échantillonnage (débit uniquement) ressemble à:

import system.dispatcher 
implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

val incoming: Sink[Message, Future[Done]] = 
    Flow[Message].mapAsync(4) { 
     case message: TextMessage.Strict => 
     println(message.text) 
     Future.successful(Done) 
     case message: TextMessage.Streamed => 
     message.textStream.runForeach(println) 
     case message: BinaryMessage => 
     message.dataStream.runWith(Sink.ignore) 
    }.toMat(Sink.last)(Keep.right) 

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
    .withBootstrapServers("localhost:9092") 

val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right) 

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv")) 

val ((completionPromise, upgradeResponse), closed) = 
    outgoing 
     .viaMat(webSocketFlow)(Keep.both) 
     .toMat(incoming)(Keep.both) 
     // TODO not working integrating kafka here 
     // .map(_.toString) 
     // .map { elem => 
     //  println(s"PlainSinkProducer produce: ${elem}") 
     //  new ProducerRecord[Array[Byte], String]("topic1", elem) 
     // } 
     // .runWith(Producer.plainSink(producerSettings)) 
     .run() 

val connected = upgradeResponse.flatMap { upgrade => 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
     Future.successful(Done) 
    } else { 
     throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
     system.terminate 
    } 
    } 

// kafka that works/writes dummy data 
val done1 = Source(1 to 100) 
    .map(_.toString) 
    .map { elem => 
     println(s"PlainSinkProducer produce: ${elem}") 
     new ProducerRecord[Array[Byte], String]("topic1", elem) 
    } 
    .runWith(Producer.plainSink(producerSettings)) 

Répondre

1

Une question est autour de la scène incoming, qui est modélisé comme un Sink. où il devrait être modélisé comme Flow. pour ensuite envoyer des messages à Kafka. Parce que les messages texte entrants peuvent être Streamed

vous pouvez utiliser flatMapMerge Combinator comme suit pour éviter la nécessité de stocker messages entiers (potentiellement importants) en mémoire:

val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) { 
    case msg: BinaryMessage => 
     msg.dataStream.runWith(Sink.ignore) 
     Future.successful(None) 
    case TextMessage.Streamed(src) => 
     src.runFold("")(_ + _).map { msg => Some(msg) } 
    }.collect { 
    case Some(msg) => msg 
    } 

À ce stade, vous avez quelque chose qui produit des chaînes, et peut être connecté à Kafka:

val addOrigin: Flow[String, String, NotUsed] = ??? 

    val ((completionPromise, upgradeResponse), closed) = 
    outgoing 
     .viaMat(webSocketFlow)(Keep.both) 
     .via(incoming) 
     .via(addOrigin) 
     .map { elem => 
     println(s"PlainSinkProducer produce: ${elem}") 
     new ProducerRecord[Array[Byte], String]("topic1", elem) 
     } 
     .toMat(Producer.plainSink(producerSettings))(Keep.both) 
     .run() 
+0

Cela fonctionne très bien comme une entrée! Mais comment puis-je 1) créer un transformateur qui ajoute «origine»: «blockchain.info» 'un champ et 2) créer une étape de graphique à partir de celui-ci? (ici un exemple de code exécutable https://github.com/geoHeil/akkaStreamsIngest) –

+0

1) il suffit d'ajouter un flux après 'incoming' qui ajoute ce que vous voulez au message (voir le code modifié) 2) que voulez-vous créer une étape graphique à partir de? –