2017-10-11 3 views
1

Lors de l'exécution suivant morceau de code PySpark:Utilisation d'objets Python personnalisé dans Pyspark UDF

nlp = NLPFunctions() 

def parse_ingredients(ingredient_lines): 
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0] 
    return list(chain.from_iterable(parsed_ingredients)) 


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType())) 

Je reçois l'erreur suivante: _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

J'imagine que c'est parce que PySpark ne peut pas sérialiser cette classe personnalisée . Mais comment puis-je éviter le surcoût d'instanciation de cet objet cher à chaque exécution de la fonction parse_ingredients_line?

Répondre

0

Éditer: Cette réponse est erronée. L'objet est toujours sérialisé, puis désérialisé lorsqu'il est diffusé, ce qui évite toute sérialisation. (Tips for properly using large broadcast variables?)


Essayez d'utiliser un broadcast variable.

sc = SparkContext() 
nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format. 

def parse_ingredients(ingredient_lines): 
    parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0] 
    return list(chain.from_iterable(parsed_ingredients)) 
+0

Cette solution proposée me donne la même erreur. –

1

Disons que vous voulez utiliser Identity classe définie comme celui-ci (identity.py):

class Identity(object):     
    def __getstate__(self): 
     raise NotImplementedError("Not serializable") 

    def identity(self, x): 
     return x 

vous pouvez par exemple utiliser un objet appelable (f.py) et stocker une instance Identity en tant que membre de la classe:

from identity import Identity 

class F(object):       
    identity = None 

    def __call__(self, x): 
     if not F.identity: 
      F.identity = Identity() 
     return F.identity.identity(x) 

et les utiliser comme indiqué ci-dessous:

from pyspark.sql.functions import udf 
import f 

sc.addPyFile("identity.py") 
sc.addPyFile("f.py") 

f_ = udf(f.F()) 

spark.range(3).select(f_("id")).show() 
+-----+ 
|F(id)| 
+-----+ 
| 0| 
| 1| 
| 2| 
+-----+ 

ou de la fonction autonome et fermeture:

from pyspark.sql.functions import udf 
import identity 

sc.addPyFile("identity.py") 

def f(): 
    dict_ = {}     
    @udf()    
    def f_(x):     
     if "identity" not in dict_: 
      dict_["identity"] = identity.Identity() 
     return dict_["identity"].identity(x) 
    return f_ 


spark.range(3).select(f()("id")).show() 
+------+ 
|f_(id)| 
+------+ 
|  0| 
|  1| 
|  2| 
+------+