2017-01-11 2 views
0

J'essaye d'accéder à un HashMap via une fonction dans Spark 2.0, mais si je parallélise la liste, elle échoue. Si je ne le fais pas, cela fonctionne, et si je n'utilise pas une classe de cas, cela fonctionne.Scala case class object 'clé non trouvée' lorsqu'utilisé comme clé HashMap dans Spark

Voici quelques exemples de code de ce que je suis en train de faire:

case class TestData(val s: String) 

def testKey(testData: TestData) { 
    println(f"Current Map: $myMap") 
    println(f"Key sent into function: $testData") 
    println("Key isn't found in Map:") 
    println(myMap(testData)) // fails here 
} 

val myList = sc.parallelize(List(TestData("foo"))) 
val myMap = Map(TestData("foo") -> "bar") 
myList.collect.foreach(testKey) // collect to see println 

Voici la sortie exacte:

Current Map: Map(TestData(foo) -> bar) 
Key sent into function: TestData(foo) 
Key isn't found in Map: 
java.util.NoSuchElementException: key not found: TestData(foo) 

Le code ci-dessus est similaire à ce que je suis en train de faire, sauf que la classe de cas est plus compliquée et que HashMap a des listes comme valeurs. Toujours dans l'exemple ci-dessus, j'utilise 'collect' pour que les instructions d'impression soient sorties. L'exemple donne toujours la même erreur sans collecte, mais pas d'impression.

Le hashCodes correspond déjà, mais j'ai essayé de redéfinir equals et hashCode pour la classe de cas, même problème. Ceci utilise des Databricks, donc je ne crois pas avoir accès à REPL ou spark-submit.

Répondre

0

Merci aux commentaires pour remarquer the similar question, qui est passé à la question de Spark, ce qui m'a amené à cette solution pour mon cas:

case class TestData(val s: String) { 
    override def equals(obj: Any) = obj.isInstanceOf[TestData] && obj.asInstanceOf[TestData].s == this.s 
} 

Redéfinition les égaux à inclure isInstanceOf résout le problème. Ce n'est peut-être pas la meilleure solution, mais c'est certainement la solution de contournement la plus simple.

0

Votre logique est cyclique & incorrect. Vous passez le même RDD à la carte & en appelant avec TestData. Mise à jour pour le rendre séquentiel comme ci-dessous:

case class TestData(val s: String) 

def testKey(testData: TestData) { 
    val myMap = Map(testData -> "bar") 
    println(f"Current Map: $myMap") 
    println(f"Key sent into function: $testData") 
    println("Key isn't found in Map:") 
    println(myMap(testData)) // fails here 
} 

val myList = sc.parallelize(List(TestData("foo"))) 
myList.collect.foreach(testKey) 

La sortie car il est:

Current Map: Map(TestData(foo) -> bar) 
Key sent into function: TestData(foo) 
Key isn't found in Map: 
bar 

J'espère que c'est ce que vous attendez ...

+0

ci-dessus était juste le code que j'ai écrit pour recréer le problème que j'avais. Dans mon code de production, je dois envoyer des clés différentes pour récupérer leurs valeurs, parfois elles correspondent, parfois non. Et je voulais le garder parallèle sur différents nœuds sans trop utiliser de données/réseau. Mais à cause du bug Spark, je n'ai jamais eu de match. J'ai dû remplacer les égales dans la classe de cas pour obtenir des correspondances. – TBhimdi

+0

Même dans ce cas, votre code est erroné. Vous passez un RDD et invocation avec TestData dans myMap. Et une mise à jour du code ci-dessus, définissez myMap juste après la déclaration TestData (au lieu de testKey()) pour éviter les déclarations répétitives. – KiranM