J'ai 2 fichiers avec des données différentes. Je suis en train de les lire dans 2 RDD diff & puis les convertir en Dataframe & insérer dans la ruche. J'étais capable de faire ce code normal. Mais l'étincelle a traité un calcul de RDD après l'autre. Donc, le 2ème attendait que le 1er se termine même si j'ai assez de ressources dans le cluster. J'ai appris que le calcul des RDD peut être parallélisé en utilisant des méthodes Async. Donc j'essaye foreachPartitionAsync. Mais il jette une erreur que je ne suis pas en mesure de déboguer davantage. Exemple de code:throw foreachPartitionAsync ne peut pas appeler des méthodes sur SparkContext arrêté
object asynccode {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("Parser")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt")
val test = ercs.map { k =>
var rc = method1(k._2, k._1).toSeq
rc
}
.flatMap(identity)
.foreachPartitionAsync { f =>
f.toSeq.toDF()
.write.insertInto("dbname.tablename1")
}
val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt")
val test2 = ercs2.map { k =>
var rs = method2(k._2, k._1)
rs
}
.flatMap(identity)
.foreachPartitionAsync(f => f.toSeq.toDF()
.write.insertInto("dbname.tablename2")
)
sc.stop()
}
def method1 = ???
def method2 = ???
}
Mais il renvoie le message d'erreur ci-dessous. Si je supprime foreachPartitionAsync du code, cela fonctionne très bien. Pas sûr de ce que je fais mal à propos de foreachPartitionAsync.
Échec de la sérialisation des tâches: java.lang.IllegalStateException: impossible d'appeler des méthodes sur un SparkContext arrêté.
MISE À JOUR: Merci pour votre suggestion. Je l'ai mis à jour comme ci-dessous. Mais maintenant, il ne fait rien du tout. Spark Web UI, je peux voir aucune étape est déclenchée (son vide). Aucune de mes tables n'est mise à jour aussi. Mais le travail est terminé sans erreur.
val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt")
val test = ercs.map { k =>
var rc = method1(k._2, k._1).toSeq
rc
}
.flatMap(identity)
toDF()
val f1 = Future(test.write.insertInto("dbname.tablename1"))
}
val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt")
val test2 = ercs2.map { k =>
var rs = method2(k._2, k._1)
rs
}
.flatMap(identity)
toSeq.toDF()
val f2 = Future(test2.write.insertInto("dbname.tablename2"))
)
Future.sequence(Seq(f1,f2)).onComplete(_ => sc.stop)
Est-ce que tout ce que j'ai manqué?