ContexteSpark TaskNotSerializable lorsque vous utilisez la fonction anonyme
Voilà ma situation: je suis en train de créer une classe qui filtre un RDD basé sur une caractéristique du contenu, mais cette fonction peut être différente dans différents scénarios de sorte Je voudrais paramétrer cela avec une fonction. Malheureusement, il semble que je rencontre des problèmes avec la façon dont Scala capture ses fermetures. Même si ma fonction est sérialisable, la classe ne l'est pas. De l'exemple dans le spark source on closure cleaning, il semble suggérer que ma situation ne peut pas être résolue, mais je suis convaincu qu'il y a un moyen d'accomplir ce que j'essaye de faire en créant la bonne fermeture (plus petite).
Mon code
class MyFilter(getFeature: Element => String, other: NonSerializable) {
def filter(rdd: RDD[Element]): RDD[Element] = {
// All my complicated logic I want to share
rdd.filter { elem => getFeature(elem) == "myTargetString" }
}
Exemple simplifié
class Foo(f: Int => Double, rdd: RDD[Int]) {
def go(data: RDD[Int]) = data.map(f)
}
val works = new Foo(_.toDouble, otherRdd)
works.go(myRdd).collect() // works
val myMap = Map(1 -> 10d)
val complicatedButSerializableFunc: Int => Double = x => myMap.getOrElse(x, 0)
val doesntWork = new Foo(complicatedButSerializableFunc, otherRdd)
doesntWork.go(myRdd).collect() // craps out
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: $iwC$$iwC$Foo
Serialization stack:
- object not serializable (class: $iwC$$iwC$Foo, value: [email protected])
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: foo, type: class $iwC$$iwC$Foo)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, [email protected])
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
// Even though
val out = new ObjectOutputStream(new FileOutputStream("test.obj"))
out.writeObject(complicatedButSerializableFunc) // works
Questions
- Pourquoi le premier exemple simpliee pas tenter de sérialiser tous les
Foo
mais le second fait? - Comment puis-je obtenir la référence à ma fonction sérialisable sans inclure une référence à
Foo
dans ma fermeture?
Utilisez-vous l'étincelle-shell pour tester votre code ou est-il dans une application d'allumage ? – eliasah
@eliasah c'est dans le spark-shell – Patrick
et qu'en est-il de votre version étincelle? – eliasah