2017-01-06 2 views
1

J'ai 2 fichiers avec des données différentes. Je suis en train de les lire dans 2 RDD diff & puis les convertir en Dataframe & insérer dans la ruche. J'étais capable de faire ce code normal. Mais l'étincelle a traité un calcul de RDD après l'autre. Donc, le 2ème attendait que le 1er se termine même si j'ai assez de ressources dans le cluster. J'ai appris que le calcul des RDD peut être parallélisé en utilisant des méthodes Async. Donc j'essaye foreachPartitionAsync. Mais il jette une erreur que je ne suis pas en mesure de déboguer davantage. Exemple de code:throw foreachPartitionAsync ne peut pas appeler des méthodes sur SparkContext arrêté

object asynccode { 
    def main(args: Array[String]) = { 
    val conf = new SparkConf() 
     .setAppName("Parser") 
    val sc = new SparkContext(conf) 
    val hiveContext = new HiveContext(sc) 
    import hiveContext.implicits._ 

    val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt") 
    val test = ercs.map { k => 
     var rc = method1(k._2, k._1).toSeq 
     rc 
    } 
     .flatMap(identity) 
     .foreachPartitionAsync { f => 
     f.toSeq.toDF() 
      .write.insertInto("dbname.tablename1") 
     } 

    val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt") 
    val test2 = ercs2.map { k => 
     var rs = method2(k._2, k._1) 
     rs 
    } 
     .flatMap(identity) 
     .foreachPartitionAsync(f => f.toSeq.toDF() 
     .write.insertInto("dbname.tablename2") 

    ) 
    sc.stop() 
    } 

    def method1 = ??? 
    def method2 = ??? 
} 

Mais il renvoie le message d'erreur ci-dessous. Si je supprime foreachPartitionAsync du code, cela fonctionne très bien. Pas sûr de ce que je fais mal à propos de foreachPartitionAsync.

Échec de la sérialisation des tâches: java.lang.IllegalStateException: impossible d'appeler des méthodes sur un SparkContext arrêté.

MISE À JOUR: Merci pour votre suggestion. Je l'ai mis à jour comme ci-dessous. Mais maintenant, il ne fait rien du tout. Spark Web UI, je peux voir aucune étape est déclenchée (son vide). Aucune de mes tables n'est mise à jour aussi. Mais le travail est terminé sans erreur.

val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt") 
    val test = ercs.map { k => 
     var rc = method1(k._2, k._1).toSeq 
     rc 
    } 
     .flatMap(identity) 
    toDF() 
    val f1 = Future(test.write.insertInto("dbname.tablename1")) 
     } 

    val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt") 
    val test2 = ercs2.map { k => 
     var rs = method2(k._2, k._1) 
     rs 
    } 
     .flatMap(identity) 
     toSeq.toDF() 

val f2 = Future(test2.write.insertInto("dbname.tablename2")) 

    ) 
     Future.sequence(Seq(f1,f2)).onComplete(_ => sc.stop) 

Est-ce que tout ce que j'ai manqué?

Répondre

1

Vous arrêtez SparkContext sans attendre pour terminer. Vous devez attendre des actions pour compléter et arrêter le contexte en réponse:

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 
import scala.util.{Success, Failure} 

val f1: Future[Unit] = sc.range(1, 200).foreachAsync(_ => Thread.sleep(10)) 
val f2: Future[Unit] = sc.range(1, 200).foreachAsync(_ => Thread.sleep(10)) 

Future.sequence(Seq(f1, f2)).onComplete { 
    case Success(_) => sc.stop 
    case Failure(e) => 
    e.printStackTrace // or some other appropriate actions 
    sc.stop 
} 

Cela dit même votre code est invalide si nous négligeons les actions async. Vous pouvez pas utiliser distribué des structures de données dans une action ou une transformation:

.foreachPartitionAsync(
    f => f.toSeq.toDF().write.insertInto("dbname.tablename2") 
) 

Si vous voulez des actions d'écriture asynchrones utilisent Futures directement:

val df1: Dataframe = ??? 
val df2: Dataframe = ??? 

val f1: Future[Unit] = Future(df1.write.insertInto("dbname.tablename1")) 
val f2: Future[Unit] = Future(df2.write.insertInto("dbname.tablename2")) 

et attendre que les actions se terminent comme s Montré ci-dessus.