2017-06-25 1 views
1

J'ai une dataframe comme ceci:étincelle variable de diffusion scala ne fonctionne pas: tâche ne serializble

val temp = sc.parallelize(Seq(Array(43,53,266),Array(69,160,166),Array(266))) 
.toDF("value") 

Je veux sélectionner la ligne qui croise le tableau suivant:

val goodValue = Array(231, 266) 
val broadcastGood = sc.broadcast(goodValue) 
val containGood = udf((array:scala.collection.mutable.WrappedArray[Int]) => 
broadcastGood.value.intersect(array).size>0) 

Et quand je a essayé cette udf,

display(temp.filter(containGood(col("value")))) 

j'ai eu l'erreur infâme: Tâche non Serializable

La chose étrange est que cela marchait bien pour moi. Je ne sais pas ce qui a changé .. J'apprécierais vraiment de l'aide.

Éditer: En fait, le code ci-dessus devrait fonctionner normalement normalement, plus la variable de diffusion n'est pas nécessaire ici. Certains d'entre vous ont mentionné que "l'une des valeurs est dans la classe Scala qui n'est pas sérialisable", je suis d'accord que cela devrait être le problème, mais je ne suis pas sûr de savoir comment le résoudre ..

Voici les informations générales: J'utilise Allocation latent Dirichlet (LDA) pour effectuer une analyse thématique sur un corpus:

val ldaModel = lda.fit(dfVectorizer) 

dfVectorizer est la version vectorisée de mon jeu de données d'origine. Avec ce modèle LDA, je produis l'ensemble de données suivant:

val topic = ldaModel.describeTopics(50) //with three columns[topic:int, termIndices: array<Int>, termWeights: array<Double>] 
val interestTerms = Seq(1,2,3,4,5,6,7) 
val interestUDF = udf((terms:Seq[Int]) =>terms.filter(r=>interestTerms.contains(r))) 
val topicTmp = topic.withColumn("InterestTerm",interestUDF(col("termIndices"))) 
val sumVec = udf((terms: Seq[Int]) => terms.sum) 
val topicDF = topicTmp.select('topic,sumVec('InterestTerm).as('order)).sort('order.desc) 

Ainsi, la trame de données finale « topicDF » ressembler à ceci:

Topic | Order 

111 | 7 

69 | 7 

248 | 5 

...... 

Cependant, si je suis en train d'effectuer un simple filtre comme celui-ci :

display(topicDF.filter("order>3")) 

Il me donnera l'erreur "task not Serializable". Dans le message d'erreur, il indique très clairement que cela est « causé par »

java.io.NotSerializableException: org.apache.spark.mllib.clustering.DistributedLDAModel.

Le message d'erreur ressemble à ceci:

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2135) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840) 
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:371) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) 
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:133) 
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2807) 
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132) 
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132) 
at org.apache.spark.sql.Dataset$$anonfun$60.apply(Dataset.scala:2791) 
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87) 
at org.apache.spark.sql.execution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53) 
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) 
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2790) 
at org.apache.spark.sql.Dataset.head(Dataset.scala:2132) 
at org.apache.spark.sql.Dataset.take(Dataset.scala:2345) 
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:81) 
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:42) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:263) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:254) 
at scala.Option.map(Option.scala:145) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:254) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:228) 
at scala.Option.map(Option.scala:145) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal.getResultBuffer(ScalaDriverLocal.scala:228) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:209) 
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:230) 
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:211) 
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:173) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168) 
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39) 
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206) 
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39) 
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:211) 
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589) 
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589) 
at scala.util.Try$.apply(Try.scala:161) 
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584) 
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488) 
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391) 
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348) 
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.spark.mllib.clustering.DistributedLDAModel 
Serialization stack: 
- object not serializable (class: 
org.apache.spark.mllib.clustering.DistributedLDAModel, value: 
[email protected]) 
- writeObject data (class: scala.collection.mutable.HashMap) 
- object (class scala.collection.mutable.HashMap, Map(lda_1da3e45afeaa__subsamplingRate -> 0.05, lda_1da3e45afeaa__k -> 320, lda_1da3e45afeaa__keepLastCheckpoint -> true, lda_1da3e45afeaa__maxIter -> 100, lda_1da3e45afeaa__optimizer -> em, lda_1da3e45afeaa__optimizeDocConcentration -> true, lda_1da3e45afeaa__learningDecay -> 0.51, lda_1da3e45afeaa__topicConcentration -> 1.1, lda_1da3e45afeaa__learningOffset -> 1024.0, lda_1da3e45afeaa__checkpointInterval -> 10, lda_1da3e45afeaa__featuresCol -> features, lda_1da3e45afeaa__seed -> 12345, lda_1da3e45afeaa__docConcentration -> [[email protected], lda_1da3e45afeaa__topicDistributionCol -> topicDistribution)) 
- field (class: org.apache.spark.ml.param.ParamMap, name: org$apache$spark$ml$param$ParamMap$$map, type: interface scala.collection.mutable.Map) 

Merci beaucoup!

+0

quelle version étincelle utilisez-vous? – mtoto

+0

ce code fonctionne bien pour moi en utilisant Spark 2.1 –

+2

Je suppose que l'une des valeurs que vous avez définies se trouve dans la classe Scala qui n'est pas sérialisable. Pouvez-vous fournir un fichier Scala complet? –

Répondre

0

Probablement si vous utilisez ceci dans une classe et que la classe subit un processus de carte alors le retour de toute la carte devrait être sérialisé. Je suppose que vous n'avez pas sérialisé votre classe de mappeur. Essayez ceci avec votre classe dans laquelle vous avez défini votre fichier udf.

+0

J'ajoute quelques informations supplémentaires dans la description de la question. Comme vous pouvez le voir, le problème est avec spark mllib DistributedLDAModel. Je ne suis pas sûr de savoir comment contourner cela .. Merci! –