0

J'ai le code suivant qui tente de joindre 2 tables cassandra dans spark.Valeur null invalide pour la partie de clé de partition url

val imageKeywords = sc.cassandraTable[ImageMetadata]("images", "metadata") 
val imageAndPageKeywords = imageKeywords 
    .joinWithCassandraTable[PagesMetadata]("pages2", "metadata") 
    .on(SomeColumns("tid", "url" as "pu")) 

Les classes de cas j'utilise pour cartographier les données sont comme ci-dessous

case class ImageMetadata(tid: String, iu: String, pu: Option[String], 
mk: List[String], fk: List[String], ak: List[String], ipk: List[String], pk: List[String], ik: List[String], ck: List[String]) 

case class PagesMetadata(tid: String, url: String, pk: List[String], uk: List[String], hk: List[String], ok: List[String], tc: List[String]) 

Je reçois une erreur lorsque je tente de faire des opérations comme ci-dessous

imageAndPageKeywords.collect.toList.sortBy(_._1.tid).take(10).foreach(println) 

erreur stacktrace -

Causé par: com.datastax.driver.core.e xceptions.InvalidQueryException: valeur null non valide pour l'URL de la partie de clé de partition at com.datastax.driver.core.Responses $ Error.asException (Responses.java:103) at com.datastax.driver.core.DefaultResultSetFuture.onSet (DefaultResultSetFuture. java: 140) à com.datastax.driver.core.RequestHandler.setFinalResult (RequestHandler.java:293) à com.datastax.driver.core.RequestHandler.onSet (RequestHandler.java:455) à com.datastax. driver.core.Connection $ Dispatcher.messageReceived (Connection.java:734) à org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream (SimpleChannelUpstreamHandler.java:70) à org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler. handleUpstream (IdleStateAwareChannelUpstreamHandler.java:36) à ou g.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:564) à org.jboss.netty.channel.DefaultChannelPipeline $ DefaultChannelHandlerContext.sendUpstream (DefaultChannelPipeline.java:791) à org.jboss.netty.handler. timeout.IdleStateHandler.messageReceived (IdleStateHandler.java:294) à org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream (SimpleChannelUpstreamHandler.java:70) à org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java: 564) à org.jboss.netty.channel.DefaultChannelPipeline $ DefaultChannelHandlerContext.sendUpstream (DefaultChannelPipeline.java:791) à org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:296) à org.jboss. netty.handler.codec.oneone.OneToOneDecod er.handleUpstream (OneToOneDecoder.java:70) à org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:564) à org.jboss.netty.channel.DefaultChannelPipeline $ DefaultChannelHandlerContext.sendUpstream (DefaultChannelPipeline.java: 791) à org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:296) à org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived (FrameDecoder.java:462) à org. jboss.netty.handler.codec.frame.FrameDecoder.callDecode (FrameDecoder.java:443) à org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived (FrameDecoder.java:303) à org.jboss. netty.channel.SimpleChannelUpstreamHandler.handleUpstream (SimpleChannelUpstreamHandler.java:70) à org. jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:564) à org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:559) à org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:268) à org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:255) à org.jboss.netty.channel.socket.nio.NioWorker.read (NioWorker.java:88) à org.jboss.netty.channel.socket.nio.AbstractNioWorker.process (AbstractNioWorker.java:108) à org.jboss.netty.channel.socket.nio.AbstractNioSelector.run (AbstractNioSelector.java: 318) à org.jboss.netty.channel.socket.nio.AbstractNioWorker.run (AbstractNioWorker.java:89) à org.jboss.netty.channel.socket.nio.NioWorker.run (NioWorker.java: 178) à org.jboss.netty.util.ThreadRenamingRunnable.run (ThreadRenamingRunnable.java:108) à org.jboss.netty.util.internal.DeadLockProofWorker $ 1.run (DeadLockProofWorker.java:42) ... 3 plus

Répondre

2

simple, l'exception vous dit qu'il ne peut pas effectuer la jointure parce que la colonne utilisée pour rejoindre ImageMetadata avec PagesMetadata sont nuls.

Dans votre cas, certaines valeurs dans ImageMetadataurl (pu) sont nuls.

Ce qui est étrange est que vous définissez le PagesMetadata avec url annulable (Option [String]) et il semble que cela fait partie de la solution clé

Un primaire de la table pour le faire fonctionner serait :

val imageAndPageKeywords = imageKeywords 
    .filter(im -> im.pu.isDefined) 
    .joinWithCassandraTable[PagesMetadata]("pages2", "metadata") 
    .on(SomeColumns("tid", "url" as "pu")) 
+0

Y a-t-il un moyen de faire fonctionner la jointure? J'essayais de voir si Option (String) pour url résoudrait mon problème. – l0n3r4ng3r

+0

Voir les photos rédigées – doanduyhai

+0

Voici ce que je fais en gardant votre suggestion à l'esprit. 'val imageKeywords = sc.cassandraTable [ImageMetadata] ("images", "méta-données")' ' val pageKeywordsByTidUrl = imageKeywords.joinWithCassandraTable [PagesMetadata] ("pages2", "méta-données") .sur (SomeColumns (« TID », "url" comme "pu")) .filter (f => f._1.pu.isDefined) .keyBy {x => Joinkey (x._1.tid, x._1.iu)}' Mais toujours la même erreur – l0n3r4ng3r