J'ai une hiérarchie de canal Kafka que je me sers dans mon projet:Scala Trait incompatibilité de type
Mon trait de base est:
trait SendChannel[A, B] extends CommunicationChannel {
def send(data:A): B
}
Maintenant, j'ai une kafka commune envoyer canal comme
trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] {
val channelProps: KafkaSendChannelProperties
val kafkaProducer: Producer[String, B]
override def close(): Unit = kafkaProducer.close()
}
Maintenant il y a 2 variantes de CommanKafkaSendChannel, l'une est avec callback et l'autre avec Future:
trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] {
override def send(data: A): Future[RecordMetadata] = Future {
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get
}
}
définition KafkaSendChannelWithCallback
:
object KafkaSendChannelWithCallback {
def apply[A, B](oChannelProps: KafkaSendChannelProperties,
oKafkaProducer: Producer[String, B],
oCallback: Callback): KafkaSendChannelWithCallback[A, B] =
new KafkaSendChannelWithCallback[A,B] {
override val channelProps: KafkaSendChannelProperties = oChannelProps
override val kafkaProducer: Producer[String, B] = oKafkaProducer
override val callback: Callback = oCallback
}
}
trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] {
val callback: Callback
override def send(data: A): Unit =
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback)
}
maintenant basé sur la valeur de configuration que je sélectionne le bon type de canal sur le temps d'exécution comme ci-dessous. Je crée acteur avec type de canal qui envoie les données à kafka:
val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
error => {
logger.error("Exception while instantiating the KafkaSendChannel")
throw error
},
success => success
)
actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME)
définition Acteur:
object IngestionRouterActor {
def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props =
Props(classOf[IngestionActor[V]], config, sendChannel)
}
Le problème est quand j'utilise KafkaSendChannelWithCallback
mon code compile correctement mais quand j'utilise KafkaSendChannelWithFuture
il me donne l'erreur ci-dessous sur actor =
déclaration:
[erreur] IngestionActor.scala: 32: type de modèle est incompatible avec le type attendu; [error] trouvé: KafkaSendChannelWithFuture [String, V] [error] requis: SendChannel [V, Unité]
Comme les deux définitions de canal sont étendues à partir SendChannel
, ce code aurait dû compilé sans aucune erreur. Je ne suis pas sûr de savoir pourquoi il ne compile pas. Merci
Salut @chunjef, merci pour votre réponse. Vous avez mentionné 'puisque Any est un supertype de Unit et Future' qui est essayer comme' Any' est supertype mais pourquoi 'ni SendChannel [V, Unit] ni SendChannel [V, Future [RecordMetadata]] sont de type SendChannel [V, Tout] 'c'est faux? – Explorer
J'ai fait le changement que vous avez suggéré, y at-il un changement nécessaire dans le 'props' qui a toujours' SendChannel [V, Unit] 'parce que je reçois toujours la même erreur. – Explorer
@Explorer: S'il vous plaît lire ma réponse plus attentivement. Je réponds à vos deux commentaires dans la réponse. – chunjef