Salut J'essaie d'écrire un code dans pyspark pour créer une liste à partir d'une base de données. J'utilise collect() fonction dans mon code mais je ne sais pas si c'est une façon correcte d'obtenir une liste de filtre de valeurs d'une colonne de données. Puisque collect() amène les données dans le noeud de données, ce sera une mauvaise option dans le cas de données de grande taille, disons 10 Go.Quelle est la meilleure façon d'éviter d'utiliser la fonction collect() dans le code pyspark? Les meilleures façons d'écrire optimiser le code pyspark?
Ci-dessous mon dataframe d'entrée -
[Row(parent=u'p1', child=u'c1'), Row(parent=u'p11', child=u'p1'),
Row(parent=u'p111', child=u'p11'), Row(parent=u'p2', child=u'c2'),
Row(parent=u'p22', child=u'p2'), Row(parent=u'p222', child=u'p22'),
Row(parent=u'p2222', child=u'p222')]
Je veux obtenir dataframe de sortie comme ci-dessous -
[Row(parent=u'p2222', child1=u'p222', child2=u'p22', child3=u'p2',
child4=u'c2'), Row(parent=u'p111', child1=u'p11', child2=u'p1',
child3=u'c1', child4=None)]
Voici le code de travail que je l'ai écrit, mais pas sûr si optimisé comme étincelle est connu pour un traitement optimisé
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
customSchema = StructType([StructField('parent',StringType(),True),\
StructField('child',StringType(),True)])
#loading data from a CSV file and creating a dataframe
mydata = sqlContext.load(source='com.databricks.spark.csv',path='/FileStore/tables/34v0qouq1507635707462/parent_child_input.csv',header=True,schema=customSchema)
mydata.registerTempTable('mydata')
#creating a list of values of column "Child" from the dataframe "mydata"
childlist = [x[1] for x in mydata.collect()]
#creating another dataframe with filter values of "Parent" column which are not present in childlist
level1 = mydata.selectExpr('parent','child as child1').where(~mydata.parent.isin(childlist))
i=1
#Function to create dataframe containing desired output as mentioned above
def getChild(level1,i):
cname = 'child'+str(i)
tmp = [x[i] for x in level1.collect() if x[i]]
tmp = list(set(tmp))
if tmp.count(None)==1:
tmp.remove(None)
level1.registerTempTable('level1')
if len(tmp)>0:
i+=1
ccname = 'child'+str(i)
querystr='select level1.*,mydata.child as ' +ccname+\
' from level1 left outer join mydata on level1.'+cname+'=mydata.parent'
level1 = sqlContext.sql(querystr)
level1 = getChild(level1,i)
return level1
level1 = getChild(level1,i)
level1.drop('child5').show()