J'ai une dataframe comme ceci:étincelle variable de diffusion scala ne fonctionne pas: tâche ne serializble
val temp = sc.parallelize(Seq(Array(43,53,266),Array(69,160,166),Array(266)))
.toDF("value")
Je veux sélectionner la ligne qui croise le tableau suivant:
val goodValue = Array(231, 266)
val broadcastGood = sc.broadcast(goodValue)
val containGood = udf((array:scala.collection.mutable.WrappedArray[Int]) =>
broadcastGood.value.intersect(array).size>0)
Et quand je a essayé cette udf,
display(temp.filter(containGood(col("value"))))
j'ai eu l'erreur infâme: Tâche non Serializable
La chose étrange est que cela marchait bien pour moi. Je ne sais pas ce qui a changé .. J'apprécierais vraiment de l'aide.
Éditer: En fait, le code ci-dessus devrait fonctionner normalement normalement, plus la variable de diffusion n'est pas nécessaire ici. Certains d'entre vous ont mentionné que "l'une des valeurs est dans la classe Scala qui n'est pas sérialisable", je suis d'accord que cela devrait être le problème, mais je ne suis pas sûr de savoir comment le résoudre ..
Voici les informations générales: J'utilise Allocation latent Dirichlet (LDA) pour effectuer une analyse thématique sur un corpus:
val ldaModel = lda.fit(dfVectorizer)
dfVectorizer est la version vectorisée de mon jeu de données d'origine. Avec ce modèle LDA, je produis l'ensemble de données suivant:
val topic = ldaModel.describeTopics(50) //with three columns[topic:int, termIndices: array<Int>, termWeights: array<Double>]
val interestTerms = Seq(1,2,3,4,5,6,7)
val interestUDF = udf((terms:Seq[Int]) =>terms.filter(r=>interestTerms.contains(r)))
val topicTmp = topic.withColumn("InterestTerm",interestUDF(col("termIndices")))
val sumVec = udf((terms: Seq[Int]) => terms.sum)
val topicDF = topicTmp.select('topic,sumVec('InterestTerm).as('order)).sort('order.desc)
Ainsi, la trame de données finale « topicDF » ressembler à ceci:
Topic | Order
111 | 7
69 | 7
248 | 5
......
Cependant, si je suis en train d'effectuer un simple filtre comme celui-ci :
display(topicDF.filter("order>3"))
Il me donnera l'erreur "task not Serializable"
. Dans le message d'erreur, il indique très clairement que cela est « causé par »
java.io.NotSerializableException: org.apache.spark.mllib.clustering.DistributedLDAModel.
Le message d'erreur ressemble à ceci:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2135)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:133)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2807)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132)
at org.apache.spark.sql.Dataset$$anonfun$60.apply(Dataset.scala:2791)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87)
at org.apache.spark.sql.execution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2790)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2132)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2345)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:81)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:42)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:263)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:254)
at scala.Option.map(Option.scala:145)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:254)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:228)
at scala.Option.map(Option.scala:145)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.getResultBuffer(ScalaDriverLocal.scala:228)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:209)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:230)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:211)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:173)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:211)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
at scala.util.Try$.apply(Try.scala:161)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.mllib.clustering.DistributedLDAModel
Serialization stack:
- object not serializable (class:
org.apache.spark.mllib.clustering.DistributedLDAModel, value:
[email protected])
- writeObject data (class: scala.collection.mutable.HashMap)
- object (class scala.collection.mutable.HashMap, Map(lda_1da3e45afeaa__subsamplingRate -> 0.05, lda_1da3e45afeaa__k -> 320, lda_1da3e45afeaa__keepLastCheckpoint -> true, lda_1da3e45afeaa__maxIter -> 100, lda_1da3e45afeaa__optimizer -> em, lda_1da3e45afeaa__optimizeDocConcentration -> true, lda_1da3e45afeaa__learningDecay -> 0.51, lda_1da3e45afeaa__topicConcentration -> 1.1, lda_1da3e45afeaa__learningOffset -> 1024.0, lda_1da3e45afeaa__checkpointInterval -> 10, lda_1da3e45afeaa__featuresCol -> features, lda_1da3e45afeaa__seed -> 12345, lda_1da3e45afeaa__docConcentration -> [[email protected], lda_1da3e45afeaa__topicDistributionCol -> topicDistribution))
- field (class: org.apache.spark.ml.param.ParamMap, name: org$apache$spark$ml$param$ParamMap$$map, type: interface scala.collection.mutable.Map)
Merci beaucoup!
quelle version étincelle utilisez-vous? – mtoto
ce code fonctionne bien pour moi en utilisant Spark 2.1 –
Je suppose que l'une des valeurs que vous avez définies se trouve dans la classe Scala qui n'est pas sérialisable. Pouvez-vous fournir un fichier Scala complet? –