J'ai un pyspark Dataframe et maintenant je veux itérer sur chaque ligne et insérer/mettre à jour à la collection mongoDB.itérer sur pyspark Dataframe et ensuite pour chaque ligne interagir avec mongoDB
#Did every required imports
#dataframe
+---+----+
|age|name|
+---+----+
| 30| c|
| 5| e|
| 6| f|
+---+----+
db = mongodbclient['mydatabase']
collection = db['mycollection']
#created below function to insert/update
def customFunction(row):
key = {'name':row.name}
data = dict(zip(columns,[row.x for x in columns]))
collection.update(key, data, {upsert:true})
#return a_flag #commented it as of now, a_flag can be 0 or 1
Si un nom existe dans la collection MongoDB « MaCollection » il devrait mettre à jour cette ligne/enregistrement autre insert nouveau record.
Je reçois erreur suivant lorsque essayé de cartographier cette fonction sur étincelle dataframe
result = my_dataframe.rdd.map(customFunction)
#.....TypeError: can't pickle _thread.lock objects....
#AttributeError: 'TypeError' object has no attribute 'message'
Quelqu'un peut-il s'il vous plaît comprendre « ce qui ne va pas ici dans cette fonction et/ou nulle part ailleurs » ou s'il vous plaît suggérer le cas échéant autre alternative est là pour ce type de tâche.
Fondamentalement itérer chaque ligne (sans appel à frais virés est que même possible ??)
Et, sur chaque ligne appliquer une fonction à exécuter le travail en dehors étincelle.
S'il vous plaît suggérer, Merci à l'avance .. :)
Mes données MongoDB
name age
a 1
b 2
c 3 #new update should make age as 30 and 2 more new recs should inserted
Quelle est la taille du jeu de données 'my_dataframe'? L'exportation doit-elle être effectuée en parallèle? Parce qu'il y a au moins 3 façons valides de réaliser ce dont vous avez besoin, selon le nombre d'enregistrements à mettre à jour. – Mariusz
@Mariusz: Base_collection dans mongoDB a environ 150mln + enregistrements et spark-dataframe aura des données incrémentielles pas plus de 500000 enregistrements. Plz faites-moi savoir quelles sont les options disponibles. – Satya
@Mariusz: Je ne préfère généralement pas recueillir ou convoquer dans pandas dataframe, puis upsert dans MongoDB. – Satya