0

Lorsque j'ai implémenté mon propre partitionneur et essayé de mélanger le rdd d'origine, je rencontre un problème. Je sais cela est causé par référence des fonctions qui ne sont pas Serializable, mais après avoir ajoutéorg.apache.spark.SparkException: Tâche non sérialisable, wh

Prolonge Serializable

à toutes les classes relevent ce problème existe toujours. Que devrais-je faire?

Exception dans le thread "principal" org.apache.spark.SparkException: Tâche non sérialisable à org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) à org.apache.spark .util.ClosureCleaner $ .Nettoyer (ClosureCleaner.scala: 158) à org.apache.spark.SparkContext.clean (SparkContext.scala: 1622)

object STRPartitioner extends Serializable{ 
    def apply(expectedParNum: Int, 
     sampleRate: Double, 
     originRdd: RDD[Vertex]): Unit= { 
    val bound = computeBound(originRdd) 
    val rdd = originRdd.mapPartitions(
     iter => iter.map(row => { 
     val cp = row 
     (cp.coordinate, cp.copy()) 
     } 
    ) 
    ) 
    val partitioner = new STRPartitioner(expectedParNum, sampleRate, bound, rdd) 
    val shuffled = new ShuffledRDD[Coordinate, Vertex, Vertex](rdd, partitioner) 
    shuffled.setSerializer(new KryoSerializer(new SparkConf(false))) 
    val result = shuffled.collect() 
    } 

class STRPartitioner(expectedParNum: Int, 
        sampleRate: Double, 
        bound: MBR, 
        rdd: RDD[_ <: Product2[Coordinate, Vertex]]) 
    extends Partitioner with Serializable { 
    ... 
} 

Répondre

0

je résous tout le problème! Ajoutez -Dsun.io.serialization.extendedDebugInfo = true à votre config VM, vous allez cibler la classe unserializable!