2017-09-26 5 views
7

Je suis en train d'exécuter un travail d'étincelle dans EMR avec YARN en tant que gestionnaire de ressources et sur 2 nœuds. J'ai besoin d'échouer volontairement l'étape si ma condition n'est pas remplie, donc l'étape suivante ne s'exécute pas selon la configuration. Pour ce faire, je lance une exception personnalisée, après l'insertion d'un message de journal dans dynamoDB.Spark, Comportement incorrect lors du lancement de SparkException dans EMR

Cela fonctionne bien mais l'enregistrement dans Dynamo est inséré deux fois.

Voici mon code. Si je supprime la ligne pour lancer une exception, cela fonctionne correctement, mais l'étape est terminée.

Comment puis-je faire échouer l'étape, sans recevoir le message de journal deux fois.

Merci pour l'aide.

Cordialement, Sorabh

Répondre

2

probablement la raison pour laquelle votre message dynamo a été inséré deux fois était parce que votre condition d'erreur a été frappé et traité par deux exécuteurs différents. Spark divise le travail à faire parmi ses travailleurs, et ces travailleurs ne partagent aucune connaissance. Je ne suis pas sûr de ce qui motive votre exigence d'avoir le Spark step FAIL, mais je suggère plutôt de suivre ce cas de défaillance dans votre code de l'application au lieu d'essayer d'avoir une étincelle directement. En d'autres termes, écrivez un code qui détecte l'erreur et la renvoie à votre pilote d'étincelle, puis agissez en conséquence. Une façon de le faire serait d'utiliser un accumulateur pour compter les erreurs qui se produisent pendant que vous traitez vos données. Il ressemblerait à quelque chose à peu près comme ça (je suppose scala et DataFrames, mais vous pouvez adapter à de RDD et/ou python au besoin):

val accum = sc.longAccumulator("Error Counter") 
def doProcessing(a: String, b: String): String = { 
    if(condition) { 
    accum.add(1) 
    null 
    } 
    else { 
    doComputation(a, b) 
    } 
} 
val doProcessingUdf = udf(doProcessing _) 

df = df.withColumn("result", doProcessing($"a", $"b")) 

df.write.format(..).save(..) // Accumulator value not computed until an action occurs! 

if(accum.value > 0) { 
    // An error detected during computation! Do whatever needs to be done. 
    <insert dynamo message here> 
} 

Une bonne chose au sujet de cette approche est que si vous cherchez des commentaires dans l'interface utilisateur Spark, vous pourrez voir les valeurs de l'accumulateur pendant son exécution. Pour référence, voici la documentation sur les accumulateurs: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators