2017-09-25 3 views
1

J'ai une hiérarchie de canal Kafka que je me sers dans mon projet:Scala Trait incompatibilité de type

Mon trait de base est:

trait SendChannel[A, B] extends CommunicationChannel { 
    def send(data:A): B 
} 

Maintenant, j'ai une kafka commune envoyer canal comme

trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] { 
val channelProps: KafkaSendChannelProperties 
val kafkaProducer: Producer[String, B] 
override def close(): Unit = kafkaProducer.close() 
} 

Maintenant il y a 2 variantes de CommanKafkaSendChannel, l'une est avec callback et l'autre avec Future:

trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] { 
override def send(data: A): Future[RecordMetadata] = Future { 
    kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get 
} 
} 

définition KafkaSendChannelWithCallback:

object KafkaSendChannelWithCallback { 

def apply[A, B](oChannelProps: KafkaSendChannelProperties, 
       oKafkaProducer: Producer[String, B], 
       oCallback: Callback): KafkaSendChannelWithCallback[A, B] = 
new KafkaSendChannelWithCallback[A,B] { 
    override val channelProps: KafkaSendChannelProperties = oChannelProps 
    override val kafkaProducer: Producer[String, B] = oKafkaProducer 
    override val callback: Callback = oCallback 
} 
} 

trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] { 

    val callback: Callback 

override def send(data: A): Unit = 
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback) 
} 

maintenant basé sur la valeur de configuration que je sélectionne le bon type de canal sur le temps d'exécution comme ci-dessous. Je crée acteur avec type de canal qui envoie les données à kafka:

val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
    error => { 
     logger.error("Exception while instantiating the KafkaSendChannel") 
     throw error 
    }, 
    success => success 
) 

actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME) 

définition Acteur:

object IngestionRouterActor { 
    def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props = 
Props(classOf[IngestionActor[V]], config, sendChannel) 
} 

Le problème est quand j'utilise KafkaSendChannelWithCallback mon code compile correctement mais quand j'utilise KafkaSendChannelWithFuture il me donne l'erreur ci-dessous sur actor = déclaration:

[erreur] IngestionActor.scala: 32: type de modèle est incompatible avec le type attendu; [error] trouvé: KafkaSendChannelWithFuture [String, V] [error] requis: SendChannel [V, Unité]

Comme les deux définitions de canal sont étendues à partir SendChannel, ce code aurait dû compilé sans aucune erreur. Je ne suis pas sûr de savoir pourquoi il ne compile pas. Merci

Répondre

1

Le Props pour IngestionActor prend un SendChannel[V, Unit]. Passer un KafkaSendChannelWithCallback à cet argument fonctionne parce que c'est un SendChannel[V, Unit]. Par contre, KafkaSendChannelWithFuture est un SendChannel[V, Future[RecordMetadata]]. Un SendChannel[V, Future[RecordMetadata]] est et non un SendChannel[V, Unit].

Une option consiste à redéfinir le Props de prendre une SendChannel[V, Any], puisque Any est un supertype des deux Unit et Future:

def props[V](config: Config, sendChannel: SendChannel[V, Any]): Props = ??? 

À ce stade, le compilateur est toujours malheureux parce SendChannel, étant un type générique , est invariant par défaut. En d'autres termes, ni SendChannel[V, Unit] ni SendChannel[V, Future[RecordMetadata]] sont de type SendChannel[V, Any].

Pour changer cela, faire SendChannel covariant sur le deuxième paramètre de type (en ajoutant un + devant la B):

trait SendChannel[A, +B] extends CommunicationChannel { 
    def send(data: A): B 
} 
+0

Salut @chunjef, merci pour votre réponse. Vous avez mentionné 'puisque Any est un supertype de Unit et Future' qui est essayer comme' Any' est supertype mais pourquoi 'ni SendChannel [V, Unit] ni SendChannel [V, Future [RecordMetadata]] sont de type SendChannel [V, Tout] 'c'est faux? – Explorer

+0

J'ai fait le changement que vous avez suggéré, y at-il un changement nécessaire dans le 'props' qui a toujours' SendChannel [V, Unit] 'parce que je reçois toujours la même erreur. – Explorer

+0

@Explorer: S'il vous plaît lire ma réponse plus attentivement. Je réponds à vos deux commentaires dans la réponse. – chunjef