2017-09-07 8 views
0

J'ai un pyspark Dataframe et maintenant je veux itérer sur chaque ligne et insérer/mettre à jour à la collection mongoDB.itérer sur pyspark Dataframe et ensuite pour chaque ligne interagir avec mongoDB

#Did every required imports 
#dataframe 
+---+----+ 
|age|name| 
+---+----+ 
| 30| c| 
| 5| e| 
| 6| f| 
+---+----+ 
    db = mongodbclient['mydatabase'] 
    collection = db['mycollection'] 
    #created below function to insert/update 
    def customFunction(row): 
     key = {'name':row.name} 
     data = dict(zip(columns,[row.x for x in columns])) 
     collection.update(key, data, {upsert:true}) 
     #return a_flag #commented it as of now, a_flag can be 0 or 1 

Si un nom existe dans la collection MongoDB « MaCollection » il devrait mettre à jour cette ligne/enregistrement autre insert nouveau record.

Je reçois erreur suivant lorsque essayé de cartographier cette fonction sur étincelle dataframe

result = my_dataframe.rdd.map(customFunction) 
#.....TypeError: can't pickle _thread.lock objects.... 
#AttributeError: 'TypeError' object has no attribute 'message' 

Quelqu'un peut-il s'il vous plaît comprendre « ce qui ne va pas ici dans cette fonction et/ou nulle part ailleurs » ou s'il vous plaît suggérer le cas échéant autre alternative est là pour ce type de tâche.

Fondamentalement itérer chaque ligne (sans appel à frais virés est que même possible ??)

Et, sur chaque ligne appliquer une fonction à exécuter le travail en dehors étincelle.

S'il vous plaît suggérer, Merci à l'avance .. :)

Mes données MongoDB

name age 
a 1 
b 2 
c 3 #new update should make age as 30 and 2 more new recs should inserted 
+0

Quelle est la taille du jeu de données 'my_dataframe'? L'exportation doit-elle être effectuée en parallèle? Parce qu'il y a au moins 3 façons valides de réaliser ce dont vous avez besoin, selon le nombre d'enregistrements à mettre à jour. – Mariusz

+0

@Mariusz: Base_collection dans mongoDB a environ 150mln + enregistrements et spark-dataframe aura des données incrémentielles pas plus de 500000 enregistrements. Plz faites-moi savoir quelles sont les options disponibles. – Satya

+0

@Mariusz: Je ne préfère généralement pas recueillir ou convoquer dans pandas dataframe, puis upsert dans MongoDB. – Satya

Répondre

1

On dirait que l'objet de connexion ne peut pas être décapée. J'utilise foreachPartition:

def customFunction(rows): 
    db = mongodbclient['mydatabase'] 
    collection = db['mycollection'] 

    for row in rows: 
     key = {'name':row.name} 
     data = dict(zip(columns,[row.x for x in columns])) 
     collection.update(key, data, {upsert:true}) 

my_dataframe.rdd.foreachPartition(customFunction) 

mais gardez à l'esprit que l'échec fatal pourrait quitter la base de données dans un état incohérent.

1

Si vous avez 500k enregistrements à être retournés dans MongoDB, le mode bulk sera probablement plus efficace pour gérer cela. L'exécution de requêtes à l'intérieur de mongoDB nécessitera beaucoup plus de puissance comparée à ce que vous faites réellement dans l'étincelle (juste en créant des requêtes) et même l'exécution en parallèle peut provoquer des instabilités sur le côté mongo (et être plus lent que l'approche itérative).

Vous pouvez essayer le code suivant. Il n'utilise pas collect(), donc la mémoire est efficace sur le pilote:

bulk = collection.initialize_unordered_bulk_op() 
for row in rdd.toLocalIterator(): 
    key = {'name':row.name} 
    data = dict(zip(columns,[row.x for x in columns])) 
    bulk.update(key, data, {upsert:true}) 

print(bulk.execute())