2016-06-14 1 views
1

Je cherchais tri secondaire en utilisant Spark et trouvé cette solution:Comment effectuer un tri secondaire dans Spark?

case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double) 
class RFMCPartitioner(partitions: Int) extends Partitioner { 
    require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.") 
    override def numPartitions: Int = partitions 
    override def getPartition(key: Any): Int = { 
    val k = key.asInstanceOf[RFMCKey] 
    k.cId.hashCode() % numPartitions 
    } 
} 
object RFMCKey { 
    implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = { 
    Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1)) 
    } 
} 

Maintenant, c'est le code que je me sers pour mon RFMC (récence, de fréquence monétaire, clumpiness) programme. Dans le même code, à la fin, je suis en train de faire:

val rfmcTableSorted = rfmcTable.repartitionAndSortWithinPartitions(new RFMCPartitioner(1)) 

Mais quand je charge ce fichier dans spark-shell, je reçois l'erreur suivante:

<console>:130: error: RFMCKey is already defined as (compiler-generated) case class companion object RFMCKey 
      object RFMCKey { 
        ^
<console>:198: error: RFMCKey.type does not take parameters 
           case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal) 
                              ^
<console>:200: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Nothing] 
val rfmcTableSorted = rfmcTable.repartitionAndSortWithinPartitions(new RFMCPartitioner(1)).cache() 

Comment puis-je contourner cette question?

Update 1

J'ai essayé de changer l'ordre de la déclaration de ma classe affaire et classe d'objets et de façon surprenante le shell chargé le fichier sans lancer des erreurs. Mais quand je courais mon programme, il a lancé une nouvelle erreur:

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) 
at org.apache.spark.rdd.RDD.map(RDD.scala:286) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$.constructRFMC(<console>:113) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) 
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) 
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51) 
at $iwC$$iwC$$iwC.<init>(<console>:53) 
at $iwC$$iwC.<init>(<console>:55) 
at $iwC.<init>(<console>:57) 
at <init>(<console>:59) 
at .<init>(<console>:63) 
at .<clinit>(<console>) 
at .<init>(<console>:7) 
at .<clinit>(<console>) 
at $print(<console>) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) 
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) 
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) 
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) 
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) 
at org.apache.spark.repl.Main$.main(Main.scala:31) 
at org.apache.spark.repl.Main.main(Main.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$ 
Serialization stack: 
    - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$, value: [email protected]) 
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$$anonfun$17, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$) 
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$$anonfun$17, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
    ... 52 more 

Mise à jour 2

La façon dont je suis en train de définir mes objets et des fonctions est comme ceci:

object rfmc { 
    def constructrfmc() = { 
    // Everything goes inside including the custom key and partitioner 
    // code defined above 
    } 
} 

Mise à jour 3

La façon dont je définis mon code dans eclipse qui fonctionne parfaitement est:

object rfmc extends App { 
    // Everything goes inside including the custom key and partitioner 
    // code defined above 
} 

J'ai aussi créé un fichier JAR pour ce code et lancé en utilisant spark-submit et cela a parfaitement fonctionné.

+0

Peut-être que votre problème est lié à ceci: https://issues.scala-lang.org/browse/SI-3772. Peut-être essayer de jouer avec l'ordre de déclaration de votre classe et de votre objet. –

+0

Je n'ai pas le temps de répondre, mais avez-vous jeté un oeil à ceci: http://codingjunkie.net/spark-secondary-sort/ – Vale

+0

@Vale J'ai effectivement fait la même chose. –

Répondre

1

Pour résoudre le problème que RFMCKey est déjà défini, vous devez permuter l'ordre de votre classe de cas et de la déclaration d'objet comme expliqué dans this issue. En ce qui concerne vos mises à jour, il peut y avoir quelques limitations dans le spark-shell qui ne peuvent pas laisser exécuter du code arbitraire (comme avec des accumulateurs). Pour obtenir plus d'informations sur le mécanisme de sérialisation, vous devez passer l'option suivante -Dsun.io.serialization.extendedDebugInfo=true. Rappelez-vous que l'étincelle est plus un utilitaire exploratoire pour tester itérativement de petites portions de code ou de nouvelles fonctionnalités grâce au REPL, et non pas un utilitaire prêt à la production qui devrait être largement utilisé pour tester votre code.

Votre option la plus sûre ici est de mettre votre application dans un bocal et de configurer Spark en standalone mode, puis de lancer spark-submit avec votre bocal. Comme indiqué dans les mises à jour 3 et 4 de votre publication, vous devez mettre à jour votre code pour l'intégrer dans un objet afin qu'il soit le point d'entrée de votre travail. Cela vous permettra de vous assurer que votre code n'est pas en cause ici.