J'essaye d'analyser une requête SQL, et je veux appeler une fonction pour chaque ligne d'une trame. La fonction est comme ci-dessous:__getattr__ erreur lors de l'appel foreach pour dataframe dans pyspark
def updateParser(df):
# update tab1 set value1 = 0.34 where id = 1111
# identify positions
setPos = df.select(instr(df.query, ' set ').alias('set')).collect()[0].set
wherePos = df.select(instr(df.query, ' where ').alias('where')).collect()[0].where
idPos = df.select(instr(df.query, ' id').alias('id')).collect()[0].id
# identify table, fields&values, id
df = df.withColumn('table', upper(trim(df.query.substr(7, setPos - 7))))
df = df.withColumn('fieldValueList', upper(trim(df.query.substr(setPos + 5, (wherePos - (setPos + 5) + 1)))))
df = df.withColumn('id', upper(trim(df.query.substr(idPos + 5, 10))))
#identify the column being updated and the value
df.show(n=5, truncate = False)
Et j'appelle cela via:
updateDF.foreach(updateParser)
mais je reçois l'erreur suivante:
File "/home/mapr/scripts/cdc.py", line 19, in updateParser
setPos = df.select(instr(df.query, ' set ').alias('set')).collect()[0].set
File "/opt/mapr/spark/spark-1.5.2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1257, in __getattr__
raise AttributeError(item)
AttributeError: select
Je n'utilise pas getattr partout. est-ce nécessaire? Si je n'utilise pas foreach et que je l'exécute directement sur le dataframe, alors ça fonctionne bien. Quelqu'un pourrait-il conseiller s'il vous plaît.
a) Ce code Python n'est pas valide (au moins une indentation de correction) b) Si 'updateDF' est un' DataFrame', ce code Spark n'est pas valide. – zero323
L'indentation a été perdue car elle a été copiée à partir de l'éditeur vi, et le code fonctionne correctement dans pyspark et il est testé dans CLI et dans le travail pyspark. – learning