2017-05-10 1 views
0

Dans mon code ci-dessous, j'essaie d'instancier la connexion redis-py en utilisant la variable d'environnement de l'URL. Le problème est que lorsque j'utilise foreach or foreachPartition la variable d'environnement n'est pas reconnue dans la méthode #save_on_redis.Pyspark ne reconnaît pas les variables d'environnement sur la méthode transmise comme argument à foreach ou foreachPartition

J'essaie juste de créer une connexion Redis à l'extérieur, mais je reçois « pickle.PicklingError: Ne peut pas décaper objet « lock » », parce que l'étincelle essayer d'exécuter ces deux méthodes, en même temps, sur tous les nœuds .

Question: How I can use env variables on the method passed as argument to foreach or foreachPartition ?

import os 
from pyspark.sql import SparkSession 
import redis 

spark = (SparkSession 
     .builder 
     .getOrCreate()) 

print "---------" 
print os.getenv("REDIS_REPORTS_URL") 
print "---------" 

def save_on_redis(row): 
    redis_ = redis.StrictRedis(host=os.getenv("REDIS_REPORTS_URL"), port=6379, db=0) 
    print os.getenv("REDIS_REPORTS_URL") 
    print redis_ 
    redis_.set("#teste#", "fagner") 


df = spark.createDataFrame([(0,1), (0,1), (0,2)], ["id", "score"]) 
df.foreach(save_on_redis) 

Répondre

0

Je vous suggère d'obtenir variable d'environnement dans votre processus de pilote et passer comme une variable python pour les processus de travail, où vous pouvez définir votre environnement avec l'utilisation os.putenv

Exemple:

In [1]: import os 

In [2]: a = sc.parallelize(range(20)) 

In [3]: os.getenv('MY_VAR') 
Out[3]: 'some_value' 

In [4]: def f(iter): 
    import os 
    return (str(os.getenv('MY_VAR')),) 
    ...: 

In [5]: a.mapPartitions(f).collect() 
Out[5]: ['None', 'None'] 

In [6]: my_var = os.getenv('MY_VAR') 

In [6]: def f2(iter): 
    import os 
    from subprocess import check_output 
    os.putenv('MY_VAR', my_var) 
    return (check_output('env | grep MY_VAR', shell=True), my_var) 
    ....: 

In [7]: a.mapPartitions(f2).collect() 
Out[7]: 
['MY_VAR=some_value\n', 
'some_value', 
'MY_VAR=some_value\n', 
'some_value'] 

PS. Selon this answer, il est préférable de modifier directement os.environ objet de mappage plutôt que d'utiliser os.putenv