Je cherchais tri secondaire en utilisant Spark et trouvé cette solution:Comment effectuer un tri secondaire dans Spark?
case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[RFMCKey]
k.cId.hashCode() % numPartitions
}
}
object RFMCKey {
implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
}
}
Maintenant, c'est le code que je me sers pour mon RFMC (récence, de fréquence monétaire, clumpiness) programme. Dans le même code, à la fin, je suis en train de faire:
val rfmcTableSorted = rfmcTable.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))
Mais quand je charge ce fichier dans spark-shell
, je reçois l'erreur suivante:
<console>:130: error: RFMCKey is already defined as (compiler-generated) case class companion object RFMCKey
object RFMCKey {
^
<console>:198: error: RFMCKey.type does not take parameters
case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal)
^
<console>:200: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Nothing]
val rfmcTableSorted = rfmcTable.repartitionAndSortWithinPartitions(new RFMCPartitioner(1)).cache()
Comment puis-je contourner cette question?
Update 1
J'ai essayé de changer l'ordre de la déclaration de ma classe affaire et classe d'objets et de façon surprenante le shell chargé le fichier sans lancer des erreurs. Mais quand je courais mon programme, il a lancé une nouvelle erreur:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$.constructRFMC(<console>:113)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC.<init>(<console>:55)
at $iwC.<init>(<console>:57)
at <init>(<console>:59)
at .<init>(<console>:63)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$
Serialization stack:
- object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$, value: [email protected])
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$$anonfun$17, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$$anonfun$17, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 52 more
Mise à jour 2
La façon dont je suis en train de définir mes objets et des fonctions est comme ceci:
object rfmc {
def constructrfmc() = {
// Everything goes inside including the custom key and partitioner
// code defined above
}
}
Mise à jour 3
La façon dont je définis mon code dans eclipse qui fonctionne parfaitement est:
object rfmc extends App {
// Everything goes inside including the custom key and partitioner
// code defined above
}
J'ai aussi créé un fichier JAR pour ce code et lancé en utilisant spark-submit
et cela a parfaitement fonctionné.
Peut-être que votre problème est lié à ceci: https://issues.scala-lang.org/browse/SI-3772. Peut-être essayer de jouer avec l'ordre de déclaration de votre classe et de votre objet. –
Je n'ai pas le temps de répondre, mais avez-vous jeté un oeil à ceci: http://codingjunkie.net/spark-secondary-sort/ – Vale
@Vale J'ai effectivement fait la même chose. –