Dans mon code ci-dessous, j'essaie d'instancier la connexion redis-py en utilisant la variable d'environnement de l'URL. Le problème est que lorsque j'utilise foreach or foreachPartition la variable d'environnement n'est pas reconnue dans la méthode #save_on_redis.Pyspark ne reconnaît pas les variables d'environnement sur la méthode transmise comme argument à foreach ou foreachPartition
J'essaie juste de créer une connexion Redis à l'extérieur, mais je reçois « pickle.PicklingError: Ne peut pas décaper objet « lock » », parce que l'étincelle essayer d'exécuter ces deux méthodes, en même temps, sur tous les nœuds .
Question: How I can use env variables on the method passed as argument to foreach or foreachPartition ?
import os
from pyspark.sql import SparkSession
import redis
spark = (SparkSession
.builder
.getOrCreate())
print "---------"
print os.getenv("REDIS_REPORTS_URL")
print "---------"
def save_on_redis(row):
redis_ = redis.StrictRedis(host=os.getenv("REDIS_REPORTS_URL"), port=6379, db=0)
print os.getenv("REDIS_REPORTS_URL")
print redis_
redis_.set("#teste#", "fagner")
df = spark.createDataFrame([(0,1), (0,1), (0,2)], ["id", "score"])
df.foreach(save_on_redis)