2015-10-01 1 views
4

ContexteSpark TaskNotSerializable lorsque vous utilisez la fonction anonyme

Voilà ma situation: je suis en train de créer une classe qui filtre un RDD basé sur une caractéristique du contenu, mais cette fonction peut être différente dans différents scénarios de sorte Je voudrais paramétrer cela avec une fonction. Malheureusement, il semble que je rencontre des problèmes avec la façon dont Scala capture ses fermetures. Même si ma fonction est sérialisable, la classe ne l'est pas. De l'exemple dans le spark source on closure cleaning, il semble suggérer que ma situation ne peut pas être résolue, mais je suis convaincu qu'il y a un moyen d'accomplir ce que j'essaye de faire en créant la bonne fermeture (plus petite).

Mon code

class MyFilter(getFeature: Element => String, other: NonSerializable) { 
    def filter(rdd: RDD[Element]): RDD[Element] = { 
    // All my complicated logic I want to share 
    rdd.filter { elem => getFeature(elem) == "myTargetString" }  
} 

Exemple simplifié

class Foo(f: Int => Double, rdd: RDD[Int]) { 
    def go(data: RDD[Int]) = data.map(f) 
} 

val works = new Foo(_.toDouble, otherRdd) 
works.go(myRdd).collect() // works 

val myMap = Map(1 -> 10d) 
val complicatedButSerializableFunc: Int => Double = x => myMap.getOrElse(x, 0) 
val doesntWork = new Foo(complicatedButSerializableFunc, otherRdd) 
doesntWork.go(myRdd).collect() // craps out 

org.apache.spark.SparkException: Task not serializable 
Caused by: java.io.NotSerializableException: $iwC$$iwC$Foo 
Serialization stack: 
    - object not serializable (class: $iwC$$iwC$Foo, value: [email protected]) 
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: foo, type: class $iwC$$iwC$Foo) 
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, [email protected]) 
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC) 
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>) 

// Even though 
val out = new ObjectOutputStream(new FileOutputStream("test.obj")) 
out.writeObject(complicatedButSerializableFunc) // works 

Questions

  1. Pourquoi le premier exemple simpliee pas tenter de sérialiser tous les Foo mais le second fait?
  2. Comment puis-je obtenir la référence à ma fonction sérialisable sans inclure une référence à Foo dans ma fermeture?
+0

Utilisez-vous l'étincelle-shell pour tester votre code ou est-il dans une application d'allumage ? – eliasah

+0

@eliasah c'est dans le spark-shell – Patrick

+0

et qu'en est-il de votre version étincelle? – eliasah

Répondre

8

J'ai trouvé la réponse avec l'aide de this article. Essentiellement, lors de la création de la fermeture pour une fonction donnée, Scala inclura l'objet entier pour tout champ complexe référencé (si quelqu'un a une bonne explication de pourquoi cela ne se produit pas dans le premier exemple simple, j'accepterai cette réponse). La solution est de passer la valeur sérialisable à une fonction différente de sorte que seule la référence minimale soit conservée, très similaire au paradigme ol 'javascript for-loop pour les écouteurs d'événements.

Exemple

def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed) 

class Foo(f: Int => Double, somethingNonserializable: RDD[String]) { 
def go(data: RDD[Int]) = enclose(f) { actualFunction => data.map(actualFunction) } 
} 

ou avec l'auto-exécution fonction anonyme JS style

def go(data: RDD[Int]) = ((actualFunction: Int => Double) => data.map(actualFunction))(f)