2017-09-07 3 views
0

J'ai une application de base scala akka http CRUD. Voir ci-dessous pour les classes pertinentes. J'aimerais simplement écrire un identifiant d'entité et des données (comme json) dans un sujet Kafka chaque fois qu'une entité est créée/mise à jour, par exemple.Comment intégrer les flux akka kafka (réactif-kafka) dans l'application akka http?

Je regarde http://doc.akka.io/docs/akka-stream-kafka/current/producer.html, mais je suis nouveau à scala et akka, et ne savez pas comment l'intégrer dans mon application? Par exemple, d'après les documents ci-dessus, ceci est l'exemple d'un producteur écrivant à Kafka, donc je pense que j'ai besoin de quelque chose de similaire, mais où se trouve ma demande? Puis-je ajouter un autre appel de carte dans la méthode create de mon service après avoir créé l'utilisateur?

Merci beaucoup!

val done = Source(1 to 100) 
    .map(_.toString) 
    .map { elem => 
    new ProducerRecord[Array[Byte], String]("topic1", elem) 
    } 
    .runWith(Producer.plainSink(producerSettings)) 

Ou dois-je faire quelque chose comme l'exemple ici https://github.com/hseeberger/accessus dans la méthode bindAndHandle() dans mon Server.scala?

WebServer.scala

object System { 

    implicit val system = ActorSystem() 
    implicit val dispatcher = system.dispatcher 
    implicit val actorMaterializer = ActorMaterializer() 

} 

object WebServer extends App { 

    import System._ 

    val config = new ApplicationConfig() with ConfigLoader 
    ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig)) 
    val injector = Guice.createInjector(new MyAppModule(config)) 
    val routes = injector.getInstance(classOf[Router]).routes 
    Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port) 

} 

Router.scala

def routes(): Route = { 
    post { 
     entity(as[User]) { user => 
     val createUser = userService.create(user) 
     onSuccess(createUser) { 
      case Invalid(y: NonEmptyList[Err]) => { 
      throw new ValidationException(y) 
      } 
      case Valid(u: User) => { 
       complete(ToResponseMarshallable((StatusCodes.Created, u))) 
      } 
     } 
     } 
    } ~ 
    // More routes here, left out for example 
} 

Service.scala

def create(user: User): Future[MaybeValid[User]] = { 
    for { 
     validating <- userValidation.validateCreate(user) 
     result <- validating match { 
     case Valid(x: User) => 
      userRepo.create(x) 
      .map(dbUser => Valid(UserConverters.fromUserRow(x))) 
     case y: DefInvalid => 
      Future{y} 
     } 
    } yield result 
    } 

Repo.scala

def create(user: User): Future[User] = { 
    mutateDbProvider.db.run(
     userTable returning userTable.map(_.userId) 
     into ((user, id) => user.copy(userId = id)) += 
     user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now()))) 
    ) 
    } 
+0

Pour être clair sur ce que vous voulez: vous êtes à la recherche d'envoyer le 'user.userId' à chaque fois KAFKA' create' retourne utilisateur valide? –

+0

Oui, et peut-être d'autres informations comme "entité": "Utilisateur" – Rory

Répondre

1

Depuis que vous avez écrit votre Route à unmarshall juste 1 User du Entity Je ne pense pas que vous avez besoin de Producer.plainSink. Au contraire, je pense que Producer.send fonctionnera aussi bien. Aussi, en note, lancer des exceptions n'est pas un scala "idiomatique". Alors j'ai changé la logique pour l'utilisateur invalide:

val producer : KafkaProducer = new KafkaProducer(producerSettings) 

val routes : Route = 
    post { 
    entity(as[User]) { user => 
     val createUser = userService.create(user) 
     onSuccess(createUser) { 
     case Invalid(y: NonEmptyList[Err]) => 
      complete(BadRequest -> "invalid user") 
     case Valid(u: User) => { 
      val producerRecord = 
      new ProducerRecord[Array[Byte], String]("topic1",s"""{"userId" : ${u.userId}, "entity" : "User"}""") 

      onComplete(producer send producerRecord) { _ => 
      complete(ToResponseMarshallable((StatusCodes.Created, u))) 
      } 
     } 
     } 
    } 
    } 
+0

@Rory Vous êtes les bienvenus. Je ne connais pas de kit de test similaire à l'exemple que vous avez fourni. –

+0

Voyez-vous des problèmes avec * pas * en utilisant onComplete pour appeler 'producer send producerRecord' (donc nous n'attendons pas que l'envoi à kafka se termine avant d'envoyer la réponse de l'API HTTP à l'utilisateur)? Fondamentalement juste faire ceci: producteur envoyer producteurRecord; complete (ToResponseMarshallable ((StatusCodes.Created, u))) – Rory

+0

@Rory En n'utilisant pas onComplete, il devient possible que le client reçoive un code de réponse 'Created' mais le message n'envoie pas. Si cet état bifurqué est correct, il n'y a pas de problèmes, mais le modèle habituel consiste à renvoyer une réponse valide seulement après que l'enregistrement a été soumis. C'est un choix de conception: voulez-vous des réponses rapides où le code donné peut être incorrect, ou voulez-vous attendre un code validé? –