2017-09-29 9 views
0

Je travaille dans Pyspark avec une fonction lambda comme les suivantes:Les variables globales non reconnues dans les fonctions lambda dans Pyspark

udf_func = UserDefinedFunction(lambda value: method1(value, dict_global), IntegerType()) 
result_col = udf_func(df[atr1]) 

La mise en œuvre de la method1 est la suivante:

def method1(value, dict_global): 
    result = len(dict_global) 
    if (value in dict_global): 
     result = dict_global[value] 
    return result 

' dict_global 'est un dictionnaire global qui contient des valeurs.

Le problème est que lorsque j'exécute la fonction lambda, le résultat est toujours None. Pour quelque raison que ce soit, la fonction 'method1' n'interprète pas la variable 'dict_global' comme une variable externe. Pourquoi? "Que pouvais-je faire?"

+0

Eh bien, 'method1' interprète' dict_global' comme un argument de fonction. La question est, où est la fonction 'lambda' à la recherche de' dict_global'? Tant qu'elle n'est pas masquée par une définition locale, la fonction 'lambda' devrait alors la rechercher dans le contexte global dans lequel' lambda' a été défini. –

+0

Je passe le dictionnaire en paramètre parce que quand j'ai essayé le même sans passer le dict comme paramètre j'obtiens le même résultat ... 'Aucun' – jartymcfly

+0

Si vous le passez comme argument, vous devriez le réécrire 'udf_func = lambda dict_global: UserDefinedFunction (valeur lambda: méthode1 (valeur, dict_global), IntegerType()) ' ' result_col = udf_func (dict_global) (df [atr1]) ' – MaFF

Répondre

0

Finalement, j'ai trouvé une solution. Je l'écris ci-dessous:

Les fonctions Lambda (ainsi que les fonctions de mappage et de réduction) exécutées dans SPARK planifient les exécutions entre les différents exécuteurs et fonctionnent dans différents threads d'exécution. Donc, le problème dans mon code pourrait être des variables globales parfois ne sont pas attrapés par les fonctions exécutées en parallèle dans différents threads, donc j'ai cherché une solution pour essayer de le résoudre.

Heureusement, dans SPARK il y a un élément appelé "Broadcast" qui permet de passer des variables à l'exécution d'une fonction organisée parmi les exécuteurs pour travailler avec eux sans problème. Il existe 2 types de variables partageables: Broadcast (variables inmutables, uniquement pour la lecture) et accumulateurs (variables mutables, mais valeurs numériques uniquement acceptées).

Je réécris mon code pour vous montrer comment ai-je résoudre le problème:

broadcastVar = sc.broadcast(dict_global) 
udf_func = UserDefinedFunction(lambda value: method1(value, boradcastVar), IntegerType()) 
result_col = udf_func(df[atr1]) 

Hope it helps!