2016-02-20 1 views
3

J'essaie d'exécuter un compte de mots python sur un cluster Spark HDInsight et je l'exécute depuis Jupyter. Je ne suis pas vraiment sûr que ce soit la bonne façon de le faire, mais je n'ai rien trouvé d'utile sur la façon de soumettre une application python autonome sur un cluster HDInsight Spark.Comment soumettre un compte de mots en python sur un cluster HDInsight Spark de Jupyter

Le code:

import pyspark 
import operator 
from pyspark import SparkConf 
from pyspark import SparkContext 
import atexit 
from operator import add 
conf = SparkConf().setMaster("yarn-client").setAppName("WC") 
sc = SparkContext(conf = conf) 
atexit.register(lambda: sc.stop()) 

input = sc.textFile("wasb:///example/data/gutenberg/davinci.txt") 
words = input.flatMap(lambda x: x.split()) 
wordCount = words.map(lambda x: (str(x),1)).reduceByKey(add) 

wordCount.saveAsTextFile("wasb:///example/outputspark") 

Et le message d'erreur que je reçois et ne comprends pas:

ValueError        Traceback (most recent call last) 
<ipython-input-2-8a9d4f2cb5e8> in <module>() 
     6 from operator import add 
     7 import atexit 
----> 8 sc = SparkContext('yarn-client') 
     9 
    10 input = sc.textFile("wasb:///example/data/gutenberg/davinci.txt") 

/usr/hdp/current/spark-client/python/pyspark/context.pyc in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls) 
    108   """ 
    109   self._callsite = first_spark_call() or CallSite(None, None, None) 
--> 110   SparkContext._ensure_initialized(self, gateway=gateway) 
    111   try: 
    112    self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, 

/usr/hdp/current/spark-client/python/pyspark/context.pyc in _ensure_initialized(cls, instance, gateway) 
    248       " created by %s at %s:%s " 
    249       % (currentAppName, currentMaster, 
--> 250        callsite.function, callsite.file, callsite.linenum)) 
    251     else: 
    252      SparkContext._active_spark_context = instance 

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=yarn-client) created by __init__ at <ipython-input-1-86beedbc8a46>:7 

Est-il réellement possible d'exécuter python travail de cette façon? Si oui - il semble être le problème avec la définition SparkContext ... J'ai essayé différentes façons:

sc = SparkContext('spark://headnodehost:7077', 'pyspark') 

et

conf = SparkConf().setMaster("yarn-client").setAppName("WordCount1") 
sc = SparkContext(conf = conf) 

mais sans succès. Quelle serait la bonne façon d'exécuter le travail ou de configurer SparkContext?

Répondre

0

Il me semble que je peux répondre moi-même à ma question. Quelques changements dans le code se sont révélées être utiles:

conf = SparkConf() 
conf.setMaster("yarn-client") 
conf.setAppName("pyspark-word-count6") 
sc = SparkContext(conf=conf) 
atexit.register(lambda: sc.stop()) 

data = sc.textFile("wasb:///example/data/gutenberg/davinci.txt") 
words = data.flatMap(lambda x: x.split()) 
wordCount = words.map(lambda x: (x.encode('ascii','ignore'),1)).reduceByKey(add) 

wordCount.saveAsTextFile("wasb:///output/path") 
0

Je viens résolu un bug similaire dans mon code pour trouver qu'il était au fait que pyspark accepte qu'un seul objet de SparkContext(). Une fois soumis, les modifications et l'exécution du code rencontrent ce problème et renvoient l'initialisation du message d'erreur. Ma solution était simplement de redémarrer le noyau de la plate-forme et lorsque le cahier est redémarré pour réexécuter mon script portable. Il a ensuite couru sans erreur.

1

Si vous exécutez à partir du bloc-notes de Jupyter, le contexte Spark est pré-créé pour vous et il serait incorrect de créer un contexte séparé. Pour résoudre le problème simplement supprimer les lignes qui créent le contexte et commencer directement à partir de:

input = sc.textFile("wasb:///example/data/gutenberg/davinci.txt") 

Si vous devez exécuter le programme autonome, vous pouvez l'exécuter à partir de la ligne de commande en utilisant pyspark ou soumettre à l'aide d'API REST en utilisant serveur exécutant Livy sur le cluster.