J'ai un flux akka d'un socket web comme akka stream consume web socket et je voudrais construire une scène graphique réutilisable (inlet
: le flux, FlowShape
: ajouter un champ supplémentaire à l'origine en spécifiant JSON-à-direakka stade graphique personnalisé flux
{
...,
"origin":"blockchain.info"
}
et un outlet
à kafka
je faire face aux problèmes suivants: 3.
- incapable de envelopper la tête autour de la création une coutume
Inlet
du flux WebSocket - incapable d'intégrer kafka directement dans le flux (voir le code ci-dessous)
- pas sûr serait nécessaire transformateur pour ajouter le champ supplémentaire désérialiser le JSON d'ajouter l'origine
le projet d'échantillonnage (débit uniquement) ressemble à:
import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val incoming: Sink[Message, Future[Done]] =
Flow[Message].mapAsync(4) {
case message: TextMessage.Strict =>
println(message.text)
Future.successful(Done)
case message: TextMessage.Streamed =>
message.textStream.runForeach(println)
case message: BinaryMessage =>
message.dataStream.runWith(Sink.ignore)
}.toMat(Sink.last)(Keep.right)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
// TODO not working integrating kafka here
// .map(_.toString)
// .map { elem =>
// println(s"PlainSinkProducer produce: ${elem}")
// new ProducerRecord[Array[Byte], String]("topic1", elem)
// }
// .runWith(Producer.plainSink(producerSettings))
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
system.terminate
}
}
// kafka that works/writes dummy data
val done1 = Source(1 to 100)
.map(_.toString)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))
Cela fonctionne très bien comme une entrée! Mais comment puis-je 1) créer un transformateur qui ajoute «origine»: «blockchain.info» 'un champ et 2) créer une étape de graphique à partir de celui-ci? (ici un exemple de code exécutable https://github.com/geoHeil/akkaStreamsIngest) –
1) il suffit d'ajouter un flux après 'incoming' qui ajoute ce que vous voulez au message (voir le code modifié) 2) que voulez-vous créer une étape graphique à partir de? –