J'ai une application de base scala akka http CRUD. Voir ci-dessous pour les classes pertinentes. J'aimerais simplement écrire un identifiant d'entité et des données (comme json) dans un sujet Kafka chaque fois qu'une entité est créée/mise à jour, par exemple.Comment intégrer les flux akka kafka (réactif-kafka) dans l'application akka http?
Je regarde http://doc.akka.io/docs/akka-stream-kafka/current/producer.html, mais je suis nouveau à scala et akka, et ne savez pas comment l'intégrer dans mon application? Par exemple, d'après les documents ci-dessus, ceci est l'exemple d'un producteur écrivant à Kafka, donc je pense que j'ai besoin de quelque chose de similaire, mais où se trouve ma demande? Puis-je ajouter un autre appel de carte dans la méthode create de mon service après avoir créé l'utilisateur?
Merci beaucoup!
val done = Source(1 to 100)
.map(_.toString)
.map { elem =>
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))
Ou dois-je faire quelque chose comme l'exemple ici https://github.com/hseeberger/accessus dans la méthode bindAndHandle() dans mon Server.scala?
WebServer.scala
object System {
implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val actorMaterializer = ActorMaterializer()
}
object WebServer extends App {
import System._
val config = new ApplicationConfig() with ConfigLoader
ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig))
val injector = Guice.createInjector(new MyAppModule(config))
val routes = injector.getInstance(classOf[Router]).routes
Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port)
}
Router.scala
def routes(): Route = {
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) => {
throw new ValidationException(y)
}
case Valid(u: User) => {
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
} ~
// More routes here, left out for example
}
Service.scala
def create(user: User): Future[MaybeValid[User]] = {
for {
validating <- userValidation.validateCreate(user)
result <- validating match {
case Valid(x: User) =>
userRepo.create(x)
.map(dbUser => Valid(UserConverters.fromUserRow(x)))
case y: DefInvalid =>
Future{y}
}
} yield result
}
Repo.scala
def create(user: User): Future[User] = {
mutateDbProvider.db.run(
userTable returning userTable.map(_.userId)
into ((user, id) => user.copy(userId = id)) +=
user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now())))
)
}
Pour être clair sur ce que vous voulez: vous êtes à la recherche d'envoyer le 'user.userId' à chaque fois KAFKA' create' retourne utilisateur valide? –
Oui, et peut-être d'autres informations comme "entité": "Utilisateur" – Rory